[Bug]: ADK Runner async doesn't show Traces but LLM calls are appearing.
What component(s) are affected?
- [x] Opik Python SDK
- [ ] Opik Typescript SDK
- [ ] Opik Agent Optimizer SDK
- [ ] Opik UI
- [ ] Opik Server
- [ ] Documentation
Opik version
- Opik version: 1.7.32
Describe the problem
When using (ADK) Agent Development Kit, Opik doesn't show Traced (in Comet Dashboard Traces Tab) but show all LLM calls.
After debugging for more than an hour, I figures, when I switch to runner.run instead of runner.run_async then the Traces appear correctly. [referring to from google.adk.runners import Runner]
Seems like Opik is not working for async, and Google doesn't recommend using runner.run (sync) in Production. Please help.
Reproduction steps and code snippets
Try using both runner.run and runner.run_async and noticed Traces shows for runner.run but not for async.
Here is the code I'm using:
async for event in runner.run_async(
user_id=payload["user_id"], session_id=payload["session_id"], new_message=message
):
# Check if the event is a response from the agent
if event.is_final_response():
if event.content and event.content.parts:
# Assuming text response in the first part
final_response_text = event.content.parts[0].text
elif event.actions and event.actions.escalate:
final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
break
return final_response_text
Error logs or stack trace
No response
Healthcheck results
No response
ADK stream events aren't showing up in Opik Traces? Please help
Hi @gdabas!
For the sake of experiment, could you use the logic like the in the following function to get the final response from your agent?
from typing import AsyncIterator
from google.adk import events as adk_events
async def _async_extract_final_response_text(events: AsyncIterator[adk_events.Event]) -> Optional[str]:
"""
Exhausts the async iterator of ADK events and returns the response text
from the last event (presumably the final root agent response).
"""
collected_events = []
async for event in events:
collected_events.append(event)
if len(collected_events) == 0:
# As the error might occur in the background, we raise an exception here
raise Exception("Agent failed to execute.")
last_event: adk_events.Event = collected_events[-1]
# Don't use only event.is_final_response() because it may be true for nested agents as well!
assert (
last_event.is_final_response()
and last_event.content
and last_event.content.parts
)
return last_event.content.parts[0].text
IDK what exactly the architecture of your agent is, but I encountered something similar with events streaming as well.
event.is_final_response() may return True for the subagent response, it will cause early loop break which will interrupt the agent flow. As a result, OpikTracer callback for the root agent will not be called and the trace will not be submitted.
P.S. I've just tried running the new tests for run_async, everything seems to be working (relevant for the latest opik and adk), but we're using the function I shared above. FYI here is the PR with new tests I've just opened https://github.com/comet-ml/opik/pull/2479 (you can take a look at test_adk_async.py module). In case the issue will still be there, please share a ready-to-run reproducing script and we will be able to take a look next week.
@alexkuzmik This is the same issue even I'm facing. Could you explain it in an elaborate way? I had raised this issue two weeks back.
Issue: https://github.com/comet-ml/opik/issues/2386
Hi @AdityaTheDev! Could you please try the approach I described in my previous comment?
If you are using adk web to run these, you can't add functions into the runner.
agent.py.txt has an agent that can reproduce it often. It's not consistently reproducible. fetch_msr_agent_v1 is missing an after_agent_callback in the case that fails.
This is possibly due to the tool call setting invocation_context.end_invocation to true in the tool call to get the tool call results to be returned verbatim to the next agent.
I also added logging statements to the OpikTracer class and here's a log opik.log from it, you notice the numbers get one higher on the case that fails, so there's still span data on the final after_agent_callback and the trace is never written in this code:
if (span_data := self._context_storage.top_span_data()) is not None:
if span_data.id in self._opik_created_spans:
span_data.update(output=output)
self._end_current_span()
self._opik_created_spans.discard(span_data.id)
else:
trace_data = self._context_storage.get_trace_data()
assert trace_data is not None
if trace_data.id == self._current_trace_created_by_opik_tracer.get():
trace_data.update(output=output)
self._end_current_trace()
self._current_trace_created_by_opik_tracer.set(None)
self._last_model_output = None
https://github.com/rcleveng/adk-python/commit/39d9e8e8ebf63e5f94382327425b4d7026aeeeb6 seems to make opik work reliably
I asked on the adk discussion group if the current adk behavior is expected or not.
Hi @gdabas, @AdityaTheDev, @rcleveng it seems that there is a confusion behavior when relying on event.is_final_response() to stop processing events. We opened an ticket upstream to discuss the best way forward: https://github.com/google/adk-python/issues/1695
I think it's a bit more than that, since I can reproduce this issue randomly using adk web which doesn't have that logic in it.
The problem is, with ADK Runner everything is an event (tool call, tool response, final message), and we need to keep collecting all the events manually until we reach the last event (final response) from the Runner.
Here is the workaround I am using to make it work for me, thanks @alexkuzmik for the help.
final_response_text = ""
try:
events = runner.run_async(
user_id=payload["user_id"],
session_id=payload["session_id"],
new_message=message,
)
# We 'await' the helper function. This passes the async iterator to it.
# The helper function will consume the iterator to get the final result.
final_response_text = await _async_extract_final_response_text(events)
except Exception as e:
logging.error(f"Agent execution failed: {e}")
final_response_text = f"Agent execution failed: {e}"
# Helper function to extract the final response text from ADK events [Temp fix for Bug: https://github.com/comet-ml/opik/issues/2467#issuecomment-2970768612]
async def _async_extract_final_response_text(events: AsyncIterator[adk_events.Event]) -> Optional[str]:
"""
Exhausts the async iterator of ADK events and returns the response text
from the last event (presumably the final root agent response).
"""
collected_events = []
# This `async for` loop iterates through the `events` iterator.
async for event in events:
collected_events.append(event)
if not collected_events:
raise Exception("Agent failed to execute: No events received.")
last_event: adk_events.Event = collected_events[-1]
# This code only runs AFTER the `async for` loop above has completed and all events have been collected.
# The last event should be the final response from the root agent.
if last_event.is_final_response():
if last_event.content and last_event.content.parts:
return last_event.content.parts[0].text
elif last_event.actions and last_event.actions.escalate:
return f"Agent escalated: {last_event.error_message or 'No specific message.'}"
logging.error(f"Last event was not a final response. Event: {last_event}")
raise Exception("Agent did not produce a final response.")
Thanks @gdabas @alexkuzmik, I was about to raise same issue today. Thanks looks like every one is working in same direction 🚀 🚀