feat: add client disconnect handling to stop generation
When a client disconnects (e.g., user clicks stop in SillyTavern), the generation now stops instead of continuing to run.
Flow:
- API catches CancelledError and sends TaskCancelled command
- Master creates TaskCancellationRequested event
- Worker receives event and sends CancelGeneration task to runner
- Runner polls for cancellation during generation loop
- Generation breaks early when cancelled
New types:
- TaskCancelled command
- TaskCancellationRequested event
- CancelGeneration task
Modified files:
- master/api.py: Send TaskCancelled on disconnect
- master/main.py: Handle TaskCancelled command
- shared/apply.py: Pass through TaskCancellationRequested
- worker/main.py: Handle cancellation events
- worker/runner/runner.py: Poll for cancellation during generation
- worker/runner/runner_supervisor.py: Add send_task_nowait()
- worker/engines/mlx/generator/generate.py: Add is_cancelled callback
I reviewed, tested and edited the code, but some of the code was written by Claude Opus. I hope that's ok.
Motivation
Previously, when the user clicks on stop in SillyTavern (or any interface that uses the OpenAI compatible endpoint) exo will continue processing. With this change, it will cancel and stop generation - so that it will immediately respond to new requests.
Changes
See above.
Why It Works
Ultimately, it just detects a disconnect, and sends a command that breaks out of the generation loop.
Test Plan
Manual Testing
I have two Macbook Studio M3 Ultras, each with 512Gb ram, connected with Thunderbolt 5. I ran Kimi K2 Thinking with MLX Ring and Tensor Split. I tested it via SillyTavern and it worked correctly.
Automated Testing
No changes to automated testing.
Changes Summary
This PR implements client disconnect handling to stop generation when users disconnect (e.g., clicking stop in SillyTavern). The system introduces a three-component cancellation flow: API detects disconnection and sends TaskCancelled command, Master creates TaskCancellationRequested event, and Worker receives the event to stop generation via polling in the generation loop.
Type: feature
Components Affected: API layer (master/api.py), Master event handling (master/main.py), Worker cancellation handling (worker/main.py), Runner generation loop (worker/runner/runner.py), MLX generator (worker/engines/mlx/generator/generate.py), Type system (commands, events, tasks)
Files Changed
| File | Summary | Change | Impact |
|---|---|---|---|
/tmp/workspace/src/exo/shared/types/commands.py |
Added TaskCancelled command type to communicate client disconnection from API to Master | âī¸ | đĸ |
/tmp/workspace/src/exo/shared/types/events.py |
Added TaskCancellationRequested event type for Master to signal cancellation to Worker | âī¸ | đĸ |
/tmp/workspace/src/exo/shared/types/tasks.py |
Added CancelGeneration task type sent from Master to Runner to interrupt generation | âī¸ | đĸ |
/tmp/workspace/src/exo/master/api.py |
Added exception handling for client disconnection (CancelledError), sending TaskCancelled command while shielded from cancellation, and graceful cleanup without sending duplicate TaskFinished | âī¸ | đ´ |
/tmp/workspace/src/exo/master/main.py |
Added handler for TaskCancelled command that creates TaskCancellationRequested event and TaskDeleted event to propagate cancellation to workers | âī¸ | đĄ |
/tmp/workspace/src/exo/worker/main.py |
Added TaskCancellationRequested event handling with _handle_cancellation_request() method that sends CancelGeneration task to all runners | âī¸ | đĄ |
/tmp/workspace/src/exo/worker/runner/runner.py |
Added check_cancelled() callback for polling CancelGeneration during generation loop, passes callback to mlx_generate(), and handles CancelGeneration task case | âī¸ | đ´ |
...pace/src/exo/worker/runner/runner_supervisor.py |
Added send_task_nowait() method for non-blocking task sending to interrupt running generation with cancellation requests | âī¸ | đĸ |
...rc/exo/worker/engines/mlx/generator/generate.py |
Added optional is_cancelled callback parameter to mlx_generate() that checks during generation loop to break early when client disconnects | âī¸ | đĄ |
/tmp/workspace/src/exo/shared/apply.py |
Added TaskCancellationRequested to pass-through events that don't modify state (handled separately by worker) | âī¸ | đĸ |
Architecture Impact
- New Patterns: Polling-based cancellation pattern in generation loop, Command -> Event -> Task propagation pattern for cross-component signaling, Non-blocking task sending for interrupt-like behavior
- Dependencies: added: BrokenResourceError import in master/api.py for handling queue closure, added: WouldBlock import in worker/runner/runner.py for non-blocking receive
- Coupling: Master and Worker now have explicit coupling through cancellation signaling. Worker must understand CancelGeneration task and poll for it during generation. API must understand TaskCancelled command flow.
Risk Areas: Race condition: TaskFinished and TaskCancelled both sent - mitigated by shielding and checking queue existence in finally block, Late chunk delivery: Chunks sent after disconnect are caught by BrokenResourceError handling - new exception handling added, Non-blocking poll loop: CancelGeneration polling in generation loop is non-blocking but only checks once per token generation - could have latency, Generation callback coupling: MLX generator now has hard dependency on is_cancelled callback signature - could break if generator changes, Unbounded queue consumption: check_cancelled() receives but discards all non-CancelGeneration tasks - could lose other task types if sent during generation
Suggestions
- Add integration tests for disconnect scenarios to validate the full cancellation flow across API->Master->Worker
- Consider documenting the non-blocking poll latency: cancellation takes effect only after the next token is generated, not immediately
- The check_cancelled() function discards non-CancelGeneration tasks received during generation - consider either rejecting them earlier or queueing them for later processing
- Add metrics/logging for cancellation success rate and timing to monitor real-world disconnect handling
- Consider explicit per-runner task routing instead of broadcasting CancelGeneration to all runners to reduce unnecessary message sends
Full review in progress... | Powered by diffray
Review Summary
Free public review - Want AI code reviews on your PRs? Check out diffray.ai
Validated 52 issues: 30 kept, 22 filtered
Issues Found: 30
đŦ See 15 individual line comment(s) for details.
đ 16 unique issue type(s) across 30 location(s)
đ Full issue list (click to expand)
đ´ CRITICAL - KeyError when CancelGeneration receives TaskAcknowledged (3 occurrences)
Agent: bugs
Category: bug
đ View all locations
| File | Description | Suggestion | Confidence |
|---|---|---|---|
src/exo/worker/runner/runner_supervisor.py:133 |
CancelGeneration tasks sent via send_task_nowait() are not added to self.pending, but runner.py:85 s... | Use pop() with a default value: self.pending.pop(event.task_id, None)?.set() or add a check before p... | 95% |
src/exo/master/main.py:187-198 |
Lines 181-183 access self.command_task_mapping[command.finished_command_id] unconditionally, but the... | Check if the key exists BEFORE accessing it, or use dict.get() with default value | 95% |
src/exo/worker/runner/runner_supervisor.py:131-133 |
In start_task(), if _task_sender.send() raises ClosedResourceError at line 136, the pending entry ad... | Use try/finally to ensure cleanup, or explicitly delete pending entry before returning on ClosedReso... | 90% |
Rule: bug_empty_catch
đ´ CRITICAL - State mutation after shallow copy in apply_node_timed_out
Agent: Delegated (performance, python)
Category: bug
File: src/exo/shared/apply.py:195-210
Description: Line 196 creates shallow copy of state.topology, but line 197 calls state.topology.remove_node() which modifies the ORIGINAL, not the copy. The returned topology is incorrect.
Suggestion: Change line 197 to: topology.remove_node(event.node_id) to modify the copy instead of the original
Confidence: 95%
đ´ CRITICAL - Remote Code Execution via Debug Prompt Injection (2 occurrences)
Agent: security
Category: security
đ View all locations
| File | Description | Suggestion | Confidence |
|---|---|---|---|
src/exo/worker/runner/runner.py:267-290 |
The _check_for_debug_prompts function contains hardcoded magic strings that are checked in user-supp... | Remove all debug/testing functionality from production code. If debug hooks are needed, gate them be... | 98% |
src/exo/shared/types/api.py:116-136 |
ChatCompletionTaskParams lacks validation constraints on critical fields. No bounds on message count... | Add Pydantic field validators: enforce maximum message count and content length, add max_tokens uppe... | 78% |
Rule: sec_missing_request_validation
đ HIGH - Quadratic loop: nested iteration over instances and tasks (3 occurrences)
Agent: performance
Category: performance
đ View all locations
| File | Description | Suggestion | Confidence |
|---|---|---|---|
src/exo/master/main.py:111-123 |
For each matching instance, code iterates through ALL tasks to count. O(N*M) complexity on critical ... | Build a task index by instance_id once: task_counts_by_instance = defaultdict(int); for task in self... | 85% |
src/exo/master/main.py:236-240 |
datetime.now(tz=timezone.utc) is called inside the for loop for every node. Should be called once be... | Call datetime.now() once before the loop: now = datetime.now(tz=timezone.utc); then use now in the l... | 70% |
src/exo/master/api.py:234-251 |
list_nodes() is called and converted to list at line 234 and again at line 248 inside nested loops. ... | Cache the node count before the loops: num_nodes = len(list(self.state.topology.list_nodes())); then... | 85% |
Rule: perf_quadratic_loops
đ HIGH - Unnecessary 'nonlocal self' declaration in nested function
Agent: python
Category: quality
File: src/exo/worker/main.py:381-382
Description: The 'nonlocal self' declaration is unnecessary. In Python, 'self' is implicitly accessible from enclosing method scope. Only variables that are reassigned need nonlocal.
Suggestion: Remove 'nonlocal self' from line 381. Keep only 'nonlocal last_progress_time' since it's actually reassigned at line 410.
Confidence: 95%
Rule: py_keep_docstrings_consistent_with_signature
đ HIGH - Full command object logged without masking sensitive parameters (10 occurrences)
Agent: compliance
Category: security
đ View all locations
| File | Description | Suggestion | Confidence |
|---|---|---|---|
src/exo/master/main.py:103 |
The entire forwarder_command object is logged at line 103, which may contain sensitive ChatCompletio... | Log only the command type and command_id, not the entire command object: logger.info(f"Executing com... | 85% |
src/exo/master/main.py:256 |
Events are logged with string truncation to 100 characters. Various event types may contain sensitiv... | Implement structured logging filter that redacts sensitive event types. Log only event type and even... | 75% |
src/exo/worker/main.py:425-426 |
Worker publishes events with limited truncation (100 chars). Events may contain sensitive task data ... | Log only structured metadata without event content: logger.debug(f"Worker published event {self.loca... | 75% |
src/exo/worker/runner/runner.py:160 |
The entire task object is logged with only 500-char truncation. ChatCompletion tasks contain ChatCom... | Log only task metadata without user content: logger.info(f"received chat request for model: {task_pa... | 88% |
src/exo/worker/engines/mlx/generator/generate.py:96 |
The entire ChatCompletionTaskParams object is logged, which contains the full user messages and prom... | Log only non-sensitive task metadata: logger.info(f"task_params: model={task.model}, num_messages={l... | 92% |
src/exo/worker/engines/mlx/generator/generate.py:121 |
Generated text output (out.text) is logged directly. This may contain sensitive information if the m... | Disable logging of generated text content in production. Log only token count and status: logger.inf... | 82% |
src/exo/worker/runner/runner_supervisor.py:125 |
The entire Task object is logged when sending tasks. For ChatCompletion tasks, this includes ChatCom... | Log only task type and task_id: logger.info(f"Sending task (nowait) {task.class.name} (id: {... | 85% |
src/exo/worker/runner/runner_supervisor.py:129 |
When a task is dropped due to closed communication, the entire Task object is logged in a warning me... | Log only task type and ID: logger.warning(f"Task {task.class.name} (id: {task.task_id}) drop... | 82% |
src/exo/worker/runner/runner_supervisor.py:132 |
The entire Task object is logged when starting tasks, exposing sensitive ChatCompletionTaskParams wi... | Log only task type and metadata: logger.info(f"Starting task {task.class.name} (id: {task.ta... | 85% |
src/exo/worker/runner/runner_supervisor.py:133 |
When start_task fails to send a task, the entire Task object is logged in a warning message, exposin... | Log only task type and ID: logger.warning(f"Task {task.class.name} (id: {task.task_id}) drop... | 82% |
Rule: soc2_mask_pii_in_logs
đ HIGH - Docstring class name mismatch in _create_supervisor
Agent: documentation
Category: docs
File: src/exo/worker/main.py:347-348
Description: The docstring says the method 'Creates and stores a new AssignedRunner' but the function signature shows it returns RunnerSupervisor and the implementation creates RunnerSupervisor.create(). The class name 'AssignedRunner' appears to be outdated or incorrect.
Suggestion: Update docstring to: 'Creates and stores a new RunnerSupervisor with initial downloading status.'
Confidence: 95%
Rule: py_docstring_description_mismatch
đ HIGH - Expensive get_args() call inside token generation loop
Agent: performance
Category: performance
File: src/exo/worker/engines/mlx/generator/generate.py:122-123
Description: The get_args(FinishReason) call is executed on every iteration of the token generation loop. Since this is a hot loop that runs once per generated token (potentially hundreds of times), this expensive reflection operation should be computed once before the loop.
Suggestion: Move get_args(FinishReason) outside the loop and cache the result before iterating. Add a module-level constant: VALID_FINISH_REASONS = get_args(FinishReason) and use that in the condition.
Confidence: 90%
Rule: perf_expensive_in_loop
đ HIGH - No Input Validation on LLM Prompt Content
Agent: security
Category: security
File: src/exo/worker/runner/runner.py:161
Description: Prompt content passed to _check_for_debug_prompts is not validated or sanitized. User-supplied prompts from task_params.messages[0].content are processed without content validation, enabling the debug prompt injection vulnerability.
Suggestion: Implement strict input validation on ChatCompletionTaskParams at the API boundary using Pydantic validators. Remove debug prompt checking entirely from production code.
Confidence: 92%
Rule: py_add_input_validation_for_critical_parame
đĄ MEDIUM - Overly broad exception handling for signal.strsignal()
Agent: python
Category: quality
File: src/exo/worker/runner/runner_supervisor.py:176-179
Description: Catches bare 'Exception' instead of specific exception types. signal.strsignal() can raise ValueError for unknown signals.
Suggestion: Replace 'except Exception:' with 'except (ValueError, OSError):' to be more specific
Confidence: 75%
Rule: py_add_specific_exception_handling
đĄ MEDIUM - Callback closures with nonlocal mutable state
Agent: architecture
Category: quality
File: src/exo/worker/main.py:377-412
Description: The download_progress_callback function captures mutable state (last_progress_time) and Worker instance state. Creates implicit dependencies making callback difficult to test independently.
Suggestion: Create a proper callback handler class that accepts necessary dependencies as constructor parameters for explicit state ownership
Confidence: 60%
Rule: py_separate_business_logic_from_framework
đĄ MEDIUM - Index access without bounds check using assert
Agent: python
Category: bug
File: src/exo/worker/runner/runner.py:160-161
Description: Line 160 uses assert task_params.messages[0].content is not None which accesses index 0 without first checking if the list is non-empty. Using assert for validation is dangerous as asserts can be disabled with -O flag.
Suggestion: Add explicit bounds check: if not task_params.messages or task_params.messages[0].content is None: raise ValueError('Empty or invalid messages')
Confidence: 78%
Rule: bug_array_bounds_python
đĄ MEDIUM - Hardcoded port 52415 in multiaddr construction
Agent: general
Category: quality
File: src/exo/worker/main.py:443-446
Description: Port 52415 is hardcoded when constructing multiaddr connection strings in the topology polling loop. This makes it difficult to change the port for testing or alternative deployments.
Suggestion: Extract to a module constant or environment variable: COORDINATOR_PORT = int(os.getenv('EXO_COORDINATOR_PORT', '52415'))
Confidence: 65%
Rule: general_hardcoded_config
đĄ MEDIUM - Loop variable shadows Python built-in
Agent: python
Category: style
File: src/exo/master/main.py:236
Description: The loop variable 'time' shadows the Python built-in 'time' module name. While the time module is not currently imported, this practice can cause confusion and issues if the code is modified.
Suggestion: Rename the loop variable to 'last_seen_time' or 'timestamp': for node_id, last_seen_time in self.state.last_seen.items():
Confidence: 72%
Rule: qual_semantic_naming_python
đĄ MEDIUM - CancelGeneration case logic flow unclear
Agent: refactoring
Category: quality
File: src/exo/worker/runner/runner.py:216-222
Description: The CancelGeneration match case (lines 216-221) only logs a message. While status updates DO happen via the common path after the match statement (lines 234-243), the comment 'already complete' suggests this case handles late cancellation, which may need explicit handling.
Suggestion: Consider adding a comment explaining the control flow, or add explicit early return/continue if the common status update path is not appropriate for late cancellations.
Confidence: 62%
Rule: quality_unused_variable
đĩ LOW - Closure callback mutating outer scope variable
Agent: python
Category: quality
File: src/exo/worker/runner/runner.py:166-182
Description: The check_cancelled() callback uses 'nonlocal generation_cancelled' to track state. While this works and is correctly scoped to the ChatCompletion case, using a mutable container could make the pattern clearer.
Suggestion: Consider using a mutable container like state = {'cancelled': False} to make the mutation more explicit, or document the pattern with a comment.
Confidence: 60%
Rule: py_avoid_modifying_input_parameters
âšī¸ 15 issue(s) outside PR diff (click to expand)
These issues were found in lines not modified in this PR.
đ´ CRITICAL - State mutation after shallow copy in apply_node_timed_out
Agent: Delegated (performance, python)
Category: bug
File: src/exo/shared/apply.py:195-210
Description: Line 196 creates shallow copy of state.topology, but line 197 calls state.topology.remove_node() which modifies the ORIGINAL, not the copy. The returned topology is incorrect.
Suggestion: Change line 197 to: topology.remove_node(event.node_id) to modify the copy instead of the original
Confidence: 95%
đ´ CRITICAL - Remote Code Execution via Debug Prompt Injection (2 occurrences)
Agent: security
Category: security
đ View all locations
| File | Description | Suggestion | Confidence |
|---|---|---|---|
src/exo/worker/runner/runner.py:267-290 |
The _check_for_debug_prompts function contains hardcoded magic strings that are checked in user-supp... | Remove all debug/testing functionality from production code. If debug hooks are needed, gate them be... | 98% |
src/exo/shared/types/api.py:116-136 |
ChatCompletionTaskParams lacks validation constraints on critical fields. No bounds on message count... | Add Pydantic field validators: enforce maximum message count and content length, add max_tokens uppe... | 78% |
Rule: sec_missing_request_validation
đ HIGH - Quadratic loop: nested iteration over instances and tasks (3 occurrences)
Agent: performance
Category: performance
đ View all locations
| File | Description | Suggestion | Confidence |
|---|---|---|---|
src/exo/master/main.py:111-123 |
For each matching instance, code iterates through ALL tasks to count. O(N*M) complexity on critical ... | Build a task index by instance_id once: task_counts_by_instance = defaultdict(int); for task in self... | 85% |
src/exo/master/main.py:236-240 |
datetime.now(tz=timezone.utc) is called inside the for loop for every node. Should be called once be... | Call datetime.now() once before the loop: now = datetime.now(tz=timezone.utc); then use now in the l... | 70% |
src/exo/master/api.py:234-251 |
list_nodes() is called and converted to list at line 234 and again at line 248 inside nested loops. ... | Cache the node count before the loops: num_nodes = len(list(self.state.topology.list_nodes())); then... | 85% |
Rule: perf_quadratic_loops
đ HIGH - Unnecessary 'nonlocal self' declaration in nested function
Agent: python
Category: quality
File: src/exo/worker/main.py:381-382
Description: The 'nonlocal self' declaration is unnecessary. In Python, 'self' is implicitly accessible from enclosing method scope. Only variables that are reassigned need nonlocal.
Suggestion: Remove 'nonlocal self' from line 381. Keep only 'nonlocal last_progress_time' since it's actually reassigned at line 410.
Confidence: 95%
Rule: py_keep_docstrings_consistent_with_signature
đ HIGH - Full command object logged without masking sensitive parameters (3 occurrences)
Agent: compliance
Category: security
đ View all locations
| File | Description | Suggestion | Confidence |
|---|---|---|---|
src/exo/master/main.py:103 |
The entire forwarder_command object is logged at line 103, which may contain sensitive ChatCompletio... | Log only the command type and command_id, not the entire command object: logger.info(f"Executing com... | 85% |
src/exo/master/main.py:256 |
Events are logged with string truncation to 100 characters. Various event types may contain sensitiv... | Implement structured logging filter that redacts sensitive event types. Log only event type and even... | 75% |
src/exo/worker/main.py:425-426 |
Worker publishes events with limited truncation (100 chars). Events may contain sensitive task data ... | Log only structured metadata without event content: logger.debug(f"Worker published event {self.loca... | 75% |
Rule: soc2_mask_pii_in_logs
đ HIGH - Docstring class name mismatch in _create_supervisor
Agent: documentation
Category: docs
File: src/exo/worker/main.py:347-348
Description: The docstring says the method 'Creates and stores a new AssignedRunner' but the function signature shows it returns RunnerSupervisor and the implementation creates RunnerSupervisor.create(). The class name 'AssignedRunner' appears to be outdated or incorrect.
Suggestion: Update docstring to: 'Creates and stores a new RunnerSupervisor with initial downloading status.'
Confidence: 95%
Rule: py_docstring_description_mismatch
đĄ MEDIUM - Overly broad exception handling for signal.strsignal()
Agent: python
Category: quality
File: src/exo/worker/runner/runner_supervisor.py:176-179
Description: Catches bare 'Exception' instead of specific exception types. signal.strsignal() can raise ValueError for unknown signals.
Suggestion: Replace 'except Exception:' with 'except (ValueError, OSError):' to be more specific
Confidence: 75%
Rule: py_add_specific_exception_handling
đĄ MEDIUM - Callback closures with nonlocal mutable state
Agent: architecture
Category: quality
File: src/exo/worker/main.py:377-412
Description: The download_progress_callback function captures mutable state (last_progress_time) and Worker instance state. Creates implicit dependencies making callback difficult to test independently.
Suggestion: Create a proper callback handler class that accepts necessary dependencies as constructor parameters for explicit state ownership
Confidence: 60%
Rule: py_separate_business_logic_from_framework
đĄ MEDIUM - Hardcoded port 52415 in multiaddr construction
Agent: general
Category: quality
File: src/exo/worker/main.py:443-446
Description: Port 52415 is hardcoded when constructing multiaddr connection strings in the topology polling loop. This makes it difficult to change the port for testing or alternative deployments.
Suggestion: Extract to a module constant or environment variable: COORDINATOR_PORT = int(os.getenv('EXO_COORDINATOR_PORT', '52415'))
Confidence: 65%
Rule: general_hardcoded_config
đĄ MEDIUM - Loop variable shadows Python built-in
Agent: python
Category: style
File: src/exo/master/main.py:236
Description: The loop variable 'time' shadows the Python built-in 'time' module name. While the time module is not currently imported, this practice can cause confusion and issues if the code is modified.
Suggestion: Rename the loop variable to 'last_seen_time' or 'timestamp': for node_id, last_seen_time in self.state.last_seen.items():
Confidence: 72%
Rule: qual_semantic_naming_python
Review ID: 49c01428-f68e-4f41-9dfc-7aae951c07cc
Rate it đ or đ to improve future reviews | Powered by diffray
Just to follow-up on the previous comment, has this been tested on an RDMA setup? If not, we can test this internally as well. I felt that we might need an mx_barrier somewhere here, but perhaps not.
I haven't tested it on RDMA (I'm on vacation until 5th so I don't have physical access to my Mac Studios to enable RDMA).
Finally tested on RDMA via two Mac Studios M3 Ultras 512Gb using Kimi K2 Thinking. Works as expected.
[ 06:49:22.9022AM | INFO ] Client disconnected, cancelling command 5531badb-63cb-40d8-a477-ac73e309c2b3 [ 06:49:22.9033AM | INFO ] Executing command: TaskCancelled(command_id='7cab0fff-7b58-4314-b082-d4425b28fd7a' cancelled_command_id='5531badb-63cb-40d8-a477-ac73e309c2b3') [ 06:49:22.9040AM | INFO ] Sending task (nowait) CancelGeneration(task_id='74b9a7a8-c6b3-4be9-b705-4a850a9c30a4' task_status=<TaskStatus.Pending: 'Pending'> instance_id='7ec9bda8-b1b0-48b0-80fd-497b9a21e3f3' command_id='5531badb-63cb-40d8-a477-ac73e309c2b3') [ 06:49:22.9043AM | INFO ] Sent CancelGeneration for command 5531badb-63cb-40d8-a477-ac73e309c2b3 to runner [ 06:49:22.9186AM | INFO ] ( [ 06:49:22.9188AM | INFO ] Generation cancelled mid-stream by client disconnect [ 06:49:22.9233AM | INFO ] runner ready
Hey, thanks for the contribution. It took a while but we have a complete implementation of this now with task cancellation.