Feat: Fix thread contention during subgraph syncing
This PR implements a sharded trigger processor to solve thread contention issues when running large numbers (2500+) of continuously syncing subgraphs. The solution distributes subgraphs across multiple worker pools while maintaining 100% backward compatibility.
Problem
In master, trigger processing has no concurrency limits, which can lead to:
- Tokio thread exhaustion
- Unfair resource allocation between subgraphs
- No backpressure mechanism for overloaded systems
Solution
This PR introduces an opt-in sharded trigger processor that addresses these issues. The key changes are:
- Sharded Trigger Processor: The SubgraphTriggerProcessor now manages a configurable number of shards, each with its own thread pool, using a tokio::sync::Semaphore for concurrency control.
- Consistent Hashing: Subgraphs are consistently assigned to shards based on a hash of their deployment ID. This deployment hash-based routing ensures cache locality.
- Backpressure: A backpressure mechanism with exponential backoff prevents individual subgraphs from overwhelming the system by pausing processing for subgraphs with excessively long trigger queues.
- Configuration: The sharded processor can be configured via environment variables, allowing operators to tune the number of shards and workers.
- Metrics: The new implementation includes detailed, per-shard metrics for monitoring the health and performance of the trigger processor, including load-balancing tracking.
Performance Impact
Before (master):
- Dependent on the thread limit for Tokio set with MAX_BLOCKING_THREADS.
- Potential thread exhaustion with 2500+ subgraphs, requiring more CPU resources with vertical scaling and RAM.
- No backpressure mechanism.
After (Sharded Mode):
- With an example configuration of 32 shards, the system can support 1024 concurrent workers (32 shards × 32 workers per shard), with ~78 subgraphs per shard instead of all competing globally. For optimal performance, the number of shards should be close to the number of available CPU cores.
- Bounded memory usage with configurable limits.
- Fair scheduling across all subgraphs.
Backward Compatibility
- Default behavior is unchanged - sharding is disabled by default.
- Opt-in activation - only enabled with GRAPH_SUBGRAPH_RUNTIME_PROCESSING_SHARDS > 1.
- Zero breaking changes - all existing deployments continue working identically.
- Test compatibility - all tests pass with legacy mode.
Configuration
Legacy mode (default - no changes for existing indexers)
- Default behavior - no configuration needed
- GRAPH_SUBGRAPH_RUNTIME_PROCESSING_SHARDS=1 # or unset
Sharded mode (recommended for 2500+ subgraphs)
export GRAPH_SUBGRAPH_RUNTIME_PROCESSING_SHARDS=32 # Enable sharding (set to number of CPU cores)
export GRAPH_SUBGRAPH_RUNTIME_WORKERS_PER_SHARD=32 # Workers per shard
export GRAPH_SUBGRAPH_MAX_QUEUE_PER_SUBGRAPH=100 # Backpressure threshold
This change enables graph-node to efficiently handle larger volumes of deployments with thousands of continuously syncing subgraphs while preserving the existing behavior for smaller deployments.
I've stared at this for a bit and am still not quite sure what this is trying to achieve. There's no semaphore in our code that I can find that is being sharded - the PR just introduces a sharded semaphore.
I also don't understand the rationale in the intro - how is it getting from 32 workers to 1024 workers? In general, parallelism is constrained by the number of CPUs; tokio takes care of scheduling the much larger amount of work that is usually ready across those. This PR seems to introduce another layer of scheduling on top of tokio's. I am very confused by all this.
@lutter
I've stared at this for a bit and am still not quite sure what this is trying to achieve. There's no semaphore in our code that I can find that is being sharded - the PR just introduces a sharded semaphore.
I also don't understand the rationale in the intro - how is it getting from 32 workers to 1024 workers? In general, parallelism is constrained by the number of CPUs; tokio takes care of scheduling the much larger amount of work that is usually ready across those. This PR seems to introduce another layer of scheduling on top of tokio's. I am very confused by all this.
Thanks for reviewing! I realize I need to better explain the problem and solution.
The Current Bottleneck
The bottleneck isn't in the code you see in master branch - it's in my earlier commits on this branch. In the first 2 commits, I introduced a single global semaphore to limit concurrent trigger processing (this was my first solution attempted):
With 2500 continuously syncing subgraphs all competing for 32 permits from a single semaphore, it would create a regressive and massive contention - 98% of subgraphs were blocked waiting.
The Solution: Sharding
This PR replaces that single bottleneck semaphore with multiple sharded semaphores:
Each subgraph is consistently assigned to one shard, so instead of 2500 subgraphs competing for 32 permits or tokio threads, we have ~78 subgraphs per shard competing for 32 permits.
About CPU Constraints
You're right that we're ultimately constrained by CPUs. The semaphores aren't about creating more parallelism than we have cores - they're about:
- Preventing thread and possibly memory exhaustion: Without limits, 2500 subgraphs could queue up unbounded work
- Fair scheduling: Ensuring each subgraph gets processing time
- Backpressure: Slowing down subgraphs that are queuing too much work
Tokio still schedules these across available CPU cores.
About "Another Layer of Scheduling"
My earlier commit (0c820f563): Added a single global semaphore for trigger processing:
This PR: Shards that single bottleneck into multiple semaphores: // Distributed - ~78 subgraphs compete per shard instead of 2500 let shard_id = get_shard_for(subgraph); let _permit = self.semaphores[shard_id].acquire().await;
The semaphores do admission control, not scheduling:
- Tokio's job: Schedule ready tasks across CPU cores
- Semaphore's job: Limit how many tasks can be in-flight (prevent contention and memory exhaustion)
Without admission control, 2500 continuously syncing subgraphs would spawn unbounded Futures.
This PR doesn't introduce a new scheduling layer. It uses semaphores + sharded connection pools to reduce contention. I had earlier tried using a global semaphore approach but realised it wouldn't scale. It's more of a WIP and idea on how to better scale the workers/threads and make graph-node manage tokio threads better as the amount of subgraphs per machine grows. I have also updated the description to avoid the confusion with semaphores.
I am closing this for now; I don't think this is needed and with the async store changes we are already reducing the number of blocking threads we need, with some more reductions to come.