Consumer hangs with broker-side zstd compression
When setting compression.type=zstd on the broker, the consumer fails to read any messages, reporting WARNING:kafka.consumer.fetcher:Unknown error fetching data for topic-partition TopicPartition(topic='test', partition=0). Switching to compression.type=gzip or to producer-side compression (even with zstd!) fixes the issue, also for historical messages.
I had issues consuming messages with zstd compression, and fixed it by installing zstandard. There isn't any reference to zstd in the documentation, but support for it was added in https://github.com/dpkp/kafka-python/pull/2021.
@RuiLoureiro This is a separate issue related to reading the headers appropriately from the broker. @he-la even mentions that zstandard is installed and working when compression.type=produce with zstd.
The failure/error here is when a broker has configuration compression.type=zstd.
I logged this as https://github.com/dpkp/kafka-python/issues/858 which was closed without fixing.
A proposed workaround in #858 is to install confluent-kafka via python3 -m pip install confluent-kafka but I have not verified if this works.
@Green-Angry-Bird confluent-kafka is a separate library that wraps librdkafka, a kafka client implemented in native code. I have verified that librdkafka works with zstd compression, though not through its python wrapper.
For people looking to just consume kafka with zsdt compression, I would suggest switching to confluent-kafka until the issue is resolved.
Sorry all, I logged this issue with https://github.com/confluentinc/confluent-kafka-python/issues/858
getting below error when trying to consume from a ZSTD compressed record from kafka.
raise UnsupportedCodecError( kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found
error log:
Traceback (most recent call last): File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 25, in consume_from_topic(topic_to_consume) File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 14, in consume_from_topic for message in consumer: File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1193, in next return self.next_v2() File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1201, in next_v2 return next(self._iterator) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2 record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 655, in poll records = self._poll_once(remaining, max_records, update_offsets=update_offsets) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 708, in _poll_once records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 344, in fetched_records self._next_partition_records = self._parse_fetched_data(completion) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 818, in _parse_fetched_data unpacked = list(self._unpack_message_set(tp, records)) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 467, in _unpack_message_set for record in batch: File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 276, in iter self._maybe_uncompress() File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 183, in _maybe_uncompress self._assert_has_codec(compression_type) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 118, in _assert_has_codec raise UnsupportedCodecError( kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found
Process finished with exit code 1
My Code for consuming a kafka topic:
from kafka import KafkaConsumer def consume_from_topic(topic): consumer = KafkaConsumer( topic, bootstrap_servers='localhost:9092', group_id='zstd-11-consumer-group', auto_offset_reset='earliest', enable_auto_commit=True ) try: for message in consumer: v = message.value k = message.key.decode("utf-8") log = "key={}, offset={}, partition={}, value={}".format(k, message.offset, message.partition, v) print(log)
except KeyboardInterrupt: consumer.close() if name == "main": topic_to_consume = "Integrate-Package-Zstd-ESP.info" consume_from_topic(topic_to_consume)