concurrent-ruby icon indicating copy to clipboard operation
concurrent-ruby copied to clipboard

Copy select thread local state of calling thread into workers

Open dueckes opened this issue 10 years ago • 8 comments

This is a suggestion.

I was recently faced with a problem in a Rails app using Futures where log content that was run by a Future task did not contain a request uuid. It turns out that Rails stores this log related data in thread locals. To work around this, I decided to patch the ThreadPoolExecutor Worker to copy select local thread state from the invoking thread to the worker thread when it receives a message to execute.

Suggestion: through configuration / call arguments, allow workers to copy select thread state prior to performing tasks.

I believe thread synchronization models in other languages / tools support this, for instance .NET allows thread context to be shared among threads: https://msdn.microsoft.com/magazine/gg598924.aspx.

Do the ruby-concurrency team believe this idea has merit?

dueckes avatar Oct 29 '15 22:10 dueckes

That's a very interesting idea. Thank you for suggesting it. Because we are so close to the 1.0 release I've put a feature freeze in place, so we won't be able to give this serious consideration until after. I'm definitely going to read the link you post and do some research.

jdantonio avatar Oct 30 '15 13:10 jdantonio

Would something as follows work?

def post_rails_job(pool, *args, &job)
  request_uuid = Thread.current[:rails_request_uuid]
  pool.post(request_uuid, job, *args) do |uuid, job, *args|
    begin
      Thread.current[:rails_request_uuid] = uuid
      job.call *args
    ensure
      Thread.current[:rails_request_uuid] = nil
    end
  end
end

or am I missing something? I am not sure why the Worker patch was needed.

pitr-ch avatar Oct 30 '15 17:10 pitr-ch

I'm a little lost on your suggestion as I'm not sure of the most appropriate way to use it. A goal for my changes was to make the sharing of thread state the least invasive as possible for callers.

My patch currently hides the sharing of thread state from calling code, so my use of futures is dead-easy:

Concurrent.future { Model.find(id) } // uses Rails logger

Sharing of thread state is achieved by a worker patch, highlights of which follow:

module WorkerPatch

  def self.included(mod)
    mod.send(:attr_reader, :thread)
    mod.send(:alias_method, :push_without_thread_local_copy, :<<)
    mod.send(:alias_method, :<<, :push_with_thread_local_copy)
  end

  def push_with_thread_local_copy(message)
    @thread.copy_missing_local_variables_from(Thread.current) #thread patch that copies state
    self.push_without_thread_local_copy(message)
  end

end

::Concurrent::RubyThreadPoolExecutor::Worker.send(:include, WorkerPatch)

If the concurrent-ruby gem were to change to allow callers to specify shared thread state, I would suggest introducing a mechanism to configure the shared thread state along these lines:

# A closure that allows callers to arbitrarily change thread state
Concurrent.shared_thread_context = -> (main_thread, worker_thread) do
  worker_thread[:a_thread_local_key] = main_thread[:a_thread_local_key]
end

# Complementary approach: automatically copies these thread local values
Concurrent.shared_thread_context = %i { a_thread_local_key } 

From a callers perspective, the user of futures, etc., would then remain unchanged.

dueckes avatar Nov 03 '15 23:11 dueckes

Ah sorry, my mistake. The example does not help with Futures. I see your problem and we should address it however the suggestion uses a global state which should be avoided. It's hard to track and it leaks to all jobs in the gem even if some of them are not called from Rails scope.

I would suggest following approach for now

module RailsConcurrent
  def in_rails_scope(&block)
    source_thread = Thread.current
    lambda do
      begin
        Thread.current[:a_thread_local_key] = source_thread[:a_thread_local_key]
        block.call
      ensure
        Thread.current[:a_thread_local_key] = nil
      end
    end
  end

  def future(*args, &block)
    Concurrent.future(*args, &in_rails_scope(&block))
  end

  extend self
end

RailsConcurrent.future { Model.find(id) } # uses Rails logger

In next concurrent-ruby release we could consider introducing a future factory class (or module) which would hold this shared configurations like what thread local information should be transferred. Concurrent would still be a way how to construct future with default setting, it would just be easier to create other factories like RailsConcurrent in above example.

pitr-ch avatar Nov 11 '15 11:11 pitr-ch

Hi guys,

Rails keep several values in current thread, which may change execution result. Here is an example

> I18n.locale = :de
> I18n.t('yes')
=> 'Ja'
>  Concurrent::Promise.execute { I18n.t('yes') }.wait.value
=> 'Yes'

Would be cool to have possibility to tell Concurent's structures to copy thread local variables in current release as well. Thank you for cool gem!

ABorovenskyi avatar Jan 03 '17 16:01 ABorovenskyi

Moving to 1.1 giving it a higher priority.

pitr-ch avatar Jan 10 '17 21:01 pitr-ch

As a note https://bugs.ruby-lang.org/issues/19078 is a solution from Ruby 3.2 for this issue. It probably doesn't just work for thread pools though where threads are created long before the task. Maybe a good use case for Fiber#storage=, cc @ioquatix. i.e., on pool.post, we'd get the caller Fiber storage with Fiber.current.storage and inside the thread we'd use Fiber.current.storage = storage + restore the storage before in ensure.

Another possibility is to create an extra Fiber for the job with Fiber.new(storage: storage), but that's very expensive on non-CRuby non-Loom.

eregon avatar Dec 21 '22 13:12 eregon

Yes, it's a use case for Fiber.curent.storage= - i.e. a request loop on a thread (this was given as an example in my original proposal). It would make sense for a thread pool where the parent scope should be cleared out for each new job. Kotlin has a concept of "work which has a context" and if you execute that work on a different thread or fiber, it automatically carries that context. In our case, we'd need the thread pool to capture the current storage before executing, and assign it within the thread/fiber when executing the work.

restore the storage before in ensure.

This really shouldn't matter in a thread pool implementation. I mean, it's extra work for no reason if you are immediately assigning in a loop (unless GC is an issue).

Another possibility is to create an extra Fiber for the job with Fiber.new(storage: storage), but that's very expensive on non-CRuby non-Loom.

Well, I personally think this should be faster :) How long until Loom arrives?

ioquatix avatar Dec 21 '22 21:12 ioquatix