Long running faust is failing with exactly_once semantic
Checklist
- [x] I have included information about relevant versions
- [x] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
import faust
import os
import sys
from faust import Record
from faust.serializers.codecs import codecs
from faust.types import ProcessingGuarantee
from mode import CrashingSupervisor
brokers = os.environ["APP_BROKERS"]
broker_list = list(map(lambda x: "kafka://" + str(x), brokers.split(",")))
settings = faust.Settings(
id="test-faust-config-5",
processing_guarantee=ProcessingGuarantee.EXACTLY_ONCE,
agent_supervisor=CrashingSupervisor,
broker=broker_list,
)
app = faust.App(
id="test-faust-config-5"
)
app.conf = settings
topic_input = app.topic(
"first-topic",
key_serializer=codecs["raw"],
key_type=bytes,
value_serializer=codecs["raw"],
value_type=codecs["raw"],
)
topic_output = app.topic(
"second-topic",
key_serializer=codecs["raw"],
key_type=bytes,
value_serializer=codecs["raw"],
value_type=bytes,
)
@app.agent(topic_input)
async def processor(stream: faust.Stream[bytes]):
async for key, value in stream.items():
await topic_output.send(key=key, value=value)
sys.argv = ["", "worker", "-l", "debug"]
app.main()
Expected behavior
The faust app should not break and should continue to process records. Moreover it should not create duplicates because of the exactly_once semantic.
Actual behavior
The faust app crash after a couple of minutes and records and when the pod restarted (in this case it is running inside kubernetes ) should not create duplicates.
Full traceback
[2021-04-19 09:55:37,209] [6] [DEBUG] Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=409 time_spent_sleeping=0.9796151280170307 drift=0.020384871982969344 new_interval=1.0203848719829693 since_epoch=437.60129701299593
[2021-04-19 09:55:37,415] [6] [DEBUG] Timer Monitor.sampler woke up - iteration=393 time_spent_sleeping=0.9025401709950529 drift=0.0974598290049471 new_interval=1.097459829004947 since_epoch=440.3753396840184
[2021-04-19 09:55:37,896] [6] [DEBUG] Timer commit woke up - iteration=147 time_spent_sleeping=2.701949173992034 drift=0.09805082600796577 new_interval=2.8980508260079656 since_epoch=436.7407859310042
[2021-04-19 09:55:38,515] [6] [DEBUG] Timer Monitor.sampler woke up - iteration=394 time_spent_sleeping=1.0996608329878654 drift=-0.09966083298786543 new_interval=0.9003391670121346 since_epoch=441.4750403960061
[2021-04-19 09:55:39,417] [6] [DEBUG] Timer Monitor.sampler woke up - iteration=395 time_spent_sleeping=0.9028530089999549 drift=0.09714699100004509 new_interval=1.097146991000045 since_epoch=442.3779428390262
[2021-04-19 09:55:40,056] [6] [ERROR] [^--Consumer]: Drain messages raised: KeyError(8657)
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 1105, in _drain_messages
async for tp, message in ait:
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 699, in getmany
records, active_partitions = await self._wait_next_records(timeout)
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 739, in _wait_next_records
records = await self._getmany(
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 1341, in _getmany
return await self._thread.getmany(active_partitions, timeout)
File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 957, in getmany
return await self.call_thread(
File "/usr/local/lib/python3.8/site-packages/mode/threads.py", line 436, in call_thread
result = await promise
File "/usr/local/lib/python3.8/site-packages/mode/threads.py", line 383, in _process_enqueued
result = await maybe_async(method(*args, **kwargs))
File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 134, in maybe_async
return await res
File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 979, in _fetch_records
return await fetcher.fetched_records(
File "/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 1067, in fetched_records
records = res_or_error.getall(max_records)
File "/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 133, in getall
for msg in self._partition_records:
File "/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 199, in __next__
return next(self._records_iterator)
File "/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 224, in _unpack_records
self._aborted_producers.remove(next_batch.producer_id)
KeyError: 8657
[2021-04-19 09:55:40,057] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=5) at offset 7148
[2021-04-19 09:55:40,058] [6] [ERROR] [^---Fetcher]: Crashed reason=KeyError(8657)
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 802, in _execute_task
await task
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 176, in _fetcher
await consumer._drain_messages(self)
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 1105, in _drain_messages
async for tp, message in ait:
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 699, in getmany
records, active_partitions = await self._wait_next_records(timeout)
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 739, in _wait_next_records
records = await self._getmany(
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 1341, in _getmany
return await self._thread.getmany(active_partitions, timeout)
File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 957, in getmany
return await self.call_thread(
File "/usr/local/lib/python3.8/site-packages/mode/threads.py", line 436, in call_thread
result = await promise
File "/usr/local/lib/python3.8/site-packages/mode/threads.py", line 383, in _process_enqueued
result = await maybe_async(method(*args, **kwargs))
File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 134, in maybe_async
return await res
File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 979, in _fetch_records
return await fetcher.fetched_records(
File "/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 1067, in fetched_records
records = res_or_error.getall(max_records)
File "/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 133, in getall
for msg in self._partition_records:
File "/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 199, in __next__
return next(self._records_iterator)
File "/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 224, in _unpack_records
self._aborted_producers.remove(next_batch.producer_id)
KeyError: 8657
[2021-04-19 09:55:40,058] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=69) at offset 7222
[2021-04-19 09:55:40,059] [6] [INFO] [^Worker]: Stopping...
[2021-04-19 09:55:40,059] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=14) at offset 7236
[2021-04-19 09:55:40,059] [6] [INFO] [^-App]: Stopping...
[2021-04-19 09:55:40,059] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=78) at offset 7308
[2021-04-19 09:55:40,060] [6] [INFO] [^---Fetcher]: Stopping...
[2021-04-19 09:55:40,060] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=23) at offset 7303
[2021-04-19 09:55:40,060] [6] [DEBUG] [^---Fetcher]: Shutting down...
[2021-04-19 09:55:40,060] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=87) at offset 7189
[2021-04-19 09:55:40,060] [6] [DEBUG] [^---Fetcher]: -Stopped!
[2021-04-19 09:55:40,061] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=96) at offset 7348
[2021-04-19 09:55:40,061] [6] [INFO] [^-App]: Wait for streams...
[2021-04-19 09:55:40,061] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=53) at offset 7183
[2021-04-19 09:55:40,061] [6] [INFO] [^--TableManager]: Stopping...
[2021-04-19 09:55:40,061] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=7) at offset 7230
[2021-04-19 09:55:40,062] [6] [INFO] [^---Fetcher]: Stopping...
[2021-04-19 09:55:40,062] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=71) at offset 7336
[2021-04-19 09:55:40,062] [6] [DEBUG] [^---Fetcher]: Shutting down...
[2021-04-19 09:55:40,062] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=16) at offset 7362
[2021-04-19 09:55:40,062] [6] [DEBUG] [^---Fetcher]: -Stopped!
[2021-04-19 09:55:40,062] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=80) at offset 7306
[2021-04-19 09:55:40,063] [6] [INFO] [^---Recovery]: Stopping...
[2021-04-19 09:55:40,063] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=25) at offset 9626
[2021-04-19 09:55:40,063] [6] [DEBUG] [^---Recovery]: Shutting down...
[2021-04-19 09:55:40,063] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=89) at offset 7070
[2021-04-19 09:55:40,064] [6] [DEBUG] [^---Recovery]: -Stopped!
[2021-04-19 09:55:40,064] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=46) at offset 7276
[2021-04-19 09:55:40,064] [6] [DEBUG] [^--TableManager]: Shutting down...
[2021-04-19 09:55:40,064] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=55) at offset 7317
[2021-04-19 09:55:40,065] [6] [DEBUG] [^--TableManager]: -Stopped!
[2021-04-19 09:55:40,065] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=0) at offset 7315
[2021-04-19 09:55:40,065] [6] [INFO] [^-App]: Flush producer buffer...
[2021-04-19 09:55:40,065] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=64) at offset 7281
[2021-04-19 09:55:40,066] [6] [INFO] [^---Conductor]: Stopping...
[2021-04-19 09:55:40,066] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=9) at offset 7419
[2021-04-19 09:55:40,066] [6] [DEBUG] [^---Conductor]: Shutting down...
[2021-04-19 09:55:40,066] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=73) at offset 7353
[2021-04-19 09:55:40,066] [6] [DEBUG] [^---Conductor]: -Stopped!
[2021-04-19 09:55:40,066] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=18) at offset 7265
[2021-04-19 09:55:40,067] [6] [INFO] [^--AgentManager]: Stopping...
[2021-04-19 09:55:40,067] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=39) at offset 7171
[2021-04-19 09:55:40,067] [6] [DEBUG] [^----Stream: Topic: first-topic]: Stopping...
[2021-04-19 09:55:40,067] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=48) at offset 7221
[2021-04-19 09:55:40,067] [6] [DEBUG] [^----Stream: Topic: first-topic]: Shutting down...
[2021-04-19 09:55:40,068] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=57) at offset 7202
[2021-04-19 09:55:40,068] [6] [DEBUG] [^----Stream: Topic: first-topic]: -Stopped!
[2021-04-19 09:55:40,068] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=66) at offset 7224
[2021-04-19 09:55:40,068] [6] [INFO] [^---Agent: __main__.processor]: Stopping...
[2021-04-19 09:55:40,068] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=11) at offset 7375
[2021-04-19 09:55:40,068] [6] [INFO] [^----CrashingSupervisor: (1@0x7f61d4103af0)]: Stopping...
[2021-04-19 09:55:40,069] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=75) at offset 7267
[2021-04-19 09:55:40,069] [6] [DEBUG] [^----Agent*: __main__.processor]: Stopping...
[2021-04-19 09:55:40,069] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=41) at offset 7407
[2021-04-19 09:55:40,069] [6] [DEBUG] [^----Agent*: __main__.processor]: Shutting down...
[2021-04-19 09:55:40,069] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=50) at offset 7357
[2021-04-19 09:55:40,069] [6] [DEBUG] [^----Agent*: __main__.processor]: -Stopped!
[2021-04-19 09:55:40,069] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=59) at offset 7231
[2021-04-19 09:55:40,070] [6] [DEBUG] [^----CrashingSupervisor: (1@0x7f61d4103af0)]: Shutting down...
[2021-04-19 09:55:40,070] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=4) at offset 7140
[2021-04-19 09:55:40,070] [6] [DEBUG] [^----CrashingSupervisor: (1@0x7f61d4103af0)]: -Stopped!
[2021-04-19 09:55:40,070] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=68) at offset 7210
[2021-04-19 09:55:40,071] [6] [DEBUG] [^---Agent: __main__.processor]: Shutting down...
[2021-04-19 09:55:40,071] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=34) at offset 7082
[2021-04-19 09:55:40,071] [6] [DEBUG] [^---Agent: __main__.processor]: -Stopped!
[2021-04-19 09:55:40,071] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=98) at offset 7109
[2021-04-19 09:55:40,071] [6] [DEBUG] [^--AgentManager]: Shutting down...
[2021-04-19 09:55:40,072] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=43) at offset 9835
[2021-04-19 09:55:40,072] [6] [DEBUG] [^--AgentManager]: -Stopped!
[2021-04-19 09:55:40,072] [6] [INFO] [^--ReplyConsumer]: Stopping...
[2021-04-19 09:55:40,072] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=61) at offset 7249
[2021-04-19 09:55:40,072] [6] [DEBUG] [^--ReplyConsumer]: Shutting down...
[2021-04-19 09:55:40,072] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=6) at offset 7441
[2021-04-19 09:55:40,073] [6] [DEBUG] [^--ReplyConsumer]: -Stopped!
[2021-04-19 09:55:40,073] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=70) at offset 7393
[2021-04-19 09:55:40,073] [6] [INFO] [^--LeaderAssignor]: Stopping...
[2021-04-19 09:55:40,073] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=27) at offset 7545
[2021-04-19 09:55:40,073] [6] [DEBUG] [^--LeaderAssignor]: Shutting down...
[2021-04-19 09:55:40,073] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=91) at offset 7411
[2021-04-19 09:55:40,074] [6] [DEBUG] [^--LeaderAssignor]: -Stopped!
[2021-04-19 09:55:40,074] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=36) at offset 7382
[2021-04-19 09:55:40,074] [6] [INFO] [^--Consumer]: Stopping...
[2021-04-19 09:55:40,074] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=45) at offset 7269
[2021-04-19 09:55:40,074] [6] [INFO] [^---TransactionManager]: Stopping...
[2021-04-19 09:55:40,075] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=63) at offset 7191
[2021-04-19 09:55:40,075] [6] [DEBUG] [^---TransactionManager]: Shutting down...
[2021-04-19 09:55:40,075] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=20) at offset 7277
[2021-04-19 09:55:40,075] [6] [DEBUG] [^---TransactionManager]: -Stopped!
[2021-04-19 09:55:40,075] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=29) at offset 7332
[2021-04-19 09:55:40,076] [6] [INFO] [^---AIOKafkaConsumerThread]: Stopping...
[2021-04-19 09:55:40,076] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=93) at offset 7355
[2021-04-19 09:55:40,076] [6] [DEBUG] [^---AIOKafkaConsumerThread]: Shutting down...
[2021-04-19 09:55:40,076] [6] [DEBUG] Adding fetch request for partition TopicPartition(topic='first-topic', partition=13) at offset 7377
[2021-04-19 09:55:40,076] [6] [DEBUG] [^---AIOKafkaConsumerThread]: Waiting for shutdown
[2021-04-19 09:55:40,077] [6] [DEBUG] <AIOKafkaConnection host=domain-kafka3 port=9092> Response 239: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='test-faust-config-5-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=[], message_set=b'')])])
[2021-04-19 09:55:40,077] [6] [DEBUG] Heartbeat: test-faust-config-5[91] faust-0.6.3-4608f4ca-ec75-45b1-bc9b-9c6bb188c3d8
[2021-04-19 09:55:40,078] [6] [DEBUG] <AIOKafkaConnection host=domain-kafka4 port=9092> Request 299: HeartbeatRequest_v1(group='test-faust-config-5', generation_id=91, member_id='faust-0.6.3-4608f4ca-ec75-45b1-bc9b-9c6bb188c3d8')
[2021-04-19 09:55:40,078] [6] [DEBUG] <AIOKafkaConnection host=domain-kafka8 port=9092> Request 18: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=1, topics=[(topic='first-topic', partitions=[(partition=45, offset=7269, max_bytes=1048576), (partition=5, offset=7148, max_bytes=1048576), (partition=25, offset=9626, max_bytes=1048576), (partition=75, offset=7267, max_bytes=1048576), (partition=55, offset=7317, max_bytes=1048576)])])
[2021-04-19 09:55:40,078] [6] [DEBUG] <AIOKafkaConnection host=domain-kafka1.domain-kafkaheadless.hubble-infrastructure port=9092> Request 17: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=1, topics=[(topic='first-topic', partitions=[(partition=9, offset=7419, max_bytes=1048576), (partition=59, offset=7231, max_bytes=1048576), (partition=39, offset=7171, max_bytes=1048576), (partition=29, offset=7332, max_bytes=1048576), (partition=69, offset=7222, max_bytes=1048576), (partition=89, offset=7070, max_bytes=1048576)])])
[2021-04-19 09:55:40,079] [6] [DEBUG] <AIOKafkaConnection host=domain-kafka5 port=9092> Request 27: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=1, topics=[(topic='first-topic', partitions=[(partition=4, offset=7140, max_bytes=1048576), (partition=34, offset=7082, max_bytes=1048576), (partition=14, offset=7236, max_bytes=1048576), (partition=64, offset=7281, max_bytes=1048576)])])
[2021-04-19 09:55:40,079] [6] [DEBUG] <AIOKafkaConnection host=domain-kafka4 port=9092> Request 23: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=1, topics=[(topic='first-topic', partitions=[(partition=18, offset=7265, max_bytes=1048576), (partition=78, offset=7308, max_bytes=1048576), (partition=98, offset=7109, max_bytes=1048576), (partition=48, offset=7221, max_bytes=1048576), (partition=68, offset=7210, max_bytes=1048576)])])
[2021-04-19 09:55:40,079] [6] [DEBUG] <AIOKafkaConnection host=domain-kafka0 port=9092> Request 18: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=1, topics=[(topic='first-topic', partitions=[(partition=73, offset=7353, max_bytes=1048576), (partition=53, offset=7183, max_bytes=1048576), (partition=63, offset=7191, max_bytes=1048576), (partition=43, offset=9835, max_bytes=1048576), (partition=13, offset=7377, max_bytes=1048576), (partition=93, offset=7355, max_bytes=1048576), (partition=23, offset=7303, max_bytes=1048576)])])
[2021-04-19 09:55:40,079] [6] [DEBUG] <AIOKafkaConnection host=domain-kafka9 port=9092> Request 19: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=1, topics=[(topic='first-topic', partitions=[(partition=27, offset=7545, max_bytes=1048576), (partition=87, offset=7189, max_bytes=1048576), (partition=57, offset=7202, max_bytes=1048576), (partition=7, offset=7230, max_bytes=1048576)])])
[2021-04-19 09:55:40,080] [6] [DEBUG] <AIOKafkaConnection host=domain-kafka2 port=9092> Request 26: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=1, topics=[(topic='first-topic', partitions=[(partition=6, offset=7441, max_bytes=1048576), (partition=96, offset=7348, max_bytes=1048576), (partition=46, offset=7276, max_bytes=1048576), (partition=36, offset=7382, max_bytes=1048576), (partition=16, offset=7362, max_bytes=1048576), (partition=66, offset=7224, max_bytes=1048576)])])
[2021-04-19 09:55:40,080] [6] [DEBUG] <AIOKafkaConnection host=domain-kafka7 port=9092> Request 15: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=1, topics=[(topic='first-topic', partitions=[(partition=61, offset=7249, max_bytes=1048576), (partition=11, offset=7375, max_bytes=1048576), (partition=71, offset=7336, max_bytes=1048576), (partition=41, offset=7407, max_bytes=1048576), (partition=91, offset=7411, max_bytes=1048576)])])
[2021-04-19 09:55:40,080] [6] [DEBUG] <AIOKafkaConnection host=domain-kafka6 port=9092> Request 25: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=1, topics=[(topic='first-topic', partitions=[(partition=50, offset=7357, max_bytes=1048576), (partition=70, offset=7393, max_bytes=1048576), (partition=20, offset=7277, max_bytes=1048576), (partition=0, offset=7315, max_bytes=1048576), (partition=80, offset=7306, max_bytes=1048576)])])
[2021-04-19 09:55:40,081] [6] [DEBUG] [^----MethodQueue@0x7f61d7b10340]: Stopping...
[2021-04-19 09:55:40,081] [6] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2021-04-19 09:55:40,081] [6] [DEBUG] <AIOKafkaConnection host=domain-kafka4 port=9092> Response 299: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
[2021-04-19 09:55:40,081] [6] [DEBUG] [^-----MethodQueueWorker@0x7f61d4103250 index=1]: Stopping...
[2021-04-19 09:55:40,083] [6] [DEBUG] <AIOKafkaConnection host=domain-kafka3 port=9092> Request 240: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=1, topics=[(topic='first-topic', partitions=[(partition=82, offset=7293, max_bytes=1048576), (partition=32, offset=7318, max_bytes=1048576), (partition=62, offset=7183, max_bytes=1048576), (partition=2, offset=7106, max_bytes=1048576), (partition=52, offset=7296, max_bytes=1048576)]), (topic='test-faust-config-5-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
[2021-04-19 09:55:40,083] [6] [DEBUG] Received successful heartbeat response for group test-faust-config-5
[2021-04-19 09:55:40,092] [6] [DEBUG] [^-----MethodQueueWorker@0x7f61d4103250 index=1]: -Stopped!
[2021-04-19 09:55:40,095] [6] [DEBUG] [^-----MethodQueueWorker@0x7f61d6cd0bb0 index=0]: Stopping...
[2021-04-19 09:55:40,095] [6] [DEBUG] [^-----MethodQueueWorker@0x7f61d6cd0bb0 index=0]: Shutting down...
[2021-04-19 09:55:40,101] [6] [DEBUG] [^-----MethodQueueWorker@0x7f61d6cd0bb0 index=0]: -Stopped!
[2021-04-19 09:55:40,102] [6] [DEBUG] [^----MethodQueue@0x7f61d7b10340]: Shutting down...
[2021-04-19 09:55:40,103] [6] [DEBUG] [^----MethodQueue@0x7f61d7b10340]: -Stopped!
[2021-04-19 09:55:40,103] [6] [DEBUG] Closing the KafkaConsumer.
[2021-04-19 09:55:40,104] [6] [DEBUG] Stopping heartbeat task
[2021-04-19 09:55:40,105] [6] [DEBUG] <AIOKafkaConnection host=domain-kafka4 port=9092> Request 300: LeaveGroupRequest_v1(group='test-faust-config-5', member_id='faust-0.6.3-4608f4ca-ec75-45b1-bc9b-9c6bb188c3d8')
[2021-04-19 09:55:40,111] [6] [DEBUG] <AIOKafkaConnection host=domain-kafka4 port=9092> Response 300: LeaveGroupResponse_v1(throttle_time_ms=0, error_code=0)
[2021-04-19 09:55:40,113] [6] [INFO] LeaveGroup request succeeded
[2021-04-19 09:55:40,157] [6] [DEBUG] The KafkaConsumer has closed.
[2021-04-19 09:55:40,158] [6] [DEBUG] [^---AIOKafkaConsumerThread]: Shutting down now
[2021-04-19 09:55:40,158] [6] [DEBUG] [^---AIOKafkaConsumerThread]: -Stopped!
[2021-04-19 09:55:40,158] [6] [DEBUG] [^---MethodQueue@0x7f61d7a63880]: Stopping...
[2021-04-19 09:55:40,158] [6] [DEBUG] [^----MethodQueueWorker@0x7f61d7a77880 index=1]: Stopping...
[2021-04-19 09:55:40,158] [6] [DEBUG] [^----MethodQueueWorker@0x7f61d7a77880 index=1]: Shutting down...
[2021-04-19 09:55:40,159] [6] [DEBUG] [^----MethodQueueWorker@0x7f61d7a77880 index=1]: -Stopped!
[2021-04-19 09:55:40,159] [6] [DEBUG] [^----MethodQueueWorker@0x7f61d7b105e0 index=0]: Stopping...
[2021-04-19 09:55:40,159] [6] [DEBUG] [^----MethodQueueWorker@0x7f61d7b105e0 index=0]: Shutting down...
[2021-04-19 09:55:40,159] [6] [DEBUG] [^----MethodQueueWorker@0x7f61d7b105e0 index=0]: -Stopped!
[2021-04-19 09:55:40,160] [6] [DEBUG] [^---MethodQueue@0x7f61d7a63880]: Shutting down...
[2021-04-19 09:55:40,160] [6] [DEBUG] [^---MethodQueue@0x7f61d7a63880]: -Stopped!
[2021-04-19 09:55:40,160] [6] [DEBUG] [^--Consumer]: Shutting down...
[2021-04-19 09:55:40,160] [6] [DEBUG] [^--Consumer]: -Stopped!
[2021-04-19 09:55:40,160] [6] [INFO] [^--Web]: Stopping...
[2021-04-19 09:55:40,160] [6] [INFO] [^---Server]: Stopping...
[2021-04-19 09:55:40,161] [6] [INFO] [^--Web]: Cleanup
[2021-04-19 09:55:40,161] [6] [DEBUG] [^---Server]: Shutting down...
[2021-04-19 09:55:40,161] [6] [DEBUG] [^---Server]: -Stopped!
[2021-04-19 09:55:40,161] [6] [DEBUG] [^--Web]: Shutting down...
[2021-04-19 09:55:40,161] [6] [DEBUG] [^--Web]: -Stopped!
[2021-04-19 09:55:40,161] [6] [INFO] [^--CacheBackend]: Stopping...
[2021-04-19 09:55:40,161] [6] [DEBUG] [^--CacheBackend]: Shutting down...
[2021-04-19 09:55:40,161] [6] [DEBUG] [^--CacheBackend]: -Stopped!
[2021-04-19 09:55:40,162] [6] [INFO] [^--Producer]: Stopping...
[2021-04-19 09:55:40,162] [6] [DEBUG] Closing connection at domain-kafka1:9092
[2021-04-19 09:55:40,162] [6] [DEBUG] Closing connection at domain-kafka2:9092
[2021-04-19 09:55:40,162] [6] [DEBUG] Closing connection at domain-kafka5:9092
[2021-04-19 09:55:40,162] [6] [DEBUG] Closing connection at domain-kafka3:9092
[2021-04-19 09:55:40,162] [6] [DEBUG] Closing connection at domain-kafka4:9092
[2021-04-19 09:55:40,163] [6] [DEBUG] Closing connection at domain-kafka6:9092
[2021-04-19 09:55:40,163] [6] [DEBUG] The Kafka producer has closed.
[2021-04-19 09:55:40,290] [6] [DEBUG] The Kafka producer has closed.
[2021-04-19 09:55:40,290] [6] [DEBUG] Closing connection at domain-kafka5:9092
[2021-04-19 09:55:40,290] [6] [DEBUG] Closing connection at domain-kafka1:9092
[2021-04-19 09:55:40,290] [6] [DEBUG] Closing connection at domain-kafka1:9092
[2021-04-19 09:55:40,291] [6] [DEBUG] The Kafka producer has closed.
[2021-04-19 09:55:41,107] [6] [INFO] [^---ProducerBuffer]: Stopping...
[2021-04-19 09:55:41,108] [6] [DEBUG] [^---ProducerBuffer]: Shutting down...
[2021-04-19 09:55:41,109] [6] [DEBUG] [^---ProducerBuffer]: -Stopped!
[2021-04-19 09:55:41,109] [6] [DEBUG] [^--Producer]: Shutting down...
[2021-04-19 09:55:41,110] [6] [DEBUG] [^--Producer]: -Stopped!
[2021-04-19 09:55:41,110] [6] [INFO] [^--Monitor]: Stopping...
[2021-04-19 09:55:41,110] [6] [DEBUG] [^--Monitor]: Shutting down...
[2021-04-19 09:55:41,110] [6] [DEBUG] [^--Monitor]: -Stopped!
[2021-04-19 09:55:41,110] [6] [DEBUG] [^-App]: Shutting down...
[2021-04-19 09:55:41,110] [6] [DEBUG] [^-App]: -Stopped!
[2021-04-19 09:55:41,110] [6] [DEBUG] [^Worker]: Shutting down...
[2021-04-19 09:55:41,111] [6] [DEBUG] [^Worker]: -Stopped!
[2021-04-19 09:55:41,111] [6] [INFO] [^Worker]: Gathering service tasks...
[2021-04-19 09:55:41,111] [6] [INFO] [^Worker]: Gathering all futures...
[2021-04-19 09:55:42,111] [6] [INFO] [^Worker]: Closing event loop
[2021-04-19 09:55:42,112] [6] [CRITICAL] [^Worker]: We experienced a crash! Reraising original exception...
Versions
- Python version: 3.8.9
- Faust version: 0.6.3
- Operating system: Debian GNU/Linux 10 (buster) --- Running inside a container within a kubernetes cluster
- Kafka version: 2.7
I meet this issue too!
Versions
- Python version: 3.8.8
- Faust version: 0.6.9
- Operating system: Debian GNU/Linux 9 (stretch) --- Running inside a container within a kubernetes cluster
- Kafka version: 2.4.1
@bluefatter I was doing some research and I figured out that this happens when they are many partitions available. When you decrees the number of partitions, it doesn't fail.
On the other hand, I am having problems with the exactly once semantic too. It always create duplicates when a pod fail or restart
@patkivikram have you ever used exactly once in production?