autogen icon indicating copy to clipboard operation
autogen copied to clipboard

ValueError: task_done() called too many times

Open y26s4824k264 opened this issue 4 months ago • 3 comments

What happened?

future: [lroa] 2025/09/10 - 10:01:15 - ERROR - /Users/puzzle/PycharmProjects/ai-manage/.venv/lib/python3.10/site-packages/uvloop/init.py:82:run - Task exception was never retrieved future: <Task finished name='Task-2502' coro=<SingleThreadedAgentRuntime._process_response() done, defined at /Users/puzzle/PycharmProjects/ai-manage/.venv/lib/python3.10/site-packages/autogen_core/_single_threaded_agent_runtime.py:632> exception=ValueError('task_done() called too many times')> Traceback (most recent call last): File "/Users/puzzle/PycharmProjects/ai-manage/.venv/lib/python3.10/site-packages/autogen_core/_single_threaded_agent_runtime.py", line 662, in _process_response self._message_queue.task_done() File "/Users/puzzle/PycharmProjects/ai-manage/.venv/lib/python3.10/site-packages/autogen_core/_queue.py", line 222, in task_done raise ValueError("task_done() called too many times") ValueError: task_done() called too many times

Which packages was the bug in?

Python Core (autogen-core)

AutoGen library version.

Python 0.7.4

Other library version.

No response

Model used

No response

Model provider

None

Other model provider

No response

Python version

3.10

.NET version

None

Operating system

MacOS

y26s4824k264 avatar Sep 10 '25 02:09 y26s4824k264

An error occurred by chance, which does not affect anything temporarily.

y26s4824k264 avatar Sep 10 '25 02:09 y26s4824k264

Would be good to have a script for reproduction

ekzhu avatar Sep 16 '25 08:09 ekzhu

Reproduction for task_done() Race Condition

This error occurs when KeyboardInterrupt interrupts message processing between queue.get() and queue.task_done().

Why KeyboardInterrupt is Necessary

Without raising KeyboardInterrupt in the signal handler, the async for loop remains blocked waiting for the next message from the LLM (could be seconds/minutes). The signal handler runs but execution resumes to the same blocked await, making Ctrl+C non-functional.

Behavior without KeyboardInterrupt:

def signal_handler(signum, frame):
    pass  # Just returns

async for event in team.run_stream(...):  
    # Loop stays BLOCKED waiting for next event
    # User presses Ctrl+C → signal handler returns → still blocked
    # Application appears frozen

Behavior with KeyboardInterrupt:

def signal_handler(signum, frame):
    raise KeyboardInterrupt()  # Breaks blocked operation

async for event in team.run_stream(...):  
    # KeyboardInterrupt immediately breaks the loop
    # Application exits (~30ms response time)
    # BUT triggers this queue race condition bug

The Race Condition

When KeyboardInterrupt is raised during message processing:

  1. message = await self._message_queue.get() succeeds (counter increments)
  2. KeyboardInterrupt raised during _process_response()
  3. The finally block containing task_done() may not execute
  4. Queue counter is now out of sync
  5. During cleanup, task_done() is called when counter is already 0
  6. Result: ValueError: task_done() called too many times

Reproduction Script

import asyncio
import signal
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def main():
    model = OpenAIChatCompletionClient(model="gpt-4o-mini")
    agent = AssistantAgent("assistant", model_client=model)
    team = RoundRobinGroupChat([agent], termination_condition=MaxMessageTermination(5))
    
    def signal_handler(signum, frame):
        print("\n[Ctrl+C] Raising KeyboardInterrupt - triggers queue race condition")
        # KeyboardInterrupt is necessary to break blocked async operations
        # but exposes this queue management bug
        raise KeyboardInterrupt()
    
    signal.signal(signal.SIGINT, signal_handler)
    
    print("Starting workflow - Press Ctrl+C during execution")
    
    try:
        async for event in team.run_stream(task="Count from 1 to 10"):
            print(f"Event: {type(event).__name__}")
    except KeyboardInterrupt:
        print("[Exiting] KeyboardInterrupt caught")
        # Error appears during cleanup here
    
    print("[Done] Check for 'task_done() called too many times' error")

asyncio.run(main())

To reproduce:

  1. Set OPENAI_API_KEY environment variable
  2. Run the script
  3. Press Ctrl+C during workflow execution (after first message)
  4. Observe ValueError: task_done() called too many times during cleanup

Suggested Fix

Ensure task_done() is called even when exceptions occur:

async def _process_next(self):
    message_envelope = None
    try:
        message_envelope = await self._message_queue.get()
        # ... process message ...
    except (KeyboardInterrupt, Exception):
        if message_envelope is not None:
            try:
                self._message_queue.task_done()
            except ValueError:
                pass  # Already called
        raise

Related Issue

This issue is closely related to #7100, which shows that run_stream() doesn't properly handle cancellation. Both issues stem from incomplete exception handling in AutoGen's async message processing.

alexey-pelykh avatar Oct 29 '25 19:10 alexey-pelykh