optimize performance of ingest path
Fixes #452
Description
Blocking I/O moved to the tokio spawn_blocking so it would not block the worker thread and move away the mem write from write path
To improve performance of ingest flow
May be other possible solutions can be various we can discuss like may be introduce a different lock for memtable and disk so we would have minimum contention, or introduce a channel so rather than writting directly to disk first push to channel and consumer would consume and write it to disk. But here in this change we just move both write to memtable and disk to spawn_blocking so worker thread would not be blocked, and take memtable write from core ingest path. Note - for now not changed anything in memtable concats, need explore that more
Major change is in src/parseable/streams.rs all other changes are just to support it, now disk write and memtable write are moved to tokio::spawn_blocking and we are waiting for write on memtable anymore.
This PR has:
- [ ] been tested to ensure log ingestion and log query works.
- [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
- [ ] added documentation for new or modified features or behaviors.
Summary by CodeRabbit
-
Refactor
- Event processing and stream push flows converted to async so processing yields correctly; disk persistence is awaited for durability while in-memory updates run in background tasks.
- Internal call sites updated to await new async flows.
-
Tests
- I/O and push-related tests updated to match async behavior.
-
Chores
- Public method signatures changed to async—callers may need to update call sites to await these methods.
✏️ Tip: You can customize this high-level summary in your review settings.
Walkthrough
Converts several synchronous processing and push paths to async: Event::process/process_unchecked become async; Stream::push becomes async with an Arc<Self> receiver; multiple call sites updated to .await?; disk writes moved to blocking tasks and memtable pushes run asynchronously.
Changes
| Cohort / File(s) | Summary |
|---|---|
Event processing methods src/event/mod.rs |
pub fn process(self) → pub async fn process(self) and pub fn process_unchecked(&self) → pub async fn process_unchecked(&self); call sites updated to await processing and push behavior preserved. |
Stream storage layer src/parseable/streams.rs |
pub fn push(...) → pub async fn push(self: &Arc<Self>, ...); disk writes dispatched to blocking thread pool and awaited; memtable pushes run fire-and-forget on blocking pool; tests adjusted to async runtime. |
HTTP handlers / ingest utils src/handlers/http/ingest.rs, src/handlers/http/modal/utils/ingest_utils.rs |
Internal calls updated from .process()? / .process_unchecked()? to .process().await? / .process_unchecked().await?. |
Connectors & storage call sites src/connectors/kafka/processor.rs, src/storage/field_stats.rs |
Replaced synchronous ...process()? chains with ...process().await?, adding awaits into async control flow. |
Sequence Diagram(s)
sequenceDiagram
autonumber
participant Client
participant Handler
participant Event
participant Stream
participant BlockPool as BlockingPool
participant Disk
Client->>Handler: ingest request
Handler->>Event: Event::process().await?
Event->>Stream: Stream::push(Arc<Self>, ...).await
Stream->>BlockPool: spawn_blocking -> disk write (await)
BlockPool->>Disk: write data
Disk-->>BlockPool: write result
BlockPool-->>Stream: disk write complete
Stream->>BlockPool: spawn_blocking -> memtable push (fire-and-forget)
Note right of BlockPool: memtable push continues in background (no await)
Stream-->>Event: push/durable result
Event-->>Handler: processing complete
Handler-->>Client: response
Estimated code review effort
🎯 4 (Complex) | ⏱️ ~60 minutes
- Pay extra attention to:
-
src/parseable/streams.rs: receiver changed toArc<Self>, spawn_blocking usage, DiskWriter lifecycle, lock poisoning handling, and error propagation. -
src/event/mod.rsand all call sites: API signature changes toasync—ensure no remaining sync callers and that ordering/durability expectations are preserved. - Tests updated for async runtime: verify determinism and proper awaiting of background tasks.
-
Possibly related PRs
- parseablehq/parseable#1180 — touches
Event::processand related call sites; likely directly related to coordinating async/sync changes.
Suggested labels
for next release
Suggested reviewers
- de-sh
- nikhilsinhaparseable
- parmesant
Poem
🐇 I hopped through bytes with eager paws,
Awaited writes and mended flaws.
Disk hummed low in blocking light,
Memtable danced off into night,
A rabbit cheers the async cause!
Pre-merge checks and finishing touches
❌ Failed checks (1 warning)
| Check name | Status | Explanation | Resolution |
|---|---|---|---|
| Docstring Coverage | ⚠️ Warning | Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. | You can run @coderabbitai generate docstrings to improve docstring coverage. |
✅ Passed checks (4 passed)
| Check name | Status | Explanation |
|---|---|---|
| Title check | ✅ Passed | The title 'optimize performance of ingest path' is concise and directly aligns with the main objective of moving blocking I/O to tokio::spawn_blocking to improve ingest flow performance. |
| Description check | ✅ Passed | The PR description identifies the core changes (issue #452, spawn_blocking usage) and acknowledges the ingest path is the focus. However, the checklist items for testing, comments, and documentation remain unchecked. |
| Linked Issues check | ✅ Passed | The PR addresses issue #452 requirements: disk.push is now async, memtable.push is deferred via spawn_blocking, and the ingest path no longer waits for memtable writes. Memtable concat behavior remains unchanged as noted. |
| Out of Scope Changes check | ✅ Passed | All changes are scoped to supporting the spawn_blocking refactoring: Stream::push conversion to async, updates to call sites in processors and handlers, and test updates. No unrelated modifications detected. |
✨ Finishing touches
- [ ] 📝 Generate docstrings
🧪 Generate unit tests (beta)
- [ ] Create PR with unit tests
- [ ] Post copyable unit tests in a comment
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.
Comment @coderabbitai help to get the list of available commands and usage tips.
CLA Assistant Lite bot All contributors have signed the CLA ✍️ ✅
I have read the CLA Document and I hereby sign the CLA
@ankitsheoran1 have you run any sort of tests to verify if the ingestion is faster with this change?
@ankitsheoran1 have you run any sort of tests to verify if the ingestion is faster with this change?
I tried to insert 1M records with each of batch size 1000 with each record size around 600 bytes on my local machine(storage also as local) with specs (M4 24GB 14 Core) and without changes for multiple iteration I see I able to achieve 85k rps, and with this changes I able to touch around 86k - 86.5k rps, not much improvement . Obviously this is not a good way to analyze performance.
Few more findings - I added multiple metrics and find out that that major bottleneck is JSON event creation(for one batch it takes around 11 ms and around 9 ms is for this process only) I am working on it, Also found few more optimisation we can do in ingest flow
Do we have some optimization testing flow ?
@ankitsheoran1 have you run any sort of tests to verify if the ingestion is faster with this change?
I tried to insert 1M records with each of batch size 1000 with each record size around 600 bytes on my local machine(storage also as local) with specs (M4 24GB 14 Core) and without changes for multiple iteration I see I able to achieve 85k rps, and with this changes I able to touch around 86k - 86.5k rps, not much improvement . Obviously this is not a good way to analyze performance.
Few more findings - I added multiple metrics and find out that that major bottleneck is JSON event creation(for one batch it takes around 11 ms and around 9 ms is for this process only) I am working on it, Also found few more optimisation we can do in ingest flow
Do we have some optimization testing flow ?
Thanks for the updates. Happy to chat more on slack if you can join https://logg.ing/community. Right now there is no automated ingestion flow, but we test on real world data ingestion scenarios every few weeks on our internal infrastructure.