pinot icon indicating copy to clipboard operation
pinot copied to clipboard

added a default groupid in kafka consumer

Open tanmesh opened this issue 3 years ago • 14 comments

Description: This PR adds a default groupid in kafka consumer

This PR will solve the following issue -- #9183

Testing: Added KafkaStreamLevelStreamConfigTest.java to ensure that default value is added to groupId.

tanmesh avatar Aug 21 '22 18:08 tanmesh

Codecov Report

Merging #9260 (77d0cc7) into master (7197461) will increase coverage by 32.65%. The diff coverage is 64.28%.

@@              Coverage Diff              @@
##             master    #9260       +/-   ##
=============================================
+ Coverage     37.14%   69.79%   +32.65%     
- Complexity      193     4788     +4595     
=============================================
  Files          1838     1889       +51     
  Lines         97516   100851     +3335     
  Branches      14711    15328      +617     
=============================================
+ Hits          36222    70392    +34170     
+ Misses        58360    25486    -32874     
- Partials       2934     4973     +2039     
Flag Coverage Δ
integration1 25.94% <50.00%> (-0.38%) :arrow_down:
integration2 24.78% <50.00%> (+0.01%) :arrow_up:
unittests1 67.08% <ø> (?)
unittests2 15.36% <35.71%> (+0.09%) :arrow_up:

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...n/stream/kafka20/KafkaStreamLevelStreamConfig.java 70.83% <61.53%> (+2.08%) :arrow_up:
...tream/kafka20/KafkaStreamLevelConsumerManager.java 61.62% <100.00%> (ø)
...a/org/apache/pinot/query/parser/QueryRewriter.java 0.00% <0.00%> (-100.00%) :arrow_down:
...apache/pinot/broker/api/HttpRequesterIdentity.java 28.57% <0.00%> (-57.15%) :arrow_down:
...org/apache/pinot/broker/api/RequesterIdentity.java 50.00% <0.00%> (-50.00%) :arrow_down:
...re/query/reduce/SelectionOnlyStreamingReducer.java 52.94% <0.00%> (-28.02%) :arrow_down:
...r/requesthandler/BrokerRequestHandlerDelegate.java 61.29% <0.00%> (-19.67%) :arrow_down:
...tream/kafka20/server/KafkaDataServerStartable.java 79.16% <0.00%> (-17.71%) :arrow_down:
...pinot/broker/api/resources/PinotClientRequest.java 43.28% <0.00%> (-15.05%) :arrow_down:
...requesthandler/MultiStageBrokerRequestHandler.java 59.15% <0.00%> (-14.88%) :arrow_down:
... and 1211 more

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

codecov-commenter avatar Aug 24 '22 00:08 codecov-commenter

Please reformat the changes with Pinot Style

@navina Can you please help review the PR? To keep the existing behavior, should we put empty string as the group id if it is missing?

I agree that the empty string will match the existing behavior. But it doesn't provide any value to the application. if you have more than one kafka table, they will all share the same group id, which in my opinion, is a bug or incorrect implementation. I think we can use the table name as the group id and prefix it with stream-level or partition-level based on the consumption model used. For existing applications that may rely on an empty string group id, they can always add a table config to override the group.id kafka consumer property. Wdyt @Jackie-Jiang ?

navina avatar Aug 24 '22 13:08 navina

I agree that the empty string will match the existing behavior. But it doesn't provide any value to the application. if you have more than one kafka table, they will all share the same group id, which in my opinion, is a bug or incorrect implementation. I think we can use the table name as the group id and prefix it with stream-level or partition-level based on the consumption model used. For existing applications that may rely on an empty string group id, they can always add a table config to override the group.id kafka consumer property. Wdyt @Jackie-Jiang ?

Does groupId apply to LLC consumer? I checked the usage of StreamConfig.getGroupId() and I think we already handled the case of missing group id (in PinotTableIdealStateBuilder.getGroupIdFromRealtimeDataTable()). @navina In which scenario are we getting the null group id?

Jackie-Jiang avatar Sep 14 '22 22:09 Jackie-Jiang

Tks for contributing this feature. Can you please extend this feature to partition-level consumer as well?

Hey @navina , I was looking into partition-level consume but I was unable to find the groupid. Can you please point me to code?

tanmesh avatar Sep 15 '22 16:09 tanmesh

Does groupId apply to LLC consumer? I checked the usage of StreamConfig.getGroupId() and I think we already handled the case of missing group id (in PinotTableIdealStateBuilder.getGroupIdFromRealtimeDataTable()). @navina In which scenario are we getting the null group id?

I was not aware of the PinotTableIdealStateBuilder.getGroupIdFromRealtimeDataTable() usage. But when the kafka consumer client config doesn't contain group.id, the kafka consumer created ends up using null as group id. Previously, it would default to empty string. This is breaking monitoring tools for existing users. Ideally, LLC consumer shouldn't care about group id. However, it looks like some users are relying having a non-null group.id for the sake of monitoring. See https://apache-pinot.slack.com/archives/C011C9JHN7R/p1657815069074379?thread_ts=1657740174.861419&cid=C011C9JHN7R for more context.

navina avatar Sep 15 '22 17:09 navina

Hey @navina , I was looking into partition-level consume but I was unable to find the groupid. Can you please point me to code?

As of today, we don't set any groupId explicitly in KafkaPartitionLevelStreamConfig.java. So, the default in the partition level consumer ends-up being empty string and with newer kafka version, it is null. You can set group.id explicitly in KafkaPartitionLevelStreamConfig.java.

navina avatar Sep 15 '22 18:09 navina

@navina Hey, I have added the changes in the new commit. Can you please review it when you are free?

tanmesh avatar Sep 20 '22 00:09 tanmesh

As of today, we don't set any groupId explicitly in KafkaPartitionLevelStreamConfig.java. So, the default in the partition level consumer ends-up being empty string and with newer kafka version, it is null. You can set group.id explicitly in KafkaPartitionLevelStreamConfig.java.

Also, I am unable to understand the above comment. Is group.id will be set similar to what is done here . If yes, can you please point me to the document for better understand of how pinot kafka works.

tanmesh avatar Sep 20 '22 00:09 tanmesh

Error: Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 247.869 s <<< FAILURE! - in org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testAddHLCTableShouldFail Time elapsed: 0.022 s Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testConsumerDirectoryExists Time elapsed: 0.001 s Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testHardcodedServerPartitionedSqlQueries Time elapsed: 1.119 s Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testInvertedIndexTriggering Time elapsed: 3.953 s Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testReload Time elapsed: 6.708 s Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testReset Time elapsed: 5.528 s Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testSegmentFlushSize Time elapsed: 0.005 s Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testDictionaryBasedQueries Time elapsed: 0.133 s <<< FAILURE!

Integration test is failing due to this error ^ with the error message --

kafka.common.StateChangeFailedException: Failed to elect leader for partition LLCRealtimeClusterIntegrationTest-1 under strategy OfflinePartitionLeaderElectionStrategy(false).

Pointer to error is here . Any suggestion to resolve this?

tanmesh avatar Sep 20 '22 22:09 tanmesh

Also, while local testing, it is unable to connect zookeeper even though zookeeper is running . I have followed this doc for the local setup. I am getting --

java.lang.InterruptedException: null

WARN [ZKUtil] [main] Invalid cluster setup for cluster: zookeeper, missing znode path: /zookeeper/IDEALSTATES

WARN [ConfigAccessor] [main] No config found at /RealtimeClusterIntegrationTest/CONFIGS/RESOURCE/leadControllerResource.

Am I missing any configuration?

tanmesh avatar Sep 20 '22 22:09 tanmesh

@navina @Jackie-Jiang May I get some eyes on this please.

tanmesh avatar Sep 22 '22 23:09 tanmesh

I think this change will break the current topic partition assignment and replication behaviour by defaulting back to the Kafka clients implementation instead of Pinot's implementation.

Kafka client will assign partitions across consumer instances and stops a partition from being consumed by 2 or more processes if the group id is the same.

Please test a multi partition topic with replication enabled.

If the issue here is monitoring lags I believe there is a rest endpoint in swagger to give the consumer offsets as well as the last consuming time stamp in the query results.

pjpringle avatar Oct 16 '22 02:10 pjpringle

@tanmesh sorry about the delay. I was able to verify your patch locally and the integration tests seem to pass now. I think it was some transient failure.

I think your PR looks good. But its not clear why this change would work with high level consumer implementation. if there is >1 replica, wouldn't we need 3 different group ids ? Which is what happens in pinot controller's PinotTableIdealStateBuilder.getGroupIdFromRealtimeDataTable() which attaches the replica id to the group id. So, I am not convinced that this patch will fix the issue with monitoring. Thoughts?

navina avatar Oct 17 '22 08:10 navina

Please test a multi partition topic with replication enabled.

yeah. this patch will break with replication. But what I am unsure about is how this was working before for the monitoring tools? Whether the group.id is null or empty, as long as it doesn't include the replica id, it will not be accurate.

If the issue here is monitoring lags I believe there is a rest endpoint in swagger to give the consumer offsets as well as the last consuming time stamp in the query results.

Pinot doesn't have a good abstraction for input partitioning/partition grouping. Once we have that, I think we can customize the connector to use Pinot's grouping model and not the connector's model. Thoughts?

navina avatar Oct 17 '22 08:10 navina