tenancy icon indicating copy to clipboard operation
tenancy copied to clipboard

Using RedisTenancyBoostrapper causes problems with queue worker

Open pr4xx opened this issue 1 year ago • 10 comments

Bug description

When calling queue:restart, it will cause all future queue:work commands to exit after only one job, if the job was dispatched in tenant context.

Steps to reproduce

See this demo repository: https://github.com/pr4xx/tenancy-problem-demo

Expected behavior

queue:work does not stop itself

Laravel version

11.9

stancl/tenancy version

3.8

pr4xx avatar Jun 03 '24 13:06 pr4xx

For context, we are currently looking into a way to solve this nicely without removing the optimizations in the queue bootstrapper. As a quick fix, you can either avoid using the Redis bootstrapper (and only use e.g. the cache bootstrapper) or you can use your own QueueTenancyBootstrapper with this line changed to if (true) {: https://github.com/archtechx/tenancy/blob/8f9c7efa4584007f41048448d0a11f572a1d3239/src/Bootstrappers/QueueTenancyBootstrapper.php#L74

stancl avatar Aug 01 '24 12:08 stancl

The more I think about this the more I feel it's best to just remove the optimization altogether, since optimizations that cause issues aren't good optimizations. The overhead of initializing tenancy is also very low compared to how long it takes the queue worker to pick up a job, so it might be worth just getting rid of this logic altogether. If we do that, the bootstrapper also becomes a lot simpler:

diff --git a/src/Bootstrappers/QueueTenancyBootstrapper.php b/src/Bootstrappers/QueueTenancyBootstrapper.php
index f747faea..5297509c 100644
--- a/src/Bootstrappers/QueueTenancyBootstrapper.php
+++ b/src/Bootstrappers/QueueTenancyBootstrapper.php
@@ -4,7 +4,6 @@ declare(strict_types=1);
 
 namespace Stancl\Tenancy\Bootstrappers;
 
-use Illuminate\Support\Str;
 use Illuminate\Config\Repository;
 use Illuminate\Queue\QueueManager;
 use Stancl\Tenancy\Contracts\Tenant;
@@ -25,16 +24,6 @@ class QueueTenancyBootstrapper implements TenancyBootstrapper
     /** @var QueueManager */
     protected $queue;
 
-    /**
-     * Don't persist the same tenant across multiple jobs even if they have the same tenant ID.
-     *
-     * This is useful when you're changing the tenant's state (e.g. properties in the `data` column) and want the next job to initialize tenancy again
-     * with the new data. Features like the Tenant Config are only executed when tenancy is initialized, so the re-initialization is needed in some cases.
-     *
-     * @var bool
-     */
-    public static $forceRefresh = false;
-
     /**
      * The normal constructor is only executed after tenancy is bootstrapped.
      * However, we're registering a hook to initialize tenancy. Therefore,
@@ -42,7 +31,7 @@ class QueueTenancyBootstrapper implements TenancyBootstrapper
      */
     public static function __constructStatic(Application $app)
     {
-        static::setUpJobListener($app->make(Dispatcher::class), $app->runningUnitTests());
+        static::setUpJobListener($app->make(Dispatcher::class));
     }
 
     public function __construct(Repository $config, QueueManager $queue)
@@ -53,87 +42,30 @@ class QueueTenancyBootstrapper implements TenancyBootstrapper
         $this->setUpPayloadGenerator();
     }
 
-    protected static function setUpJobListener($dispatcher, $runningTests)
+    protected static function setUpJobListener($dispatcher)
     {
-        $previousTenant = null;
+        $dispatcher->listen(JobProcessing::class, function ($event) {
+            $tenant = $event->job->payload()['tenant_id'] ?? null;
 
-        $dispatcher->listen(JobProcessing::class, function ($event) use (&$previousTenant) {
-            $previousTenant = tenant();
-
-            static::initializeTenancyForQueue($event->job->payload()['tenant_id'] ?? null);
+            if ($tenant) {
+                tenancy()->initialize($tenant);
+            }
         });
 
-        $dispatcher->listen(JobRetryRequested::class, function ($event) use (&$previousTenant) {
-            $previousTenant = tenant();
+        $dispatcher->listen(JobRetryRequested::class, function ($event) {
+            $tenant = $event->payload()['tenant_id'] ?? null;
 
-            static::initializeTenancyForQueue($event->payload()['tenant_id'] ?? null);
+            if ($tenant) {
+                tenancy()->initialize($tenant);
+            }
         });
 
-        // If we're running tests, we make sure to clean up after any artisan('queue:work') calls
-        $revertToPreviousState = function ($event) use (&$previousTenant, $runningTests) {
-            if ($runningTests) {
-                static::revertToPreviousState($event, $previousTenant);
-            }
+        $revertToCentralContext = function () {
+            tenancy()->end();
         };
 
-        $dispatcher->listen(JobProcessed::class, $revertToPreviousState); // artisan('queue:work') which succeeds
-        $dispatcher->listen(JobFailed::class, $revertToPreviousState); // artisan('queue:work') which fails
-    }
-
-    protected static function initializeTenancyForQueue($tenantId)
-    {
-        if (! $tenantId) {
-            // The job is not tenant-aware
-            if (tenancy()->initialized) {
-                // Tenancy was initialized, so we revert back to the central context
-                tenancy()->end();
-            }
-
-            return;
-        }
-
-        if (static::$forceRefresh) {
-            // Re-initialize tenancy between all jobs
-            if (tenancy()->initialized) {
-                tenancy()->end();
-            }
-
-            tenancy()->initialize(tenancy()->find($tenantId));
-
-            return;
-        }
-
-        if (tenancy()->initialized) {
-            // Tenancy is already initialized
-            if (tenant()->getTenantKey() === $tenantId) {
-                // It's initialized for the same tenant (e.g. dispatchNow was used, or the previous job also ran for this tenant)
-                return;
-            }
-        }
-
-        // Tenancy was either not initialized, or initialized for a different tenant.
-        // Therefore, we initialize it for the correct tenant.
-        tenancy()->initialize(tenancy()->find($tenantId));
-    }
-
-    protected static function revertToPreviousState($event, &$previousTenant)
-    {
-        $tenantId = $event->job->payload()['tenant_id'] ?? null;
-
-        // The job was not tenant-aware
-        if (! $tenantId) {
-            return;
-        }
-
-        // Revert back to the previous tenant
-        if (tenant() && $previousTenant && $previousTenant->isNot(tenant())) {
-            tenancy()->initialize($previousTenant);
-        }
-
-        // End tenancy
-        if (tenant() && (! $previousTenant)) {
-            tenancy()->end();
-        }
+        $dispatcher->listen(JobProcessed::class, $revertToCentralContext); // artisan('queue:work') which succeeds
+        $dispatcher->listen(JobFailed::class, $revertToCentralContext); // artisan('queue:work') which fails
     }
 
     protected function setUpPayloadGenerator()

Only issue is that removing $forceRefresh is a breaking change, so I should probably keep it even if it doesn't do anything.

stancl avatar Aug 06 '24 00:08 stancl

@pr4xx If you could test if this version of the bootstrapper works for you:

<?php

declare(strict_types=1);

namespace Stancl\Tenancy\Bootstrappers;

use Illuminate\Config\Repository;
use Illuminate\Queue\QueueManager;
use Stancl\Tenancy\Contracts\Tenant;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Queue\Events\JobRetryRequested;
use Illuminate\Support\Testing\Fakes\QueueFake;
use Illuminate\Contracts\Foundation\Application;
use Stancl\Tenancy\Contracts\TenancyBootstrapper;

class QueueTenancyBootstrapper implements TenancyBootstrapper
{
    /** @var Repository */
    protected $config;

    /** @var QueueManager */
    protected $queue;

    /**
     * The normal constructor is only executed after tenancy is bootstrapped.
     * However, we're registering a hook to initialize tenancy. Therefore,
     * we need to register the hook at service provider execution time.
     */
    public static function __constructStatic(Application $app)
    {
        static::setUpJobListener($app->make(Dispatcher::class));
    }

    public function __construct(Repository $config, QueueManager $queue)
    {
        $this->config = $config;
        $this->queue = $queue;

        $this->setUpPayloadGenerator();
    }

    protected static function setUpJobListener($dispatcher)
    {
        $dispatcher->listen(JobProcessing::class, function ($event) {
            $tenant = $event->job->payload()['tenant_id'] ?? null;

            if ($tenant) {
                tenancy()->initialize($tenant);
            }
        });

        $dispatcher->listen(JobRetryRequested::class, function ($event) {
            $tenant = $event->payload()['tenant_id'] ?? null;

            if ($tenant) {
                tenancy()->initialize($tenant);
            }
        });

        $revertToCentralContext = function () {
            tenancy()->end();
        };

        $dispatcher->listen(JobProcessed::class, $revertToCentralContext); // artisan('queue:work') which succeeds
        $dispatcher->listen(JobFailed::class, $revertToCentralContext); // artisan('queue:work') which fails
    }

    protected function setUpPayloadGenerator()
    {
        $bootstrapper = &$this;

        if (! $this->queue instanceof QueueFake) {
            $this->queue->createPayloadUsing(function ($connection) use (&$bootstrapper) {
                return $bootstrapper->getPayload($connection);
            });
        }
    }

    public function bootstrap(Tenant $tenant)
    {
        //
    }

    public function revert()
    {
        //
    }

    public function getPayload(string $connection)
    {
        if (! tenancy()->initialized) {
            return [];
        }

        if ($this->config["queue.connections.$connection.central"]) {
            return [];
        }

        $id = tenant()->getTenantKey();

        return [
            'tenant_id' => $id,
        ];
    }
}

stancl avatar Aug 06 '24 01:08 stancl

@stancl Hey there, thanks for the update!

I tried your code and have an issue: since the new code does not make use of a $previousTenant field, my short test failed when dispatching a job within tenant context and having QUEUE_CONNECTION set to sync. It reverts back to central context but my controller code is not done yet in this scenario. It tries to hit the tenant database (in controller, outside job which has completed) and fails because the provider called tenancy()->end().

Might not be the best test case because I use redis as my queue connection in production but would be nice to have for local development. I guess the restoring of any previous tenant cannot be skipped?

niconico291 avatar Aug 06 '24 11:08 niconico291

Thanks for testing the code and letting me know about this. I see, so it seems like we want to do this?

  • If queue connection is sync, we do want to revert to the previous tenant (but we can do so unconditionally, removing a lot of the original optimizations)
  • If queue connection is not sync, we revert to the central context after each job

stancl avatar Aug 06 '24 12:08 stancl

It seems like we should revert back to any previous state. I am not sure about that split between sync and not sync. Though I cannot think of any case outside of sync where it would matter tbh. Is the overhead of storing any previous tenant, unaware of the queue driver, problematic?

niconico291 avatar Aug 06 '24 12:08 niconico291

It's not, but then if we reverted to the previous tenant in a queue worker context we'd just get back to the original issue here - the queue worker remaining in the central context. This could be solved by keeping track of the original state properly, since it'd be central in the queue worker but tenant in your sync example, but that might be error prone so I'd prefer to simplify or remove the optimizations as much as possible.

stancl avatar Aug 06 '24 12:08 stancl

We can try checking for sync and then not revert to central. Doing nothing would probably be enough I think?

niconico291 avatar Aug 06 '24 12:08 niconico291

@pr4xx If you could test if this version of the bootstrapper works for you:

<?php

declare(strict_types=1);

namespace Stancl\Tenancy\Bootstrappers;

use Illuminate\Config\Repository;
use Illuminate\Queue\QueueManager;
use Stancl\Tenancy\Contracts\Tenant;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Queue\Events\JobRetryRequested;
use Illuminate\Support\Testing\Fakes\QueueFake;
use Illuminate\Contracts\Foundation\Application;
use Stancl\Tenancy\Contracts\TenancyBootstrapper;

class QueueTenancyBootstrapper implements TenancyBootstrapper
{
    /** @var Repository */
    protected $config;

    /** @var QueueManager */
    protected $queue;

    /**
     * The normal constructor is only executed after tenancy is bootstrapped.
     * However, we're registering a hook to initialize tenancy. Therefore,
     * we need to register the hook at service provider execution time.
     */
    public static function __constructStatic(Application $app)
    {
        static::setUpJobListener($app->make(Dispatcher::class));
    }

    public function __construct(Repository $config, QueueManager $queue)
    {
        $this->config = $config;
        $this->queue = $queue;

        $this->setUpPayloadGenerator();
    }

    protected static function setUpJobListener($dispatcher)
    {
        $dispatcher->listen(JobProcessing::class, function ($event) {
            $tenant = $event->job->payload()['tenant_id'] ?? null;

            if ($tenant) {
                tenancy()->initialize($tenant);
            }
        });

        $dispatcher->listen(JobRetryRequested::class, function ($event) {
            $tenant = $event->payload()['tenant_id'] ?? null;

            if ($tenant) {
                tenancy()->initialize($tenant);
            }
        });

        $revertToCentralContext = function () {
            tenancy()->end();
        };

        $dispatcher->listen(JobProcessed::class, $revertToCentralContext); // artisan('queue:work') which succeeds
        $dispatcher->listen(JobFailed::class, $revertToCentralContext); // artisan('queue:work') which fails
    }

    protected function setUpPayloadGenerator()
    {
        $bootstrapper = &$this;

        if (! $this->queue instanceof QueueFake) {
            $this->queue->createPayloadUsing(function ($connection) use (&$bootstrapper) {
                return $bootstrapper->getPayload($connection);
            });
        }
    }

    public function bootstrap(Tenant $tenant)
    {
        //
    }

    public function revert()
    {
        //
    }

    public function getPayload(string $connection)
    {
        if (! tenancy()->initialized) {
            return [];
        }

        if ($this->config["queue.connections.$connection.central"]) {
            return [];
        }

        $id = tenant()->getTenantKey();

        return [
            'tenant_id' => $id,
        ];
    }
}

Just wanted to confirm, this had solved the issue in production using the redis tenancy bootstrapper and this version of the queue tenancy bootstrapper.

AdamRollinson avatar Aug 09 '24 15:08 AdamRollinson

I guess the easiest way to make this work in both contexts is to keep the existing logic for reverting to to the previous context since it's well written and tested, and I'll just make the logic for reverting to the previous context execute every time, rendering forceRefresh deprecated and ignored.

stancl avatar Aug 09 '24 15:08 stancl

@pr4xx I'm trying to build an automated solution that could test for queue edge cases in our CI, by running a full queue worker setup in an actual Laravel application in Docker, and I've noticed some additional strange behavior. The latest 3.x works as intended, but 3.8.4 acts up not only by not restarting when it should, it seems to also restart when it shouldn't.

This isn't really critical so if it'd require you to spend time on this, you don't need to, but I'm wondering if you've noticed any extra queue worker restarts. The behavior I think I'm seeing is that:

  1. If a worker is restarted (properly, in the central context),
  2. Then at any point after the restart, a job is dispatched for a new tenant,
  3. The queue worker will restart again.

It will not happen again for that tenant (unless cache is cleared). It only happens once, until the illuminate:queue:restart cache key is populated. The issue basically is that Laravel compares after each job whether the latest illuminate:queue:restart != $lastRestart (same key, just fetched ONCE at the start of the process), and if they don't match a restart is triggered and the key is populated.

Anyway, not a huge deal, just wondering if this has ever been observed by others too.

Edit: Upon further investigation, it seems that it's actually way worse than that — it doesn't happen once, it seems to happen ... all the time? Not sure if I'm testing this correctly. But from dumping the cache values it makes sense — last legitimate restart using central queue:restart takes place at timestamp t1, while the accidental restart happens at timestamp t2 and updates the tenant's cache, meaning the central cache and tenant cache values are different, and the illuminate:queue:restart != $lastRestart check has 0% chance of ever matching.

stancl avatar Dec 31 '24 03:12 stancl

For context, here's the simplified output of my queue testing script on 3.8.4:

Setup complete, starting tests...

Dispatching job from tenant context...
queue-1  |   2024-12-31 05:18:11 App\Jobs\FooJob cmPHyLX8t7mXvs98LKhl5sW2b6C9IHJS  RUNNING
queue-1  |   2024-12-31 05:18:11 App\Jobs\FooJob cmPHyLX8t7mXvs98LKhl5sW2b6C9IHJS  245.13ms DONE
OK: User created in tenant

Dispatching job from central context...
queue-1  |   2024-12-31 05:18:17 App\Jobs\FooJob gjB8rHbLkYEDYQFVOwAXrH1LCjgLbVFI  RUNNING
queue-1  |   2024-12-31 05:18:17 App\Jobs\FooJob gjB8rHbLkYEDYQFVOwAXrH1LCjgLbVFI  244.88ms DONE
OK: User created in central

Running queue:restart (after a central job)...
queue-1  | "Queue should restart!" // vendor/laravel/framework/src/Illuminate/Queue/Worker.php:703
queue-1  | "illuminate:queue:restart: 1735622302" // vendor/laravel/framework/src/Illuminate/Queue/Worker.php:704
queue-1  | "$lastRestart: " // vendor/laravel/framework/src/Illuminate/Queue/Worker.php:705
queue-1 exited with code 0
OK: Queue worker has exited

Starting queue worker again...
[+] Restarting 1/1
 ✔ Container queue-tester-queue-1  Started                                                                                                                                                              0.1s

Dispatching job from tenant context...
queue-1  |   2024-12-31 05:18:31 App\Jobs\FooJob ObTAKP2PDwOuRBmMSwAKx7jh3MU7pAKL  RUNNING
queue-1  |   2024-12-31 05:18:32 App\Jobs\FooJob ObTAKP2PDwOuRBmMSwAKx7jh3MU7pAKL  243.34ms DONE
queue-1  | "Queue should restart!" // vendor/laravel/framework/src/Illuminate/Queue/Worker.php:703
queue-1  | "illuminate:queue:restart: " // vendor/laravel/framework/src/Illuminate/Queue/Worker.php:704
queue-1  | "$lastRestart: 1735622302" // vendor/laravel/framework/src/Illuminate/Queue/Worker.php:705
queue-1 exited with code 0
OK: User created in tenant

WARN: Queue worker restarted after running a tenant job post-restart (https://github.com/archtechx/tenancy/issues/1229#issuecomment-2566111616) following assertions will likely fail.
[+] Running 2/2
 ✔ Container queue-tester-redis-1  Healthy                                                                                                                                                              0.5s
 ✔ Container queue-tester-queue-1  Started                                                                                                                                                              0.1s
Dispatching job from central context...
queue-1  |   2024-12-31 05:18:40 App\Jobs\FooJob sNZI23SQvJ1vzkMkkmrhvE9P6BZ51W26  RUNNING
queue-1  |   2024-12-31 05:18:41 App\Jobs\FooJob sNZI23SQvJ1vzkMkkmrhvE9P6BZ51W26  259.75ms DONE
OK: User created in central

Dispatching job from tenant context...
queue-1  |   2024-12-31 05:18:44 App\Jobs\FooJob WUcStPSDdmpCVvGlBdZLCH7JCZly6bgU  RUNNING
queue-1  |   2024-12-31 05:18:44 App\Jobs\FooJob WUcStPSDdmpCVvGlBdZLCH7JCZly6bgU  237.55ms DONE
queue-1  | "Queue should restart!" // vendor/laravel/framework/src/Illuminate/Queue/Worker.php:703
queue-1  | "illuminate:queue:restart: " // vendor/laravel/framework/src/Illuminate/Queue/Worker.php:704
queue-1  | "$lastRestart: 1735622302" // vendor/laravel/framework/src/Illuminate/Queue/Worker.php:705
queue-1 exited with code 0
OK: User created in tenant

WARN: ANOTHER extra restart took place after running a tenant job
[+] Running 2/2
 ✔ Container queue-tester-redis-1  Healthy                                                                                                                                                              0.5s
 ✔ Container queue-tester-queue-1  Started                                                                                                                                                              0.1s
queue-1  | "Queue should restart!" // vendor/laravel/framework/src/Illuminate/Queue/Worker.php:703
queue-1  | "illuminate:queue:restart: " // vendor/laravel/framework/src/Illuminate/Queue/Worker.php:704
queue-1  | "$lastRestart: 1735622302" // vendor/laravel/framework/src/Illuminate/Queue/Worker.php:705
queue-1 exited with code 0
[+] Restarting 1/1
 ✔ Container queue-tester-queue-1  Started                                                                                                                                                              0.1s
Dispatching job from tenant context...
queue-1  |   2024-12-31 05:18:56 App\Jobs\FooJob Nauopp7lxNEUerS3r2BAFgqK5D33p77K  RUNNING
queue-1  |   2024-12-31 05:18:56 App\Jobs\FooJob Nauopp7lxNEUerS3r2BAFgqK5D33p77K  244.43ms DONE
OK: User created in tenant

Running queue:restart (after a tenant job)...
ERR: Queue worker has NOT exited!

That's with this change to Illuminate\Queue\Worker:

protected function queueShouldRestart($lastRestart)
{
    if ($this->getTimestampOfLastQueueRestart() != $lastRestart) { // added this section
        dump("Queue should restart!");
        dump("illuminate:queue:restart: " . $this->getTimestampOfLastQueueRestart());
        dump('$lastRestart: ' . $lastRestart);
    }

    return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}

stancl avatar Dec 31 '24 05:12 stancl

Just for completeness, this happens on older Laravel versions too, including the one used in your repo:

CleanShot 2024-12-31 at 8  20 57@2x

I saw some variability (in terms of when the tests fail) between Laravel versions while debugging this test script but that was likely due to other causes it seems.

stancl avatar Dec 31 '24 07:12 stancl