当前位置: 首页 > news >正文

Workflow轻量级工作流引擎实现

1.背景

Workflow 是一款极致轻量、高性能、易集成、可扩展的流程引擎,专注于纯内存执行,无状态设计,确保运行高效、简洁。

借助 Workflow,开发人员可以通过 YAML 直观地设计业务流程,将复杂的业务逻辑可视化,搭建起业务设计人员与开发工程师之间的桥梁。

Workflow 通过解耦业务流程与具体操作,使业务流程管理更加灵活,降低新流程上线对现有业务的影响,极大提升企业的敏捷性可维护性

2.使用工作流对比分析

提到工作流,印象里都是 OA 系统各种请假审批流。事实上,广义上的工作流是对工作流程及其各操作步骤之间业务规则的抽象、概括、描述。简单理解,我们为了实现某个业务目标,抽象拆解出来的一系列步骤及这些步骤之间的协作关系,就是工作流。例如订单发货流、程序构建流等。业界通常用 BPMN 流程图来描述一个工作流。

2.1.没有工作流时的任务协作

以实现一个用户购买逻辑为例,如果不应用工作流模型,我们串联多个任务(步骤)一般是通过显示的代码调用:

校验、支付、发货一气呵成,流畅自然。正喝着枸杞红枣,产品一脸笑意跑过来:“我们新搞个充会员卡的业务,大概步骤就校验 -> 推荐 -> 支付 -> 充值。校验和支付前面都做过了,应该很快实现吧?”

精通 if-else 的你,听完的瞬间就已经构思好了代码:

一通写下来,总感觉哪里不对,“为什么加新的任务节点,要改已有的代码呢?这不符合开闭原则啊!”.

2.2.应用工作流模型的任务协作

工作流模型正是为了解决这类问题而生:分离任务的实现和任务的协作关系。上面同样的用户购物逻辑,有了工作流模型,各个任务只实现自己原子的逻辑,任务协作关系使用流程图来表达。

当新的逻辑需要复用已有任务节点时,只需要要调整流程图,无需修改已有代码。

3.流程模型关系

4.流程引擎架构

5.流程解析步骤

6.使用方式

6.1.快速开始

6.1.1.业务流程定义

6.1.2.业务流程定义转换为流程描述文件

此流程定义文件需要拷贝到工程的 resources/flow 目录下

processDefinition:
  id: "leave_process"
  version: "1.0"
  startStateId: "launch"
  states:
    - id: "launch"
      type: "Start"
      nextStateId: "leader_approval"
      processors:
        - "com.damon.leave.EmployeeLeaveProcessor"
      extendParams:
        assigneeUser: "181987"
        actions:
          - submit

    - id: "leader_approval"
      type: "UserTask"
      nextStateId: "leave_day_gateway"
      processors:
        - "com.damon.leave.LeaderApprovalProcessor"
      extendParams:
        assigneeUser: "20000"
        actions:
          - agree
          - reject

    - id: "leave_day_gateway"
      type: "ExclusiveGateway"
      conditions:
        - nextStateConditionParser: "com.damon.leave.LeaveDaysLessOrEqualThan3ConditionParser"
          nextStateId: "End"
        - nextStateConditionParser: "com.damon.leave.LeaveDaysGreaterThan3ConditionParser"
          nextStateId: "director_approval"

    - id: "director_approval"
      type: "UserTask"
      processors:
        - "com.damon.leave.DirectorApprovalProcessor"
      nextStateId: "End"
      extendParams:
        assigneeUser: "20000"
        actions:
          - agree
          - reject

    - id: "End"
      type: "End"
      nextStateId: null

注意:processors 和 nextStateConditionParser 关联的类文件路径需要和实际运行类的路径保持一致,否则执行的时候会出现找不类的错误。

6.1.3.实现节点处理器

@Component
public class EmployeeLeaveProcessor implements IProcessor {
    @Override
    public void process(RuntimeContext context) {
        System.out.println("用户提交表单内容:" + context.getVariables());
    }
}
@Component
public class LeaderApprovalProcessor implements IProcessor {
    @Override
    public void process(RuntimeContext context) {
        System.out.println("组长审批内容:" + context.getVariables());
    }
}
@Component
public class DirectorApprovalProcessor implements IProcessor {
    @Override
    public void process(RuntimeContext context) {
        System.out.println("总监审批内容:" + context.getVariables());
    }
}

6.1.4.实现条件处理

/**
 * 请假天数大于3天条件解析器
 */
@Component
public class LeaveDaysGreaterThan3ConditionParser implements IConditionParser {
    @Overrid
    public boolean test(RuntimeContext context) {
        Map<String, Object> variables = context.getVariables();
        int leaveDays = (int) variables.get("leave_days");
        return leaveDays > 3;
    }
}
/**
 * 请假天数小等于3天条件解析器
 */
@Component
public class LeaveDaysLessOrEqualThan3ConditionParser implements IConditionParser {
    @Override
    public boolean test(RuntimeContext context) {
        Map<String, Object> variables = context.getVariables();
        int leaveDays = (int) variables.get("leave_days");
        return leaveDays <= 3;
    }
}

6.1.5.配置流程引擎

@SpringBootApplication
@ComponentScan(basePackages = "com.damon", nameGenerator = WorkflowBeanNameGenerator.class)
public class Application {
    @Bean
    public ProcessEngine processEngine() {
        return new ProcessEngine.Builder().evaluator(DefaultEvaluator.build()).build();
    }
    
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

6.1.5.运行示例

package com.damon.leave;

import com.damon.performance.evaluation.workflow.Application;
import com.damon.performance.evaluation.workflow.ComplexProcessResult;
import com.damon.performance.evaluation.workflow.ProcessEngine;
import com.damon.performance.evaluation.workflow.config.StateIdentifier;
import com.damon.performance.evaluation.workflow.evaluator.DefaultEvaluator;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;
import java.util.Map;

@SpringBootTest(classes = Application.class)
public class LeaveTest {
    @Autowired
    private ProcessEngine engine;
    /**
     * 请假小于3天场景
     */
    @Test
    public void leaveLessThanOrEquals3DaysTest() {
        Map<String, Object> variables = new HashMap<>();
        String busId = "1000";
        variables.put("leave_days", 2);
        //1.发起流程
        ComplexProcessResult result = engine.process("leave_process:1.0", variables, busId);
        //流转到组长审批, 结果为: leave_process:1.0>leader_approval
        Assert.assertEquals("leave_process:1.0>leader_approval", result.getNextStates().get(0).getNextStateFullPaths());
        //2.组长审批
        ComplexProcessResult result2 = engine.process(
                StateIdentifier.buildFromFullPaths("leave_process:1.0>leader_approval"),
                variables, busId
        );
        // 流程结束, 结果为: leave_process:1.0>End
        Assert.assertEquals("leave_process:1.0>End", result2.getNextStates().get(0).getNextStateFullPaths());

        System.out.println("流程是否结束:" + result2.isCompleted());
    }

    /**
     * 请假大于3天场景
     */
    @Test
    public void leaveGreaterThan3DaysTest() {
        Map<String, Object> variables = new HashMap<>();
        String busId = "1000";
        variables.put("leave_days", 4);
        //1.发起流程
        ComplexProcessResult result = engine.process("leave_process:1.0", variables, busId);
        //流转到组长审批, 结果为: leave_process:1.0>leader_approval
        Assert.assertEquals("leave_process:1.0>leader_approval", result.getNextStates().get(0).getNextStateFullPaths());


        //2.组长审批
        ComplexProcessResult result2 = engine.process(
                StateIdentifier.buildFromFullPaths("leave_process:1.0>leader_approval"),
                variables,
                busId
        );
        // 流转到总监审批, 结果为: leave_process:1.0>director_approval
        Assert.assertEquals("leave_process:1.0>director_approval", result2.getNextStates().get(0).getNextStateFullPaths());

        //3.总监审批
        ComplexProcessResult result3 = engine.process(
                StateIdentifier.buildFromFullPaths("leave_process:1.0>director_approval"),
                variables,
                busId
        );
        // 流程结束, 结果为: leave_process:1.0>End
        Assert.assertEquals("leave_process:1.0>End", result3.getNextStates().get(0).getNextStateFullPaths());

        System.out.println("流程是否结束:" + result3.isCompleted());
    }
}

6.2.支持的功能

支持功能

功能描述

备注

条件分支

排他分支用于在流程中实现决策,即根据条件选择一个分支执行。

并行分支

并行分支允许将流程分成多条分支,也可以把多条分支汇聚到一起。其功能是基于进入和外出顺序流的,即可以分叉(fork)成多个并行分支,也可以汇聚(join)多个并行分支。

子流程

主流程节点设置子流程,子流程节点会自动进入子流程,子流程结束后,主流程节点会自动跳转。

驳回

将审批重置发送给某节点,重新审批。驳回也叫退回,也可以分退回申请人、退回上一步、任意退回等

当前驳回的策略由业务系统自己实现。实现 IProcessRollback 接口即可

撤回

在当前办理人尚未处理文件前,允许上一节点提交人员执行拿回

当前撤回的策略由业务系统自己实现。实现 IProcessCallback 接口即可

6.3.流程属性描述

编码

名称

描述

示例

id

ID

流程任务 ID

name

名称

流程任务名称

type

任务类型

UserTask(用户任务)、ExclusiveGateway(排它网关)、End(结束)、ParallelStartGateway(并行网关开始)、ParallelEndGateway(并行网关结束)、SubProcess(子流程)

nextStateId

下一个流程任务 ID

processors

任务处理器

当前流程任务的业务处理器(可配置多个)

nextStateConditionParser

下一个任务条件解析器

用于判断当前任务是否可以流转到下一个流程任务,类似工作流里面的多人会签,当多个人都完成的时候才可以流程到下一个流程任务节点。

nextStateCondition

下一个任务条件判断

与 nextStateConditionParser 不同的是它不用创建条件解析器,可以直接使用 Groovy、Javascirpt 脚本进行判断。

nextStateConditionScriptType

任务条件判断的脚步类型

不填写默认是 Javascirpt,也可以指定 Groovy

extendParams

节点的扩展参数

用于描述流程任务的扩展参数,比如标注当前流程任务那些角色可以处理、哪些资源可以编辑。

conditions

多条件分支

只能用于任务类型是:ExclusiveGateway(排它网关)、ParallelStartGateway(并行开始网关)。

6.4.核心 API 使用

6.4.1.配置流程引擎

@Bean
public ProcessEngine processEngine() {
    return new ProcessEngine.Builder().evaluator(DefaultEvaluator.build()).build();
}

6.4.2.开启流程

Map<String, Object> variables = new HashMap<>();
String busId = "123456";
ComplexProcessResult result = engine.process("PerformanceEvaluation:1.0", variables,busId);
//下一个任务: PerformanceEvaluation:1.0>employee_self_assessment

6.4.3.处理流程任务

Map<String, Object> variables = new HashMap<>();
String busId = "123456";
ComplexProcessResult result = engine.process(
    StateIdentifier.buildFromFullPaths("PerformanceEvaluation:1.0>employee_self_assessment"),
    variables, busId
 );

6.4.4.获取流程任务描述信息

//获取员工自评任务节点描述信息
State = engine.getState("PerformanceEvaluation:1.0", "employee_self_assessment");

6.4.5.回退流程

需要实现 IProcessRollback 接口才能支持回退功能

//从分管主O评价任务回退到主O评价任务
StateIdentifier currentStateIdentifier = StateIdentifier.buildFromFullPaths("PerformanceEvaluation:1.0>secondary_owner_evaluation");
String busId = "123456";
ComplexProcessResult result = engine.rollback(currentStateIdentifier, busId);
//返回上一个任务: PerformanceEvaluation:1.0>primary_owner_evaluation

6.4.6.任务处理器

任务处理器主要用于记录流程步骤时做的业务行为,同时记录业务数据。例如:用户的当前步骤是请假,假如用户提交一个请假单,那么当前 EmployeeLeaveProcessor 的处理器的作用就是记录请假单信息。

@Component
public class EmployeeLeaveProcessor implements IProcessor {
    @Override
    public void process(RuntimeContext context) {
        System.out.println("用户提交表单内容:" + context.getVariables());
        //记录请假单信息
    }
}

6.4.7.条件解析器

用于判断当前任务是否可以流转到下一个流程任务,例如:员工请假大于 3 天需要总监审批,小于等于 3 天只需要组长审批即可。当员工提交请假申请单且组长进行审批完成的时候,就需要根据表单参数判断是否走总监审批还是结束流程。

/**
 * 请假天数大于3天条件解析器
 */
@Component
public class LeaveDaysGreaterThan3ConditionParser implements IConditionParser {
    @Override
    public boolean test(RuntimeContext context) {
        Map<String, Object> variables = context.getVariables();
        int leaveDays = (int) variables.get("leave_days");
        return leaveDays > 3;
    }
}

6.5.FAQ

6.5.1.如何实现有状态的流程引擎?

目前,流程引擎是 纯内存 模式,并不会自动记录流程的处理结果或下一个待处理任务。这部分需要由业务系统自行维护。在流程执行时,业务系统需要 记录当前步骤及其对应的下一个步骤,确保在后续处理时能够准确定位待办任务。这样,当某个任务需要被处理时,只需根据 当前步骤 和 处理人 进行任务分配,即可继续推进流程。

Map<String, Object> variables = new HashMap<>();
String busId = "1000";
variables.put("leave_days", 2);
//1.发起流程
ComplexProcessResult result = engine.process("leave_process:1.0", variables, busId);
//2.当前步骤处理完成后的下一个处理步骤
List<NextState> nextStates = result.getNextStates();
//3.记录下一个处理步骤到业务系统

 6.5.2.如何与业务数据关联?

在启动流程或处理任务时,可以传入 busId(业务唯一标识),流程节点处理器内部即可获取该 busId,从而实现与具体业务数据的关联。这使得流程引擎可以无缝对接业务系统,确保每个流程节点都能够正确映射到相关的业务数据。

//开启流程和流程处理时可以传入busId
engine.process("leave_process:1.0", variables, busId)
@Component
public class EmployeeLeaveProcessor implements IProcessor {
    @Override
    public void process(RuntimeContext context) {
        String busId = context.getBusinessId();
    }
}

6.5.3.如何实现流程的驳回?

实现 IProcessRollback 接口时,需要解决 流程驳回 的问题,而当前的 工作流引擎是纯内存的,不会自动记录流转历史。因此,流程驳回需要依赖 业务系统记录的流程任务流转日志 来确定可驳回的上一个步骤。

1. 业务系统必须记录流程流转历史

  • 每次任务流转时,业务系统应记录 当前步骤、处理人、操作时间、下一步流转信息 等。

  • 这样在驳回时,可以根据 busId 查询流转记录,找到可驳回的步骤。

2. 驳回时的逻辑

  • 通过 busId 查询业务系统的流转日志,找到当前任务的 上一个可驳回节点

  • 执行 驳回操作,修改流程状态,并通知相关处理人重新处理。

public interface IProcessRollback {
    List<StateIdentifier> rollback(StateIdentifier currentStateIdentifier, String businessId);
}

配置驳回实现到流程引擎

@Bean
public ProcessEngine processEngine(IProcessRollback processRollback) {
    return new ProcessEngine.Builder().processRollback(processRollback).build();
}

6.5.4.如何实现会签功能

会签功能其实就是一个特殊的条件解析器。假设某个节点需要张三、李四两个人完成才可以到达下一个节点。我们只需要再条件解析器里面判断张三、李四是否都完成了签署,如果都完成了签署, 则可以进入下一个节点。

这边有一个特殊的地方就是,条件解析器必须得继承 RedissionSafeConditionParser,因为会签会存在多个人并行的情况,需要依赖 redis 的分布式锁来解决问题。

@Component
public class CountersignConditionParser extends RedissionSafeConditionParser {
    @Override
    public boolean test(RuntimeContext context) {
       //1.查询当前节点的处理人都有谁
       //2.执行业务判断所有的人是否都已完成会签
       //3.如果都完成签署返回true,否则返回false
    }
}

https://github.com/654894017/workflow

相关文章:

  • C++高并发内存池ConcurrenMemoPool
  • mysql8.0.17以下驱动导致mybatis blob映射String乱码问题分析与解决
  • gis系统中如何提高shp大文件加载效率
  • B端可视化像企业数据的透视镜,看清关键信息
  • C 语 言 --- 指 针 3
  • jangow靶机笔记(Vulnhub)
  • 深度学习数据预处理:Dataset类的全面解析与实战指南
  • 在Windows创建虚拟环境如何在pycharm中配置使用
  • 【滑动窗口】最⼤连续 1 的个数 III(medium)
  • MLA(多头潜在注意力)原理概述
  • leetcode 2563. 统计公平数对的数目 中等
  • turtle库绘制进阶图形
  • 【Canvas与旗帜】标准英国米字旗
  • 深入解析进程与线程:区别、联系及Java实现
  • 【大模型框架】LLAMA-FACTORY使用总结
  • 【工控基础】工业相机设置中,增益和数字增益有什么区别?
  • 网络爬虫和前端相关知识
  • 数据结构——栈以及相应的操作
  • 健康养生:拥抱美好生活的基石
  • 9 C 语言变量详解:声明与定于、初始化与赋值、printf 输出与 scanf 输入、关键字、标识符命名规范
  • 影子调查丨义门陈遗址建筑被“没收”风波
  • 观察|智驾监管升级挤掉宣传水分,行业或加速驶入安全快车道
  • 建投读书会·东西汇流|西风东渐中的上海营造
  • 不断深化“数字上海”建设!上海市数据发展管理工作领导小组会议举行
  • 疼痛管理“童”样重要,解读围术期疼痛管理
  • 全球建筑瞭望|与自然共呼吸的溪谷石舍与海边公共空间