Smart_Engine使用指南
Smart_Engine使用指南
SmartEngine 是一个轻量级的业务编排引擎。它在阿里巴巴集团中被广泛使用。可用于编排微服务架构中的多个服务,以极高性能的方式启动/发送流程实例,存储成本低,也可用于传统的流程审批场景。
1、知识科普
1.1、流程(process)
smartEngine中最基础的就是要有一个process,我们将自己的业务逻辑通过xml配置的方式进行编写,汇总成一个process,在一个项目中可以有多个process,每个process有唯一的标示
1.1.1、流程基本概念
名词 | 概念 |
---|---|
开始节点 | 一个流程的开始,只允许有一个 |
结束节点 | 一个流程的结束,可以有多个 |
流转 | 从一个流程流向另一个流程的关系描述,一般包括原流程和目标流程加条件 |
网关 | 用于控制流程的交互,可以是互斥,并行,包容等,就行if/else、fork/join |
1.1.2、BPMN
BPMN(Business Process Modeling Notation),业务流程建模与标注,可以用其定义的一系列业务组件,组成业务流程图。
smartEngine遵从BPMN的设计理念,我们设计流程的时候可以借助BPMN的UI工具进行方便直观的设计
流对象(Flow Objects):
名词 | 概念 |
---|---|
事件 | 指业务流程过程中发生的事情,有开始/中间/结束 |
活动 | 任务,包括任务和子流程两类 |
网关 | 用于表示流程的分支与合并,有互斥网关/并行网关/包容网关/事件网关 |
数据(Data):
- 数据对象(Data Objects)
- 数据输入(Data Inputs)
- 数据输出(Data Outputs)
- 数据存储(Data Stores)
连接对象(Connecting Objects):
名词 | 概念 |
---|---|
顺序流 | 用一个带实心箭头的实心线表示,用于指定活动执行的顺序 |
信息流 | 任务,包括任务和子流程两类 |
关联 | 用于表示流程的分支与合并,有互斥网关/并行网关/包容网关/事件网关 |
SmartEngine中支持了BPMN中的StartEvent、EndEvent、SequenceFlow、ExclusiveGateway、ServiceTask、ReceiveTask
1.2、SmartEngine配置文件
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns:smart="http://smartengine.org/schema/process" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" targetNamespace="Examples">
<process id="exclusiveTest" version="1.0.0">
<startEvent id="theStart">
</startEvent>
<sequenceFlow id="flow1" sourceRef="theStart" targetRef="submitTask"/>
<userTask id="submitTask" name="SubmitTask">
</userTask>
<sequenceFlow id="flowFromSubmitTask" sourceRef="submitTask" targetRef="auditTask"/>
<userTask id="auditTask" name="AuditTask">
</userTask>
<sequenceFlow id="flowFromAuditTask" sourceRef="auditTask" targetRef="exclusiveGw1"/>
<exclusiveGateway id="exclusiveGw1" name="Exclusive Gateway 1"/>
<sequenceFlow id="flow2" sourceRef="exclusiveGw1"
targetRef="executeTask">
<conditionExpression xsi:type="mvel">approve == 'agree'
</conditionExpression>
</sequenceFlow>
<sequenceFlow id="flow3" sourceRef="exclusiveGw1"
targetRef="advancedAuditTask">
<conditionExpression xsi:type="mvel">approve == 'upgrade'
</conditionExpression>
</sequenceFlow>
<serviceTask id="executeTask" name="ExecuteTask"
smart:class="com.alibaba.simplest.bpm.util.AuditProcessServiceTaskDelegation">
</serviceTask>
<serviceTask id="advancedAuditTask" name="ExecuteTask"
smart:class="com.alibaba.simplest.bpm.util.AdvancedAuditServiceTaskDelegation">
</serviceTask>
<sequenceFlow id="flow4" sourceRef="executeTask" targetRef="theEnd"/>
<sequenceFlow id="flow5" sourceRef="advancedAuditTask" targetRef="theEnd"/>
<endEvent id="theEnd"/>
</process>
</definitions>
名词 | 概念 |
---|---|
process | 流程标示,id="exclusiveTest" version=“1.0.0" 两个字段唯一区分一个流程 |
startEvent | 表示流程开始节点,只允许有一个开始节点 |
endEvent | 表示流程结束节点,可以有多个结束节点 |
sequenceFlow | 表示环节流转关系,sourceRef起始节点,targetRef目标节点,即流转到下一个节点 |
exclusiveGateway | 互斥网关,通过conditionExpression进行判断,只能选择一个sequenceFlow继续执行 |
parallelGateway | 并行网关,parallelGateway 首先必须成对出现,分别承担fork 和join 职责。 其次,在join时需要实现分布式锁接口:LockStrategy。第三,fork 默认是顺序遍历多个sequeceFlow,但是你如果需要使用并发fork功能的话,则需要实现该接口:ExecutorService。 |
serviceTask | 服务任务,用来表示执行一个服务,改服务需要继承JavaDelegation,smart:class="”来进行指定类名,到达改节点时,会自动执行该服务的方法,并流转到下一个节点 |
receiveTask | 接收任务。在引擎遇到此类型的节点时,引擎执行会自动暂停,等待外部调用signal方法,当调用signal方法时,会驱动流程当前节点离开。 在离开该节点时,引擎会自动执行 smart:class 这个 delegation。 在一般业务场景中,我们通常使用receiveTask来表示等需要等待外部回调的节点。 |
userTask | 表示用户任务节点,仅用于DataBase模式。该节点需要人工参与处理,并且通常需要在待办列表中展示。 在Custom 模式下,建议使用receiveTask来代替。 |
上面的配置文件使用BPMN展示:
1.3、接入模式
模式 | 注释 |
---|---|
Custom | 服务于高并发海量数据低成本的业务流程治理针对微服务架构体系下的服务编排更偏向某一过程,中间不需要持久化状态 |
DataBase | 聚焦于服务于传统的审批流场景,比如工单,请假,会签等流程需要持久化,流程可能中断,需要人工干预等可以用作状态机 |
1.4、主要类介绍
1.4.1、 API操作类
SmartEngine:核心操作实例,所有的流程引擎相关的服务都是从该类进行获取,是整个操作的入口
ProcessEngineConfiguration: 引擎初始化的配置类,可以定制一些扩展
InstanceAccessor: 必须实现该接口。该接口主要用于获取业务的Delegation对象。在生产环境中,强烈建议结合Spring 获取这个Bean
IdGenerator: ID生成器
RepositoryCommandService: 流程定义的解析与加载,将流程定义从配置文件中实例化出来
RepositoryQueryService: 从内存中的流程容器里获取某一id:version的流程定义实例
DeploymentCommandService: 将流程定义文件持久化到数据库里面,并负责调用RepositoryCommandService 完成解析。
DeploymentQueryService: 获取存储到DB中流程定义内容
ProcessCommandService: 流程实例管理服务,执行流程到某一个节点
ProcessQueryService: 流程实例查询服务
ActivityQueryService: 活动实例查询服务
ExecutionCommandService: 驱动引擎流转服务,主要支持signal,markDone,jump和retry 等。 该服务区别于 TaskCommandService,主要负责驱动 ReceiveTask 这样暂停型的节点
ExecutionQueryService: 执行实例查询服务
TaskCommandService: 主要负责人工任务处理服务,主要支持transfer,markDone,add/remove TaskAssigneeCandidate。在TaskCommandService内部实现中,调用了ExecutionCommandService 方法。 该类仅用于DataBase模式
TaskQueryService: 任务实例查询服务
VariableQueryService: 变量实例查询服务
TaskAssigneeQueryService: 主要负责查询人工任务的处理者。仅用于DataBase模式
1.4.2、核心领域对象
类名 | 中文名称 | 注释 |
---|---|---|
DeploymentInstance | 部署实例 | 描述这个流程定义是谁发布的,当前处于什么状态 |
ProcessDefinition | 流程定义 | 描述一个流程有几个环节,之间的流转关系是什么样子的 |
ProcessInstance | 流程实例 | 可以简单理解为我们常见的一个工单 |
ActivityInstance | 活动实例 | 主要是描述流程实例(工单)的流转轨迹 |
ExecutionInstance | 执行实例 | 主要根据该实例的状态,来判断当前流程处在哪个节点上 |
TaskInstance | 任务实例 | 用来表示人工任务处理的.可以理解为一个需要人工参与处理的环节 |
TaskAssigneeInstance | 任务处理 | 用来表示当前任务共有几个处理者。通常在代办列表中用到此实体 |
VariableInstance | 变量实例 | 用来存储流程实例上下文 |
2、接入指南
2.1、代码接入
2.1.1、引入依赖
Custom:
<dependency>
<groupId>com.alibaba.smart.framework</groupId>
<artifactId>smart-engine-extension-storage-custom</artifactId>
<version>2.6.0</version>
</dependency>
Database:
<dependency>
<groupId>com.alibaba.smart.framework</groupId>
<artifactId>smart-engine-extension-storage-mysql</artifactId>
<version>2.6.0</version>
</dependency>
2.1.2、Spring集成
@Slf4j
@Order(LOWEST_PRECEDENCE)
@Configuration
@ConditionalOnClass(SmartEngine.class)
public class SmartEngineAutoConfiguration implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Bean
@ConditionalOnMissingBean
public SmartEngine smartEngine() {
// 使用引擎配置模板,对流程引擎做一些定制化的配置
ProcessEngineConfiguration processEngineConfiguration = new DefaultProcessEngineConfiguration();
// 配置bean实例访问服务
processEngineConfiguration.setInstanceAccessor(new CustomInstanceAccessService());
// 配置id生成器
processEngineConfiguration.setIdGenerator(new SnowFlowIdGenerator());
// 实例化引擎
SmartEngine smartEngine = new DefaultSmartEngine();
smartEngine.init(processEngineConfiguration);
// 加载流程到引擎中
deployProcessDefinition(smartEngine);
return smartEngine;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* 使用spring容器获取bean实例
*/
private class CustomInstanceAccessService implements InstanceAccessor {
@Override
public Object access(String name) {
if (StringUtils.isBlank(name)) {
return null;
}
// 指定的包名
if (StringUtils.contains(name, '.')) {
try {
return applicationContext.getBean(Class.forName(name));
} catch (ClassNotFoundException e) {
log.error("{}加载失败", name);
throw new RuntimeException("类加载失败");
}
}
// 指定的bean名称
return applicationContext.getBean(name);
}
}
/**
* 从配置文件中加载流程定义
* @param smartEngine
*/
private void deployProcessDefinition(SmartEngine smartEngine) {
RepositoryCommandService repositoryCommandService = smartEngine
.getRepositoryCommandService();
PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
try {
Resource[] resources = resolver.getResources("classpath*:/smart-engine/*.xml");
for (Resource resource : resources) {
InputStream inputStream = resource.getInputStream();
repositoryCommandService.deploy(inputStream);
IOUtil.closeQuietly(inputStream);
}
} catch (Exception e) {
throw new EngineException(e);
}
}
}
2.1.3、Database模式额外配置
在启动类上增加ComponentScan和MapperScan
@MapperScan(basePackages = {"com.alibaba.smart.framework.engine.persister.database"})
@ComponentScan(basePackages = {"com.alibaba.smart.framework.engine.persister", "com.manwang.smartengine.demo"})
@SpringBootApplication
public class DatabaseApplication {
public static void main(String[] args) {
SpringApplication.run(DatabaseApplication.class, args);
}
}
在配置文件中增加mybatis配置文件扫描路径
mybatis.mapper-locations=classpath*:mybatis/sqlmap/*.xml
还需有创建数据库表,脚本文件在https://github.com/alibaba/SmartEngine/tree/master/extension/storage/storage-mysql/src/main/resources/sql 文件夹中,执行完表如下图
2.2、DataBase模式接入测试
模拟订单审核业务场景,以审批流场景为例
2.2.1、流程模板定义
可以借助Camunda Modeler绘制流程,在官网 进行下载安装
下载完成打开使用Camunda Platform7进行绘制,下面是示列流程,不知道怎么绘制的可以在网上搜一下教程,还是很简单的
对于serviceTask可以编辑指定运行的服务名称
对于互斥网关,需要编辑条件以便于走不同的节点
绘制完成,我们点击左下角的XML,将xml配置文件copy到项目中resources/smart-engine/文件夹下保存成xml格式的文件
2.2.2、Case测试
1、从配置文件中加载流程模板
@SpringBootTest
public class SmartTest {
@Autowired
private SmartEngine smartEngine;
@Test
@DisplayName("读取配置文件实例化流程模板")
public void testRepositoryCommandService() {
RepositoryCommandService repositoryCommandService = smartEngine.getRepositoryCommandService();
ProcessDefinitionSource definitionSource = repositoryCommandService.deploy("smart-engine/order.bpmn20.xml");
Assertions.assertEquals(1, definitionSource.getProcessDefinitionList().size());
Assertions.assertEquals(14, definitionSource.getProcessDefinitionList().get(0).getBaseElementList().size());
}
}
成功加载流程定义模板
这里只是测试手动加载模板,实际上我们在AutoConfiguration中已经进行全部加载了
2、运行流程
根据上面在xml中配置的类名,建立运行服务的Class
@Slf4j
@Component
public class CreateOrderJavaDelegation implements JavaDelegation {
@Override
public void execute(ExecutionContext executionContext) {
log.info("创建了一个订单");
}
}
使用流程管理服务运行一个流程
@Test
@DisplayName("流程实例管理")
public void testProcessCommandService() {
Map<String, Object> request = new HashMap<>();
request.put(ProcessConstant.ENGINE_ACTION, "the_start");
ProcessCommandService processCommandService = smartEngine.getProcessCommandService();
ProcessInstance processInstance = processCommandService.start("order", "1.0.0", request);
}
执行结果:
上面的报错是因为我们下一个节点是userTask,但我们实例化引擎的时候并没有扩展taskAssigneeService
3、增加TaskAssigneeDispatcher配置
自定义TaskAssigneeDispatcher
@Slf4j
public class OrderTaskAssigneeDispatcher implements TaskAssigneeDispatcher {
@Override
public List<TaskAssigneeCandidateInstance> getTaskAssigneeCandidateInstance(Activity activity, Map<String, Object> request) {
List<TaskAssigneeCandidateInstance> assigneeLists = getTaskAssignees(activity.getId());
sendMessage(activity.getId(), request);
return assigneeLists;
}
/**
* 对外发送一个需要人工处理的消息
*
* @param id
* @param request
*/
private void sendMessage(String id, Map<String, Object> request) {
log.info("{}任务等待人工处理", id);
}
/**
* 根据活动返回对应的审核人列表
*
* @param id
* @return
*/
private List<TaskAssigneeCandidateInstance> getTaskAssignees(String id) {
List<TaskAssigneeCandidateInstance> taskAssigneeCandidateInstanceList = new ArrayList<TaskAssigneeCandidateInstance>();
TaskAssigneeCandidateInstance taskAssigneeCandidateInstance = new TaskAssigneeCandidateInstance();
taskAssigneeCandidateInstance.setAssigneeId("1121");
taskAssigneeCandidateInstance.setAssigneeType(AssigneeTypeConstant.USER);
taskAssigneeCandidateInstanceList.add(taskAssigneeCandidateInstance);
return taskAssigneeCandidateInstanceList;
}
}
将实现类增加到引擎配置里面
@Bean
@ConditionalOnMissingBean
public SmartEngine smartEngine() {
// 使用引擎配置模板,对流程引擎做一些定制化的配置
ProcessEngineConfiguration processEngineConfiguration = new DefaultProcessEngineConfiguration();
// 配置bean实例访问服务
processEngineConfiguration.setInstanceAccessor(new CustomInstanceAccessService());
// 配置id生成器
processEngineConfiguration.setIdGenerator(new SnowFlowIdGenerator());
// 配置userTask任务指派处理器
processEngineConfiguration.setTaskAssigneeDispatcher(new OrderTaskAssigneeDispatcher());
// 实例化引擎
SmartEngine smartEngine = new DefaultSmartEngine();
smartEngine.init(processEngineConfiguration);
// 加载流程到引擎中
deployProcessDefinition(smartEngine);
return smartEngine;
}
再次运行上述列子,执行成功
执行成功后,数据库会有对应的执行记录:
se_process_instance(本次运行的流程记录)
se_execution_instance(本次运行时执行的节点记录)
4、用户审核成功测试
@Test
@DisplayName("唤起人工审核节点")
public void testExecutionCommandService() {
// 针对userTask进行唤醒
ExecutionCommandService executionCommandService = smartEngine.getExecutionCommandService();
// 流程查询器
ProcessQueryService processQueryService = smartEngine.getProcessQueryService();
// 执行记录查询器
ExecutionQueryService executionQueryService = smartEngine.getExecutionQueryService();
// id即为数据库中se_process_instance表中的id
ProcessInstance processInstance = processQueryService.findById("297181418952327168");
// 获取当前激活的节点
List<ExecutionInstance> activeExecutionInstances = executionQueryService.findActiveExecutionList(processInstance.getInstanceId());
Map<String, Object> request = new HashMap<>();
request.put("approve", "agree");
for (ExecutionInstance activeExecutionInstance : activeExecutionInstances) {
if (activeExecutionInstance.getProcessDefinitionActivityId().equals("Activity_0sovwe9")) {
executionCommandService.signal(activeExecutionInstance.getInstanceId(), request);
}
}
}
执行成功后,再去查看数据库中数据情况
se_process_instance中的status已经为completed
se_execution_instance中新增了三条记录
2.3、Custom模式接入测试
模拟服务调用组装数据进行测试,以服务编排场景为例
2.3.1、设计BPMN流程图
并行网关需要成对出现,一个开始一个结束
和上面一样,将xml格式的内容copy到项目resources下
2.3.2、编写serviceTask业务类
对应上面流程中的6个serviceTask
整个执行过程中的数据由ExecutionContext上下文承接,里面包含request和response对象可以用来在各个节点中流转数据
@Component
@Slf4j
public class BuildUserInfoJavaDelegation implements JavaDelegation {
@Override
public void execute(ExecutionContext executionContext) {
log.info("Task5 组装用户信息 request={},response={}", executionContext.getRequest(), executionContext.getResponse());
User user = (User) executionContext.getResponse().get(GetUserParamConstants.USER_PARAM);
UserPlate userPlate = (UserPlate) executionContext.getResponse().get(GetUserParamConstants.USER_PLATE_PARAM);
UserAddress userAddress = (UserAddress) executionContext.getResponse().get(GetUserParamConstants.USER_ADDRESS_PARAM);
UserCar userCar = (UserCar) executionContext.getResponse().get(GetUserParamConstants.USER_CAR_PARAM);
executionContext.getResponse().put(GetUserParamConstants.USER_RESULT_PARAM, buildUser(user, userPlate, userAddress, userCar));
}
private UserResponse buildUser(User user, UserPlate userPlate, UserAddress userAddress, UserCar userCar) {
UserResponse userResponse = new UserResponse();
userResponse.setUserId(user.getId());
userResponse.setUserName(user.getName());
userResponse.setPlate(userPlate.getPlate());
userResponse.setAddress(userAddress.getAddress());
userResponse.setPhoneNum(userAddress.getPhoneNum());
userResponse.setModel(userCar.getModel());
userResponse.setVehicle(userCar.getVehicle());
return userResponse;
}
}
2.3.3、运行流程
custom模式下,运行的代码块需要在Session中进行,固定格式:
try {
PersisterSession.create();
// 流程代码
} finally {
PersisterSession.destroySession();
}
TestCase:
@SpringBootTest
@Slf4j
public class GetUserTest {
@Autowired
private SmartEngine smartEngine;
@Test
public void testGetUser() {
try {
PersisterSession.create();
ProcessCommandService processCommandService = smartEngine.getProcessCommandService();
Map<String, Object> request = new HashMap<>();
Map<String, Object> response = new HashMap<>();
request.put(GetUserParamConstants.USER_ID_PARAM, UUID.randomUUID().toString());
ProcessInstance processInstance = processCommandService.start("get_user", "1.0.0", request, response);
if (!processInstance.getStatus().equals(InstanceStatus.completed)) {
throw new RuntimeException("运行异常");
}
UserResponse userResponse = (UserResponse) response.get(GetUserParamConstants.USER_RESULT_PARAM);
if (userResponse == null) {
log.warn("未获取到用户信息");
} else {
log.info("{}", userResponse);
}
} finally {
PersisterSession.destroySession();
}
}
}
运行结果失败了,原因是在并发网关情况下,需要扩展锁的实现
使用guava简单实现一把资源锁,多实例情况下请使用分布式锁
public class JavaLockStrategyImpl implements LockStrategy {
private static final Striped<Lock> PROCESS_LOCK = Striped.lazyWeakLock(1024);
@Override
public void tryLock(String processInstanceId, ExecutionContext context) throws LockException {
PROCESS_LOCK.get(processInstanceId).lock();
}
@Override
public void unLock(String processInstanceId, ExecutionContext context) throws LockException {
Lock lock = PROCESS_LOCK.get(processInstanceId);
if (lock != null) {
lock.unlock();
}
}
}
SmartEngineAutoConfiguration配置:
@Bean
@ConditionalOnMissingBean
public SmartEngine constructSmartEngine() {
ProcessEngineConfiguration processEngineConfiguration = new DefaultProcessEngineConfiguration();
processEngineConfiguration.setInstanceAccessor(new CustomInstanceAccessService());
// 并发网关需要实现分布式锁
processEngineConfiguration.setLockStrategy(new JavaLockStrategyImpl());
SmartEngine smartEngine = new DefaultSmartEngine();
smartEngine.init(processEngineConfiguration);
deployProcessDefinition(smartEngine);
return smartEngine;
}
再次运行,执行成功:
多次执行你会发现,对于task 2/3/4 实际上是串行执行的,并没有达到并发执行的效果
2.3.4、并发执行测试
定义线程池:
smart_engine提供默认线程池配置和每个并行网关自定义线程池两种方式
@Configuration
public class ExecutorPoolConfig {
@Bean
public ExecutorService defaultExecutor() {
return new ThreadPoolExecutor(
20,
100,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("default-poll-%d").build(),
new ThreadPoolExecutor.AbortPolicy()
);
}
@Bean
public ExecutorService useExecutor() {
return new ThreadPoolExecutor(
10,
50,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("user-poll-%d").build(),
new ThreadPoolExecutor.AbortPolicy()
);
}
}
并行网关配置线程池需要在xml中绑定
<bpmn:parallelGateway id="Gateway_1dvqgzv" poolName="get_user_executor" timeout="2000">
并行网关总共有以下配置:
- poolName::线程池名称,与代码对应
- timeout::超时时间
- skipTimeoutExp:超时时是否跳过
- strategy:执行策略(all:执行所有/ any:返回最快的任意一个)
在SmartEngineAutoConfiguration配置线程池:
@Bean
@ConditionalOnMissingBean
public SmartEngine constructSmartEngine() {
ProcessEngineConfiguration processEngineConfiguration = new DefaultProcessEngineConfiguration();
processEngineConfiguration.setInstanceAccessor(new CustomInstanceAccessService());
// 并发网关需要实现分布式锁
processEngineConfiguration.setLockStrategy(new JavaLockStrategyImpl());
// 并发线程池配置
// 默认线程池
processEngineConfiguration.setExecutorService((ExecutorService) applicationContext.getBean("defaultExecutor"));
// 自定义线程池,map的key与xml配置的poolName对应
Map<String, ExecutorService> executorMap = new HashMap<>();
executorMap.put("get_user_executor", (ExecutorService) applicationContext.getBean("useExecutor"));
processEngineConfiguration.setExecutorServiceMap(executorMap);
// 开启服务编排
processEngineConfiguration.getOptionContainer().put(ConfigurationOption.SERVICE_ORCHESTRATION_OPTION);
SmartEngine smartEngine = new DefaultSmartEngine();
smartEngine.init(processEngineConfiguration);
deployProcessDefinition(smartEngine);
return smartEngine;
}
再次执行case:
可以多执行几次,或者每个任务的sleep一下,测试一下并发效果
3、实战
单开篇幅再聊 *_ *