How to use ClusterMetaData Class to get some info about the kafka cluster
Hello,
I was trying to use ClusterMetaData as below , to see if any of its methods return anything useful, but i am getting most return values as None or empty sets. The kafka server is working and healthy as i can create producers/consumers and get correct topics and other info
def kafka_with_cluster_metadata_method():
print("in kafka_with_cluster_metadata_method")
try:
ko = cluster.ClusterMetadata(bootstrap_servers=settings.kafka_server,
metadata_max_age_ms=100000,
retry_backoff_ms=60)
print(f"ko obj is {ko}")
print(f"ko.config {ko.config}")
print("broker info")
print(f"ko.brokers : {ko.brokers}")
print(f"ko.broker_metadata(0) {ko.broker_metadata(0)}")
print("topic info")
print(f"ko.topics: {ko.topics}")
print(f"ko.need_all_topic_metadata {ko.need_all_topic_metadata}")
print("partition info")
print(f"ko.partitions_for_topic('some_existing_topic') {ko.partitions_for_topic('some_existing_topic')}")
except Exception as e:
raise e
Output of the above print messages are:
in kafka_with_cluster_metadata_method
ko obj is ClusterMetadata(brokers: 0, topics: 0, groups: 0)
ko.config {'retry_backoff_ms': 60, 'metadata_max_age_ms': 100000, 'bootstrap_servers': '********'}
broker info
ko.brokers : <bound method ClusterMetadata.brokers of <kafka.cluster.ClusterMetadata object at 0x7f54362c7e50>>
ko.broker_metadata(0) None
topic info
ko.topics: <bound method ClusterMetadata.topics of <kafka.cluster.ClusterMetadata object at 0x7f54362c7e50>>
ko.need_all_topic_metadata False
partition info
ko.partitions_for_topic('some_existing_topic') None
Following this documentation https://kafka-python.readthedocs.io/en/master/apidoc/ClusterMetadata.html
I tried initializing KafkaClient and then calling KafkaClient.cluster to see if this some how initializes the clusetermetadata class, but i am not seeing any difference here either.
Can someone help out here?
Hello, there is a hidden method in API "KafkaAdminClient" called "_get_cluster_metadata": Code of KafkaAdminClient, check line 491.
So what you can do is to connect to kafka broker using two APIs:
kafka_client = kafka.KafkaClient(**yourConf)
admin_client = kafka.KafkaAdminClient(**yourConf)
metadataResponse = admin_client._get_cluster_metadata()
clusterMetadata = kafka_client.cluster
clusterMetadata.update_metadata(metadataResponse)
And then you can have your clusterMetadata up to date.
Hope this can help
Hello, there is a hidden method in API "KafkaAdminClient" called "_get_cluster_metadata": Code of KafkaAdminClient, check line 491.
So what you can do is to connect to kafka broker using two APIs:
kafka_client = kafka.KafkaClient(**yourConf) admin_client = kafka.KafkaAdminClient(**yourConf) metadataResponse = admin_client._get_cluster_metadata() clusterMetadata = kafka_client.cluster clusterMetadata.update_metadata(metadataResponse)And then you can have your clusterMetadata up to date.
Hope this can help
For me this throws an exception on 2.0.2:
>>> cluster_metadata.update_metadata(metadata_response)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "./venv/lib/python3.11/site-packages/kafka/cluster.py", line 280, in update_metadata
for p_error, partition, leader, replicas, isr in partitions:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: too many values to unpack (expected 5)
Hello, this is a problem caused by the upgrade of python. Since there is no new releases on this project for almost 4 years, you need to change "./venv/lib/python3.11/site-packages/kafka/cluster.py" manually and unpack it with the right number of variables.
Hello, there is a hidden method in API "KafkaAdminClient" called "_get_cluster_metadata": Code of KafkaAdminClient, check line 491. So what you can do is to connect to kafka broker using two APIs:
kafka_client = kafka.KafkaClient(**yourConf) admin_client = kafka.KafkaAdminClient(**yourConf) metadataResponse = admin_client._get_cluster_metadata() clusterMetadata = kafka_client.cluster clusterMetadata.update_metadata(metadataResponse)And then you can have your clusterMetadata up to date. Hope this can help
For me this throws an exception on 2.0.2:
>>> cluster_metadata.update_metadata(metadata_response) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "./venv/lib/python3.11/site-packages/kafka/cluster.py", line 280, in update_metadata for p_error, partition, leader, replicas, isr in partitions: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ValueError: too many values to unpack (expected 5)