Unexpected timing behavior for a daily periodic job
Following https://github.com/riverqueue/river/discussions/521#discussioncomment-10281093, I've implemented a job that I want to run at least once a day around a certain time like this:
// every 5 minutes from 6-7pm
schedule, err := cron.ParseStandard("CRON_TZ=America/Chicago */5 18 * * *")
if err != nil {
panic(err)
}
const uniquePeriod = 24 * time.Hour
return river.NewPeriodicJob(
schedule,
func() (river.JobArgs, *river.InsertOpts) {
return ExecJob{}, &river.InsertOpts{
UniqueOpts: river.UniqueOpts{
ByPeriod: uniquePeriod,
},
}
},
&river.PeriodicJobOpts{RunOnStart: true},
)
My assumption was that trying for the insert every 5 minutes for the whole hour would be enough to get a reliable once daily insertion, but on some days the job does not run. Even more confusingly, I am seeing some insertions that I don't think should be happening because of the uniqueness period:
The time between some of the created_ats is far less than 24 hours.
Am I doing something wrong here? Is River just the wrong tool if I want a reliable once daily cronjob?
I tried to reproduce the problem described here and put together something very similar to your code into a self-contained program (runs once every minute instead of run every five):
package main
import (
"context"
"fmt"
"log/slog"
"os"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/robfig/cron/v3"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)
type NothingUsefulArgs struct{}
func (NothingUsefulArgs) Kind() string { return "nothing_useful" }
type NothingUsefulWorker struct {
river.WorkerDefaults[NothingUsefulArgs]
}
func (w *NothingUsefulWorker) Work(ctx context.Context, job *river.Job[NothingUsefulArgs]) error {
fmt.Printf("working, but doing nothing useful\n")
return nil
}
func main() {
ctx := context.Background()
dbPool, err := pgxpool.New(ctx, "postgres://localhost:5432/river_dev")
if err != nil {
panic(err)
}
defer dbPool.Close()
workers := river.NewWorkers()
river.AddWorker(workers, &NothingUsefulWorker{})
// logger := slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelInfo})
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
schedule, err := cron.ParseStandard("CRON_TZ=America/Los_Angeles */1 22 * * *")
if err != nil {
panic(err)
}
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: logger,
PeriodicJobs: []*river.PeriodicJob{
river.NewPeriodicJob(
schedule,
func() (river.JobArgs, *river.InsertOpts) {
return NothingUsefulArgs{}, &river.InsertOpts{
UniqueOpts: river.UniqueOpts{
ByPeriod: 24 * time.Hour,
},
}
},
nil,
),
},
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
if err != nil {
panic(err)
}
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
defer func() {
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
}()
logger.Info("main.go: Started")
time.Sleep(1 * time.Hour)
}
When I add a little more logging to the periodic jog enqueuer, running this on the target cron hour does exactly what I'd expect it to do. You can see an initial job inserted and worked, then all insertions skipped thereafter:
$ go run main.go
time=2025-04-02T22:29:39.682-07:00 level=INFO msg="main.go: Started"
time=2025-04-02T22:29:39.682-07:00 level=INFO msg="River client started" client_id=proximl_local_2025_04_03T05_29_39_654361
time=2025-04-02T22:29:52.561-07:00 level=INFO msg="PeriodicJobEnqueuer: Starting"
time=2025-04-02T22:29:53.213-07:00 level=INFO msg="PeriodicJobEnqueuer: Next run in: 6.786561s"
time=2025-04-02T22:30:00.006-07:00 level=INFO msg="PeriodicJobEnqueuer: Inserted 'nothing_useful'; skipped as duplicate? false"
time=2025-04-02T22:30:00.007-07:00 level=INFO msg="PeriodicJobEnqueuer: Next run in: 59.992945s"
working, but doing nothing useful
time=2025-04-02T22:31:00.005-07:00 level=INFO msg="PeriodicJobEnqueuer: Inserted 'nothing_useful'; skipped as duplicate? true"
time=2025-04-02T22:31:00.007-07:00 level=INFO msg="PeriodicJobEnqueuer: Next run in: 59.992932s"
time=2025-04-02T22:32:00.002-07:00 level=INFO msg="PeriodicJobEnqueuer: Inserted 'nothing_useful'; skipped as duplicate? true"
time=2025-04-02T22:32:00.004-07:00 level=INFO msg="PeriodicJobEnqueuer: Next run in: 59.995515s"
Then forwarding to an hour outside of the target window, you can see nothing happening:
$ go run main.go
time=2025-04-02T22:32:21.401-07:00 level=INFO msg="main.go: Started"
time=2025-04-02T22:32:21.401-07:00 level=INFO msg="River client started" client_id=proximl_local_2025_04_03T05_32_21_376051
time=2025-04-02T22:32:22.401-07:00 level=INFO msg="PeriodicJobEnqueuer: Starting"
time=2025-04-02T22:32:22.961-07:00 level=INFO msg="PeriodicJobEnqueuer: Next run in: 27m37.038907s"
We'll need clearer evidence of a problem here before we can action further. It's just as possible that your binary was just not running during the cron hour on the missing day.
@brandur Thank you for the attempt. I'll do more digging later when I'm back at work. I may have to look at our River logging settings. For now all else I can say about our setup is that we are running several worker pods on Kubernetes.
Hello @dgunay and @brandur,
My colleagues and I have found this thread/issue because we have noticed the similar behaviour last night.
dailySegmentationSchedule, err := cron.ParseStandard("30 7 * * 1-6") // 7:30am Mon-Sat
if err != nil {
panic("Cannot parse cron schedule: " + err.Error())
}
return river.NewPeriodicJob(
dailySegmentationSchedule,
func() (river.JobArgs, *river.InsertOpts) {
args := dailysegmentationworkers.StartProcessWorkerArgs{
DryRun: false,
}
return args, &river.InsertOpts{
Queue: periodicQueue,
MaxAttempts: 1,
UniqueOpts: river.UniqueOpts{
ByPeriod: 24 * time.Hour,
},
}
},
&river.PeriodicJobOpts{RunOnStart: true},
)
We have also been curiously looking into what could cause this.
@alexanderenam In your case, it's most likely &river.PeriodicJobOpts{RunOnStart: true}. The job will be inserted on schedule, but this option also causes an insert attempt every time the program starts up.
@alexanderenam In your case, it's most likely
&river.PeriodicJobOpts{RunOnStart: true}. The job will be inserted on schedule, but this option also causes an insert attempt every time the program starts up.
Thank you for the follow-up @brandur !
Wouldn't it be the case that it shouldn't insert because it has a UniqueOpts prop of ByPeriod?
@alexanderenam I'm guessing what happened here is that your leader node changed a little after midnight UTC, and due to RunOnStart: true the new leader attempted to insert a periodic job when it started. The UniqueByPeriod: 24 * time.Hour setting here means that duplicate jobs can't exist on a given calendar day, which wasn't an issue here because no job yet existed on 2025-04-02, so the insert succeeded.
The schedule's next time isn't factored in when doing a RunOnStart insert. This is necessary when trying to ensure that a leadership change right at the moment of your schedule doesn't result in a skipped/missed job as the next scheduled time might be the following day if you just barely missed your scheduled insert. Unfortunately this results in a confusing outcome that isn't what you intended here when trying to ensure once daily execution at a specific offset rather than at i.e. midnight.
I think we'll need to think about the best way to solve this.
EDIT: reading comprehension fail - the periodic job enqueuer doesn't emit logs in the production build if I'm reading correctly
@brandur I'm not seeing any info logs prefixed by PeriodicJobEnqueuer: coming from our pods. Log level is set to LevelInfo. The periodic job scheduler is clearly doing something.
riverLogger := baseLogger.With().Str("logger.name", "river").Logger()
// ...
Logger: slog.New(slogzerolog.Option{
Level: slog.LevelInfo,
Logger: &riverLogger,
}.NewZerologHandler()),
I don't believe there's a remaining issue here so I'm going to close this one out. If you want to ensure consistent once-daily execution times, you have a few options:
- Use a cron-based schedule so the job runs at a consistent time every day regardless of how long a leader has been running.
- Option (1) is prone to missing the job if a leader switch happens right at the time of execution, so you can combine it with
RunOnStartandUniqueByPeriod: 24 * time.Hourto make it always attempt insertion when the leader changes but also dedupe by day so it never gets enqueued more than once in a day. - Use River Pro's Durable Periodic Jobs feature which leverages a database table to ensure consistent, reliable, durable periodic job timing across all scenarios.
In the end my team fixed the issue by running a k8s cronjob that ran a small program to insert the desired job.