pinot icon indicating copy to clipboard operation
pinot copied to clipboard

More control of realtime segments - topic partitions offsets

Open johighley opened this issue 4 years ago • 7 comments

We had some kind of issue with a Kafka topic such that the most recent offsets changed (to lower values). The realtime table is not processing any new messages to the topic now. The consumers seem to recognize what the most recent offset is, but ignore them. The following log messages repeat. If I create a new table for the same topic, it will process new messages without issue.

Short of deleting the table and re-creating, there's no way to tell Pinot to start reading from a different offset. Realtime segments seem excessively coupled to their topic partitions. A catastrophic Kafka failure resulting a a re-built Kafka server would require any attached Pinot tables to be deleted and re-built also.

I need some way to bypass the consuming segments' saved commits, offsets, etc and force it to start reading a topic partition from a specified offset. User would have to accept risks like duplicate events. Otherwise, a production issue with Kafka could require deleting Pinot tables with years of data and millions of rows.

2021/06/07 15:24:27.918 INFO [Fetcher] [agent_daily__2__2__20210605T0819Z] [Consumer clientId=consumer-71, groupId=] Fetch offset 22 is out of range for partition agent_daily-2, resetting offset 2021/06/07 15:24:27.919 INFO [Fetcher] [agent_daily__2__2__20210605T0819Z] [Consumer clientId=consumer-71, groupId=] Resetting offset for partition agent_daily-2 to offset 5. 2021/06/07 15:24:27.938 INFO [Fetcher] [agent_daily__1__2__20210605T0819Z] [Consumer clientId=consumer-73, groupId=] Fetch offset 20 is out of range for partition agent_daily-1, resetting offset 2021/06/07 15:24:27.939 INFO [Fetcher] [agent_daily__1__2__20210605T0819Z] [Consumer clientId=consumer-73, groupId=] Resetting offset for partition agent_daily-1 to offset 0. 2021/06/07 15:24:27.942 INFO [Fetcher] [agent_daily__0__2__20210605T0819Z] [Consumer clientId=consumer-72, groupId=] Fetch offset 24 is out of range for partition agent_daily-0, resetting offset 2021/06/07 15:24:27.943 INFO [Fetcher] [agent_daily__0__2__20210605T0819Z] [Consumer clientId=consumer-72, groupId=] Resetting offset for partition agent_daily-0 to offset 1.

2021/06/07 15:24:33.018 INFO [Fetcher] [agent_daily__2__2__20210605T0819Z] [Consumer clientId=consumer-71, groupId=] Fetch offset 22 is out of range for partition agent_daily-2, resetting offset 2021/06/07 15:24:33.018 INFO [Fetcher] [agent_daily__2__2__20210605T0819Z] [Consumer clientId=consumer-71, groupId=] Resetting offset for partition agent_daily-2 to offset 5. 2021/06/07 15:24:33.038 INFO [Fetcher] [agent_daily__1__2__20210605T0819Z] [Consumer clientId=consumer-73, groupId=] Fetch offset 20 is out of range for partition agent_daily-1, resetting offset 2021/06/07 15:24:33.039 INFO [Fetcher] [agent_daily__1__2__20210605T0819Z] [Consumer clientId=consumer-73, groupId=] Resetting offset for partition agent_daily-1 to offset 0. 2021/06/07 15:24:33.042 INFO [Fetcher] [agent_daily__0__2__20210605T0819Z] [Consumer clientId=consumer-72, groupId=] Fetch offset 24 is out of range for partition agent_daily-0, resetting offset 2021/06/07 15:24:33.043 INFO [Fetcher] [agent_daily__0__2__20210605T0819Z] [Consumer clientId=consumer-72, groupId=] Resetting offset for partition agent_daily-0 to offset 1.

johighley avatar Jun 09 '21 18:06 johighley

@yupeng9

xiangfu0 avatar Oct 19 '21 18:10 xiangfu0

We also ran into this issue in some scenarios:

  • A topic is recreated and the offset needs to be reset
  • User needs to change the Kafka cluster of the topic, which has different offset mappings

Currently the Kafka consumer in Pinot keeps resetting the offset, because the starting offset is determined by the previous segment. It'll be good to have a robust way to adapt the high/low watermark change on the topic.

yupeng9 avatar Oct 19 '21 21:10 yupeng9

We also ran into this issue in some scenarios:

  • A topic is recreated and the offset needs to be reset
  • User needs to change the Kafka cluster of the topic, which has different offset mappings

Currently the Kafka consumer in Pinot keeps resetting the offset, because the starting offset is determined by the previous segment. It'll be good to have a robust way to adapt the high/low watermark change on the topic.

+1 on handling those scenarios in a more robust way.

I feel we first need some tooling to at least handle the cases by operators, e.g. reset kafka offsets for all the consuming segments to earliest/latest.

Then we can think what to do for automation, especially things like shall old data be kept? shall reset offset to earliest or latest etc. For both scenarios you mentioned, I feel the old data should be deleted, but for 1, Pinot should consume from earliest offset, for 2, Pinot should just consume from based on user's offset reset policy.

For 2, it's actually a table alter command, which potentially can trigger an event.

For 1, since no Pinot table changes here, ideally it's better to have human interventions. Considering whatever approach implemented should also apply for kinesis or pulsar.

xiangfu0 avatar Oct 19 '21 22:10 xiangfu0

Now that we can use pause and resume endpoints, we can:

  1. pause the table
  2. make stream config changes to point to different topic or cluster
  3. resume the table with option to consume from earliest/latest offset of the new stream

I believe that should address the requirements mentioned in this ticket. @johighley @yupeng9 @xiangfu0 what do you guys think?

sajjad-moradi avatar Aug 30 '22 23:08 sajjad-moradi

yes, I believe it can address the issue. Thanks for the improvement!

yupeng9 avatar Aug 31 '22 02:08 yupeng9

You closed this 3 hours after tagging me for feedback. While resetting to the earliest or most recent offset is an improvement, setting a specific offset is needed for our original issue.

johighley avatar Aug 31 '22 02:08 johighley

I did not close the issue! Anyways, so the ideal scenario for your use case would be to provide the mapping of partition id -> start offset along with the resume request, right? May I ask how often you've encountered this issue?

sajjad-moradi avatar Aug 31 '22 03:08 sajjad-moradi

Another way may be to provide a utility to set offsets on a per-partition basis after pausing, and then restarting. May involve a bit of thinking here. Otherwise, the resume operation may end up being super-long, and there is a risk of missing out some partitions.

@johighley what is your ideal UX to do the operation you desire to do?

mcvsubbu avatar Aug 31 '22 17:08 mcvsubbu