Job with concurrency key blocked when it shouldn't have, eventually was run after `duration` expired
We are observing that our jobs are sometimes unnecessarily blocked.
In the following example, for the same concurrency key, there are only 2 jobs. The first one completes immediately in just over a 100 milliseconds (finished_at value).
The second one takes over 30 minutes, we are guessing it is blocked by the concurrency control duration. The scheduled_at time of the second job is much after when the first job completes.
As per our understanding, the second job should have started immediately.
We confirmed that none of the jobs processes bounced during that time.
... indicates redacted data.
Concurrency Config
limits_concurrency to: 1, key: ->(action, params) do
...
end, duration: 30.minutes
Queue Config
dispatchers:
- polling_interval: 1
batch_size: 500
workers:
- queues: [<queue of blocked job>, "*"]
threads: 3
processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
polling_interval: 0.1
Console queries
SolidQueue::Job.where("DATE_PART('minutes', finished_at - scheduled_at) >= 30").count
=> 1
SolidQueue::Job.where(concurrency_key: '...').select(:active_job_id, :scheduled_at, :finished_at).order(:finished_at)
=>
[#<SolidQueue::Job:0x00007f062d151a50
active_job_id: "60916418-e7a3-4f1e-9171-357b6e52c154",
scheduled_at: "2024-12-16 01:20:09.441199000 +0000",
finished_at: "2024-12-16 01:20:09.648985000 +0000",
id: nil>,
#<SolidQueue::Job:0x00007f062d151910
active_job_id: "3c4f34da-9db7-430e-9cbe-c1214f98ccb9",
scheduled_at: "2024-12-16 01:54:49.287841000 +0000",
finished_at: "2024-12-16 02:24:50.264702000 +0000",
id: nil>]
Logs about the second job execution
I, [2024-12-16T02:24:50.154361 #84] INFO -- : [ActiveJob] [...] [3c4f34da-9db7-430e-9cbe-c1214f98ccb9] Performing ... (Job ID: 3c4f34da-9db7-430e-9cbe-c1214f98ccb9) from SolidQueue(...) enqueued at 2024-12-16T01:54:49.287885003Z with arguments: ...
I, [2024-12-16T02:24:50.264573 #84] INFO -- : [ActiveJob] [...] [3c4f34da-9db7-430e-9cbe-c1214f98ccb9] Performed ... (Job ID: 3c4f34da-9db7-430e-9cbe-c1214f98ccb9) from SolidQueue(...) in 110.35ms
As per our understanding, the second job should have started immediately.
That's right. The first job should have unblocked the second job here π€ Would it be possible for you to enable Active Record logs at the debug level to see what's happening when the first job runs and tries to unblock it?
That would be a lot of logging unfortunately. And this isnβt that common.
I'll see if I can add it in our staging environment. My plan is to add an initializer which basically does
module SolidQueue
class Job
def unblock_next_blocked_job
ActiveRecord.verbose_query_logs = true
super
ensure
ActiveRecord.verbose_query_logs = false
end
end
end
@rosa Are debug logs prefixed with SolidQueue::Semaphore sufficient. (Or maybe some more prefixes?)
Went with a different approach which right now allows debug logs prefixed with SolidQueue::
class FilteredSqLogger < ActiveSupport::Logger
def debug(progname = nil, &block)
super if progname.match(/^\s+SolidQueue::/)
end
end
I think you could try SolidQueue::Semaphore and SolidQueue::BlockedExecution. I think any clues would involve any of those records.
~Ended up going with /^\s*SolidQueue::(?!Process)/ since Process was the most noisy.~ (That was too noisy as well π)
I will post an update to this issue once we see regression.
Happened again
Records (... is for obfuscation, I searched with the proper concurrency key)
SolidQueue::Job.where(concurrency_key: '...').select(:id, :scheduled_at, :finished_at)
=>
[#<SolidQueue::Job:0x00007f0eaf044c40 id: 17031406, scheduled_at: "2024-12-23 11:54:57.301722000 +0000", finished_at: "2024-12-23 11:54:57.793273000 +0000">,
#<SolidQueue::Job:0x00007f0eaf044b00 id: 17031411, scheduled_at: "2024-12-23 11:54:57.832971000 +0000", finished_at: "2024-12-23 12:01:15.875778000 +0000">]
Logs for that key
D, [2024-12-23T11:54:57.872605 #97] DEBUG -- : SolidQueue::Semaphore Update All (64.5ms) UPDATE "solid_queue_semaphores" SET value = value + 1, expires_at = '2024-12-23 11:57:57.807857' WHERE "solid_queue_semaphores"."key" = '...' AND "solid_queue_semaphores"."value" < 1
D, [2024-12-23T11:54:57.879150 #97] DEBUG -- : SolidQueue::BlockedExecution Load (5.8ms) SELECT "solid_queue_blocked_executions".* FROM "solid_queue_blocked_executions" WHERE "solid_queue_blocked_executions"."concurrency_key" = '...' ORDER BY "solid_queue_blocked_executions"."priority" ASC, "solid_queue_blocked_executions"."job_id" ASC LIMIT 1 FOR UPDATE SKIP LOCKED
D, [2024-12-23T12:01:14.997407 #94] DEBUG -- : SolidQueue::Semaphore Pluck (5.1ms) SELECT "solid_queue_semaphores"."key", "solid_queue_semaphores"."value" FROM "solid_queue_semaphores" WHERE "solid_queue_semaphores"."key" = '...'
D, [2024-12-23T12:01:15.008779 #94] DEBUG -- : SolidQueue::BlockedExecution Load (5.6ms) SELECT "solid_queue_blocked_executions".* FROM "solid_queue_blocked_executions" WHERE "solid_queue_blocked_executions"."concurrency_key" = '...' ORDER BY "solid_queue_blocked_executions"."priority" ASC, "solid_queue_blocked_executions"."job_id" ASC LIMIT 1 FOR UPDATE SKIP LOCKED
D, [2024-12-23T12:01:15.380319 #94] DEBUG -- : SolidQueue::Semaphore Load (7.0ms) SELECT "solid_queue_semaphores".* FROM "solid_queue_semaphores" WHERE "solid_queue_semaphores"."key" = '...' LIMIT 1
D, [2024-12-23T12:01:15.469601 #94] DEBUG -- : SolidQueue::Semaphore Insert (67.5ms) INSERT INTO "solid_queue_semaphores" ("key","value","expires_at","created_at","updated_at") VALUES ('...', 0, '2024-12-23 12:04:15.380905', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT ("key") DO NOTHING RETURNING "id"
D, [2024-12-23T12:01:15.902346 #97] DEBUG -- : SolidQueue::Semaphore Update All (8.4ms) UPDATE "solid_queue_semaphores" SET value = value + 1, expires_at = '2024-12-23 12:04:15.893182' WHERE "solid_queue_semaphores"."key" = '...' AND "solid_queue_semaphores"."value" < 1
D, [2024-12-23T12:01:15.908124 #97] DEBUG -- : SolidQueue::BlockedExecution Load (5.1ms) SELECT "solid_queue_blocked_executions".* FROM "solid_queue_blocked_executions" WHERE "solid_queue_blocked_executions"."concurrency_key" = '...' ORDER BY "solid_queue_blocked_executions"."priority" ASC, "solid_queue_blocked_executions"."job_id" ASC LIMIT 1 FOR UPDATE SKIP LOCKED
Ohhhh, this is super useful, @leondmello! I think I know what's going on, thanks to your logs. Looks like a tricky race condition. Basically, what happens is that your first job finishes before the second one has been enqueued and blocked, not before enough to have unblocked the semaphore but before enough to not see it blocked yet. Here's the first job finishing:
finished_at: "2024-12-23 11:54:57.793273000 +0000"
Then, when that job finishes, it unblocks the semaphore, here:
D, [2024-12-23T11:54:57.872605 #97] DEBUG --
Here's the second job enqueued:
scheduled_at: "2024-12-23 11:54:57.832971000 +0000"
At that point the semaphore hasn't been updated yet, because it was updated at 11:54:57.872605, so it gets blocked. However, when this SELECT runs, the blocked execution mustn't have been created yet:
D, [2024-12-23T11:54:57.879150 #97] DEBUG -- : SolidQueue::BlockedExecution Load (5.8ms) SELECT "solid_queue_blocked_executions".* FROM "solid_queue_blocked_executions" WHERE "solid_queue_blocked_executions"."concurrency_key" = '...' ORDER BY "solid_queue_blocked_executions"."priority" ASC, "solid_queue_blocked_executions"."job_id" ASC LIMIT 1 FOR UPDATE SKIP LOCKED
So it can't get unblocked, because it's not completely blocked yet. Ahh, tricky!
What isolation level are you running?
What isolation level are you running?
We have this in our application.rb,
config.active_support.isolation_level = :fiber
But don't know the reason as to why that was added some time ago. Trying to find out.
@leondmello, no, no worries about that. I meant the transaction isolation level in your DB (I imagined you're using PostgreSQL because of the logs), but thinking more about it, it shouldn't make any difference, so no worries about it.
It's a tricky one, I'm not sure right now about how to fix it but I'll keep thinking about it.
Thinking more about this and how to fix... Claiming the semaphore + enqueuing the job happens in a transaction, but the semaphore is not locked if it exists because it's just checked, so it can be released while the transaction is ongoing, and you wouldn't know, regardless of whether READ COMMITTED or REPEATABLE READ is used as isolation level (the most common isolation levels), because you'd just read that once in the beginning.
I think, to fix this, we'd have to take a lock there, when checking the semaphore, and releasing it when the transaction is committed π€ However, this might introduce a new kind of deadlock I can't see right now (I've fixed a couple of deadlocks here in the past) so I need to think carefully about it.
@rosa Any luck fixing this one?
Hey @leondmello, unfortunately not π I've got an idea that might work, but I haven't been able to test it carefully as I think it's going to cause problems under certain load. I'm now focused on other different work so I won't have a lot of time in the next couple of months, but will try to get back to this soon.
@rosa We have the same problem.
- [job1] scheduled
- [job2] scheduled
- [job1] finished
- [job1] release BlockedExecution (but it don't exists yet)
- [job2] create BlockedExecution (won't be released until block timeout pass)