pinot icon indicating copy to clipboard operation
pinot copied to clipboard

Pauseless Consumption

Open sajjad-moradi opened this issue 3 years ago • 16 comments

Background

Currently when commit protocol starts for a partition, the realtime server stops consumption and till the new consuming->online helix message for the next consuming segment arrives, messages are not read from the stream. Depending on the segment size, the period in which there's no consumption can take something from few seconds to a several minutes (and in some cases tens of minutes because of hitting concurrent segment build threshold on the servers). This leads to data freshness issues specially for high ingesting use cases.

Proposal

A simple solution to prevent these consumption pauses is to spin off a temporary consuming segment as soon as the segment commit starts. This temporary segment is a supplement to the main segment that is committing. There won't be any changes in Ideal State and External View.

Query execution

To make the ingested data in the temporary segment available for query execution, Server Query Executor sends the incoming query to the main segment as well as the temporary segment if there is one. Both segments execute the query and return the results.

Next Offline to Consuming Transition

After the main segment is committed and the next consuming segment is created during handling of offline->consuming helix transition message, the data in the temporary segment can be used to bootstrap the new consuming segment. Once the mutable segment is created for the next consuming segment, and before the consume loop starts, the data in the temporary segment can be read and inserted into the new mutable segment. After the data is copied over, the temporary segment can be deleted, and the consume loop in the new consuming segment starts from the offset of the last copied message.

Configurations

  • We can add a new boolean flag to the Stream Config properties to enable/disable pause-less consumption. We can use something like pauseless.consumption.enabled.
  • For the size of the temporary segment, we can go with a default value of 20% of the main segment size. This ratio can be configured as a stream config property, something like pauseless.consumption.temporary.segment.size.ratio.
  • We also need to set a timeout for temporary consumption in case something is wrong and the helix transition message for the next consuming segment doesn't arrive to the server. We can have a default 10min value and also a new property in stream config to override that, something like pauseless.consumption.timeout.

sajjad-moradi avatar Jan 19 '23 04:01 sajjad-moradi

CC: @mcvsubbu @npawar @Jackie-Jiang

sajjad-moradi avatar Jan 19 '23 04:01 sajjad-moradi

Have you considered using the regular state transition to achieve this? Essentially we can create the new consuming segment in IdealState when the commit starts. Nothing needs to be changed in the routing or state transition handling. If the commit end didn't arrive or failed, we can remove the new consuming segment from the IdealState. I feel this approach is more natural and easier to implement

Jackie-Jiang avatar Jan 20 '23 00:01 Jackie-Jiang

How many rows would the new consuming segment be setup to consume? (Currently that is decided when the prev segment is committed).

mcvsubbu avatar Jan 20 '23 01:01 mcvsubbu

Thanks for starting this @sajjad-moradi ! Looking forward to this. Couple of questions:

  1. Server Query Executor sends the incoming query to the main segment as well as the temporary segment if there is one - Could you elaborate on this? How will we send the query to temporary segment, if there's no record of it in ideal state/external view, given the EV is what drives all the routing logic? I'm wondering if that would end up looking like a lot of if-else in the query execution at consuming segment level, as we won't have any info about this segment in the planner/executor etc.
  2. What Jackie suggested, was also what came to my mind instantly when I read the problem statement. It does seema. lot more natural and a clean extension, and will reduce the number of special cases we'd have to think about. We can still set it up to consume 20% of the main segment size if that's what is deemed best, and update it when we have that definitive number. Or we go with the previous segment's size, and make rows adjustments with 1 segment delay.

npawar avatar Jan 26 '23 03:01 npawar

+1 on this! It's awesome to see this feature is getting tackled 👍

@sajjad-moradi What's the reasoning behind of not touching external view / idealstate for this temp segment? I also think that having hidden temp segments may require us to handle a lot of edge case scenarios. Would you elaborate the concerns/challenges for using the externalview/idealstate state transitions?

snleee avatar Jan 26 '23 09:01 snleee

  1. Server Query Executor sends the incoming query to the main segment as well as the temporary segment if there is one - Could you elaborate on this? How will we send the query to temporary segment, if there's no record of it in ideal state/external view, given the EV is what drives all the routing logic? I'm wondering if that would end up looking like a lot of if-else in the query execution at consuming segment level, as we won't have any info about this segment in the planner/executor etc.

I was thinking of using a PauselessConsumptionManager that knows which consuming segments have kicked off their corresponding temporary segments. Server Query Executor simply passed in the list of segmentsToQuery to the pauseless consumption manager and that returns the list of temporary segments to be added to segmentsToQuery list. So there's not a lot of if-elsees.

That being said, I agree that Jackie's suggestion is 1) more explicit and 2) it also has the benefit that if the next consuming segment is going to be assigned to a different server, the data in temporary segment of the current server doesn't need to be thrown away.

If we want to take that route, we need to think of how to address the followings:

  1. final number of rows for the new segment
  2. segment commit failure

For the first one, we can start the next segment (les't call it S2) with the same size of the existing one (S1). When S1 commits successfully, we can calculate the proper size for S2 and if the new size is within a threshold (something like 5%) of the size of S1, we can assume S2 size is good enough so we leave it as is. If the new size is not within the 5% threshold, then we send a helix transition message from Controller to the servers hosting S2 to create a new mutable segment with the new size and copy over the data.

For the second problem, I think we need to change the commit protocol a bit, because currently we assume if there's commit failure, segments which have been on hold will simply start over. With pauseless consumption, since the new segment (S2) already has started consumption with a specific start offset, we need to make sure the commit protocol for S1 ends with the same offset.

sajjad-moradi avatar Jan 27 '23 00:01 sajjad-moradi

Changing the idealstate is a much more complex solution, and will have the number of rows to consume as an unresolved point. Today, we compute the new number of rows based on the old number of rows consumed, at the time the commit happens. Further, the computation is done on the controller (for a good reason, and it should be there) and not on the server.

Our proposal for pauseless consumption is a much simpler one, no fundamental changes, and we can fine tune it as needed. Completely local to the server.

We can keep the "daughter" segment inside the LLRealtimeSegmentDataManager object. Whenever queries come in, the other segment is also consulted to return the results. So, the view from rest of pinot will be that a segment in the CONSUMING state has not paused consumption during segment creation.

mcvsubbu avatar Jan 27 '23 01:01 mcvsubbu

Reviving this.

I think we should go with optimistic approach as Jackie suggested. One simplification would be to enable pauseless consumption feature only in case of commit happening because of reaching row threshold. This will always ensure that the ending offset is the same on all replicas. There is no harm in going ahead with next segment immediately in this case.

The edge case happens when the commit is decided based on time because each replica can commit at different offset. To avoid all edge cases, we can simply fallback to current behavior in this case.

Thoughts?

kishoreg avatar Jun 21 '24 05:06 kishoreg

IMO adding a new segment on the same server is very localized change and easy to implement. I strongly suggest we enter the feature with that approach, and then branch over to more complicated approaches as needed.

mcvsubbu avatar Jun 22 '24 17:06 mcvsubbu

local decisions are very hard to reason about when things go wrong. Maybe we should all sync up and decide the approach. We can post the summary here.

My vote would be pick something that is right vs easy.

kishoreg avatar Jun 22 '24 20:06 kishoreg

Yes, syncing up sounds good.

mcvsubbu avatar Jun 26 '24 03:06 mcvsubbu

We sync'd on this offline, and here are the some conclusions:

  • When segment commit happens (end offset of the consuming segment is determined), controller can create the ZKMetadata for the next consuming segment, and use commit protocol to ask servers to start consuming the next segment.
    • Controller should try to assign the next consuming segment, and only ask servers to start consuming when it is assigned to the same servers. This can avoid duplicate consuming segment during rebalance.
    • Note that controller won't change the ideal state at this moment because we want to avoid having 2 CONSUMING segments on the same partition. If we allow that, it can cause problem when server crashed restarted because it could skip one CONSUMING segment.
  • Controller updates ideal state the same way as current code when commit end (change consuming segment to ONLINE and add new consuming segment as CONSUMING)
  • Server creates the new consuming segment when informed by controller via the commit protocol, and associate it with the last consuming segment for query purpose
  • When server gets the state transition from OFFLINE -> CONSUMING on the new consuming segment, it moves the new consuming segment as the regular consuming segment. It can remove the association of new consuming segment and last consuming segment, and broker should be able to directly query the new consuming segment as optional (introduced in #11978).
  • RealtimeSegmentValidationManager should be modified to handle cleaning up the orphan ZKMetadata when segment commit failed

Jackie-Jiang avatar Jun 28 '24 23:06 Jackie-Jiang

How will broker query the new segment if it's not part of IS/EV?

kishoreg avatar Jun 29 '24 03:06 kishoreg

That part remains as what was originally proposed in the issue description. Basically Server Query Executor sends the incoming query to the main segment as well as the temporary segment. Both segments execute the query and return the results.

sajjad-moradi avatar Jun 30 '24 21:06 sajjad-moradi

since we are already updating the zkmetadata, why not change the idealstate of current consuming segment to ONLINE and create a new segment in consuming? The server code currently handles consuming -> online transition anyway rt

kishoreg avatar Jun 30 '24 22:06 kishoreg

Consuming segment cannot be changed to online unless one replica builds the segment. Using the proposed approach, controller creates the new segment zk metadata before replicas start building the segments. That way the next consuming segment starts on servers right before they start building the previous one.

sajjad-moradi avatar Jul 01 '24 04:07 sajjad-moradi

Hi everyone,

I’ve put together a design document for addressing (https://github.com/apache/pinot/issues/10147), which outlines the problem, proposed solution, and expected impact on the system. I’d appreciate any comments or feedback on the approach.

You can find the document here: https://docs.google.com/document/d/1d-xttk7sXFIOqfyZvYw5W_KeGS6Ztmi8eYevBcCrT_c/edit?usp=sharing

Thanks in advance for your input!.

cc: @mcvsubbu @sajjad-moradi @Jackie-Jiang @npawar @swaminathanmanish @KKcorps

9aman avatar Nov 08 '24 11:11 9aman

@9aman please refer to the comment https://github.com/apache/pinot/issues/10147#issuecomment-2197768243 in this issue. We should not be updating the idealstate before the segment has completed build and is present in the deep store.

mcvsubbu avatar Nov 11 '24 19:11 mcvsubbu

@mcvsubbu

  1. In ZooKeeper, segment metadata will use COMMITTING as the status for segments being built, distinguishing them from DONE segments that are already in deep store. Segment ZK metadata will act as the source of truth.
  2. @Jackie-Jiang supports the new approach of marking the IdealState as ONLINE. 2.1. As before, the external view will help us know the current state of the system and the segment will still be CONSUMING in EV

9aman avatar Nov 12 '24 13:11 9aman

@mcvsubbu

1. In ZooKeeper, segment metadata will use **COMMITTING** as the status for segments being built, distinguishing them from **DONE** segments that are already in deep store. Segment ZK metadata will act as the source of truth.

2. @Jackie-Jiang supports the new approach of marking the IdealState as ONLINE.
   2.1. As before,  the external view will help us know the current state of the system and the segment will still be CONSUMING in EV

Consider the scenario when there are two consuming segments on the same stream partition, and a pinot server is restarted. It will get state transitions for two segments (in any order). I believe we have protection against starting two consumers on the same partition, so only one of them can start to consume. Even if that protection is not there, the server may not have capacity to handle two consumers for one partition (and this may happen for several partitions on the consumer, since partitions generally complete around the same time).

Worse, if the server that is restarted is the one that is committing (server-2 in your diagram), I am not sure what the recovery path is.

Can you draw ladder diagrams for these scenarios so that we can understand the proposal better? Thanks

mcvsubbu avatar Nov 12 '24 17:11 mcvsubbu

A couple of weeks ago, I implemented most of the required changes for this feature based on the design outlined in this issue. A few months ago, Jackie, Subbu, and I had an in-person meeting and reached an agreement on this design. Please look at the Draft PR, and let me know what you think.

sajjad-moradi avatar Nov 13 '24 17:11 sajjad-moradi

@mcvsubbu @sajjad-moradi it would be better if we synced up offline on this. Can you let us know your availability in the OSS slack #feat-pausless-consumption channel

KKcorps avatar Nov 14 '24 03:11 KKcorps

Attaching the relevant docs and PR's for the issue:

Doc: https://docs.google.com/document/d/1d-xttk7sXFIOqfyZvYw5W_KeGS6Ztmi8eYevBcCrT_c/edit?tab=t.0 https://docs.google.com/document/d/1xq5wTGiDDM9FHnyEmOqJbWEBNmVNPRV72Q65oYvHI08/edit?usp=sharing

PR:

https://github.com/apache/pinot/pull/14741 https://github.com/apache/pinot/pull/14798 https://github.com/apache/pinot/pull/14920

9aman avatar Mar 19 '25 04:03 9aman