opik icon indicating copy to clipboard operation
opik copied to clipboard

[Bug]: ADK Runner async doesn't show Traces but LLM calls are appearing.

Open gdabas opened this issue 10 months ago • 11 comments

What component(s) are affected?

  • [x] Opik Python SDK
  • [ ] Opik Typescript SDK
  • [ ] Opik Agent Optimizer SDK
  • [ ] Opik UI
  • [ ] Opik Server
  • [ ] Documentation

Opik version

  • Opik version: 1.7.32

Describe the problem

When using (ADK) Agent Development Kit, Opik doesn't show Traced (in Comet Dashboard Traces Tab) but show all LLM calls.

After debugging for more than an hour, I figures, when I switch to runner.run instead of runner.run_async then the Traces appear correctly. [referring to from google.adk.runners import Runner]

Seems like Opik is not working for async, and Google doesn't recommend using runner.run (sync) in Production. Please help.

Image Image

Reproduction steps and code snippets

Try using both runner.run and runner.run_async and noticed Traces shows for runner.run but not for async.

Here is the code I'm using:

async for event in runner.run_async(
            user_id=payload["user_id"], session_id=payload["session_id"], new_message=message
        ):
            # Check if the event is a response from the agent
            if event.is_final_response():
                if event.content and event.content.parts:
                    # Assuming text response in the first part
                    final_response_text = event.content.parts[0].text
                elif event.actions and event.actions.escalate:
                    final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
                break

    return final_response_text

Error logs or stack trace

No response

Healthcheck results

No response

gdabas avatar Jun 12 '25 23:06 gdabas

ADK stream events aren't showing up in Opik Traces? Please help

gdabas avatar Jun 12 '25 23:06 gdabas

Hi @gdabas!

For the sake of experiment, could you use the logic like the in the following function to get the final response from your agent?

from typing import AsyncIterator

from google.adk import events as adk_events

async def _async_extract_final_response_text(events: AsyncIterator[adk_events.Event]) -> Optional[str]:
    """
    Exhausts the async iterator of ADK events and returns the response text
    from the last event (presumably the final root agent response).
    """
    collected_events = []
    async for event in events:
        collected_events.append(event)
        
    if len(collected_events) == 0:
        # As the error might occur in the background, we raise an exception here
        raise Exception("Agent failed to execute.")

    last_event: adk_events.Event = collected_events[-1]
    # Don't use only event.is_final_response() because it may be true for nested agents as well!
    assert (
        last_event.is_final_response()
        and last_event.content
        and last_event.content.parts
    )
    return last_event.content.parts[0].text

IDK what exactly the architecture of your agent is, but I encountered something similar with events streaming as well. event.is_final_response() may return True for the subagent response, it will cause early loop break which will interrupt the agent flow. As a result, OpikTracer callback for the root agent will not be called and the trace will not be submitted.

P.S. I've just tried running the new tests for run_async, everything seems to be working (relevant for the latest opik and adk), but we're using the function I shared above. FYI here is the PR with new tests I've just opened https://github.com/comet-ml/opik/pull/2479 (you can take a look at test_adk_async.py module). In case the issue will still be there, please share a ready-to-run reproducing script and we will be able to take a look next week.

alexkuzmik avatar Jun 13 '25 15:06 alexkuzmik

@alexkuzmik This is the same issue even I'm facing. Could you explain it in an elaborate way? I had raised this issue two weeks back.

Issue: https://github.com/comet-ml/opik/issues/2386

AdityaTheDev avatar Jun 17 '25 09:06 AdityaTheDev

Hi @AdityaTheDev! Could you please try the approach I described in my previous comment?

alexkuzmik avatar Jun 17 '25 14:06 alexkuzmik

If you are using adk web to run these, you can't add functions into the runner.

rcleveng avatar Jun 27 '25 00:06 rcleveng

agent.py.txt has an agent that can reproduce it often. It's not consistently reproducible. fetch_msr_agent_v1 is missing an after_agent_callback in the case that fails.

This is possibly due to the tool call setting invocation_context.end_invocation to true in the tool call to get the tool call results to be returned verbatim to the next agent.

I also added logging statements to the OpikTracer class and here's a log opik.log from it, you notice the numbers get one higher on the case that fails, so there's still span data on the final after_agent_callback and the trace is never written in this code:

            if (span_data := self._context_storage.top_span_data()) is not None:
                if span_data.id in self._opik_created_spans:
                    span_data.update(output=output)
                    self._end_current_span()
                    self._opik_created_spans.discard(span_data.id)
            else:
                trace_data = self._context_storage.get_trace_data()
                assert trace_data is not None

                if trace_data.id == self._current_trace_created_by_opik_tracer.get():
                    trace_data.update(output=output)
                    self._end_current_trace()
                    self._current_trace_created_by_opik_tracer.set(None)
                    self._last_model_output = None

rcleveng avatar Jun 27 '25 02:06 rcleveng

https://github.com/rcleveng/adk-python/commit/39d9e8e8ebf63e5f94382327425b4d7026aeeeb6 seems to make opik work reliably

I asked on the adk discussion group if the current adk behavior is expected or not.

rcleveng avatar Jun 27 '25 05:06 rcleveng

Hi @gdabas, @AdityaTheDev, @rcleveng it seems that there is a confusion behavior when relying on event.is_final_response() to stop processing events. We opened an ticket upstream to discuss the best way forward: https://github.com/google/adk-python/issues/1695

Lothiraldan avatar Jun 27 '25 14:06 Lothiraldan

I think it's a bit more than that, since I can reproduce this issue randomly using adk web which doesn't have that logic in it.

rcleveng avatar Jun 27 '25 15:06 rcleveng

The problem is, with ADK Runner everything is an event (tool call, tool response, final message), and we need to keep collecting all the events manually until we reach the last event (final response) from the Runner.

Here is the workaround I am using to make it work for me, thanks @alexkuzmik for the help.

final_response_text = ""
try:
        events = runner.run_async(
            user_id=payload["user_id"],
            session_id=payload["session_id"],
            new_message=message,
        )

        # We 'await' the helper function. This passes the async iterator to it.
        # The helper function will consume the iterator to get the final result.
        final_response_text = await _async_extract_final_response_text(events)

except Exception as e:
        logging.error(f"Agent execution failed: {e}")
        final_response_text = f"Agent execution failed: {e}"
# Helper function to extract the final response text from ADK events [Temp fix for Bug: https://github.com/comet-ml/opik/issues/2467#issuecomment-2970768612]
async def _async_extract_final_response_text(events: AsyncIterator[adk_events.Event]) -> Optional[str]:
    """
    Exhausts the async iterator of ADK events and returns the response text
    from the last event (presumably the final root agent response).
    """
    collected_events = []

    # This `async for` loop iterates through the `events` iterator.
    async for event in events:
        collected_events.append(event)
        
    if not collected_events:
        raise Exception("Agent failed to execute: No events received.")

    last_event: adk_events.Event = collected_events[-1]
    
    # This code only runs AFTER the `async for` loop above has completed and all events have been collected.
    # The last event should be the final response from the root agent.
    if last_event.is_final_response():
        if last_event.content and last_event.content.parts:
            return last_event.content.parts[0].text
        elif last_event.actions and last_event.actions.escalate:
            return f"Agent escalated: {last_event.error_message or 'No specific message.'}"
    
    logging.error(f"Last event was not a final response. Event: {last_event}")
    raise Exception("Agent did not produce a final response.")

gdabas avatar Jul 10 '25 07:07 gdabas

Thanks @gdabas @alexkuzmik, I was about to raise same issue today. Thanks looks like every one is working in same direction 🚀 🚀

anupmanekar avatar Jul 11 '25 19:07 anupmanekar