How to add my own custom kafka commands into this?
Hi Shawn- First of all great work!. I'm working on adding extra kafka commands into this shell. Any pointers? I was able to add to add custom command into the menu but options execution throws error. I thought to reach out to you for the pointers so that I can add some commands to this and take it forward. Thanks
Hi there, thank you!
The steps look like this:
- Add the new command and its options to the
completer.jsonfile in thekafkashell/datadirectory. - Implement new logic in the
kafkashell/executor.pyclass. This class takes the user input when it is submitted. It then matches specific methods based on the input and executes them.
For example, take the kafka-topics command. If a user enters kafka-topics --list in kafka-shell, it will be sent to the executor class. It will match this block on line 59:
elif command.startswith(valid_command_prefixes):
self.execute_valid_command(command)
Then, it will match the following block on line 63:
if command.startswith(constants.COMMAND_KAFKA_TOPICS):
final_command = self.handle_kafka_topics_command(command)
We then handle the kafka topics command. This is where we automatically add flags such as --bootstrap-servers. This starts on line 166.
def handle_kafka_topics_command(self, command):
command += self.handle_bootstrap_or_zookeeper_flag(command)
command += self.handle_admin_client_settings(command)
return command
Lastly, we end up with a final_command that gets executed by os.system(final_command) on line 126.
If you want to implement non-Kafka based commands, look at the exit or save methods in the executor for examples.
I hope that helps!
Thanks for the info Shawn. I have couple of questions.
- How do you discover kafka install location? Is it via port#?
- Does this work on a secure environment?
- I notice a decorator @staticmethod being used. Where is that coming from?
Happy to help, @xargs-Pratix!
-
It depends on how you've installed kafka. Most kafka installations run on port
9092. If you've installed it on a local machine and have started a kafka cluster, yourbootstrap.serversare most likelylocalhost:9092. -
Yes, this can work in a secure environment. If you're talking about a secure kafka cluster, you can pass client/admin config files with the tool (defined in the kafka-shell config file).
-
It's built into python, see this post more information.
Hope that helps!
I think I got it. You mean that I should add the path to the cert config file in the completer.json like this "kafka-topics --secure-config path": Like in the screenshot
