盘古BPM体验地址    盘古BPM交流群盘古BPM交流群号:963222735

《Flowable基础十八 Flowable 消息队列异步执行器 》

分享牛 3593℃

基于消息队列的异步执行器 Message Queue based Async Executor

    异步执行器设计思路保证了,可以用消息队列轻松接管线程池的工作,并用于处理异步作业。

    跑分显示消息队列相比基于线程池的异步执行器,性能出众,吞吐量大。但是,会需要额外的架构组件,当然也就增加了安装配置、维护及监控的复杂度。对于多数用户来说,基于线程池的异步执行器,性能已经足够用了。但能够知道在性能要求增长之后,仍有可用方案,也是挺好的。

    目前,立即可用的唯一选择是带有JMS的Spring。首先支持Spring的原因是,Spring提供了非常好的功能,抚平了使用线程以及处理多个消息消费者造成的伤痛。然而集成很简单,因此可以轻松改用任何其他消息队列实现或协议(Stomp、AMPQ等等)。我们欢迎反馈下一个应该实现什么。

(使用消息队列后)当引擎创建一个新的异步作业时,会在消息队列中放入一条包含有作业标识的消息(处在一个事务提交监听器之下,这样就可以确保该作业条目已经提交至数据库)。之后一个消息消费者可以获取作业标识,并获取及执行该作业。异步执行器不再创建线程池,而是会在另一个单独线程中插入及查询定时器。当定时器到时触发时,将会被移至异步作业表,现在也就会同时向消息队列发送一条消息。“重置过期”线程会按照一般逻辑解锁作业,因为消息队列也可能失败。只不过不是“解锁”作业,而是重发消息。异步执行器不再轮询异步作业。

实现由两个类组成:

  • 一个org.flowable.engine.impl.asyncexecutor.JobManager接口的实现,用于向消息队列发送消息,以代替将其发送至线程池。

  • 一个javax.jms.MessageListener接口的实现,用于从消息队列中消费消息,并使用消息中的作业标识获取及执行该作业。

首先,在你的项目中添加flowable-jms-spring-executor依赖:

 <dependency>
  <groupId>org.flowable</groupId>
  <artifactId>flowable-jms-spring-executor</artifactId>
  <version>${flowable.version}</version>
 </dependency>

要启用基于消息队列的异步执行器,需要在流程引擎配置中进行如下设置:

  • asyncExecutorActivate仍然需要设置为true

  • asyncExecutorMessageQueueMode需要设置为true

  • org.flowable.spring.executor.jms.MessageBasedJobManager必须作为JobManager注入

下面是一个基于Java配置的完整例子,使用ActiveMQ作为消息中间件。

请注意:

  • MessageBasedJobManager需要注入一个配置了正确的connectionFactoryJMSTemplate

  • 我们使用Spring的MessageListenerContainer概念,因为它大幅简化了线程与多消费者的使用。

@Configurationpublic class SpringJmsConfig {

  @Bean
  public DataSource dataSource() {
    // Omitted 已省略
  }

  @Bean(name = "transactionManager")
  public PlatformTransactionManager transactionManager() {
    DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
    transactionManager.setDataSource(dataSource());
    return transactionManager;
  }

  @Bean
  public SpringProcessEngineConfiguration processEngineConfiguration() {
    SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration();
    configuration.setDataSource(dataSource());
    configuration.setTransactionManager(transactionManager());
    configuration.setDatabaseSchemaUpdate(SpringProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
    configuration.setAsyncExecutorMessageQueueMode(true);
    configuration.setAsyncExecutorActivate(true);
    configuration.setJobManager(jobManager());
    return configuration;
  }

  @Bean
  public ProcessEngine processEngine() {
    return processEngineConfiguration().buildProcessEngine();
  }

  @Bean
  public MessageBasedJobManager jobManager() {
    MessageBasedJobManager jobManager = new MessageBasedJobManager();
    jobManager.setJmsTemplate(jmsTemplate());
    return jobManager;
  }

  @Bean
  public ConnectionFactory connectionFactory() {
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
      activeMQConnectionFactory.setUseAsyncSend(true);
      activeMQConnectionFactory.setAlwaysSessionAsync(true);
      return new CachingConnectionFactory(activeMQConnectionFactory);
  }

  @Bean
  public JmsTemplate jmsTemplate() {
      JmsTemplate jmsTemplate = new JmsTemplate();
      jmsTemplate.setDefaultDestination(new ActiveMQQueue("flowable-jobs"));
      jmsTemplate.setConnectionFactory(connectionFactory());
      return jmsTemplate;
  }

  @Bean
  public MessageListenerContainer messageListenerContainer() {
      DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
      messageListenerContainer.setConnectionFactory(connectionFactory());
      messageListenerContainer.setDestinationName("flowable-jobs");
      messageListenerContainer.setMessageListener(jobMessageListener());
      messageListenerContainer.setConcurrentConsumers(2);
      messageListenerContainer.start();
      return messageListenerContainer;
  }

  @Bean
  public JobMessageListener jobMessageListener() {
    JobMessageListener jobMessageListener = new JobMessageListener();
    jobMessageListener.setProcessEngineConfiguration(processEngineConfiguration());
    return jobMessageListener;
  }
}

在上面的代码中,flowable-jms-spring-executor模块提供的只有JobMessageListenerMessageBasedJobManager两个类。其他的所有代码都来自Spring。因此,如果想要替换为其他的队列/协议,就需要替换这些类。

转载请注明:分享牛 » 《Flowable基础十八 Flowable 消息队列异步执行器 》