盘古BPM体验地址    盘古BPM交流群盘古BPM交流群号:963222735

《Flowable基础十二 Flowable 异步执行器 Async Executor ​ 配置异步执行器 》

分享牛 5247℃


异步执行器 Async Executor

    Flowable V5版本中,在已有的作业执行器(job executor)之外,还提供了异步执行器(async executor)。异步执行器已被许多Flowable的用户及我们的跑分证明,性能比老的作业执行器好。

    从Flowable V6起,将只存在异步执行器。在V6中,已经为性能优化及接口封装,对异步执行器进行了完全的重构,当然仍然与现有API兼容。

 异步执行器的设计 Async Executor design

    有两种作业类型:定时器(例如边界事件或用户任务中的定时器)以及异步操作(带有flowable:async="true"属性的服务任务)。

    定时器(timer)很容易解释:它们在ACT_RU_TIMER_JOB表中持久化,并带有给定的到期日期。异步执行器中有一个线程,周期性地检查是否有需要触发的定时器(也就是说,到期日期在当前时间“之前”)。当需要触发定时器时,将会移除该定时器,并创建一个异步作业(async job)。

    异步作业在执行流程实例步骤时插入数据库(也就是说,在某个API调用时)。如果当前Flowable引擎启用了异步执行器,则该异步作业将被锁定(locked)。这意味着会在ACT_RU_JOB表中插入一个作业条目,并设置其lock owner(锁持有人)lock expiration time(锁到期时间)。在API调用成功后触发的事务监听器(transaction commit listener),将会触发同一引擎中的异步执行器,让其执行该作业(因此可以保证数据库中已经保存了数据)。为了实现这个目标,异步执行器有一个(可配置的)线程池,可以从其中取出一个线程用于执行作业,并使流程可以异步进行。如果Flowable引擎未启用异步执行器,则异步作业仍会插入ACT_RU_JOB表,但不会被锁定。

与检查定时器的线程类似,异步执行器中也有一个用于“获取”新的异步作业的线程。这里的异步作业,指的是表中存储,但未被锁定的作业。这个线程会为当前Flowable引擎锁定这些作业,并将其发送至异步执行器。

用于执行作业的线程池从一个内存队列中获取作业。当队列满了时(可配置),作业将会被解锁,并重新插入数据库。这样,其他的异步执行器就可以重新操作它们。

如果在执行作业期间发生了异常,这个异步作业将会转化为一个定时器作业,并带有一个到期日期。之后,它将会像普通定时器作业一样被获取,并重新变回异步作业,以实现重试。当一个作业已经重试了(可配置)几次,仍然失败,则作业被视为“死亡(dead)”,并被移至ACT_RU_DEADLETTER_JOB表。“死信(deadletter)”的概念在各种其他系统中也广泛使用。管理员需要检查失败作业的异常信息,并进行相应操作。

流程定义与流程实例都可以被暂停。这些定义或实例所关联的暂停作业,将被移至ACT_RU_SUSPENDED_JOB表,以确保用于获取作业的查询语句中的where条件尽量少。

综上所述:对于熟悉作业/异步执行器的旧实现的人来说,主要目标是让“获取用的查询”尽可能简单。在过去(V6以前),所有作业类型/状态使用一张表。为了满足所有使用场景,最终导致“where”条件十分庞大。现在已经解决了这个问题,我们的跑分也证明这个新设计带来了更好的性能,也更有弹性。

 配置异步执行器 Async executor configuration

异步执行器是一个高度可配置的组件。我们建议先查看异步执行器的默认配置,检查它们是否符合你的流程的要求。

另外,也可以扩展默认的实现,或者用你自己实现的org.flowable.engine.impl.asyncexecutor.AsyncExecutor接口代替。

可以在流程引擎配置中,使用setter设置下列参数:

Table 263. 异步执行器配置选项 Async executor configuration options
名称 默认值 描述

asyncExecutorThreadPoolQueueSize

100

在获取待执行的作业之后,线程池中某个线程实际执行作业之前,放置这些作业的队列的长度。

asyncExecutorCorePoolSize

2

用于执行作业的线程池中,最小的活动(kept alive)线程数量。

asyncExecutorMaxPoolSize

10

用于执行作业的线程池中,创建线程的最大数量。

asyncExecutorThreadKeepAliveTime

5000

在销毁执行作业所用的线程前,需要保持活动的时间(以毫秒计)。设置为>0的值会消耗资源,但在有大量执行作业的时候,可以避免总是创建新线程。如果设置为0,则线程会在执行作业完成之后立刻被销毁。

asyncExecutorNumberOfRetries

3

在被移入“死信”表之前,一个作业的最大重试次数。

asyncExecutorMaxTimerJobsPerAcquisition

1

在一次获取用查询中,获取的定时器作业的数量。默认值为1,因为这样可以降低潜在的乐观锁异常情况。较大的数值性能较好,但在不同的引擎间发生乐观锁异常的几率也会变大。

asyncExecutorMaxAsyncJobsDuePerAcquisition

1

在一次获取用查询中,获取的异步作业的数量。默认值为1,因为这样可以降低潜在的乐观锁异常情况。较大的数值性能较好,但在不同的引擎间发生乐观锁异常的几率也会变大。

asyncExecutorDefaultTimerJobAcquireWaitTime

10000

获取定时器作业的线程在两次获取用查询之间等待的时间(以毫秒记)。只在上一次查询未找到新的定时器作业,或者获取的作业数量少于asyncExecutorMaxTimerJobsPerAcquisition中设置的值时,才会等待。

asyncExecutorDefaultAsyncJobAcquireWaitTime

10000

获取异步作业的线程在两次获取用查询之间等待的时间(以毫秒记)。只在上一次查询未找到新的定时器作业,或者获取的作业数量少于asyncExecutorMaxAsyncJobsDuePerAcquisition中设置的值时,才会等待。

asyncExecutorDefaultQueueSizeFullWaitTime

0

在内部作业队列已满之后,执行下一次查询之前,获取用线程(包括定时器作业及异步作业)将等待的时间(以毫秒记)。默认值为0(为保证向后兼容性)。设置为较大的值,可以让异步执行器可以先执行掉一些队列作业。

asyncExecutorTimerLockTimeInMillis

5分钟

一个定时器作业在被异步执行器获取之后锁定的时间(以毫秒记)。在这段时间内,其它异步执行器不会尝试获取或锁定这个作业。

asyncExecutorAsyncJobLockTimeInMillis

5分钟

一个异步作业在被异步执行器获取之后锁定的时间(以毫秒记)。在这段时间内,其它异步执行器不会尝试获取或锁定这个作业。

asyncExecutorSecondsToWaitOnShutdown

60

当请求关闭执行器(或流程引擎)后,等待执行作业的线程池安全关闭的时间(以秒记)。

asyncExecutorResetExpiredJobsInterval

60秒钟

在两次“过期作业(expired job)”检查之间等待的时间(以毫秒记)。过期作业指的是已经锁定(某个执行器已经为其写入了锁持有人以及过期时间),但一直没有完成的作业。在检查中,过期作业将会重新可用,也就是会移除其锁持有人以及过期时间。这样其他执行器就可以重新获取它。如果锁(过期)时间在当前时间之前,则该作业被视作过期。

asyncExecutorResetExpiredJobsPageSize

3

异步执行器的“重置过期(reset expired)”线程一次获取的作业数量。

转载请注明:分享牛 » 《Flowable基础十二 Flowable 异步执行器 Async Executor ​ 配置异步执行器 》