KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams
Streams hard-codes a few configurations, currently the documentation refers to 5 such configs.
The four mentioned within - Parameters controlled by Kafka Streams + enable.auto.commit. Three out of the 4 mentioned within the Parameters controlled by Kafka Streams are also present within Default Values which state the configurations that can be configured but will have a default value if not set.
This PR makes changes to warn the user when a configuration set by them is being overridden. Due to the overlapping documentation I have gone through the code and have separated the configs in a few categories -
- Non configurable consumer configs when EOS is disabled -
enable.auto.commit
allow.auto.create.topics
- Non configurable consumer configs when EOS is enabled -
isolation.level
- Non configurable producer configs when EOS is enabled -
enable.idempotence
max.in.flight.requests.per.connection
transactional.id
- Default consumer configs which can be configured -
auto.offset.reset
max.poll.records
- Default producer configs which can be configured -
linger.ms
StreamsConfig already had code to log warnings when a non-configurable property was being set.
It was missing logging warnings when allow.auto.create.topics was configured. I have added this change and removed the code that was setting allow.auto.create.topics within getMainConsumerConfigs.
Now it follows the existing code execution path to check for -
- Missing default configurations
- Logging warnings and overriding settings for non-configurable properties.
Permalink to checkIfUnexpectedUserSpecifiedConsumerConfig
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
Since I am fairly new to the code base - I will be spending some more time identifying any other configs that I might have missed that can fall under any of these buckets. @ableegoldman could you please have a look at this PR.
Hi @ableegoldman thank you for the review!
I went through the producer configs and was not able to find any config with the name default.partitioner. Were you referring to the partitioner.class config?
In case this is not the config could you please help me by pointing where I can find default.partitioner within code?
Ah, yeah, sorry -- it's partitioner.class
What's the status of this PR?
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch)
If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed.
Will be re-picking this. My bad for dropping this in the middle. Will update soon. Issue - https://issues.apache.org/jira/browse/KAFKA-14405
Thanks! Great to hear.
Hi @mjsax, a small doubt -
From what I understand there are three different types of consumer configs - global, main and restore
If a user has set the property consumer.allow.auto.create.topics to True we override it to False because stream controls this property value.
But this can be bypassed by setting the property as main.consumer.allow.auto.create.topics = True and this property would be set as True.
Is this expected behaviour?
Sorry for late reply. I was OOO. Well, if it can be by-passed, it sounds like a bug.
Because there are multiple consumer, we use consumer. to allow users to change a config for all consumer. If you want to change a config for a specific consumer (or if you want to configure two consumer differently), you would use main.consumer. et al. If both prefix (generic and specific) are use the consumer-specific prefix has preference over the general consumer. prefix. Does this make sense? It's basically a "config hierarchy" (flexible and powerful, but maybe a little hard to understand on first encounter...)
But no matter what prefix is used, we should not allow users to by-pass what Streams tries to overwrite. (Btw: just setting allow.auto.create.topic = true, ie, without consumer. prefix is technically also valid, and we would pass config without prefix into the consumer, too, so this case must also be covered.)
Got it @mjsax -
Sharing the code that seems to be causing this bypass. Currently to fetch any consumer config i.e. main, restore or global we use a common function getCommonConsumerConfigs
It's within the getCommonConsumerConfigs function where we check and override the configs preferred by streams -
private Map<String, Object> getCommonConsumerConfigs() {
// Fetch all consumer props starting with "consumer."
clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
// CLean out any properties that were set but need to be controlled by streams
checkIfUnexpectedUserSpecifiedConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
checkIfUnexpectedUserSpecifiedConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
// Create a config map of the preferred props and merge it with the cleaned props from above
final consumerProps =new (eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
consumerProps.putAll(clientProvidedProps);
}
And the logic within getMainConsumerConfigs is -
public Map<String, Object> getMainConsumerConfigs(...) {
// Fetch the props starting with "consumer." after cleaning
// any props that needed to be overwritten
final consumerProps = getCommonConsumerConfigs();
// Get main consumer override props i.e. the ones
// starting with "main.consumer." and merge the two maps.
final mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX);
for (final entry: mainConsumerProps.entrySet()) {
consumerProps.put(entry.getKey(), entry.getValue());
// Continue processing and filling in other required configs
}
Do you think I've understood this piece correct? If so should a fix go for this within this PR itself?
Thanks for digging into this -- I think you are spot on -- seem we should extract a method that will set KS controlled config, and refactor getMainConsumerConfigs to first call getCommonConsumerConfigs(), than apply main.consumer configs, and in a last step call the new method to set KS controlled configs.
I assume we need to do something similar for restore and global consumer? -- To be fair, I was actually aware that something is off and still have a (old and stale) local branch adding corresponding testing to StreamsConfigTest to verify that overwrite hierarchy works as expected... Would be great if you could also look into this test...
Got it! I'll make this change - for now I have gone through the code and the following two references and compiled a list of configs that are somehow "controlled" by KS. For now sharing the Producer Configs here and soon Consumer Configs too
Producer Configs with EoS Disabled
1. [Editable] [CustomDefault] linger.ms = 100
2. [Fixed] partitioner.class = StreamsPartitioner
Producer Configs with EoS Disabled
1. [Editable] [CustomDefault] linger.ms = 100
2. [Fixed] partitioner.class = StreamsPartitioner
3. [Fixed] enable.idempotence = true
4. [Validate] max.in.flight.requests.per.connection <= 5
5. [Fixed] [NoDefault] transactional.id = <appId>-<generatedSuffix>
6. [Editable] [CustomDefault] delivery.timeout.ms = Integer.MAX
7. [Editable] [CustomDefault] transaction.timeout.ms = 10000
@ashmeet13 -- Any update on this PR? We are coming up to 3.7 release code freeze deadline. Might be nice to finish this on time?
Hi @mjsax apologies for the delay. Pushing this soon.
@ashmeet13 -- do you still have interest to finish this PR?
@ashmeet13 -- any updates on this PR?
Hi @mjsax, apologies for the extremely absent behavior on this PR. I have gone ahead and implemented the changes. The tests are pending and currently working on them. Detailing the implementation down.
There are two pieces that KS controls -
- Custom Default - Configs that have custom default values for KS compared to the actual defaults. These values can also be overwritten by the user.
- Controlled Configs - Configs that are controlled by KS and cannot be overwritten by the user (We want to warn the user that this value is being overwritten if set by the user)
Previous Implementation -
- We used to have the following data structures
String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS // Controlled KS Consumer Configs
String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIG // Controlled KS Consumer Configs when EoS enabled
String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS // Controlled KS Producer Config when EoS enabled
Map<String, Object> PRODUCER_DEFAULT_OVERRIDES // Producer Custom Default + Controlled Config Values
Map<String, Object> PRODUCER_EOS_OVERRIDES // Producer Custom Default + Controlled Config Values with EoS
Map<String, Object> CONSUMER_DEFAULT_OVERRIDES // Consumer Custom Default + Controlled Config Values
Map<String, Object> CONSUMER_EOS_OVERRIDES // Consumer Custom Default + Controlled Config Values with EoS
-
The steps to return the required config broadly were:
-
Get client configs: Gather client configurations with prefixes either
consumer.orproducer.and put them inclientProvidedProps. -
Clean
clientProvidedProps: Use the methodcheckIfUnexpectedUserSpecifiedConsumerConfigto tidy upclientProvidedProps. -
Create
props: Generatepropsusing either<>_DEFAULT_OVERRIDESor<>_EOS_OVERRIDES. -
Overwrite
props: Replacepropswith the cleanedclientProvidedProps. -
Fetch additional configs (only for consumer props): If it's consumer props, fetch configurations set using
main.consumer.,global.consumer., orrestore.consumer.and add them to thepropsmap.
-
Get client configs: Gather client configurations with prefixes either
-
After the initial setup, we make some tweaks based on whether it's for a consumer or producer, and then we hand back the props.
Current Implementation -
- Give away with the old data structures and define the following new ones -
Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS // KS Custom Defaults for Producer
Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED // KS Custom Defaults for Producer with EoS
Map<String, Object> KS_CONTROLLED_PRODUCER_CONFIGS // KS Controlled Configs for Producer
Map<String, Object> KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED // KS Controlled Configs for Producer with EoS
Map<String, Object> KS_DEFAULT_CONSUMER_CONFIGS // KS Custom Defaults for Consumer
Map<String, Object> KS_CONTROLLED_CONSUMER_CONFIGS // KS Controlled Configs for Consumer
Map<String, Object> KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED // KS Controlled Configs for Consumer with EoS
-
The steps to return the required config now are:
-
Get client configs: Obtain client configurations with prefixes either
consumer.orproducer.and place them inclientProvidedProps. -
Create
props: Generatepropsusing eitherKS_DEFAULT_<>_CONFIGSorKS_DEFAULT_<>_CONFIGS_EOS_ENABLED. -
Overwrite
props: Replacepropswith the cleanedclientProvidedProps. -
Fetch additional configs (only for consumer props): If it's consumer props, fetch configurations set using
main.consumer.,global.consumer., orrestore.consumer.and add them to thepropsmap. -
Run validation check over
props: Perform a validation check onprops. This check will useKS_CONTROLLED_<>_CONFIGSorKS_CONTROLLED_<>_CONFIGS_EOS_ENABLEDmaps to see if the values are already set inprops. If they are, log a warning and overwrite them. If not, add them toprops.
-
Get client configs: Obtain client configurations with prefixes either
-
After the initial setup, we make some tweaks based on whether it's for a consumer or producer, and then we hand back the props.
Below, I'll share the configurations organized into custom defaults and controlled configs for both consumers and producers.
Down are the configs for each type with whether -
- Is it a custom default for KS but is editable by the user
- Or, is it a fixed value controlled by KS
Producer Configs
EoS Disabled
1. [Editable] [CustomDefault] linger.ms = 100
4. [Fixed] partitioner.class = StreamsPartitioner
EoS Enabled
1. [Editable] [CustomDefault] linger.ms = 100
2. [Editable] [CustomDefault] delivery.timeout.ms = Integer.MAX
5. [Editable] [CustomDefault] transaction.timeout.ms = 10000
6. [Fixed] partitioner.class = StreamsPartitioner
7. [Fixed] enable.idempotence = true
8. [Fixed] transactional.id = <appId>-<generatedSuffix>
7. [Validate] max.in.flight.requests.per.connection <= 5
Main Consumer Configs
EoS Disabled
1. [Editable][CustomDefault] auto.offset.reset = earliest
2. [Editable] [CustomDefault] max.poll.records = 1000
3. [Fixed] allow.auto.create.topics = false
4. [Fixed] enable.auto.commit = false
5. [Fixed] group.id = <appId>
EoS Enabled
1. [Editable][CustomDefault] auto.offset.reset = earliest
2. [Editable] [CustomDefault] max.poll.records = 1000
3. [Fixed] allow.auto.create.topics = false
4. [Fixed] enable.auto.commit = false
5. [Fixed] group.id = <appId>
Global Consumer Configs
EoS Disabled
1. [Editable] [CustomDefault] max.poll.records = 1000
2. [Fixed] auto.offset.reset = None
3. [Fixed] allow.auto.create.topics = false
4. [Fixed] enable.auto.commit = false
5. [Fixed] group.id = None
EoS Enabled
1. [Editable] [CustomDefault] max.poll.records = 1000
2. [Fixed] auto.offset.reset = None
3. [Fixed] allow.auto.create.topics = false
4. [Fixed] enable.auto.commit = false
5. [Fixed] group.id = None
Restore Consumer Configs
EoS Disabled
1. [Editable] [CustomDefault] max.poll.records = 1000
2. [Fixed] auto.offset.reset = None
3. [Fixed] allow.auto.create.topics = false
4. [Fixed] enable.auto.commit = false
5. [Fixed] group.id = None
EoS Enabled
1. [Editable] [CustomDefault] max.poll.records = 1000
2. [Fixed] auto.offset.reset = None
3. [Fixed] allow.auto.create.topics = false
4. [Fixed] enable.auto.commit = false
5. [Fixed] group.id = None
There are a few more that are coded ad-hoc within the code that I haven't included. Seemed like a broader change for Streams Configs.
Fixing the failing build
Hi @mjsax requesting a review on this PR.
There is one open case still that I am not sure how we should handle - it's the check @ableegoldman mentioned in her
review here to handle the ProducerConfig.PARTITIONER_CLASS_CONFIG (partitioner.class) config.
@mjsax I have added the handling for partitioner.class config on the basis of this logic -
- If the config set is of a class that implements
StreamPartitioner, we let it pass - If it doesn't we remove the key from the properties and log a warning
Adding test cases now