基于消息队列的异步执行器 Message Queue based Async Executor
异步执行器设计思路保证了,可以用消息队列轻松接管线程池的工作,并用于处理异步作业。
跑分显示消息队列相比基于线程池的异步执行器,性能出众,吞吐量大。但是,会需要额外的架构组件,当然也就增加了安装配置、维护及监控的复杂度。对于多数用户来说,基于线程池的异步执行器,性能已经足够用了。但能够知道在性能要求增长之后,仍有可用方案,也是挺好的。
目前,立即可用的唯一选择是带有JMS的Spring。首先支持Spring的原因是,Spring提供了非常好的功能,抚平了使用线程以及处理多个消息消费者造成的伤痛。然而集成很简单,因此可以轻松改用任何其他消息队列实现或协议(Stomp、AMPQ等等)。我们欢迎反馈下一个应该实现什么。
(使用消息队列后)当引擎创建一个新的异步作业时,会在消息队列中放入一条包含有作业标识的消息(处在一个事务提交监听器之下,这样就可以确保该作业条目已经提交至数据库)。之后一个消息消费者可以获取作业标识,并获取及执行该作业。异步执行器不再创建线程池,而是会在另一个单独线程中插入及查询定时器。当定时器到时触发时,将会被移至异步作业表,现在也就会同时向消息队列发送一条消息。“重置过期”线程会按照一般逻辑解锁作业,因为消息队列也可能失败。只不过不是“解锁”作业,而是重发消息。异步执行器不再轮询异步作业。
实现由两个类组成:
-
一个org.flowable.engine.impl.asyncexecutor.JobManager接口的实现,用于向消息队列发送消息,以代替将其发送至线程池。
-
一个javax.jms.MessageListener接口的实现,用于从消息队列中消费消息,并使用消息中的作业标识获取及执行该作业。
首先,在你的项目中添加flowable-jms-spring-executor依赖:
<dependency> <groupId>org.flowable</groupId> <artifactId>flowable-jms-spring-executor</artifactId> <version>${flowable.version}</version> </dependency>
要启用基于消息队列的异步执行器,需要在流程引擎配置中进行如下设置:
-
asyncExecutorActivate仍然需要设置为true
-
asyncExecutorMessageQueueMode需要设置为true
-
org.flowable.spring.executor.jms.MessageBasedJobManager必须作为JobManager注入
下面是一个基于Java配置的完整例子,使用ActiveMQ作为消息中间件。
请注意:
-
MessageBasedJobManager需要注入一个配置了正确的connectionFactory的JMSTemplate。
-
我们使用Spring的MessageListenerContainer概念,因为它大幅简化了线程与多消费者的使用。
@Configurationpublic class SpringJmsConfig { @Bean public DataSource dataSource() { // Omitted 已省略 } @Bean(name = "transactionManager") public PlatformTransactionManager transactionManager() { DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(); transactionManager.setDataSource(dataSource()); return transactionManager; } @Bean public SpringProcessEngineConfiguration processEngineConfiguration() { SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration(); configuration.setDataSource(dataSource()); configuration.setTransactionManager(transactionManager()); configuration.setDatabaseSchemaUpdate(SpringProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE); configuration.setAsyncExecutorMessageQueueMode(true); configuration.setAsyncExecutorActivate(true); configuration.setJobManager(jobManager()); return configuration; } @Bean public ProcessEngine processEngine() { return processEngineConfiguration().buildProcessEngine(); } @Bean public MessageBasedJobManager jobManager() { MessageBasedJobManager jobManager = new MessageBasedJobManager(); jobManager.setJmsTemplate(jmsTemplate()); return jobManager; } @Bean public ConnectionFactory connectionFactory() { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); activeMQConnectionFactory.setUseAsyncSend(true); activeMQConnectionFactory.setAlwaysSessionAsync(true); return new CachingConnectionFactory(activeMQConnectionFactory); } @Bean public JmsTemplate jmsTemplate() { JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setDefaultDestination(new ActiveMQQueue("flowable-jobs")); jmsTemplate.setConnectionFactory(connectionFactory()); return jmsTemplate; } @Bean public MessageListenerContainer messageListenerContainer() { DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(); messageListenerContainer.setConnectionFactory(connectionFactory()); messageListenerContainer.setDestinationName("flowable-jobs"); messageListenerContainer.setMessageListener(jobMessageListener()); messageListenerContainer.setConcurrentConsumers(2); messageListenerContainer.start(); return messageListenerContainer; } @Bean public JobMessageListener jobMessageListener() { JobMessageListener jobMessageListener = new JobMessageListener(); jobMessageListener.setProcessEngineConfiguration(processEngineConfiguration()); return jobMessageListener; } }
在上面的代码中,flowable-jms-spring-executor模块提供的只有JobMessageListener与MessageBasedJobManager两个类。其他的所有代码都来自Spring。因此,如果想要替换为其他的队列/协议,就需要替换这些类。