connect
connect copied to clipboard
Buffer with GCP pubsub with order key
Description
This issues is a follow up of a slack discussion
When GCP topic is configured with ordering key and GCP subscription with ordering enable, buffer can not behave correctly because gcp will not "send" new message until all the previous message are ACK.
How to reproduce
- configure a GCP topic with ordering key and gcp subscription with ordering enable
- start benthos with a buffer
- start a code sample that generate 1 message per second for an ordering key
- The output will have some missing message:
- reason is after we receive some message, gcp will not send the new message because it is waiting for previous one to be ACK.
Example of benthos config
input:
label: ""
gcp_pubsub:
project: XX
subscription: XXX
max_outstanding_messages: 10000000
buffer:
system_window:
timestamp_mapping: root = this.timestamp.number()
size: 10s
allowed_lateness: 9s
pipeline:
threads: 10
processors:
- group_by_value:
value: '${! json("device_uid") }'
- bloblang: |
root = if batch_index() == 0 {
{
"device_id": this.device_uid,
"created_at": this.timestamp,
"items_processed": batch_size()
}
} else { deleted() }
output:
label: ""
stdout:
codec: lines
Possible fix
Overall it is not a problem of benthos. This is how gcp is designed. However Benthos could provide some option like:
- option1:
ack_on_readfor buffer (with warning that we might lost benthos guarantee) - other ?