盘古BPM体验地址    盘古BPM交流群盘古BPM交流群号:963222735

Flowable消息队列实战

分享牛 4845℃

    Flowable异步方式处理历史数据实战一文中详细介绍了如何使用flowable异步功能,其中这种方式的原理无外乎就是开启历史定时器,然后定期轮训历史作业,然后解析并将其插入到不同的历史表。这种方式本质还是使用的线程池以及数据库的轮训处理方式,flowable在6.1.2版本之后引入了消息队列机制来处理历史作业。jms被用作传输协议,当然了任何协议都是有效的。这个设置的架构思想如下图所示:


对于上图的解释如下:

1.不使用轮训数据库的方式,而是采用将消息发到到消息队列,并指示消息作业已经准备ok。

2.历史作业数据与运行数据是在同一个事务中的。这样就可以避免脏数据。

3.Flowable提供了一个消息监听器,这样监听器就可以监听到指定的队列消息并进行处理。比如作业的获取,将其存储到历史表中。

4.如果我们实际项目开发中,不希望将历史数据放置到flowable引擎表中,那我们可以扩展相应的代码,将历史作业以及历史数据存放到其他介质,比如mongodb或者es等搜索引擎。如果使用这种方式,那么消息监听器可以完全与流程引擎解耦,也就是各做各的事情。架构示意图如下所示:


    架构思路稍微有点变化,其他的不变化。

    依赖

        因为我们现在要使用jms相关的东西,在这里呢我们使用的是ActiveMQ容器,因为我们需要引入包,如下所示:

<!-- JMS -->
<dependency>
<groupId>org.flowable</groupId>
<artifactId>flowable-jms-spring-executor</artifactId>
<version>${flowable.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>5.15.0</version>
</dependency>

code

    下面我们开始编码。

  1. 创建jmsTemplate,该方法如下所示:

public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setDefaultDestination(new ActiveMQQueue("flowable-history-jobs"));
jmsTemplate.setConnectionFactory(connectionFactory());
return jmsTemplate;
}

    我们使用的队列名称是flowable-history-jobs。

2.创建jmsTemplate,该方法如下所示:

	  public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.209.129:61616");
activeMQConnectionFactory.setUseAsyncSend(true);
activeMQConnectionFactory.setAlwaysSessionAsync(true);
activeMQConnectionFactory.setStatsEnabled(true);
return new CachingConnectionFactory(activeMQConnectionFactory);
}

3.创建jobManager,该方法如下所示:

public MessageBasedJobManager jobManager() {
MessageBasedJobManager jobManager = new MessageBasedJobManager();
jobManager.setHistoryJmsTemplate(jmsTemplate());
jobManager.setJmsTemplate(jmsTemplate());
return jobManager;
}

4.开始配置引擎配置类,示例代码如下:

config.setJobManager(jobManager());
config.setAsyncHistoryExecutorMessageQueueMode(true);

5.启动一个流程实例,实例代码如下:

@Test
public void start1() {
Authentication.setAuthenticatedUserId("分享牛");
String processDefinitionId="myProcess:2:10004";
runtimeService.startProcessInstanceById(processDefinitionId);
Authentication.setAuthenticatedUserId(null);
}

6.观察ActiveMQ的变化吗,我们在浏览器中输入http://192.168.209.129:8161/admin/queues.jsp,第一次访问的时候,可能要输入用户名以及密码,默认是admin/admin。消息队列的数据如下图所示:

    消息队列中已经有5个消息了,那我们怎么处理这些消息呢?可以自己写代码实现或者直接使用flowable提供的对应包(我们使用该方式),实例代码如下:

@Bean
public MessageListenerContainer messageListenerContainer() {
DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
messageListenerContainer.setConnectionFactory(connectionFactory());
messageListenerContainer.setDestinationName("flowable-history-jobs");
messageListenerContainer.setMessageListener(historyJobsMessageListener());
messageListenerContainer.setConcurrentConsumers(10);
messageListenerContainer.start();
return messageListenerContainer;
}

@Bean
public HistoryJobMessageListener historyJobsMessageListener() {
HistoryJobMessageListener historyJobMessageListener = new HistoryJobMessageListener();
historyJobMessageListener.setProcessEngineConfiguration(processEngineConfiguration());
return historyJobMessageListener;
}

注意:上面的代码需要在引擎启动的时候,就一直去监听消息队列,当消息队列有消息的时候,代码就会自动触发执行。

转载请注明:分享牛 » Flowable消息队列实战