盘古BPM体验地址    盘古BPM交流群盘古BPM交流群号:963222735

Flowable异步方式处理历史数据实战

分享牛 3959℃

     Flowable异步方式处理历史数据中详细说明了flowable异步数据处理的架构演变。本文我们重点看一下怎么使用。大概步骤如下:

1.建立flowable项目(6.1.2包)

2.测试功能以及数据库表的变化。

1.新建项目

    我们以maven项目的构建过程为例进行说明,首先引入相应的依赖包,如下所示:

<dependency>
	<groupId>org.flowable</groupId>
	<artifactId>flowable-engine</artifactId>
	<version>${flowable.version}</version>
</dependency>
<dependency>
	<groupId>org.flowable</groupId>
	<artifactId>flowable-spring</artifactId>
	<version>${flowable.version}</version>
</dependency>
		
<dependency>
	<groupId>com.h2database</groupId>
	<artifactId>h2</artifactId>
	<version>1.3.176</version>
</dependency>
		
<dependency>
	<groupId>com.zaxxer</groupId>
	<artifactId>HikariCP</artifactId>
	<version>2.6.3</version>
</dependency>

注意:

    flowable版本是6.1.2,如果不打算使用HikariCP(数据库连接包)可以使用其他的,比如druid。

1.1部署流程

    流程的部署实例代码如下:

@Test
	public void addInputStreamTest() throws IOException {
		// 定义的文件信息的流读取
		InputStream inputStream = DeploymentBuilderTest.class.getClassLoader()
				.getResource("com/shareniu/shareniu_flowable6/ch2/leave.bpmn").openStream();
		// 流程定义的分类
		String category = "shareniu_addInputStream";
		// 构造DeploymentBuilder对象
		DeploymentBuilder deploymentBuilder = repositoryService
				.createDeployment().category(category)
				.addInputStream(resourceName, inputStream);
		// 部署
		Deployment deploy = deploymentBuilder.deploy();
		System.out.println(deploy);
	}

1.2流程引擎xml配置

    流程引擎相关配置实例代码如下:

<bean id="processEngineConfiguration"
		class="org.flowable.engine.impl.cfg.StandaloneProcessEngineConfiguration">
		<property name="dataSource" ref="dataSource" />
		<property name="databaseSchemaUpdate" value="true" />
		<property name="asyncHistoryEnabled" value="true" />
		<property name="asyncHistoryExecutorActivate" value="true" />
		<property name="activityFontName" value="宋体" />
		<property name="labelFontName" value="宋体" />
		<property name="annotationFontName" value="宋体" />
	</bean>

    asyncHistoryEnabled:用于开启异步处理数据功能。

    asyncHistoryExecutorActivate:用于开启异步作业执行器。如果不开启,那么定时器不执行,异步处理数据的功能无法使用。关于这一点一定要注意。

1.3启动流程实例

    上面的流程文档部署之后,接下来我们部署流程之后,就启动流程实例。实例代码如下:

	@Test
	public void start1() {
		Authentication.setAuthenticatedUserId("分享牛");
		String processDefinitionId="myProcess:2:10004";
		runtimeService.startProcessInstanceById(processDefinitionId);
		Authentication.setAuthenticatedUserId(null);
	}

1.4数据库的变化

    上面的代码执行完毕之后,我们来看一下历史表(比如act_hi_taskinst)并没有数据,act_ge_bytearray表新增了很多数据,那我们找几个来看一下,实例代码如下:

{
    "type": "process-instance-start",
    "data": {
        "processInstanceId": "35001",
        "processDefinitionId": "myProcess:2:10004",
        "startUserId": "分享牛",
        "startActivityId": "startevent1",
        "__timeStamp": "2017-08-17T08:33:48.023Z",
        "tenantId": "",
        "processDefinitionName": "My process",
        "startTime": "2017-08-17T08:33:47.944Z",
        "id": "35001",
        "processDefinitionVersion": "2",
        "processDefinitionKey": "myProcess"
    }
}

    type:该数据对应的定时器事件,不同的异步作业处理器处理的事件不同。操作的表也不同,关于这一点后续的源码分析会详细说明。再次不再累赘。

    data:数据,定时器、轮训处理作业的时候,会查询到这些数据,然后根据type找到对应的事件处理类,然后将data中的一个个列转化为数据库历史表中的一个个列进行插入处理,这一点一定要有印象。

1.5异步处理高级特性

    我们也可以在上文1.2中配置如下信息:

<bean id="processEngineConfiguration"
		class="org.flowable.engine.impl.cfg.StandaloneProcessEngineConfiguration">
		<property name="dataSource" ref="dataSource" />
		<property name="databaseSchemaUpdate" value="true" />
		<property name="asyncHistoryEnabled" value="true" />
		<property name="asyncHistoryExecutorActivate" value="true" />
	        <property name="asyncHistoryJsonGroupingEnabled" value="true" />
		<property name="asyncHistoryJsonGzipCompressionEnabled" value="true" />
		<property name="AsyncHistoryJsonGroupingThreshold" value="10" />
		<property name="activityFontName" value="宋体" />
		<property name="labelFontName" value="宋体" />
		<property name="annotationFontName" value="宋体" />
	</bean>

asyncHistoryJsonGroupingEnabled:开启json数据分组。

AsyncHistoryJsonGroupingThreshold:10

asyncHistoryJsonGzipCompressionEnabled:开启json压缩。

    第一个和第二个设置是相关的:如果启用了JSON分组,那么将为所有的历史事件创建一个作业,当然,这是一个更大的JSON有效负载。默认情况下,这是每个事件的一个工作。

    第二个设置定义了应用该分组的阈值。基准测试表明,只有在一个事务中有大量的历史动作时,分组的额外开销才有意义。在这个示例中,当在同一个事务中有超过10个异步历史作业时,就会创建一个作业(如果没有设置的话,也是默认的)。注意,由于一些历史数据的粒度,这种情况比我们想象的要快。

    第三个设置使gzip压缩(使用JDK GZIPOutputStream)在JSON存储的异步历史作业中。在启用分组时,基准测试表明这是有意义的,原因有二。首先,将不同作业的JSON组合在相同的JSON数据中增加了大量的重复,这对于应用压缩是非常完美的,对于一个历史动作来说,这并不是一个JSON字符串的情况。其次,应用压缩使用额外的CPU资源,只有当JSON的实际压缩大小大大降低了存储资源时才有意义。

    关于更多的原理以及源码分析我们后续详细单独章节进行讲解。



转载请注明:分享牛 » Flowable异步方式处理历史数据实战