connect icon indicating copy to clipboard operation
connect copied to clipboard

Buffer with GCP pubsub with order key

Open AyWa opened this issue 3 years ago • 0 comments

Description

This issues is a follow up of a slack discussion

When GCP topic is configured with ordering key and GCP subscription with ordering enable, buffer can not behave correctly because gcp will not "send" new message until all the previous message are ACK.

How to reproduce

  • configure a GCP topic with ordering key and gcp subscription with ordering enable
  • start benthos with a buffer
  • start a code sample that generate 1 message per second for an ordering key
  • The output will have some missing message:
    • reason is after we receive some message, gcp will not send the new message because it is waiting for previous one to be ACK.

Example of benthos config

input:
  label: ""
  gcp_pubsub:
    project: XX
    subscription: XXX
    max_outstanding_messages: 10000000

buffer:
  system_window:
    timestamp_mapping: root = this.timestamp.number()
    size: 10s
    allowed_lateness: 9s

pipeline:
  threads: 10
  processors:
    - group_by_value:
        value: '${! json("device_uid") }'
    - bloblang: |
        root =  if batch_index() == 0 { 
          {
            "device_id": this.device_uid,
            "created_at": this.timestamp,
            "items_processed": batch_size()
          }
        } else { deleted() }

output:
  label: ""
  stdout:
    codec: lines

Possible fix

Overall it is not a problem of benthos. This is how gcp is designed. However Benthos could provide some option like:

  • option1: ack_on_read for buffer (with warning that we might lost benthos guarantee)
  • other ?

AyWa avatar May 23 '22 09:05 AyWa