flowable-engine icon indicating copy to clipboard operation
flowable-engine copied to clipboard

ExecutorPerTenantAsyncExecutor executeAsyncJob tenant missmatch

Open FranPregernik opened this issue 3 years ago • 1 comments

Describe the bug I am using a single process engine with a single database/shema and multiple tenants set up with a ExecutorPerTenantAsyncExecutor.

The TenantAwareAcquireAsyncJobsDueRunnable for one tenant (determined by TenantInfoHolder) will fetch jobs for multiple tenants (no where condition in SQL, see link to forum) and will determine the asyncExecutor (ExecutorPerTenantAsyncExecutor.determineAsyncExecutor) based on the TenantInfoHolder value while the provided jobInfo might have a different tenant altogether.

This will result in executing one tenant under a thread pool for another tenant.

Expected behavior The determineAsyncExecutor should take the JobInfo.tenantId into account instead of tenantInfoHolder value.

Code Old:

    @Override
    public boolean executeAsyncJob(JobInfo job) {
        return determineAsyncExecutor().executeAsyncJob(job);
    }

Idea:


    @Override
    public boolean executeAsyncJob(JobInfo job) {
        String contextTenantId = tenantInfoHolder.getCurrentTenantId();
        String tenantId = StringUtils.defaultIfBlank(job.getTenantId(), contextTenantId);

        if (StringUtils.isBlank(tenantId)) {
            return false;
        }

        final AsyncExecutor asyncExecutor = tenantExecutors.get(tenantId);
        if (asyncExecutor == null) {
            return false;
        }

        return asyncExecutor.executeAsyncJob(job);
    }

Additional context Related link: https://forum.flowable.org/t/asyncjobexecutor-doesnt-work-as-expected-with-multitenancy/6424/4 Currently using flowable 6.3.1 but I think newer versions are also affected.

FranPregernik avatar Feb 22 '22 11:02 FranPregernik

I did some digging and maybe TenantAwareAcquireAsyncJobsDueRunnable.offerJobs should filter by tenant?

    @Override
    protected List<JobInfoEntity> offerJobs(AcquiredJobEntities acquiredJobs) {
        List<JobInfoEntity> rejected = new ArrayList<>();
        for (JobInfoEntity job : acquiredJobs.getJobs()) {
            // skip jobs not for this tenant
            if (!Objects.equals(job.getTenantId(), this.tenantId)) {
                rejected.add(job);
                continue;
            }
            
            boolean jobSuccessFullyOffered = asyncExecutor.executeAsyncJob(job);
            if (!jobSuccessFullyOffered) {
                rejected.add(job);
            }
        }
        return rejected;
    }

FranPregernik avatar Feb 22 '22 12:02 FranPregernik