Window expiration doesn't work with RockDB after recovery
Checklist
- [x] I have included information about relevant versions
- [x] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
Unfortunately I cannot post the entire pipeline, but at the core it's very simple. Just count number of some events in tumbling 10 minute window and keep the history for 7 days. The table is stored using rocksdb.
app = faust.App(
store='rocksdb://',
...
)
input_topic = app.topic('views_topic', value_type=Event)
views = (
app.Table('views', default=int).tumbling(
size=timedelta(minutes=10),
expires=timedelta(days=7),
key_index=True,
)
.relative_to_field(Event.ts)
)
...
@app.agent(input_topic)
async def process_stream(stream: faust.Stream[Data]) -> None:
async for event in stream.events():
views[event.value.event_id] += 1
...
Expected behavior
I would expect windows to expire after 7 days. So for each event, there should be at most 1008 nonexpired windows.
minutes_in_week: 60 * 24 * 7 = 10,080
10_minute_tumbling_windows_in_a_week: 10,080 / 10 = 1008
Actual behavior
Windows are not expiring. The rockdb keeps growing. I've checked the contents of the table with this snippet
from faust.stores.rocksdb import RocksDBOptions
from collections import Counter
import json
rocksdb_options = RocksDBOptions()
partition_path = "/rocksdb/pipeline/v1/tables/views-0.db"
db = rocksdb_options.open(partition_path, read_only=True)
it = db.iterkeys()
it.seek_to_first()
counter = Counter()
for key in it:
key = json.loads(key.decode('utf-8'))
event_id, (start_epoch, end_epoch) = key
counter[event_id] += 1
print(counter.most_common(3))
[('x', 13711), ('y', 12541), ('z', 11822)]
and found out, that for some of them we have more than 10,000 active windows, which is more than the expected 1008.
Versions
- Python version
Python 3.9.7 - Faust version
faust-streaming==0.8.8 - Operating system
Debian GNU/Linux 11 (bullseye) - Kafka version
- RocksDB version
faust-streaming-rocksdb==0.8.0
I think this is because the expiration logic
https://github.com/faust-streaming/faust/blob/63af9f58dfb5159bea6e5c8a0312bc41627d9538/faust/tables/base.py#L374-L390
depends on self._partition_timestamp_keys which is populated only in _maybe_set_key_ttl
https://github.com/faust-streaming/faust/blob/63af9f58dfb5159bea6e5c8a0312bc41627d9538/faust/tables/base.py#L400
which is triggered only in on_key_set callback.
https://github.com/faust-streaming/faust/blob/96ce8f0b2f213b296bab2b4c2c0feea37a412ed7/faust/tables/table.py#L75-L83
I suspect that self._partition_timestamp_keys is only kept in-memory. When workers restarts and loads state from rocksdb from persistent storage, these restored values will not re-trigger the on_key_set callback, thus won't be later expired. Because our expiration time is 7 days (bigger than average lifetime of our worker), this phenomenon happens quite a lot.
I suspect that self._partition_timestamp_keys is only kept in-memory. When workers restarts and loads state from rocksdb from persistent storage, these restored values will not re-trigger the on_key_set callback, thus won't be later expired. Because our expiration time is 7 days (bigger than average lifetime of our worker), this phenomenon happens quite a lot.
Based on how the default value for table_cleanup_interval is 30 seconds, I guess the original developer(s) thought self._partition_timestamp_keys wouldn't need to persist on a storage medium. We could try to embed the partition timestamps in the RocksDB storage.