solid_queue icon indicating copy to clipboard operation
solid_queue copied to clipboard

Job with concurrency key blocked when it shouldn't have, eventually was run after `duration` expired

Open leondmello opened this issue 1 year ago β€’ 14 comments

We are observing that our jobs are sometimes unnecessarily blocked. In the following example, for the same concurrency key, there are only 2 jobs. The first one completes immediately in just over a 100 milliseconds (finished_at value). The second one takes over 30 minutes, we are guessing it is blocked by the concurrency control duration. The scheduled_at time of the second job is much after when the first job completes. As per our understanding, the second job should have started immediately. We confirmed that none of the jobs processes bounced during that time.

... indicates redacted data.

Concurrency Config

limits_concurrency to: 1, key: ->(action, params) do
  ...
end, duration: 30.minutes

Queue Config

dispatchers:
    - polling_interval: 1
      batch_size: 500
workers:
    - queues: [<queue of blocked job>, "*"]
      threads: 3
      processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
      polling_interval: 0.1

Console queries

SolidQueue::Job.where("DATE_PART('minutes', finished_at - scheduled_at) >= 30").count
=> 1

SolidQueue::Job.where(concurrency_key: '...').select(:active_job_id, :scheduled_at, :finished_at).order(:finished_at)
=>
[#<SolidQueue::Job:0x00007f062d151a50
  active_job_id: "60916418-e7a3-4f1e-9171-357b6e52c154",
  scheduled_at: "2024-12-16 01:20:09.441199000 +0000",
  finished_at: "2024-12-16 01:20:09.648985000 +0000",
  id: nil>,
 #<SolidQueue::Job:0x00007f062d151910
  active_job_id: "3c4f34da-9db7-430e-9cbe-c1214f98ccb9",
  scheduled_at: "2024-12-16 01:54:49.287841000 +0000",
  finished_at: "2024-12-16 02:24:50.264702000 +0000",
  id: nil>]

Logs about the second job execution

I, [2024-12-16T02:24:50.154361 #84]  INFO -- : [ActiveJob] [...] [3c4f34da-9db7-430e-9cbe-c1214f98ccb9] Performing ... (Job ID: 3c4f34da-9db7-430e-9cbe-c1214f98ccb9) from SolidQueue(...) enqueued at 2024-12-16T01:54:49.287885003Z with arguments: ...
I, [2024-12-16T02:24:50.264573 #84]  INFO -- : [ActiveJob] [...] [3c4f34da-9db7-430e-9cbe-c1214f98ccb9] Performed ... (Job ID: 3c4f34da-9db7-430e-9cbe-c1214f98ccb9) from SolidQueue(...) in 110.35ms

leondmello avatar Dec 16 '24 21:12 leondmello

As per our understanding, the second job should have started immediately.

That's right. The first job should have unblocked the second job here πŸ€” Would it be possible for you to enable Active Record logs at the debug level to see what's happening when the first job runs and tries to unblock it?

rosa avatar Dec 16 '24 21:12 rosa

That would be a lot of logging unfortunately. And this isn’t that common.

leondmello avatar Dec 16 '24 21:12 leondmello

I'll see if I can add it in our staging environment. My plan is to add an initializer which basically does

module SolidQueue
  class Job
    def unblock_next_blocked_job
      ActiveRecord.verbose_query_logs = true
      super
    ensure
      ActiveRecord.verbose_query_logs = false
    end
  end
end

leondmello avatar Dec 16 '24 23:12 leondmello

@rosa Are debug logs prefixed with SolidQueue::Semaphore sufficient. (Or maybe some more prefixes?) Went with a different approach which right now allows debug logs prefixed with SolidQueue::

class FilteredSqLogger < ActiveSupport::Logger
  def debug(progname = nil, &block)
    super if progname.match(/^\s+SolidQueue::/)
  end
end

leondmello avatar Dec 17 '24 17:12 leondmello

I think you could try SolidQueue::Semaphore and SolidQueue::BlockedExecution. I think any clues would involve any of those records.

rosa avatar Dec 17 '24 18:12 rosa

~Ended up going with /^\s*SolidQueue::(?!Process)/ since Process was the most noisy.~ (That was too noisy as well πŸ™‚) I will post an update to this issue once we see regression.

leondmello avatar Dec 17 '24 19:12 leondmello

Happened again

Records (... is for obfuscation, I searched with the proper concurrency key)

SolidQueue::Job.where(concurrency_key: '...').select(:id, :scheduled_at, :finished_at)
=>
[#<SolidQueue::Job:0x00007f0eaf044c40 id: 17031406, scheduled_at: "2024-12-23 11:54:57.301722000 +0000", finished_at: "2024-12-23 11:54:57.793273000 +0000">,
 #<SolidQueue::Job:0x00007f0eaf044b00 id: 17031411, scheduled_at: "2024-12-23 11:54:57.832971000 +0000", finished_at: "2024-12-23 12:01:15.875778000 +0000">]

Logs for that key

D, [2024-12-23T11:54:57.872605 #97] DEBUG -- :   SolidQueue::Semaphore Update All (64.5ms)  UPDATE "solid_queue_semaphores" SET value = value + 1, expires_at = '2024-12-23 11:57:57.807857' WHERE "solid_queue_semaphores"."key" = '...' AND "solid_queue_semaphores"."value" < 1

D, [2024-12-23T11:54:57.879150 #97] DEBUG -- :   SolidQueue::BlockedExecution Load (5.8ms)  SELECT "solid_queue_blocked_executions".* FROM "solid_queue_blocked_executions" WHERE "solid_queue_blocked_executions"."concurrency_key" = '...' ORDER BY "solid_queue_blocked_executions"."priority" ASC, "solid_queue_blocked_executions"."job_id" ASC LIMIT 1 FOR UPDATE SKIP LOCKED

D, [2024-12-23T12:01:14.997407 #94] DEBUG -- :   SolidQueue::Semaphore Pluck (5.1ms)  SELECT "solid_queue_semaphores"."key", "solid_queue_semaphores"."value" FROM "solid_queue_semaphores" WHERE "solid_queue_semaphores"."key" = '...'

D, [2024-12-23T12:01:15.008779 #94] DEBUG -- :   SolidQueue::BlockedExecution Load (5.6ms)  SELECT "solid_queue_blocked_executions".* FROM "solid_queue_blocked_executions" WHERE "solid_queue_blocked_executions"."concurrency_key" = '...' ORDER BY "solid_queue_blocked_executions"."priority" ASC, "solid_queue_blocked_executions"."job_id" ASC LIMIT 1 FOR UPDATE SKIP LOCKED

D, [2024-12-23T12:01:15.380319 #94] DEBUG -- :   SolidQueue::Semaphore Load (7.0ms)  SELECT "solid_queue_semaphores".* FROM "solid_queue_semaphores" WHERE "solid_queue_semaphores"."key" = '...' LIMIT 1

D, [2024-12-23T12:01:15.469601 #94] DEBUG -- :   SolidQueue::Semaphore Insert (67.5ms)  INSERT INTO "solid_queue_semaphores" ("key","value","expires_at","created_at","updated_at") VALUES ('...', 0, '2024-12-23 12:04:15.380905', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT ("key") DO NOTHING RETURNING "id"

D, [2024-12-23T12:01:15.902346 #97] DEBUG -- :   SolidQueue::Semaphore Update All (8.4ms)  UPDATE "solid_queue_semaphores" SET value = value + 1, expires_at = '2024-12-23 12:04:15.893182' WHERE "solid_queue_semaphores"."key" = '...' AND "solid_queue_semaphores"."value" < 1

D, [2024-12-23T12:01:15.908124 #97] DEBUG -- :   SolidQueue::BlockedExecution Load (5.1ms)  SELECT "solid_queue_blocked_executions".* FROM "solid_queue_blocked_executions" WHERE "solid_queue_blocked_executions"."concurrency_key" = '...' ORDER BY "solid_queue_blocked_executions"."priority" ASC, "solid_queue_blocked_executions"."job_id" ASC LIMIT 1 FOR UPDATE SKIP LOCKED

leondmello avatar Dec 23 '24 13:12 leondmello

Ohhhh, this is super useful, @leondmello! I think I know what's going on, thanks to your logs. Looks like a tricky race condition. Basically, what happens is that your first job finishes before the second one has been enqueued and blocked, not before enough to have unblocked the semaphore but before enough to not see it blocked yet. Here's the first job finishing:

finished_at: "2024-12-23 11:54:57.793273000 +0000"

Then, when that job finishes, it unblocks the semaphore, here:

D, [2024-12-23T11:54:57.872605 #97] DEBUG --

Here's the second job enqueued:

scheduled_at: "2024-12-23 11:54:57.832971000 +0000"

At that point the semaphore hasn't been updated yet, because it was updated at 11:54:57.872605, so it gets blocked. However, when this SELECT runs, the blocked execution mustn't have been created yet:

D, [2024-12-23T11:54:57.879150 #97] DEBUG -- :   SolidQueue::BlockedExecution Load (5.8ms)  SELECT "solid_queue_blocked_executions".* FROM "solid_queue_blocked_executions" WHERE "solid_queue_blocked_executions"."concurrency_key" = '...' ORDER BY "solid_queue_blocked_executions"."priority" ASC, "solid_queue_blocked_executions"."job_id" ASC LIMIT 1 FOR UPDATE SKIP LOCKED

So it can't get unblocked, because it's not completely blocked yet. Ahh, tricky!

What isolation level are you running?

rosa avatar Dec 27 '24 16:12 rosa

What isolation level are you running?

We have this in our application.rb,

    config.active_support.isolation_level = :fiber

But don't know the reason as to why that was added some time ago. Trying to find out.

leondmello avatar Dec 27 '24 17:12 leondmello

@leondmello, no, no worries about that. I meant the transaction isolation level in your DB (I imagined you're using PostgreSQL because of the logs), but thinking more about it, it shouldn't make any difference, so no worries about it.

It's a tricky one, I'm not sure right now about how to fix it but I'll keep thinking about it.

rosa avatar Dec 27 '24 17:12 rosa

Thinking more about this and how to fix... Claiming the semaphore + enqueuing the job happens in a transaction, but the semaphore is not locked if it exists because it's just checked, so it can be released while the transaction is ongoing, and you wouldn't know, regardless of whether READ COMMITTED or REPEATABLE READ is used as isolation level (the most common isolation levels), because you'd just read that once in the beginning.

I think, to fix this, we'd have to take a lock there, when checking the semaphore, and releasing it when the transaction is committed πŸ€” However, this might introduce a new kind of deadlock I can't see right now (I've fixed a couple of deadlocks here in the past) so I need to think carefully about it.

rosa avatar Dec 30 '24 13:12 rosa

@rosa Any luck fixing this one?

leondmello avatar Feb 06 '25 16:02 leondmello

Hey @leondmello, unfortunately not 😞 I've got an idea that might work, but I haven't been able to test it carefully as I think it's going to cause problems under certain load. I'm now focused on other different work so I won't have a lot of time in the next couple of months, but will try to get back to this soon.

rosa avatar Feb 06 '25 17:02 rosa

@rosa We have the same problem.

  • [job1] scheduled
  • [job2] scheduled
  • [job1] finished
  • [job1] release BlockedExecution (but it don't exists yet)
  • [job2] create BlockedExecution (won't be released until block timeout pass)

madmax avatar May 06 '25 11:05 madmax