faust icon indicating copy to clipboard operation
faust copied to clipboard

Window expiration doesn't work with RockDB after recovery

Open novotl opened this issue 3 years ago • 2 comments

Checklist

  • [x] I have included information about relevant versions
  • [x] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Unfortunately I cannot post the entire pipeline, but at the core it's very simple. Just count number of some events in tumbling 10 minute window and keep the history for 7 days. The table is stored using rocksdb.

app = faust.App(    
    store='rocksdb://',
    ...
)

input_topic = app.topic('views_topic', value_type=Event)
views = (
    app.Table('views', default=int).tumbling(
        size=timedelta(minutes=10),
        expires=timedelta(days=7),
        key_index=True,
    )
    .relative_to_field(Event.ts)
)

...

@app.agent(input_topic)
async def process_stream(stream: faust.Stream[Data]) -> None:
    async for event in stream.events(): 
        views[event.value.event_id] += 1
        ...

Expected behavior

I would expect windows to expire after 7 days. So for each event, there should be at most 1008 nonexpired windows.

minutes_in_week: 60 * 24 * 7 = 10,080
10_minute_tumbling_windows_in_a_week: 10,080 / 10 = 1008

Actual behavior

Windows are not expiring. The rockdb keeps growing. I've checked the contents of the table with this snippet

from faust.stores.rocksdb import RocksDBOptions
from collections import Counter
import json

rocksdb_options = RocksDBOptions()

partition_path = "/rocksdb/pipeline/v1/tables/views-0.db"
db = rocksdb_options.open(partition_path, read_only=True)

it = db.iterkeys()
it.seek_to_first()

counter = Counter()

for key in it:
    key = json.loads(key.decode('utf-8'))
    event_id, (start_epoch, end_epoch) = key
    counter[event_id] += 1
    

print(counter.most_common(3))
[('x', 13711), ('y', 12541), ('z', 11822)]

and found out, that for some of them we have more than 10,000 active windows, which is more than the expected 1008.

Versions

  • Python version Python 3.9.7
  • Faust version faust-streaming==0.8.8
  • Operating system Debian GNU/Linux 11 (bullseye)
  • Kafka version
  • RocksDB version faust-streaming-rocksdb==0.8.0

novotl avatar Oct 13 '22 12:10 novotl

I think this is because the expiration logic

https://github.com/faust-streaming/faust/blob/63af9f58dfb5159bea6e5c8a0312bc41627d9538/faust/tables/base.py#L374-L390

depends on self._partition_timestamp_keys which is populated only in _maybe_set_key_ttl https://github.com/faust-streaming/faust/blob/63af9f58dfb5159bea6e5c8a0312bc41627d9538/faust/tables/base.py#L400

which is triggered only in on_key_set callback. https://github.com/faust-streaming/faust/blob/96ce8f0b2f213b296bab2b4c2c0feea37a412ed7/faust/tables/table.py#L75-L83

I suspect that self._partition_timestamp_keys is only kept in-memory. When workers restarts and loads state from rocksdb from persistent storage, these restored values will not re-trigger the on_key_set callback, thus won't be later expired. Because our expiration time is 7 days (bigger than average lifetime of our worker), this phenomenon happens quite a lot.

novotl avatar Oct 13 '22 13:10 novotl

I suspect that self._partition_timestamp_keys is only kept in-memory. When workers restarts and loads state from rocksdb from persistent storage, these restored values will not re-trigger the on_key_set callback, thus won't be later expired. Because our expiration time is 7 days (bigger than average lifetime of our worker), this phenomenon happens quite a lot.

Based on how the default value for table_cleanup_interval is 30 seconds, I guess the original developer(s) thought self._partition_timestamp_keys wouldn't need to persist on a storage medium. We could try to embed the partition timestamps in the RocksDB storage.

wbarnha avatar Oct 13 '22 19:10 wbarnha