trigger.dev icon indicating copy to clipboard operation
trigger.dev copied to clipboard

[TRI-1081] Feature idea: Batch Event Trigger

Open ericallam opened this issue 2 years ago • 12 comments

It would be useful to have a "batch" event trigger, where you could set the frequency that the batched events would be flushed:

batchEventTrigger({ name: "batched.event", schema: z.object({ id: z.string() }), flushEvery: 60 })

Then whenever you sent an event with an ID, instead of immediately triggering runs with the payload it will get added to a batch. If more events with the same ID are sent within the 60 seconds of the first one, those payloads will get appended.

Then, when the run is triggered the payload in the run function becomes an array of all the event payloads.

TRI-1081

ericallam avatar Aug 17 '23 10:08 ericallam

Could also maybe do it with a cron expression

ericallam avatar Aug 17 '23 11:08 ericallam

The more I think about it the more I feel like maybe this can be done without a new trigger type and just using the eventTrigger and sendEvent.

Ideally we could just add options to sendEvent like so:

await client.sendEvent({ id: "event123", name: "new.message", payload: { message: "Hey there!" }}, { batchKey: "user123", deliverAfter: 60 })
await client.sendEvent({ id: "event456", name: "new.message", payload: { message: "It's me!" }}, { batchKey: "user123", deliverAfter: 60 })
await client.sendEvent({ id: "event789", name: "new.message", payload: { message: "Where are you?" }}, { batchKey: "user123", deliverAfter: 60 })

If all three of those events were sent within 60 seconds of the first one, then they would all trigger a single run of an eventTrigger with the batch option set to true.

client.defineJob({
  id: "batched-event-example",
  name: "Batched Event Example",
  version: "1.0.0",
  trigger: eventTrigger({
    name: "new.message",
    schema: z.object({ message: z.string() }),
    batch: true,
  }),
  run: async (payload, io, ctx) => {
    // payload is typed as Array<{ message: string }>
  },
});

One other thing we'd need to consider is the TriggerContext["event"] type might need to change because a single run can now be caused by more than 1 event:

export interface TriggerContext {
  /** Job metadata */
  job: { id: string; version: string };
  /** Environment metadata */
  environment: { slug: string; id: string; type: RuntimeEnvironmentType };
  /** Organization metadata */
  organization: { slug: string; id: string; title: string };
  /** Run metadata */
  run: { id: string; isTest: boolean; startedAt: Date; isRetry: boolean };
  /** Event metadata */
  event: { id: string; name: string; context: any; timestamp: Date };
  /** Source metadata */
  source?: { id: string; metadata?: any };
  /** Account metadata */
  account?: { id: string; metadata?: any };
}

ericallam avatar Aug 21 '23 21:08 ericallam

If you wanted to flush the events at a specific time, you would just use deliverAt instead of deliverAfter:

await client.sendEvent({ id: "event123", name: "new.message", payload: { message: "Hey there!" }}, { batchKey: "user123", deliverAt: new Date(2023, 11, 25, 10, 30, 0) })
await client.sendEvent({ id: "event456", name: "new.message", payload: { message: "It's me!" }}, { batchKey: "user123", deliverAt: new Date(2023, 11, 25, 10, 30, 0) })
await client.sendEvent({ id: "event789", name: "new.message", payload: { message: "Where are you?" }}, { batchKey: "user123", deliverAt: new Date(2023, 11, 25, 10, 30, 0) })

ericallam avatar Aug 22 '23 15:08 ericallam

Hey @ericallam this makes a lot of sense. I'll love to work on this but I have a few questions.

  1. How would client.cancelEvent() work now, since I think we would be creating a new public function in the zodWorker Class for this?
  2. Also, I'll love to know how you will go about this, your own approach implementing this?

My approach.

  • when the batchKey is present in a sent event I'll create the event with a corresponding batchKey and expiresAt column. So I'll add new db columns for the EventRecords Table.

  • Create a new public function in the zodWorker class enqueueBatchedEvents It'll have the eventId as an identifier and then the payload would have the batch key. At the deliveredAt time I'll do a query to get all the events with the same batchKey and expiredAt column and deliver them together.

  • Then I'll update the triggerContext["event"] interface to reflect the new changes. it would be something like this : event: Array<{ id: string; name: string; context: any; timestamp: Date }> | { id: string; name: string; context: any; timestamp: Date; };

    I'll really love to know your thoughts on this. Thanks.

Chigala avatar Aug 22 '23 16:08 Chigala

I'll write up something shortly 👍

ericallam avatar Aug 22 '23 17:08 ericallam

Have you seen how Inngest handle this?

https://www.inngest.com/docs/guides/batching

DaleWebb avatar Aug 22 '23 21:08 DaleWebb

@DaleWebb we have a policy not to copy our competitors so we won't be clicking on that link haha. We want to hear from users and build stuff that they want (and we're users as well so we get to build the features that we want too).

ericallam avatar Aug 23 '23 05:08 ericallam

Here's how I think it could be implemented on the backend

  • Add a enqueueBatched public method to ZodWorker that would have a very similar interface to enqueue but it would require a jobKey param
  • Add an batched option to ZodTasks which would take a boolean. If batched is set to true, then the handler payload type will be Array<z.infer<TConsumerSchema[K]>> instead of just z.infer<TConsumerSchema[K]>
  • In IngestSendEvent, if the batchKey option is set, then instead of enqueueing deliverEvent, we should enqueueBatched to a new task called deliverBatchedEvent. The jobKey option should be ${environment.id}:${batchKey} . The jobKeyMode should be set to preserve_run_at, and the runAt option should be the event.deliverAt like it is now.
  • The deliverBatchedEvent task should call a service called DeliverBatchedEvent

There's more obviously but this is the basic architecture design for handling this.

ericallam avatar Aug 23 '23 09:08 ericallam

@ericallam Thanks.

Chigala avatar Aug 23 '23 09:08 Chigala

Another thing I didn't mention was there would probably need to be changes to ZodWorker.#handleMessage to be able to handle batched jobs

ericallam avatar Aug 23 '23 09:08 ericallam

This feature will need to be done after we do https://github.com/triggerdotdev/trigger.dev/issues/396, as it requires the "batch jobs" feature in graphile worker.

ericallam avatar Aug 25 '23 08:08 ericallam

Some database changes:

  • Add payload and context to the JobRun
  • Have many EventRecords associated with a JobRun (currently it's just one)

Anything that wants the payload data would use the JobRun directly, not associated EventRecords.

matt-aitken avatar Aug 25 '23 13:08 matt-aitken