kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams

Open ashmeet13 opened this issue 3 years ago • 21 comments

Streams hard-codes a few configurations, currently the documentation refers to 5 such configs.

The four mentioned within - Parameters controlled by Kafka Streams + enable.auto.commit. Three out of the 4 mentioned within the Parameters controlled by Kafka Streams are also present within Default Values which state the configurations that can be configured but will have a default value if not set.

This PR makes changes to warn the user when a configuration set by them is being overridden. Due to the overlapping documentation I have gone through the code and have separated the configs in a few categories -

  • Non configurable consumer configs when EOS is disabled -
enable.auto.commit
allow.auto.create.topics
  • Non configurable consumer configs when EOS is enabled -
isolation.level
  • Non configurable producer configs when EOS is enabled -
enable.idempotence
max.in.flight.requests.per.connection
transactional.id
  • Default consumer configs which can be configured -
auto.offset.reset
max.poll.records
  • Default producer configs which can be configured -
linger.ms

StreamsConfig already had code to log warnings when a non-configurable property was being set. It was missing logging warnings when allow.auto.create.topics was configured. I have added this change and removed the code that was setting allow.auto.create.topics within getMainConsumerConfigs.

Now it follows the existing code execution path to check for -

  1. Missing default configurations
  2. Logging warnings and overriding settings for non-configurable properties.

Permalink to checkIfUnexpectedUserSpecifiedConsumerConfig

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

ashmeet13 avatar Dec 14 '22 07:12 ashmeet13

Since I am fairly new to the code base - I will be spending some more time identifying any other configs that I might have missed that can fall under any of these buckets. @ableegoldman could you please have a look at this PR.

ashmeet13 avatar Dec 14 '22 07:12 ashmeet13

Hi @ableegoldman thank you for the review! I went through the producer configs and was not able to find any config with the name default.partitioner. Were you referring to the partitioner.class config? In case this is not the config could you please help me by pointing where I can find default.partitioner within code?

ashmeet13 avatar Dec 28 '22 14:12 ashmeet13

Ah, yeah, sorry -- it's partitioner.class

ableegoldman avatar Dec 30 '22 05:12 ableegoldman

What's the status of this PR?

mjsax avatar Feb 14 '23 00:02 mjsax

This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch)

If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed.

github-actions[bot] avatar Jun 23 '23 03:06 github-actions[bot]

Will be re-picking this. My bad for dropping this in the middle. Will update soon. Issue - https://issues.apache.org/jira/browse/KAFKA-14405

ashmeet13 avatar Jul 22 '23 17:07 ashmeet13

Thanks! Great to hear.

mjsax avatar Jul 25 '23 22:07 mjsax

Hi @mjsax, a small doubt - From what I understand there are three different types of consumer configs - global, main and restore

If a user has set the property consumer.allow.auto.create.topics to True we override it to False because stream controls this property value. But this can be bypassed by setting the property as main.consumer.allow.auto.create.topics = True and this property would be set as True.

Is this expected behaviour?

ashmeet13 avatar Aug 16 '23 12:08 ashmeet13

Sorry for late reply. I was OOO. Well, if it can be by-passed, it sounds like a bug.

Because there are multiple consumer, we use consumer. to allow users to change a config for all consumer. If you want to change a config for a specific consumer (or if you want to configure two consumer differently), you would use main.consumer. et al. If both prefix (generic and specific) are use the consumer-specific prefix has preference over the general consumer. prefix. Does this make sense? It's basically a "config hierarchy" (flexible and powerful, but maybe a little hard to understand on first encounter...)

But no matter what prefix is used, we should not allow users to by-pass what Streams tries to overwrite. (Btw: just setting allow.auto.create.topic = true, ie, without consumer. prefix is technically also valid, and we would pass config without prefix into the consumer, too, so this case must also be covered.)

mjsax avatar Sep 08 '23 02:09 mjsax

Got it @mjsax - Sharing the code that seems to be causing this bypass. Currently to fetch any consumer config i.e. main, restore or global we use a common function getCommonConsumerConfigs

It's within the getCommonConsumerConfigs function where we check and override the configs preferred by streams -

    private Map<String, Object> getCommonConsumerConfigs() {
        // Fetch all consumer props starting with "consumer."
        clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());

        // CLean out any properties that were set but need to be controlled by streams
        checkIfUnexpectedUserSpecifiedConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
        checkIfUnexpectedUserSpecifiedConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);

        // Create a config map of the preferred props and merge it with the cleaned props from above  
        final consumerProps =new (eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
        consumerProps.putAll(clientProvidedProps);
    }

And the logic within getMainConsumerConfigs is -

    public Map<String, Object> getMainConsumerConfigs(...) {
        // Fetch the props starting with "consumer." after cleaning
        // any props that needed to be overwritten
        final consumerProps = getCommonConsumerConfigs();

        // Get main consumer override props i.e. the ones 
        // starting with "main.consumer." and merge the two maps.
        final mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX);
        for (final entry: mainConsumerProps.entrySet()) {
            consumerProps.put(entry.getKey(), entry.getValue());

        // Continue processing and filling in other required configs
        }

Do you think I've understood this piece correct? If so should a fix go for this within this PR itself?

ashmeet13 avatar Oct 08 '23 18:10 ashmeet13

Thanks for digging into this -- I think you are spot on -- seem we should extract a method that will set KS controlled config, and refactor getMainConsumerConfigs to first call getCommonConsumerConfigs(), than apply main.consumer configs, and in a last step call the new method to set KS controlled configs.

I assume we need to do something similar for restore and global consumer? -- To be fair, I was actually aware that something is off and still have a (old and stale) local branch adding corresponding testing to StreamsConfigTest to verify that overwrite hierarchy works as expected... Would be great if you could also look into this test...

mjsax avatar Oct 13 '23 02:10 mjsax

Got it! I'll make this change - for now I have gone through the code and the following two references and compiled a list of configs that are somehow "controlled" by KS. For now sharing the Producer Configs here and soon Consumer Configs too

Producer Configs with EoS Disabled

1. [Editable] [CustomDefault] linger.ms = 100
2. [Fixed] partitioner.class = StreamsPartitioner

Producer Configs with EoS Disabled

1. [Editable] [CustomDefault] linger.ms = 100
2. [Fixed] partitioner.class = StreamsPartitioner
3. [Fixed] enable.idempotence = true
4. [Validate] max.in.flight.requests.per.connection <= 5
5. [Fixed] [NoDefault] transactional.id = <appId>-<generatedSuffix>
6. [Editable] [CustomDefault] delivery.timeout.ms = Integer.MAX
7. [Editable] [CustomDefault] transaction.timeout.ms = 10000

ashmeet13 avatar Oct 13 '23 20:10 ashmeet13

@ashmeet13 -- Any update on this PR? We are coming up to 3.7 release code freeze deadline. Might be nice to finish this on time?

mjsax avatar Nov 29 '23 04:11 mjsax

Hi @mjsax apologies for the delay. Pushing this soon.

ashmeet13 avatar Dec 06 '23 08:12 ashmeet13

@ashmeet13 -- do you still have interest to finish this PR?

mjsax avatar Feb 17 '24 01:02 mjsax

@ashmeet13 -- any updates on this PR?

mjsax avatar Apr 30 '24 03:04 mjsax

Hi @mjsax, apologies for the extremely absent behavior on this PR. I have gone ahead and implemented the changes. The tests are pending and currently working on them. Detailing the implementation down.

There are two pieces that KS controls -

  1. Custom Default - Configs that have custom default values for KS compared to the actual defaults. These values can also be overwritten by the user.
  2. Controlled Configs - Configs that are controlled by KS and cannot be overwritten by the user (We want to warn the user that this value is being overwritten if set by the user)

Previous Implementation -

  1. We used to have the following data structures
String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS // Controlled KS Consumer Configs
String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIG // Controlled KS Consumer Configs when EoS enabled

String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS // Controlled KS Producer Config when EoS enabled

Map<String, Object> PRODUCER_DEFAULT_OVERRIDES // Producer Custom Default + Controlled Config Values
Map<String, Object> PRODUCER_EOS_OVERRIDES // Producer Custom Default + Controlled Config Values with EoS

Map<String, Object> CONSUMER_DEFAULT_OVERRIDES // Consumer Custom Default + Controlled Config Values
Map<String, Object> CONSUMER_EOS_OVERRIDES // Consumer Custom Default + Controlled Config Values with EoS
  1. The steps to return the required config broadly were:

    1. Get client configs: Gather client configurations with prefixes either consumer. or producer. and put them in clientProvidedProps.
    2. Clean clientProvidedProps: Use the method checkIfUnexpectedUserSpecifiedConsumerConfig to tidy up clientProvidedProps.
    3. Create props: Generate props using either <>_DEFAULT_OVERRIDES or <>_EOS_OVERRIDES.
    4. Overwrite props: Replace props with the cleaned clientProvidedProps.
    5. Fetch additional configs (only for consumer props): If it's consumer props, fetch configurations set using main.consumer., global.consumer., or restore.consumer. and add them to the props map.
  2. After the initial setup, we make some tweaks based on whether it's for a consumer or producer, and then we hand back the props.

Current Implementation -

  1. Give away with the old data structures and define the following new ones -
Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS // KS Custom Defaults for Producer
Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED // KS Custom Defaults for Producer with EoS
Map<String, Object> KS_CONTROLLED_PRODUCER_CONFIGS // KS Controlled Configs for Producer
Map<String, Object> KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED // KS Controlled Configs for Producer with EoS

Map<String, Object> KS_DEFAULT_CONSUMER_CONFIGS // KS Custom Defaults for Consumer
Map<String, Object> KS_CONTROLLED_CONSUMER_CONFIGS // KS Controlled Configs for Consumer
Map<String, Object> KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED // KS Controlled Configs for Consumer with EoS
  1. The steps to return the required config now are:

    1. Get client configs: Obtain client configurations with prefixes either consumer. or producer. and place them in clientProvidedProps.
    2. Create props: Generate props using either KS_DEFAULT_<>_CONFIGS or KS_DEFAULT_<>_CONFIGS_EOS_ENABLED.
    3. Overwrite props: Replace props with the cleaned clientProvidedProps.
    4. Fetch additional configs (only for consumer props): If it's consumer props, fetch configurations set using main.consumer., global.consumer., or restore.consumer. and add them to the props map.
    5. Run validation check over props: Perform a validation check on props. This check will use KS_CONTROLLED_<>_CONFIGS or KS_CONTROLLED_<>_CONFIGS_EOS_ENABLED maps to see if the values are already set in props. If they are, log a warning and overwrite them. If not, add them to props.
  2. After the initial setup, we make some tweaks based on whether it's for a consumer or producer, and then we hand back the props.

Below, I'll share the configurations organized into custom defaults and controlled configs for both consumers and producers.

ashmeet13 avatar May 05 '24 16:05 ashmeet13

Down are the configs for each type with whether -

  1. Is it a custom default for KS but is editable by the user
  2. Or, is it a fixed value controlled by KS

Producer Configs


EoS Disabled
1. [Editable] [CustomDefault] linger.ms = 100

4. [Fixed] partitioner.class = StreamsPartitioner

EoS Enabled
1. [Editable] [CustomDefault] linger.ms = 100
2. [Editable] [CustomDefault] delivery.timeout.ms = Integer.MAX
5. [Editable] [CustomDefault] transaction.timeout.ms = 10000

6. [Fixed] partitioner.class = StreamsPartitioner
7. [Fixed] enable.idempotence = true
8. [Fixed] transactional.id = <appId>-<generatedSuffix>

7. [Validate] max.in.flight.requests.per.connection <= 5

Main Consumer Configs

EoS Disabled 
1. [Editable][CustomDefault] auto.offset.reset = earliest
2. [Editable] [CustomDefault] max.poll.records = 1000

3. [Fixed] allow.auto.create.topics = false
4. [Fixed] enable.auto.commit = false
5. [Fixed] group.id = <appId>

EoS Enabled
1. [Editable][CustomDefault] auto.offset.reset = earliest
2. [Editable] [CustomDefault] max.poll.records = 1000

3. [Fixed] allow.auto.create.topics = false
4. [Fixed] enable.auto.commit = false
5. [Fixed] group.id = <appId>

Global Consumer Configs

EoS Disabled 
1. [Editable] [CustomDefault] max.poll.records = 1000

2. [Fixed] auto.offset.reset = None
3. [Fixed] allow.auto.create.topics = false
4. [Fixed] enable.auto.commit = false
5. [Fixed] group.id = None

EoS Enabled
1. [Editable] [CustomDefault] max.poll.records = 1000

2. [Fixed] auto.offset.reset = None
3. [Fixed] allow.auto.create.topics = false
4. [Fixed] enable.auto.commit = false
5. [Fixed] group.id = None

Restore Consumer Configs

EoS Disabled 
1. [Editable] [CustomDefault] max.poll.records = 1000

2. [Fixed] auto.offset.reset = None
3. [Fixed] allow.auto.create.topics = false
4. [Fixed] enable.auto.commit = false
5. [Fixed] group.id = None

EoS Enabled
1. [Editable] [CustomDefault] max.poll.records = 1000

2. [Fixed] auto.offset.reset = None
3. [Fixed] allow.auto.create.topics = false
4. [Fixed] enable.auto.commit = false
5. [Fixed] group.id = None

There are a few more that are coded ad-hoc within the code that I haven't included. Seemed like a broader change for Streams Configs.

ashmeet13 avatar May 05 '24 16:05 ashmeet13

Fixing the failing build

ashmeet13 avatar May 06 '24 14:05 ashmeet13

Hi @mjsax requesting a review on this PR.

There is one open case still that I am not sure how we should handle - it's the check @ableegoldman mentioned in her review here to handle the ProducerConfig.PARTITIONER_CLASS_CONFIG (partitioner.class) config.

ashmeet13 avatar Jun 02 '24 08:06 ashmeet13

@mjsax I have added the handling for partitioner.class config on the basis of this logic -

  1. If the config set is of a class that implements StreamPartitioner, we let it pass
  2. If it doesn't we remove the key from the properties and log a warning

Adding test cases now

ashmeet13 avatar Jun 03 '24 17:06 ashmeet13