[TRI-1081] Feature idea: Batch Event Trigger
It would be useful to have a "batch" event trigger, where you could set the frequency that the batched events would be flushed:
batchEventTrigger({ name: "batched.event", schema: z.object({ id: z.string() }), flushEvery: 60 })
Then whenever you sent an event with an ID, instead of immediately triggering runs with the payload it will get added to a batch. If more events with the same ID are sent within the 60 seconds of the first one, those payloads will get appended.
Then, when the run is triggered the payload in the run function becomes an array of all the event payloads.
Could also maybe do it with a cron expression
The more I think about it the more I feel like maybe this can be done without a new trigger type and just using the eventTrigger and sendEvent.
Ideally we could just add options to sendEvent like so:
await client.sendEvent({ id: "event123", name: "new.message", payload: { message: "Hey there!" }}, { batchKey: "user123", deliverAfter: 60 })
await client.sendEvent({ id: "event456", name: "new.message", payload: { message: "It's me!" }}, { batchKey: "user123", deliverAfter: 60 })
await client.sendEvent({ id: "event789", name: "new.message", payload: { message: "Where are you?" }}, { batchKey: "user123", deliverAfter: 60 })
If all three of those events were sent within 60 seconds of the first one, then they would all trigger a single run of an eventTrigger with the batch option set to true.
client.defineJob({
id: "batched-event-example",
name: "Batched Event Example",
version: "1.0.0",
trigger: eventTrigger({
name: "new.message",
schema: z.object({ message: z.string() }),
batch: true,
}),
run: async (payload, io, ctx) => {
// payload is typed as Array<{ message: string }>
},
});
One other thing we'd need to consider is the TriggerContext["event"] type might need to change because a single run can now be caused by more than 1 event:
export interface TriggerContext {
/** Job metadata */
job: { id: string; version: string };
/** Environment metadata */
environment: { slug: string; id: string; type: RuntimeEnvironmentType };
/** Organization metadata */
organization: { slug: string; id: string; title: string };
/** Run metadata */
run: { id: string; isTest: boolean; startedAt: Date; isRetry: boolean };
/** Event metadata */
event: { id: string; name: string; context: any; timestamp: Date };
/** Source metadata */
source?: { id: string; metadata?: any };
/** Account metadata */
account?: { id: string; metadata?: any };
}
If you wanted to flush the events at a specific time, you would just use deliverAt instead of deliverAfter:
await client.sendEvent({ id: "event123", name: "new.message", payload: { message: "Hey there!" }}, { batchKey: "user123", deliverAt: new Date(2023, 11, 25, 10, 30, 0) })
await client.sendEvent({ id: "event456", name: "new.message", payload: { message: "It's me!" }}, { batchKey: "user123", deliverAt: new Date(2023, 11, 25, 10, 30, 0) })
await client.sendEvent({ id: "event789", name: "new.message", payload: { message: "Where are you?" }}, { batchKey: "user123", deliverAt: new Date(2023, 11, 25, 10, 30, 0) })
Hey @ericallam this makes a lot of sense. I'll love to work on this but I have a few questions.
- How would
client.cancelEvent()work now, since I think we would be creating a new public function in the zodWorker Class for this? - Also, I'll love to know how you will go about this, your own approach implementing this?
My approach.
-
when the
batchKeyis present in a sent event I'll create the event with a correspondingbatchKeyandexpiresAtcolumn. So I'll add new db columns for the EventRecords Table. -
Create a new public function in the zodWorker class
enqueueBatchedEventsIt'll have theeventIdas an identifier and then the payload would have the batch key. At thedeliveredAttime I'll do a query to get all the events with the samebatchKeyandexpiredAtcolumn and deliver them together. -
Then I'll update the triggerContext["event"] interface to reflect the new changes. it would be something like this :
event: Array<{ id: string; name: string; context: any; timestamp: Date }> | { id: string; name: string; context: any; timestamp: Date; };I'll really love to know your thoughts on this. Thanks.
I'll write up something shortly 👍
Have you seen how Inngest handle this?
https://www.inngest.com/docs/guides/batching
@DaleWebb we have a policy not to copy our competitors so we won't be clicking on that link haha. We want to hear from users and build stuff that they want (and we're users as well so we get to build the features that we want too).
Here's how I think it could be implemented on the backend
- Add a
enqueueBatchedpublic method to ZodWorker that would have a very similar interface toenqueuebut it would require ajobKeyparam - Add an
batchedoption toZodTaskswhich would take a boolean. Ifbatchedis set to true, then the handler payload type will beArray<z.infer<TConsumerSchema[K]>>instead of justz.infer<TConsumerSchema[K]> - In IngestSendEvent, if the
batchKeyoption is set, then instead of enqueueingdeliverEvent, we shouldenqueueBatchedto a new task calleddeliverBatchedEvent. ThejobKeyoption should be${environment.id}:${batchKey}. ThejobKeyModeshould be set topreserve_run_at, and therunAtoption should be theevent.deliverAtlike it is now. - The
deliverBatchedEventtask should call a service called DeliverBatchedEvent
There's more obviously but this is the basic architecture design for handling this.
@ericallam Thanks.
Another thing I didn't mention was there would probably need to be changes to ZodWorker.#handleMessage to be able to handle batched jobs
This feature will need to be done after we do https://github.com/triggerdotdev/trigger.dev/issues/396, as it requires the "batch jobs" feature in graphile worker.
Some database changes:
- Add payload and context to the JobRun
- Have many EventRecords associated with a JobRun (currently it's just one)
Anything that wants the payload data would use the JobRun directly, not associated EventRecords.