faust
faust copied to clipboard
Faust *sometimes* crashes during rebalance when AeroSpike is used as storage engine
Steps to reproduce
There probably is a race condition during rebalancing when AeroSpike is used. If unlucky, a TypeError is encountered.
Expected behavior
No crashes on rebalance.
Actual behavior
During metrics collection float gets None instead of number. I'm attaching log output.
Full traceback
I [dev-v0-wd] aiokafka.consumer.group_coordinator: Setting newly assigned partitions {TopicPartition(topic='dev-v0-wd-recalculate_account_positions', partition=55), TopicPartition(topic='dev-v0-wd-accounts_patches', partition=55), TopicPartition(topic='dev-v0-wd-accounts_complete_data', partition=55), TopicPartition(topic='dev-v0-wd-in_positions', partition=55), TopicPartition(topic='dev-v0-wd-delay_tasks', partition=55), TopicPartition(topic='dev-v0-wd-account_positions-changelog', partition=55), TopicPartition(topic='dev-v0-wd-accounts-changelog', partition=55), TopicPartition(topic='account_updates_watchdog', partition=55), TopicPartition(topic='dev-v0-wd-account_recovery', partition=55), TopicPartition(topic='dev-v0-wd-account_realized_profits_updates', partition=55), TopicPartition(topic='dev-v0-wd-in_accounts', partition=55)} for group dev-v0-wd
I [dev-v0-wd] faust.app.base: Executing _on_partitions_assigned
I [dev-v0-wd] faust.tables.recovery: generation id 9071 app consumers id 9071
I [dev-v0-wd] faust.tables.recovery: [^---Recovery]: Resuming flow...
I [dev-v0-wd] faust.tables.recovery: [^---Recovery]: Seek stream partitions to committed offsets.
E [dev-v0-wd] faust.transport.drivers.aiokafka: [^--Consumer]: Drain messages raised: TypeError("float() argument must be a string or a number, not 'NoneType'")
Traceback (most recent call last):
File "/opt/app/venv/xxx/lib/python3.9/site-packages/faust/transport/consumer.py", line 1104, in _drain_messages
async for tp, message in ait:
File "/opt/app/venv/xxx/lib/python3.9/site-packages/faust/transport/consumer.py", line 715, in getmany
self.app.monitor.track_tp_end_offset(tp, highwater_mark)
File "/opt/app/venv/xxx/lib/python3.9/site-packages/faust/sensors/prometheus.py", line 496, in track_tp_end_offset
self._metrics.topic_partition_end_offset.labels(
File "/opt/app/venv/xxx/lib/python3.9/site-packages/prometheus_client/metrics.py", line 364, in set
self._value.set(float(value))
TypeError: float() argument must be a string or a number, not 'NoneType'
Versions
Python version - 3.9.7 Faust version - 0.6.10 Operating system - macOS/Linux - applies to both Kafka version - 3.0.0 AeroSpike version - ce-5.7.0.7
I dont think this is related to aerospike and can happen even with no state stores based on the stack