faust icon indicating copy to clipboard operation
faust copied to clipboard

Faust *sometimes* crashes during rebalance when AeroSpike is used as storage engine

Open xaralis opened this issue 4 years ago • 1 comments

Steps to reproduce

There probably is a race condition during rebalancing when AeroSpike is used. If unlucky, a TypeError is encountered.

Expected behavior

No crashes on rebalance.

Actual behavior

During metrics collection float gets None instead of number. I'm attaching log output.

Full traceback

I [dev-v0-wd] aiokafka.consumer.group_coordinator: Setting newly assigned partitions {TopicPartition(topic='dev-v0-wd-recalculate_account_positions', partition=55), TopicPartition(topic='dev-v0-wd-accounts_patches', partition=55), TopicPartition(topic='dev-v0-wd-accounts_complete_data', partition=55), TopicPartition(topic='dev-v0-wd-in_positions', partition=55), TopicPartition(topic='dev-v0-wd-delay_tasks', partition=55), TopicPartition(topic='dev-v0-wd-account_positions-changelog', partition=55), TopicPartition(topic='dev-v0-wd-accounts-changelog', partition=55), TopicPartition(topic='account_updates_watchdog', partition=55), TopicPartition(topic='dev-v0-wd-account_recovery', partition=55), TopicPartition(topic='dev-v0-wd-account_realized_profits_updates', partition=55), TopicPartition(topic='dev-v0-wd-in_accounts', partition=55)} for group dev-v0-wd
I [dev-v0-wd] faust.app.base: Executing _on_partitions_assigned
I [dev-v0-wd] faust.tables.recovery: generation id 9071 app consumers id 9071
I [dev-v0-wd] faust.tables.recovery: [^---Recovery]: Resuming flow...
I [dev-v0-wd] faust.tables.recovery: [^---Recovery]: Seek stream partitions to committed offsets.
E [dev-v0-wd] faust.transport.drivers.aiokafka: [^--Consumer]: Drain messages raised: TypeError("float() argument must be a string or a number, not 'NoneType'")
Traceback (most recent call last):
  File "/opt/app/venv/xxx/lib/python3.9/site-packages/faust/transport/consumer.py", line 1104, in _drain_messages
    async for tp, message in ait:
  File "/opt/app/venv/xxx/lib/python3.9/site-packages/faust/transport/consumer.py", line 715, in getmany
    self.app.monitor.track_tp_end_offset(tp, highwater_mark)
  File "/opt/app/venv/xxx/lib/python3.9/site-packages/faust/sensors/prometheus.py", line 496, in track_tp_end_offset
    self._metrics.topic_partition_end_offset.labels(
  File "/opt/app/venv/xxx/lib/python3.9/site-packages/prometheus_client/metrics.py", line 364, in set
    self._value.set(float(value))
TypeError: float() argument must be a string or a number, not 'NoneType'

Versions

Python version - 3.9.7 Faust version - 0.6.10 Operating system - macOS/Linux - applies to both Kafka version - 3.0.0 AeroSpike version - ce-5.7.0.7

xaralis avatar Nov 03 '21 13:11 xaralis

I dont think this is related to aerospike and can happen even with no state stores based on the stack

patkivikram avatar Nov 04 '21 14:11 patkivikram