added a default groupid in kafka consumer
Description: This PR adds a default groupid in kafka consumer
This PR will solve the following issue -- #9183
Testing: Added KafkaStreamLevelStreamConfigTest.java to ensure that default value is added to groupId.
Codecov Report
Merging #9260 (77d0cc7) into master (7197461) will increase coverage by
32.65%. The diff coverage is64.28%.
@@ Coverage Diff @@
## master #9260 +/- ##
=============================================
+ Coverage 37.14% 69.79% +32.65%
- Complexity 193 4788 +4595
=============================================
Files 1838 1889 +51
Lines 97516 100851 +3335
Branches 14711 15328 +617
=============================================
+ Hits 36222 70392 +34170
+ Misses 58360 25486 -32874
- Partials 2934 4973 +2039
| Flag | Coverage Δ | |
|---|---|---|
| integration1 | 25.94% <50.00%> (-0.38%) |
:arrow_down: |
| integration2 | 24.78% <50.00%> (+0.01%) |
:arrow_up: |
| unittests1 | 67.08% <ø> (?) |
|
| unittests2 | 15.36% <35.71%> (+0.09%) |
:arrow_up: |
Flags with carried forward coverage won't be shown. Click here to find out more.
| Impacted Files | Coverage Δ | |
|---|---|---|
| ...n/stream/kafka20/KafkaStreamLevelStreamConfig.java | 70.83% <61.53%> (+2.08%) |
:arrow_up: |
| ...tream/kafka20/KafkaStreamLevelConsumerManager.java | 61.62% <100.00%> (ø) |
|
| ...a/org/apache/pinot/query/parser/QueryRewriter.java | 0.00% <0.00%> (-100.00%) |
:arrow_down: |
| ...apache/pinot/broker/api/HttpRequesterIdentity.java | 28.57% <0.00%> (-57.15%) |
:arrow_down: |
| ...org/apache/pinot/broker/api/RequesterIdentity.java | 50.00% <0.00%> (-50.00%) |
:arrow_down: |
| ...re/query/reduce/SelectionOnlyStreamingReducer.java | 52.94% <0.00%> (-28.02%) |
:arrow_down: |
| ...r/requesthandler/BrokerRequestHandlerDelegate.java | 61.29% <0.00%> (-19.67%) |
:arrow_down: |
| ...tream/kafka20/server/KafkaDataServerStartable.java | 79.16% <0.00%> (-17.71%) |
:arrow_down: |
| ...pinot/broker/api/resources/PinotClientRequest.java | 43.28% <0.00%> (-15.05%) |
:arrow_down: |
| ...requesthandler/MultiStageBrokerRequestHandler.java | 59.15% <0.00%> (-14.88%) |
:arrow_down: |
| ... and 1211 more |
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
Please reformat the changes with Pinot Style
@navina Can you please help review the PR? To keep the existing behavior, should we put empty string as the group id if it is missing?
I agree that the empty string will match the existing behavior. But it doesn't provide any value to the application. if you have more than one kafka table, they will all share the same group id, which in my opinion, is a bug or incorrect implementation.
I think we can use the table name as the group id and prefix it with stream-level or partition-level based on the consumption model used. For existing applications that may rely on an empty string group id, they can always add a table config to override the group.id kafka consumer property. Wdyt @Jackie-Jiang ?
I agree that the empty string will match the existing behavior. But it doesn't provide any value to the application. if you have more than one kafka table, they will all share the same group id, which in my opinion, is a bug or incorrect implementation. I think we can use the table name as the group id and prefix it with
stream-levelorpartition-levelbased on the consumption model used. For existing applications that may rely on an empty string group id, they can always add a table config to override thegroup.idkafka consumer property. Wdyt @Jackie-Jiang ?
Does groupId apply to LLC consumer? I checked the usage of StreamConfig.getGroupId() and I think we already handled the case of missing group id (in PinotTableIdealStateBuilder.getGroupIdFromRealtimeDataTable()). @navina In which scenario are we getting the null group id?
Tks for contributing this feature. Can you please extend this feature to partition-level consumer as well?
Hey @navina , I was looking into partition-level consume but I was unable to find the groupid. Can you please point me to code?
Does
groupIdapply to LLC consumer? I checked the usage ofStreamConfig.getGroupId()and I think we already handled the case of missing group id (inPinotTableIdealStateBuilder.getGroupIdFromRealtimeDataTable()). @navina In which scenario are we getting thenullgroup id?
I was not aware of the PinotTableIdealStateBuilder.getGroupIdFromRealtimeDataTable() usage. But when the kafka consumer client config doesn't contain group.id, the kafka consumer created ends up using null as group id. Previously, it would default to empty string. This is breaking monitoring tools for existing users.
Ideally, LLC consumer shouldn't care about group id. However, it looks like some users are relying having a non-null group.id for the sake of monitoring. See https://apache-pinot.slack.com/archives/C011C9JHN7R/p1657815069074379?thread_ts=1657740174.861419&cid=C011C9JHN7R for more context.
Hey @navina , I was looking into partition-level consume but I was unable to find the groupid. Can you please point me to code?
As of today, we don't set any groupId explicitly in KafkaPartitionLevelStreamConfig.java. So, the default in the partition level consumer ends-up being empty string and with newer kafka version, it is null. You can set group.id explicitly in KafkaPartitionLevelStreamConfig.java.
@navina Hey, I have added the changes in the new commit. Can you please review it when you are free?
As of today, we don't set any groupId explicitly in KafkaPartitionLevelStreamConfig.java. So, the default in the partition level consumer ends-up being empty string and with newer kafka version, it is null. You can set group.id explicitly in KafkaPartitionLevelStreamConfig.java.
Also, I am unable to understand the above comment. Is group.id will be set similar to what is done here . If yes, can you please point me to the document for better understand of how pinot kafka works.
Error: Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 247.869 s <<< FAILURE! - in org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testAddHLCTableShouldFail Time elapsed: 0.022 s Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testConsumerDirectoryExists Time elapsed: 0.001 s Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testHardcodedServerPartitionedSqlQueries Time elapsed: 1.119 s Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testInvertedIndexTriggering Time elapsed: 3.953 s Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testReload Time elapsed: 6.708 s Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testReset Time elapsed: 5.528 s Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testSegmentFlushSize Time elapsed: 0.005 s Error: org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest.testDictionaryBasedQueries Time elapsed: 0.133 s <<< FAILURE!
Integration test is failing due to this error ^ with the error message --
kafka.common.StateChangeFailedException: Failed to elect leader for partition LLCRealtimeClusterIntegrationTest-1 under strategy OfflinePartitionLeaderElectionStrategy(false).
Pointer to error is here . Any suggestion to resolve this?
Also, while local testing, it is unable to connect zookeeper even though zookeeper is running . I have followed this doc for the local setup. I am getting --
java.lang.InterruptedException: null
WARN [ZKUtil] [main] Invalid cluster setup for cluster: zookeeper, missing znode path: /zookeeper/IDEALSTATES
WARN [ConfigAccessor] [main] No config found at /RealtimeClusterIntegrationTest/CONFIGS/RESOURCE/leadControllerResource.
Am I missing any configuration?
@navina @Jackie-Jiang May I get some eyes on this please.
I think this change will break the current topic partition assignment and replication behaviour by defaulting back to the Kafka clients implementation instead of Pinot's implementation.
Kafka client will assign partitions across consumer instances and stops a partition from being consumed by 2 or more processes if the group id is the same.
Please test a multi partition topic with replication enabled.
If the issue here is monitoring lags I believe there is a rest endpoint in swagger to give the consumer offsets as well as the last consuming time stamp in the query results.
@tanmesh sorry about the delay. I was able to verify your patch locally and the integration tests seem to pass now. I think it was some transient failure.
I think your PR looks good. But its not clear why this change would work with high level consumer implementation. if there is >1 replica, wouldn't we need 3 different group ids ? Which is what happens in pinot controller's PinotTableIdealStateBuilder.getGroupIdFromRealtimeDataTable() which attaches the replica id to the group id. So, I am not convinced that this patch will fix the issue with monitoring. Thoughts?
Please test a multi partition topic with replication enabled.
yeah. this patch will break with replication. But what I am unsure about is how this was working before for the monitoring tools? Whether the group.id is null or empty, as long as it doesn't include the replica id, it will not be accurate.
If the issue here is monitoring lags I believe there is a rest endpoint in swagger to give the consumer offsets as well as the last consuming time stamp in the query results.
Pinot doesn't have a good abstraction for input partitioning/partition grouping. Once we have that, I think we can customize the connector to use Pinot's grouping model and not the connector's model. Thoughts?