ExecutorPerTenantAsyncExecutor executeAsyncJob tenant missmatch
Describe the bug I am using a single process engine with a single database/shema and multiple tenants set up with a ExecutorPerTenantAsyncExecutor.
The TenantAwareAcquireAsyncJobsDueRunnable for one tenant (determined by TenantInfoHolder) will fetch jobs for multiple tenants (no where condition in SQL, see link to forum) and will determine the asyncExecutor (ExecutorPerTenantAsyncExecutor.determineAsyncExecutor) based on the TenantInfoHolder value while the provided jobInfo might have a different tenant altogether.
This will result in executing one tenant under a thread pool for another tenant.
Expected behavior The determineAsyncExecutor should take the JobInfo.tenantId into account instead of tenantInfoHolder value.
Code Old:
@Override
public boolean executeAsyncJob(JobInfo job) {
return determineAsyncExecutor().executeAsyncJob(job);
}
Idea:
@Override
public boolean executeAsyncJob(JobInfo job) {
String contextTenantId = tenantInfoHolder.getCurrentTenantId();
String tenantId = StringUtils.defaultIfBlank(job.getTenantId(), contextTenantId);
if (StringUtils.isBlank(tenantId)) {
return false;
}
final AsyncExecutor asyncExecutor = tenantExecutors.get(tenantId);
if (asyncExecutor == null) {
return false;
}
return asyncExecutor.executeAsyncJob(job);
}
Additional context Related link: https://forum.flowable.org/t/asyncjobexecutor-doesnt-work-as-expected-with-multitenancy/6424/4 Currently using flowable 6.3.1 but I think newer versions are also affected.
I did some digging and maybe TenantAwareAcquireAsyncJobsDueRunnable.offerJobs should filter by tenant?
@Override
protected List<JobInfoEntity> offerJobs(AcquiredJobEntities acquiredJobs) {
List<JobInfoEntity> rejected = new ArrayList<>();
for (JobInfoEntity job : acquiredJobs.getJobs()) {
// skip jobs not for this tenant
if (!Objects.equals(job.getTenantId(), this.tenantId)) {
rejected.add(job);
continue;
}
boolean jobSuccessFullyOffered = asyncExecutor.executeAsyncJob(job);
if (!jobSuccessFullyOffered) {
rejected.add(job);
}
}
return rejected;
}