Dynamic scheduled tasks
Hi @rosa, I really appreciate the work you have done here and I am eager to switch over to solid_queue. There is only one thing left, which is holding me back.
Other gems like resque-scheduler or sidekiq-scheduler offer the ability to dynamically add or remove tasks to the schedule (see https://github.com/resque/resque-scheduler?tab=readme-ov-file#dynamic-schedules). Since everything seems to be stored in the database for solid_queue, this should be quite easy to achieve.
Maybe it is only a documentation issue and it is already possible (this would be awesome 🤞 ). Happy to hear from you!
@wollistik, no, this is not supported yet, you didn't miss it 😅
This is something we can consider adding for sure. May I ask what's your use case for it?
Hi @rosa, we have some cron like jobs, where the user is able to
- Disable/Enable running the job
- Change the schedule on when and how often the job is performed
Therefore the dynamic scheduling feature was really nice to implement these requirements.
But after I thought about this the last two days, I might be completely redesign this feature and come up with a different solution, because it was rarely used and still required some quirks to get it working.
Ahh got it! That makes sense 👍 Yes, I thought a bit more about it yesterday and this is something I want to implement eventually 😊
Maybe this helps, but the way I implemented dynamic cron schedules is by having one job run every minute and check if the cron schedule matches the one stored in my database table, with my table containing cron expressions. I use Fugit to check if the cron matches the current time.
@abrunner94 Even I have done a similar thing. Mine is not exactly the cron like scheduling but I am having different schedules. First task is scheduled to be executed after 15 days and then onwards every alternate days. So in the execution of first job I enqueue next job and in that job I keep enqueueing next jobs every 2 days as per condition.
SolidQueue is great. I’ve never embraced a new Rails addition more warmly than this one. Thanks to every contributor!
I took at look into dynamic recurring jobs and I think SolidQueue is really close to supporting them, at least how I envision. There are just a couple of missing parts which I will try to detail below.
TLDR; The Scheduler is to recurring_tasks like a media player is to playing your music/video playlist. It currently only supports (static) playlists predefined in a config file, which is only loaded at application startup. It would be awesome if we could update our playlist (recurring_tasks) while music is playing and without having to restart the playback.
The dirty details of exploration and a hacky workaround
I wanted to dive into SolidQueue in hope to uncover what it will take to run dynamic recurring tasks. First as a reminder, the README tells us that the Scheduler handles recurring tasks. Currently it seems that only static recurring tasks work out of the box while dynamic has some plumbing in place. What is a static vs. dynamic recurring task? If I understand correctly,
-
Static recurring tasks are defined in a configuration file that defaults to
config/recurring.yml. Tasks within are loaded and persisted into the solid queue database as aSolidQueue::RecurringTaskAR object withstatic: true. -
Dynamic recurring tasks are simply
SolidQueue::RecurringTaskwithstatic: false, and they are not really supported yet.
I will give an example of how to create a dynamic recurring task in my use case described way down below. But even if you created one now it will not run automatically. Remember, the Scheduler holds the list of recurring tasks which:
- currently only knows about static recurring tasks (loaded from config file) and
- only gets loaded upon boot
So here is one way to solve (1). In SolidQueue::Configuration#recurring_tasks lets return static + dynamic. BTW, we're just hacking this to work.
# in lib/solid_queue/configuration.rb
def recurring_tasks
@recurring_tasks ||= recurring_tasks_config.map do |id, options|
RecurringTask.from_configuration(id, **options)
end.select(&:valid?)
+ @recurring_tasks + RecurringTask.where(static: false)
end
So now assuming that we have at least one static recurring job and a dynamic one then all we need to is restart our application for the Scheduler to know about them. Yeah I know, that's not ideal so lets try to address that now.
I could not figure out if or how to reach the instance of the Scheduler in an attempt update the attributes with a reloaded recurring_tasks list. So instead, what if we just restart the Scheduler right after we create our dynamic recurring tasks hoping that it will reload the recurring tasks list?
# after dynamic recurring task creation...
SolidQueue::Process.where(kind: "Scheduler").all.map(&:deregister)
Assuming that SolidQueue::Process is public API, deregister will restart the Scheduler process fine, but it can take the Supervisor up to a minute which totally works for my use case but maybe not others. Regardless, we still have a problem. The Supervisor holds the Configuration (with recurring_tasks) from boot up, meaning the newly spawned Scheduler process will not have our newly created dynamic recurring task. No bueno.
To address that, first we can update Configuration so that we can tell it to reload and return a fresh recurring_tasks list.
# in lib/solid_queue/configuration.rb
+ def reload_recurring_tasks
+ recurring_tasks
+ end
private
...
Then lets go to the Supervisor#replace_fork and just before we fork the new Scheduler process we make sure it will have a fresh reloaded list of recurring tasks.
# in lib/solid_queue/supervisor.rb
def replace_fork(pid, status)
SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
if terminated_fork = forks.delete(pid)
payload[:fork] = terminated_fork
handle_claimed_jobs_by(terminated_fork, status)
+ if configured_processes[pid].kind == :scheduler
+ configured_processes[pid].attributes[:recurring_tasks] = configuration.reload_recurring_tasks
+ end
start_process(configured_processes.delete(pid))
end
end
end
I tried to find a better way. This feels pretty hackish but it does get the job done. But now we seem to have everything in place for my use case.
Final thoughts. I think what it boils down to is that it would be awesome if the Scheduler could reload the recurring_tasks as they get updated. It could poll for updates periodically. It could receive a signal that triggers a reload. It could have some after hooks that trigger a reload. Whatever it is, I am super excited for it.
Use Case
I have an Event AR that has a start_time and end_time. When an event is created I would like to schedule a one-time job at the start_time. I call this job LiveEventDispatcherJob. When it runs at the scheduled event.start_time it creates some other jobs, one of which is the dynamic recurring task LiveEventPollingJob which runs every 15 seconds or so. There is also a LiveEventCleanupJob which runs at event.end_time which deletes the recurring task and, somewhat unfortunately, restarts the Scheduler just to refresh its recurring_tasks list.
# config/recurring.yml
development:
periodic_hello:
command: "puts 'Hello, static recurring world!'"
priority: 2
schedule: every 10 minutes
Beware if you are doing development for dynamic recurring tasks. The current SolidQueue
Supervisorwill not start aSchedulerprocess if there are no recurring jobs. As a workaround I was using the above dummy recurring task.
class Event < ApplicationRecord
after_create :create_event
validates :name, :start_time, :end_time, presence: true
private
def create_event
LiveEventDispatcherJob.set(wait_until: self.start_time).perform_later(event_id: self.id)
end
end
class LiveEventDispatcherJob < ApplicationJob
queue_as :default
POLLING_SCHEDULE = "every 15 seconds"
def perform(event_id:)
puts "dispatching for the start of live event with id: #{event_id}"
event = Event.find(event_id)
# What I wish ActiveJob had (passing schedule to set):
# LiveEventPollingJob.set(schedule: POLLING_SCHEDULE).perform_later(event_id: event.id)
# What I am doing instead:
SolidQueue::RecurringTask.find_or_create_by(
static: false,
key: "LiveEventPollingEvent#{event.id}",
schedule: POLLING_SCHEDULE,
class_name: "LiveEventPollingJob",
arguments: [ { event_id: event.id } ]
)
# Restart the Scheduler to pick up dynamic recurring task changes
SolidQueue::Process.where(kind: "Scheduler").all.map(&:deregister)
# Schedule the cleanup job
LiveEventCleanupJob.set(wait_until: event.end_time).perform_later(event_id: event.id)
end
end
class LiveEventPollingJob < ApplicationJob
queue_as :default
def perform(event_id:)
puts "polling for live event with id: #{event_id}"
sleep 5
end
end
class LiveEventCleanupJob < ApplicationJob
queue_as :default
def perform(event_id:)
puts "cleaning up after the end of live event with id: #{event_id}"
SolidQueue::RecurringTask.where(key: "LiveEventPollingEvent#{event_id}").destroy_all
# ...
# Restart the Scheduler to pick up dynamic recurring task changes
SolidQueue::Process.where(kind: "Scheduler").all.map(&:deregister)
end
end
So my hacks aside, I think SolidQueue is very close to supporting dynamic recurring tasks, at least for the my use case. Thank you SolidQueue team.