parseable icon indicating copy to clipboard operation
parseable copied to clipboard

optimize performance of ingest path

Open ankitsheoran1 opened this issue 2 months ago • 6 comments

Fixes #452

Description

Blocking I/O moved to the tokio spawn_blocking so it would not block the worker thread and move away the mem write from write path

To improve performance of ingest flow

May be other possible solutions can be various we can discuss like may be introduce a different lock for memtable and disk so we would have minimum contention, or introduce a channel so rather than writting directly to disk first push to channel and consumer would consume and write it to disk. But here in this change we just move both write to memtable and disk to spawn_blocking so worker thread would not be blocked, and take memtable write from core ingest path. Note - for now not changed anything in memtable concats, need explore that more

Major change is in src/parseable/streams.rs all other changes are just to support it, now disk write and memtable write are moved to tokio::spawn_blocking and we are waiting for write on memtable anymore.


This PR has:

  • [ ] been tested to ensure log ingestion and log query works.
  • [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • [ ] added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • Refactor

    • Event processing and stream push flows converted to async so processing yields correctly; disk persistence is awaited for durability while in-memory updates run in background tasks.
    • Internal call sites updated to await new async flows.
  • Tests

    • I/O and push-related tests updated to match async behavior.
  • Chores

    • Public method signatures changed to async—callers may need to update call sites to await these methods.

✏️ Tip: You can customize this high-level summary in your review settings.

ankitsheoran1 avatar Nov 23 '25 04:11 ankitsheoran1

Walkthrough

Converts several synchronous processing and push paths to async: Event::process/process_unchecked become async; Stream::push becomes async with an Arc<Self> receiver; multiple call sites updated to .await?; disk writes moved to blocking tasks and memtable pushes run asynchronously.

Changes

Cohort / File(s) Summary
Event processing methods
src/event/mod.rs
pub fn process(self)pub async fn process(self) and pub fn process_unchecked(&self)pub async fn process_unchecked(&self); call sites updated to await processing and push behavior preserved.
Stream storage layer
src/parseable/streams.rs
pub fn push(...)pub async fn push(self: &Arc<Self>, ...); disk writes dispatched to blocking thread pool and awaited; memtable pushes run fire-and-forget on blocking pool; tests adjusted to async runtime.
HTTP handlers / ingest utils
src/handlers/http/ingest.rs, src/handlers/http/modal/utils/ingest_utils.rs
Internal calls updated from .process()? / .process_unchecked()? to .process().await? / .process_unchecked().await?.
Connectors & storage call sites
src/connectors/kafka/processor.rs, src/storage/field_stats.rs
Replaced synchronous ...process()? chains with ...process().await?, adding awaits into async control flow.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Client
    participant Handler
    participant Event
    participant Stream
    participant BlockPool as BlockingPool
    participant Disk

    Client->>Handler: ingest request
    Handler->>Event: Event::process().await?
    Event->>Stream: Stream::push(Arc<Self>, ...).await
    Stream->>BlockPool: spawn_blocking -> disk write (await)
    BlockPool->>Disk: write data
    Disk-->>BlockPool: write result
    BlockPool-->>Stream: disk write complete
    Stream->>BlockPool: spawn_blocking -> memtable push (fire-and-forget)
    Note right of BlockPool: memtable push continues in background (no await)
    Stream-->>Event: push/durable result
    Event-->>Handler: processing complete
    Handler-->>Client: response

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • Pay extra attention to:
    • src/parseable/streams.rs: receiver changed to Arc<Self>, spawn_blocking usage, DiskWriter lifecycle, lock poisoning handling, and error propagation.
    • src/event/mod.rs and all call sites: API signature changes to async—ensure no remaining sync callers and that ordering/durability expectations are preserved.
    • Tests updated for async runtime: verify determinism and proper awaiting of background tasks.

Possibly related PRs

  • parseablehq/parseable#1180 — touches Event::process and related call sites; likely directly related to coordinating async/sync changes.

Suggested labels

for next release

Suggested reviewers

  • de-sh
  • nikhilsinhaparseable
  • parmesant

Poem

🐇 I hopped through bytes with eager paws,
Awaited writes and mended flaws.
Disk hummed low in blocking light,
Memtable danced off into night,
A rabbit cheers the async cause!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'optimize performance of ingest path' is concise and directly aligns with the main objective of moving blocking I/O to tokio::spawn_blocking to improve ingest flow performance.
Description check ✅ Passed The PR description identifies the core changes (issue #452, spawn_blocking usage) and acknowledges the ingest path is the focus. However, the checklist items for testing, comments, and documentation remain unchecked.
Linked Issues check ✅ Passed The PR addresses issue #452 requirements: disk.push is now async, memtable.push is deferred via spawn_blocking, and the ingest path no longer waits for memtable writes. Memtable concat behavior remains unchanged as noted.
Out of Scope Changes check ✅ Passed All changes are scoped to supporting the spawn_blocking refactoring: Stream::push conversion to async, updates to call sites in processors and handlers, and test updates. No unrelated modifications detected.
✨ Finishing touches
  • [ ] 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • [ ] Create PR with unit tests
  • [ ] Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot] avatar Nov 23 '25 04:11 coderabbitai[bot]

CLA Assistant Lite bot All contributors have signed the CLA ✍️ ✅

github-actions[bot] avatar Nov 23 '25 04:11 github-actions[bot]

I have read the CLA Document and I hereby sign the CLA

ankitsheoran1 avatar Nov 23 '25 12:11 ankitsheoran1

@ankitsheoran1 have you run any sort of tests to verify if the ingestion is faster with this change?

nitisht avatar Nov 25 '25 20:11 nitisht

@ankitsheoran1 have you run any sort of tests to verify if the ingestion is faster with this change?

I tried to insert 1M records with each of batch size 1000 with each record size around 600 bytes on my local machine(storage also as local) with specs (M4 24GB 14 Core) and without changes for multiple iteration I see I able to achieve 85k rps, and with this changes I able to touch around 86k - 86.5k rps, not much improvement . Obviously this is not a good way to analyze performance.

Few more findings - I added multiple metrics and find out that that major bottleneck is JSON event creation(for one batch it takes around 11 ms and around 9 ms is for this process only) I am working on it, Also found few more optimisation we can do in ingest flow

Do we have some optimization testing flow ?

ankitsheoran1 avatar Nov 28 '25 04:11 ankitsheoran1

@ankitsheoran1 have you run any sort of tests to verify if the ingestion is faster with this change?

I tried to insert 1M records with each of batch size 1000 with each record size around 600 bytes on my local machine(storage also as local) with specs (M4 24GB 14 Core) and without changes for multiple iteration I see I able to achieve 85k rps, and with this changes I able to touch around 86k - 86.5k rps, not much improvement . Obviously this is not a good way to analyze performance.

Few more findings - I added multiple metrics and find out that that major bottleneck is JSON event creation(for one batch it takes around 11 ms and around 9 ms is for this process only) I am working on it, Also found few more optimisation we can do in ingest flow

Do we have some optimization testing flow ?

Thanks for the updates. Happy to chat more on slack if you can join https://logg.ing/community. Right now there is no automated ingestion flow, but we test on real world data ingestion scenarios every few weeks on our internal infrastructure.

nitisht avatar Nov 28 '25 18:11 nitisht