graph-node icon indicating copy to clipboard operation
graph-node copied to clipboard

Feat: Fix thread contention during subgraph syncing

Open DaMandal0rian opened this issue 4 months ago • 2 comments

This PR implements a sharded trigger processor to solve thread contention issues when running large numbers (2500+) of continuously syncing subgraphs. The solution distributes subgraphs across multiple worker pools while maintaining 100% backward compatibility.

Problem

In master, trigger processing has no concurrency limits, which can lead to:

  • Tokio thread exhaustion
  • Unfair resource allocation between subgraphs
  • No backpressure mechanism for overloaded systems

Solution

This PR introduces an opt-in sharded trigger processor that addresses these issues. The key changes are:

  • Sharded Trigger Processor: The SubgraphTriggerProcessor now manages a configurable number of shards, each with its own thread pool, using a tokio::sync::Semaphore for concurrency control.
  • Consistent Hashing: Subgraphs are consistently assigned to shards based on a hash of their deployment ID. This deployment hash-based routing ensures cache locality.
  • Backpressure: A backpressure mechanism with exponential backoff prevents individual subgraphs from overwhelming the system by pausing processing for subgraphs with excessively long trigger queues.
  • Configuration: The sharded processor can be configured via environment variables, allowing operators to tune the number of shards and workers.
  • Metrics: The new implementation includes detailed, per-shard metrics for monitoring the health and performance of the trigger processor, including load-balancing tracking.

Performance Impact

Before (master):

  • Dependent on the thread limit for Tokio set with MAX_BLOCKING_THREADS.
  • Potential thread exhaustion with 2500+ subgraphs, requiring more CPU resources with vertical scaling and RAM.
  • No backpressure mechanism.

After (Sharded Mode):

  • With an example configuration of 32 shards, the system can support 1024 concurrent workers (32 shards × 32 workers per shard), with ~78 subgraphs per shard instead of all competing globally. For optimal performance, the number of shards should be close to the number of available CPU cores.
  • Bounded memory usage with configurable limits.
  • Fair scheduling across all subgraphs.

Backward Compatibility

  • Default behavior is unchanged - sharding is disabled by default.
  • Opt-in activation - only enabled with GRAPH_SUBGRAPH_RUNTIME_PROCESSING_SHARDS > 1.
  • Zero breaking changes - all existing deployments continue working identically.
  • Test compatibility - all tests pass with legacy mode.

Configuration

Legacy mode (default - no changes for existing indexers)

  • Default behavior - no configuration needed
  • GRAPH_SUBGRAPH_RUNTIME_PROCESSING_SHARDS=1 # or unset

Sharded mode (recommended for 2500+ subgraphs)

   export GRAPH_SUBGRAPH_RUNTIME_PROCESSING_SHARDS=32  # Enable sharding (set to number of CPU cores)
   export GRAPH_SUBGRAPH_RUNTIME_WORKERS_PER_SHARD=32  # Workers per shard
   export GRAPH_SUBGRAPH_MAX_QUEUE_PER_SUBGRAPH=100    # Backpressure threshold

This change enables graph-node to efficiently handle larger volumes of deployments with thousands of continuously syncing subgraphs while preserving the existing behavior for smaller deployments.

DaMandal0rian avatar Sep 14 '25 12:09 DaMandal0rian

I've stared at this for a bit and am still not quite sure what this is trying to achieve. There's no semaphore in our code that I can find that is being sharded - the PR just introduces a sharded semaphore.

I also don't understand the rationale in the intro - how is it getting from 32 workers to 1024 workers? In general, parallelism is constrained by the number of CPUs; tokio takes care of scheduling the much larger amount of work that is usually ready across those. This PR seems to introduce another layer of scheduling on top of tokio's. I am very confused by all this.

lutter avatar Sep 15 '25 18:09 lutter

@lutter

I've stared at this for a bit and am still not quite sure what this is trying to achieve. There's no semaphore in our code that I can find that is being sharded - the PR just introduces a sharded semaphore.

I also don't understand the rationale in the intro - how is it getting from 32 workers to 1024 workers? In general, parallelism is constrained by the number of CPUs; tokio takes care of scheduling the much larger amount of work that is usually ready across those. This PR seems to introduce another layer of scheduling on top of tokio's. I am very confused by all this.

Thanks for reviewing! I realize I need to better explain the problem and solution.

The Current Bottleneck

The bottleneck isn't in the code you see in master branch - it's in my earlier commits on this branch. In the first 2 commits, I introduced a single global semaphore to limit concurrent trigger processing (this was my first solution attempted):

With 2500 continuously syncing subgraphs all competing for 32 permits from a single semaphore, it would create a regressive and massive contention - 98% of subgraphs were blocked waiting.

The Solution: Sharding

This PR replaces that single bottleneck semaphore with multiple sharded semaphores:

Each subgraph is consistently assigned to one shard, so instead of 2500 subgraphs competing for 32 permits or tokio threads, we have ~78 subgraphs per shard competing for 32 permits.

About CPU Constraints

You're right that we're ultimately constrained by CPUs. The semaphores aren't about creating more parallelism than we have cores - they're about:

  1. Preventing thread and possibly memory exhaustion: Without limits, 2500 subgraphs could queue up unbounded work
  2. Fair scheduling: Ensuring each subgraph gets processing time
  3. Backpressure: Slowing down subgraphs that are queuing too much work

Tokio still schedules these across available CPU cores.


About "Another Layer of Scheduling"

My earlier commit (0c820f563): Added a single global semaphore for trigger processing:

This PR: Shards that single bottleneck into multiple semaphores: // Distributed - ~78 subgraphs compete per shard instead of 2500 let shard_id = get_shard_for(subgraph); let _permit = self.semaphores[shard_id].acquire().await;

The semaphores do admission control, not scheduling:

  • Tokio's job: Schedule ready tasks across CPU cores
  • Semaphore's job: Limit how many tasks can be in-flight (prevent contention and memory exhaustion)

Without admission control, 2500 continuously syncing subgraphs would spawn unbounded Futures.

This PR doesn't introduce a new scheduling layer. It uses semaphores + sharded connection pools to reduce contention. I had earlier tried using a global semaphore approach but realised it wouldn't scale. It's more of a WIP and idea on how to better scale the workers/threads and make graph-node manage tokio threads better as the amount of subgraphs per machine grows. I have also updated the description to avoid the confusion with semaphores.

DaMandal0rian avatar Sep 15 '25 20:09 DaMandal0rian

I am closing this for now; I don't think this is needed and with the async store changes we are already reducing the number of blocking threads we need, with some more reductions to come.

lutter avatar Dec 17 '25 23:12 lutter