Flowable异步方式处理历史数据中详细说明了flowable异步数据处理的架构演变。本文我们重点看一下怎么使用。大概步骤如下:
1.建立flowable项目(6.1.2包)
2.测试功能以及数据库表的变化。
1.新建项目
我们以maven项目的构建过程为例进行说明,首先引入相应的依赖包,如下所示:
<dependency> <groupId>org.flowable</groupId> <artifactId>flowable-engine</artifactId> <version>${flowable.version}</version> </dependency> <dependency> <groupId>org.flowable</groupId> <artifactId>flowable-spring</artifactId> <version>${flowable.version}</version> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <version>1.3.176</version> </dependency> <dependency> <groupId>com.zaxxer</groupId> <artifactId>HikariCP</artifactId> <version>2.6.3</version> </dependency>
注意:
flowable版本是6.1.2,如果不打算使用HikariCP(数据库连接包)可以使用其他的,比如druid。
1.1部署流程
流程的部署实例代码如下:
@Test public void addInputStreamTest() throws IOException { // 定义的文件信息的流读取 InputStream inputStream = DeploymentBuilderTest.class.getClassLoader() .getResource("com/shareniu/shareniu_flowable6/ch2/leave.bpmn").openStream(); // 流程定义的分类 String category = "shareniu_addInputStream"; // 构造DeploymentBuilder对象 DeploymentBuilder deploymentBuilder = repositoryService .createDeployment().category(category) .addInputStream(resourceName, inputStream); // 部署 Deployment deploy = deploymentBuilder.deploy(); System.out.println(deploy); }
1.2流程引擎xml配置
流程引擎相关配置实例代码如下:
<bean id="processEngineConfiguration" class="org.flowable.engine.impl.cfg.StandaloneProcessEngineConfiguration"> <property name="dataSource" ref="dataSource" /> <property name="databaseSchemaUpdate" value="true" /> <property name="asyncHistoryEnabled" value="true" /> <property name="asyncHistoryExecutorActivate" value="true" /> <property name="activityFontName" value="宋体" /> <property name="labelFontName" value="宋体" /> <property name="annotationFontName" value="宋体" /> </bean>
asyncHistoryEnabled:用于开启异步处理数据功能。
asyncHistoryExecutorActivate:用于开启异步作业执行器。如果不开启,那么定时器不执行,异步处理数据的功能无法使用。关于这一点一定要注意。
1.3启动流程实例
上面的流程文档部署之后,接下来我们部署流程之后,就启动流程实例。实例代码如下:
@Test public void start1() { Authentication.setAuthenticatedUserId("分享牛"); String processDefinitionId="myProcess:2:10004"; runtimeService.startProcessInstanceById(processDefinitionId); Authentication.setAuthenticatedUserId(null); }
1.4数据库的变化
上面的代码执行完毕之后,我们来看一下历史表(比如act_hi_taskinst)并没有数据,act_ge_bytearray表新增了很多数据,那我们找几个来看一下,实例代码如下:
{ "type": "process-instance-start", "data": { "processInstanceId": "35001", "processDefinitionId": "myProcess:2:10004", "startUserId": "分享牛", "startActivityId": "startevent1", "__timeStamp": "2017-08-17T08:33:48.023Z", "tenantId": "", "processDefinitionName": "My process", "startTime": "2017-08-17T08:33:47.944Z", "id": "35001", "processDefinitionVersion": "2", "processDefinitionKey": "myProcess" } }
type:该数据对应的定时器事件,不同的异步作业处理器处理的事件不同。操作的表也不同,关于这一点后续的源码分析会详细说明。再次不再累赘。
data:数据,定时器、轮训处理作业的时候,会查询到这些数据,然后根据type找到对应的事件处理类,然后将data中的一个个列转化为数据库历史表中的一个个列进行插入处理,这一点一定要有印象。
1.5异步处理高级特性
我们也可以在上文1.2中配置如下信息:
<bean id="processEngineConfiguration" class="org.flowable.engine.impl.cfg.StandaloneProcessEngineConfiguration"> <property name="dataSource" ref="dataSource" /> <property name="databaseSchemaUpdate" value="true" /> <property name="asyncHistoryEnabled" value="true" /> <property name="asyncHistoryExecutorActivate" value="true" /> <property name="asyncHistoryJsonGroupingEnabled" value="true" /> <property name="asyncHistoryJsonGzipCompressionEnabled" value="true" /> <property name="AsyncHistoryJsonGroupingThreshold" value="10" /> <property name="activityFontName" value="宋体" /> <property name="labelFontName" value="宋体" /> <property name="annotationFontName" value="宋体" /> </bean>
asyncHistoryJsonGroupingEnabled:开启json数据分组。
AsyncHistoryJsonGroupingThreshold:10
asyncHistoryJsonGzipCompressionEnabled:开启json压缩。
第一个和第二个设置是相关的:如果启用了JSON分组,那么将为所有的历史事件创建一个作业,当然,这是一个更大的JSON有效负载。默认情况下,这是每个事件的一个工作。
第二个设置定义了应用该分组的阈值。基准测试表明,只有在一个事务中有大量的历史动作时,分组的额外开销才有意义。在这个示例中,当在同一个事务中有超过10个异步历史作业时,就会创建一个作业(如果没有设置的话,也是默认的)。注意,由于一些历史数据的粒度,这种情况比我们想象的要快。
第三个设置使gzip压缩(使用JDK GZIPOutputStream)在JSON存储的异步历史作业中。在启用分组时,基准测试表明这是有意义的,原因有二。首先,将不同作业的JSON组合在相同的JSON数据中增加了大量的重复,这对于应用压缩是非常完美的,对于一个历史动作来说,这并不是一个JSON字符串的情况。其次,应用压缩使用额外的CPU资源,只有当JSON的实际压缩大小大大降低了存储资源时才有意义。
关于更多的原理以及源码分析我们后续详细单独章节进行讲解。
转载请注明:分享牛 » Flowable异步方式处理历史数据实战