Transfer_to_agent does not actually transfer?
Describe the bug
I'm trying to build a CustomAgent by extending the BaseAgent class and adding a few subagents. In particular I want this custom agent to orchestrate manually between the different agents that I have. The EventActions parameter transfer_to_agent does not work, it does nothing.
To Reproduce The only problem is that doing something like:
class Orchestrator(BaseAgent):
# Define fields with type hints
name: str
description: str
def __init__(
self,
name: str | None = None,
description: str | None = None,
report_planner: LlmAgent | None = None,
report_executor: LlmAgent | None = None,
report_aggregator: LlmAgent | None = None,
# Add **kwargs to capture any other arguments BaseAgent might need
**kwargs: Any
):
# Determine the final values for initialization
# Use passed values or defaults (including class-level defaults for name/description)
final_name = name if name is not None else "RecursiveOrchestrator"
final_description = description if description is not None else "Manages task planning, execution, decomposition, and aggregation."
# Ensure sub-agents are instantiated here if not passed in
report_planner = report_planner if report_planner is not None else PlannerAgent()
final_sub_agents_list = [report_planner]
super().__init__(
name=final_name,
description=final_description,
#Pass the list of top-level sub-agents this orchestrator uses
sub_agents=final_sub_agents_list,
**kwargs
)
async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
"""
Main entry point for the recursive orchestrator.
Handles initial setup and recursive task processing.
"""
initial_task = ctx.session.events[0].content.parts[0].text
logger.info(f"Initial task: {initial_task}")
# Create a response event
response_text = f"Processing task: {initial_task}"
yield Event(
author=self.name,
actions=EventActions(transfer_to_agent=self.find_sub_agent("PlannerAgent").name)
)
This will just return an EventActions object but will not actually transfer to the agent. How can I fix? Or am I doing something wrong... Expected behavior It should handoff to another agent and that should take over, but it does not...
@koverholt to help provide a tutorial.
I'm having this problem with a simple sub-agent setup.
Following the quickstart instructions.
Instead of transferring to the agent, in some cases it simply print: transfer_to_agent(agent_name='soporte_tecnico').
I can't recreate it consistently. And it happens to me on some deployments that work fine. The deployment is on Google Cloud Run with ADK deploy.
Any ideas? Thanks.
I'm having this problem with a simple sub-agent setup.
Following the quickstart instructions.
Instead of transferring to the agent, in some cases it simply print: transfer_to_agent(agent_name='soporte_tecnico').
I can't recreate it consistently. And it happens to me on some deployments that work fine. The deployment is on Google Cloud Run with ADK deploy.
Any ideas? Thanks.
The same issue. Root agent simply response directly like.
User Request: User ask about ticket info ABB-222 . transfer_to_agent(agent_name='agent_name')
I'm having this problem with a simple sub-agent setup. Following the quickstart instructions. Instead of transferring to the agent, in some cases it simply print: transfer_to_agent(agent_name='soporte_tecnico'). I can't recreate it consistently. And it happens to me on some deployments that work fine. The deployment is on Google Cloud Run with ADK deploy. Any ideas? Thanks.
The same issue. Root agent simply response directly like.
User Request: User ask about ticket info ABB-222 . transfer_to_agent(agent_name='agent_name')
Any solutions? Thanks.
I'm having this problem with a simple sub-agent setup. Following the quickstart instructions. Instead of transferring to the agent, in some cases it simply print: transfer_to_agent(agent_name='soporte_tecnico'). I can't recreate it consistently. And it happens to me on some deployments that work fine. The deployment is on Google Cloud Run with ADK deploy. Any ideas? Thanks.
The same issue. Root agent simply response directly like. User Request: User ask about ticket info ABB-222 . transfer_to_agent(agent_name='agent_name')
Any solutions? Thanks.
I changed to smarter model then it works correctly more than old model. Mean change from gemini-2.0-flash to gemini-2.5-pro. But I think the library (transfer to sub agent flow) should work correctly without depend on intelligent of model.
any updates?
after agent transfer only the last LlmAgent will take the next user query see : https://github.com/google/adk-python/blob/bab3be2cf31dc9afd00bcce70103bdaa5460f1a3/src/google/adk/runners.py#L430
@hmzo do you still see this issue ? could you provide your agent codes and sample user query to reproduce ?
@seanzhou1023 I wrote a simple test script:
# google-adk 1.6.1
class Step1Agent(LlmAgent):
def __init__(self):
super().__init__(
name="step1_agent",
description="a agent that can process step1",
model=GEMINI_2_5_FLASH,
instruction="you are a helpful agent, you can only process step1"
)
async def _run_async_impl(self, parent_context: InvocationContext) -> AsyncGenerator[Event, None]:
last_event = None
async for event in super()._run_async_impl(parent_context):
last_event = event
if event.partial:
yield event
last_event.actions = EventActions(state_delta={"step1_result": "step1_result"}, transfer_to_agent="step2_agent")
yield last_event
class Step2Agent(LlmAgent):
def __init__(self):
super().__init__(
name="step2_agent",
description="a agent that can process step2",
model=GEMINI_2_5_FLASH,
instruction="you are a helpful agent, you can only process step2"
)
async def _run_async_impl(self, parent_context: InvocationContext) -> AsyncGenerator[Event, None]:
last_event = None
async for event in super()._run_async_impl(parent_context):
last_event = event
if event.partial:
yield event
last_event.actions = EventActions(state_delta={"step2_result": "step2_result"}, transfer_to_agent="root_agent")
yield last_event
step1_agent = Step1Agent()
step2_agent = Step2Agent()
root_agent = LlmAgent(
name="root_agent",
description="a root agent",
sub_agents=[step1_agent, step2_agent],
model=GEMINI_2_5_FLASH,
instruction="you are a helpful agent, you can only process step1 and step2"
)
Input: I want to process step 1
Behaviour: step1_agent can't transfer to step2_agent automatically.
transfer_to_agent does not actually transfer in case of Event produced. We need to find the agent instance using the following and call their run_async_impl. You should still produce this event so that adk web displays it for the user but the actual transfer happens through the following code.
def _get_agent_to_run(invocation_context: InvocationContext, agent_name: str) -> BaseAgent:
root_agent = invocation_context.agent.root_agent
agent_to_run = root_agent.find_agent(agent_name)
if not agent_to_run:
raise ValueError(f'Agent {agent_name} not found in the agent tree.')
return agent_to_run
so instead of transfer_to_agent, we should do something like this:
async for event in _get_agent_to_run(ctx, "agent_name").run_async(ctx):
yield event
Again, this is the learning I got after doing a deep dive in adk architecture. This works as we are using for our use case but not sure whether there is some alternate and right approach to achieve this functionality.
@bipulkarnani However, I noticed that when escalate=True is set in the actions, the current agent can hand over control to the high-level agent. From my perspective, I think what escalate=True and transfer_to_agent do is very similar.
cc @seanzhou1023
@hmzo escalate=True will also not work if it is a custom agent. escalate=True and transfer_to_agent works for LLMResponse which has a separate parsing model. But if the agent is a custom Agent then these values are ignored.
IMO behaviour is correct as well because when you know which agent that you want to transfer, then why do you need to call transfer_to_agent function.
I believe ADK should improve their documentation around such things.
transfer_to_agent passes control of the current and next user requests to the subagent. When doing
async for event in self.subagent.run_async(ctx):
yield event
this only invokes the subagent once and on the every next user request the root agent needs to pass the control to the subagent.
It makes sense that a custom agent may need to pass control to a subagent instead of rerouting every request.
i made a custom function to handle the handoff. Hopefully it helps someone. it hands off to other agents correctly. I need to work on the mark down report and display. I will follow the thread for update:
# apps/agent/assembly.py
# -*- coding: utf-8 -*-
"""
ADK multi-agent assembly with robust handoff and output-curation shims.
Key points:
- Custom handoff tool (no name clash with ADK internals)
- Router stub to avoid parameterless-tool schema crashes
- Display wrappers live in tools_es (fmt tools)
- "done" tool + finalizer converts tool output into the final user reply
and stops further model text (prevents "extra narration after done").
"""
import os
from typing import Any, Dict, Optional, Tuple
from google.adk.agents import Agent
#from google.adk.models.lite_llm import LiteLlm
from google.adk.tools.function_tool import FunctionTool
# Import only tool FUNCTIONS to avoid circular imports
from .tools_threat import plan_and_hunt, get_hunt_rows
from .tools_es import (
list_indices, get_mappings, get_shards, search,
get_data_rows, suggest_indices_from_query, esql, esql_or_message,
list_indices_fmt, list_shards_fmt, get_mappings_fmt, search_fmt, esql_fmt, # <-- added esql_fmt
)
# --- Runtime env / LiteLLM knobs -------------------------------------------------
#os.environ.setdefault("OPENAI_API_BASE", "http://172.16.10.69:3051/v1")
#os.environ.setdefault("OPENAI_API_KEY", "dummy")
#os.environ.setdefault("LITELLM_DISABLE_COST_CALCULATOR", "1")
if not os.getenv("GOOGLE_API_KEY") and os.getenv("GEMINI_API_KEY"):
os.environ["GOOGLE_API_KEY"] = os.environ["GEMINI_API_KEY"]
#ROUTER_MODEL = os.getenv("ROUTER_MODEL", "openai/llama-3.3-70b-instruct")
#THREAT_MODEL = os.getenv("THREAT_MODEL", "openai/llama-3.3-70b-instruct")
#ESDATA_MODEL = os.getenv("ESDATA_MODEL", "openai/llama-3.3-70b-instruct")
#ROUTER_MODEL = os.getenv("ROUTER_MODEL", "gemini-2.5-pro")
#THREAT_MODEL = os.getenv("THREAT_MODEL", "gemini-2.5-pro")
#ESDATA_MODEL = os.getenv("ESDATA_MODEL", "gemini-2.5-pro")
ROUTER_MODEL = os.getenv("ROUTER_MODEL", "gemini-1.5-flash")
THREAT_MODEL = os.getenv("THREAT_MODEL", "gemini-1.5-flash")
ESDATA_MODEL = os.getenv("ESDATA_MODEL", "gemini-1.5-flash")
# --- System prompts -------------------------------------------------------------
ROUTER_INSTRUCTION = """
Route each user message to exactly one sub-agent. Do NOT answer yourself.
Send to ThreatAnalystAgent if the user is hunting / investigating incidents/IOCs/attacks/exfil/beacons/lateral movement,
or asks things like “failed logins”, “who downloaded the most data”, “top sources by egress/ingress”, “beaconing hosts”, etc.
Send to ESDataAgent if the user wants Elasticsearch metadata (indices/mappings, shards, health, cat APIs) OR if the user supplies explicit ES|QL
(message starts with FROM/SHOW/DESCRIBE/EXPLAIN or contains a pipe `|`).
Send to ThreatAnalystAgent if the user asks for downloads/ingress, egress/exfiltration, failed logins/brute force, beaconing, or lateral movement, even if the message uses natural language and not ES|QL.
Examples:
- “computers with most downloads last 2 hours” → ThreatAnalystAgent
- “list indices matching logs-*” → ESDataAgent
- “FROM logs-* | STATS …” → ESDataAgent
If uncertain, prefer ThreatAnalystAgent.
To hand off, always call the tool `handoff_to_agent` with the `agent` argument set to the exact sub-agent name.
NEVER call `transfer_to_router` yourself.
"""
THREAT_INSTRUCTION = """
You are ThreatAnalystAgent. Perform threat hunting end-to-end.
HARD ROUTING RULE (read carefully):
• If the message mentions ANY of these hunt intents — downloads/ingress/egress/exfiltration,
beaconing, lateral movement, brute force/failed logins, “top/most downloads” — DO NOT
hand off to ESDataAgent. Stay here and use plan_and_hunt.
• Only hand off to ESDataAgent for Elasticsearch metadata (indices/mappings/shards/cat APIs)
or explicit ES|QL requests (message starts with FROM/SHOW/DESCRIBE/EXPLAIN or contains '|').
Use tools:
- plan_and_hunt to execute hunts and summarize
- get_hunt_rows to page cached results
- handoff_to_agent to hand off when appropriate.
Output rules:
• If a tool returns report_markdown, your ENTIRE reply must be exactly that markdown.
• Otherwise, write a concise SOC-style summary with tables.
• When paging with get_hunt_rows, render a markdown table (≤10 rows) and include the cache_id + paging hint.
Never call tools you don't have. Never call ES|QL tools yourself.
After calling any *_fmt tool, immediately call **done** with the exact `report_markdown` you received as a PLAIN STRING. Do not wrap markdown inside JSON. Do not add extra text.
"""
DATA_INSTRUCTION = """
You are ESDataAgent. You handle Elasticsearch metadata/structure and ES|QL execution.
HARD ROUTING RULE (applies BEFORE any data tool):
• If the message is a threat-hunting ask (downloads/ingress/egress/exfiltration, failed logins/brute force,
beaconing, lateral movement, C2, “top/most downloads”), your FIRST and ONLY action must be to call
**handoff_to_agent** with {"agent":"ThreatAnalystAgent"}.
Examples:
- “computers with most downloads last 2 hours” → call handoff_to_agent({"agent":"ThreatAnalystAgent"})
- “list 10 indices” → call list_indices_fmt(limit=10) → then **done(markdown=<report_markdown>)**
- “list 8 shards for logs-*” → call list_shards_fmt("logs-*", limit=8) → then **done(...)**
- “show 7 fields/mappings for logs-*” → call get_mappings_fmt("logs-*", limit=7) → then **done(...)**
- “search processes error show 6*” → call search_fmt(index_pattern, "process.name:error*", limit=6) → then **done(...)**
- “FROM logs-* | ...” (explicit ES|QL) → call **esql_fmt(query, limit=100)** → then **done(...)**
Metadata tools: list_indices_fmt, list_shards_fmt, get_mappings_fmt, search_fmt, esql_fmt
Raw tools (use for follow-up paging or programmatic flows only): list_indices, get_mappings, get_shards, search, esql_or_message, esql, get_data_rows, suggest_indices_from_query
ES|QL:
- If the user gives natural language, you may call esql_or_message then run **esql_fmt** (EXCEPT for threat-hunting asks).
- If the user gives explicit ES|QL (FROM/SHOW/DESCRIBE/EXPLAIN or contains '|'), run **esql_fmt** directly.
Output rules:
• Markdown only, never HTML.
• If a tool returns report_markdown, your ENTIRE reply must be exactly that markdown.
• If the user specifies N (e.g., “list 10 indices”), render exactly N rows (or all if fewer, max 50). Otherwise, default to 10 rows for list_indices_fmt and 5 rows for get_mappings_fmt, list_shards_fmt, and search_fmt.
• Then print exactly:
cache_id: <value>
Use: get_data_rows(cache_id, 1, 10) to page more.
After calling any *_fmt tool, immediately call **done** with the exact `report_markdown` as a PLAIN STRING (do not embed JSON). Do not add extra text.
NEVER call `transfer_to_router`.
"""
# --- Handoff tools --------------------------------------------------------------
ALLOWED_AGENTS = {"ThreatAnalystAgent", "ESDataAgent"}
def handoff_to_agent(agent: str, tool_context=None) -> dict:
if tool_context is None:
return {"ok": False, "error": "no_tool_context"}
wanted = (agent or "").strip()
agent_norm = next((a for a in ALLOWED_AGENTS if a.lower() == wanted.lower()), wanted)
if agent_norm not in ALLOWED_AGENTS:
return {"ok": False, "error": f"unknown agent '{agent}'", "known": sorted(ALLOWED_AGENTS)}
inv = getattr(tool_context, "invocation_context", None)
cur_agent = getattr(inv, "agent", None)
current = getattr(cur_agent, "name", getattr(tool_context, "agent_name", "RouterAgent"))
state = getattr(tool_context, "state", None)
last_from = getattr(state, "last_transfer_from", None) if state else None
last_to = getattr(state, "last_transfer_to", None) if state else None
if last_from == agent_norm and last_to == current:
return {"ok": False, "error": "bounce_detected"}
default_budget = int(os.getenv("HOP_BUDGET", "12"))
budget = getattr(state, "hops_left", default_budget) if state else default_budget
if budget <= 0:
return {"ok": False, "error": "hop_budget_exhausted"}
if state is not None:
setattr(state, "last_transfer_from", current)
setattr(state, "last_transfer_to", agent_norm)
setattr(state, "hops_left", budget - 1)
try:
delta = getattr(tool_context.actions, "state_delta", {}) or {}
delta.update({
"last_transfer_from": current,
"last_transfer_to": agent_norm,
"hops_left": budget - 1,
})
tool_context.actions.state_delta = delta
except Exception:
pass
tool_context.actions.transfer_to_agent = agent_norm
print(f"[handoff_to_agent] {current} -> {agent_norm} (hops_left={budget-1})")
return {"ok": True, "from": current, "transferred_to": agent_norm, "hops_left": budget - 1}
def done(markdown: str = "", report_markdown: str = "", tool_context=None, **kwargs) -> dict:
md = report_markdown or markdown or ""
# If model shoved JSON into markdown, unwrap it
if md.strip().startswith("{"):
try:
import json
j = json.loads(md)
md = j.get("report_markdown") or j.get("markdown") or ""
except Exception:
pass
if not md and kwargs:
md = kwargs.get("report_markdown") or kwargs.get("markdown") or ""
md = md or ""
_finalize_reply(tool_context, md)
return {"ok": True, "report_markdown": md}
def transfer_to_router(dummy: Optional[bool] = False, tool_context=None) -> dict:
if tool_context is None:
return {"ok": False, "error": "no_tool_context"}
inv = getattr(tool_context, "invocation_context", None)
cur_agent = getattr(inv, "agent", None)
current = getattr(cur_agent, "name", getattr(tool_context, "agent_name", "RouterAgent"))
if current != "RouterAgent":
tool_context.actions.transfer_to_agent = "RouterAgent"
print(f"[transfer_to_router] {current} -> RouterAgent")
return {"ok": True, "from": current, "transferred_to": "RouterAgent"}
print("[transfer_to_router] already at RouterAgent; no-op")
return {"ok": True, "note": "already at RouterAgent"}
# --- Callback signature normalizer(s) ------------------------------------------
def _normalize_after_tool(*args, **kwargs) -> Tuple[Any, Optional[str], Any, Any]:
tool = kwargs.get("tool")
tool_context = kwargs.get("tool_context")
tool_response = kwargs.get("tool_response")
tool_name = kwargs.get("tool_name")
# Old signature: (ctx, tool_name, tool_result)
if len(args) == 3 and tool_context is None and tool_response is None:
tool_context = args[0]
if tool_name is None:
tool_name = args[1]
tool_response = args[2]
# New signature: (tool, args, tool_context, tool_response)
if len(args) >= 1 and tool is None and hasattr(args[0], "name"):
tool = args[0]
if len(args) >= 3 and tool_context is None and hasattr(args[2], "__dict__"):
tool_context = args[2]
if len(args) >= 4 and tool_response is None:
tool_response = args[3]
if tool_name is None and tool is not None:
tool_name = getattr(tool, "name", None)
return tool, tool_name, tool_context, tool_response
def _router_after_tool_callback(*args, **kwargs) -> Optional[Dict[str, Any]]:
_tool, tool_name, _tool_context, tool_response = _normalize_after_tool(*args, **kwargs)
if (tool_name or "") in {"handoff_to_agent", "transfer_to_agent"}:
print("[router] transfer result:", tool_response)
return None
def _finalize_reply(tool_context, markdown: str):
if not tool_context or not markdown:
return
a = getattr(tool_context, "actions", None)
if not a: return
# Put the final text in multiple likely-consumed fields
for attr in ("assistant_response", "final_response", "response_text",
"reply_markdown", "output_markdown"):
try: setattr(a, attr, markdown)
except Exception: pass
# Hard stop knobs different runtimes look for
for flag in ("stop_generation", "end_turn", "should_end", "return_immediately",
"suppress_model_output", "tool_result_is_final"):
try: setattr(a, flag, True)
except Exception: pass
def _specialist_after_tool(*args, **kwargs) -> Optional[Dict[str, Any]]:
_tool, tool_name, tool_context, tool_response = _normalize_after_tool(*args, **kwargs)
if tool_context is None:
return None
# Auto-finalize on any tool that returns report_markdown
try:
md = None
if isinstance(tool_response, dict):
md = tool_response.get("report_markdown") or tool_response.get("markdown")
if md and (tool_name or "").lower() in {
"list_indices_fmt", "list_shards_fmt", "get_mappings_fmt", "search_fmt",
"esql_fmt",
"plan_and_hunt", # Threat report
"done"
}:
_finalize_reply(tool_context, md)
except Exception:
pass
# Clear bounce markers after any non-transfer tool
if (tool_name or "") not in {"handoff_to_agent", "transfer_to_agent"}:
state = getattr(tool_context, "state", None)
if state is not None:
for attr in ("last_transfer_from", "last_transfer_to"):
if hasattr(state, attr):
try:
delattr(state, attr)
except Exception:
setattr(state, attr, None)
try:
delta = getattr(tool_context.actions, "state_delta", {}) or {}
delta.update({"last_transfer_from": None, "last_transfer_to": None})
tool_context.actions.state_delta = delta
except Exception:
pass
return None
# --- Build the agent tree -------------------------------------------------------
def _fix_tree(root: Agent) -> None:
def dfs(a: Agent):
try:
a.root_agent = root
except Exception:
pass
subs = getattr(a, "sub_agents", None) or []
for s in subs:
try:
s.parent_agent = a
except Exception:
pass
dfs(s)
dfs(root)
def build_root_agent() -> Agent:
router = Agent(
name="RouterAgent",
model=ROUTER_MODEL,
instruction=ROUTER_INSTRUCTION,
sub_agents=[],
tools=[], # populated below
)
# Shared tools
handoff_tool = FunctionTool(handoff_to_agent)
to_router_tool= FunctionTool(transfer_to_router)
done_tool = FunctionTool(done)
threat = Agent(
name="ThreatAnalystAgent",
model=THREAT_MODEL,
instruction=THREAT_INSTRUCTION,
tools=[
FunctionTool(plan_and_hunt),
FunctionTool(get_hunt_rows),
handoff_tool,
to_router_tool,
done_tool,
],
)
data = Agent(
name="ESDataAgent",
model=ESDATA_MODEL,
instruction=DATA_INSTRUCTION,
tools=[
handoff_tool,
to_router_tool, # present but we instruct NOT to call it
# Display wrappers
FunctionTool(list_indices_fmt),
FunctionTool(list_shards_fmt),
FunctionTool(get_mappings_fmt),
FunctionTool(search_fmt),
FunctionTool(esql_fmt), # <-- Added table-building ES|QL wrapper
# Raw tools (kept for paging/analysis, not for direct user-facing output)
FunctionTool(esql_or_message),
FunctionTool(esql),
FunctionTool(list_indices),
FunctionTool(get_mappings),
FunctionTool(get_shards),
FunctionTool(search),
FunctionTool(get_data_rows),
FunctionTool(suggest_indices_from_query),
done_tool,
],
)
# Wire up
router.sub_agents = [threat, data]
router.tools = [handoff_tool, to_router_tool] # router routes by calling our handoff tool
_fix_tree(router)
# Callbacks
threat.after_tool_callback = [_specialist_after_tool]
data.after_tool_callback = [_specialist_after_tool]
router.after_tool_callback = [_router_after_tool_callback]
return router
``