agents icon indicating copy to clipboard operation
agents copied to clipboard

Return function calling results to the client

Open hoangtran98 opened this issue 1 year ago • 5 comments

Hi team,

I have been tweaking with event loop and event emitter to send back function calling results to the client on "function_calls_finished" event, but haven't been able to make it work. As assistant.on() takes a synchronous callback, I am trying to use the event loop to create task. Often it conflicts with TTS segment forwarder, causing it to be destroyed. I am just wondering if there's any ways I could work around this.

Thanks

Hoang

hoangtran98 avatar Oct 01 '24 13:10 hoangtran98

What we do is just send them over the data channel to the participant on a topic for agent callbacks.

jezell avatar Oct 02 '24 15:10 jezell

It looks to me that assistant.on() only takes synchronous callbacks, while publishing data is asynchronous. Is there any ways I can work around this?

hoangtran98 avatar Oct 03 '24 15:10 hoangtran98

the tool calls themselves can be async so that's where we do it

jezell avatar Oct 03 '24 20:10 jezell

yes I agreed. I can do it manually through chat.on("message_received"). What I am question about is how to do so within VoicePipelineAgent (VoiceAssistant).

I tried:

class AssistantFnc(llm.FunctionContext):

def __init__(self, agent: LocalParticipant):
    super().__init__()
    self.agent = agent

@llm.ai_callable()
async def get_weather(
    self,
    location: Annotated[
        str, llm.TypeInfo(description="The location to get the weather for")
    ],
):
    """Called when the user asks about the weather. This function will return the weather for the given location."""
    logger.info(f"getting weather for {location}")
    url = f"https://wttr.in/{location}?format=%C+%t"
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            if response.status == 200:
                weather_data = await response.text()
                # response from the function call is returned to the LLM
               await self.agent.publish_data(weather_data, "weather_data")
                return f"The weather in {location} is {weather_data}."
            else:
                raise f"Failed to get weather data, status code: {response.status}"

async def entrypoint(ctx: JobContext):

await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
fnc_ctx = AssistantFnc(ctx.agent)  # create our fnc ctx instance
initial_chat_ctx = llm.ChatContext().append(
    text=(
        "You are a weather assistant created by LiveKit. Your interface with users will be voice. "
        "You will provide weather information for a given location."
    ),
    role="system",
)
participant = await ctx.wait_for_participant()
agent = VoicePipelineAgent(
    vad=ctx.proc.userdata["vad"],
    stt=deepgram.STT(),
    llm=openai.LLM(),
    tts=openai.TTS(),
    fnc_ctx=fnc_ctx,
    chat_ctx=initial_chat_ctx,
)
agent.start(ctx.room, participant)
await agent.say(
    "Hello from the weather station. Would you like to know the weather? If so, tell me your location."
)

it gives me the following error:

{"message": "Task was destroyed but it is pending!\ntask: <Task pending name='Task-69' coro=<TTSSegmentsForwarder._main_task() running at /Users/hoangtran/livekit/server/.venv/lib/python3.12/site-packages/livekit/agents/utils/log.py:16> wait_for=<Future pending cb=[Task.task_wakeup()]>>", "level": "ERROR", "name": "asyncio", "pid": 12459, "job_id": "AJ_Yxdyf6qEM9H5", "timestamp": "2024-10-06T12:09:55.407125+00:00"} {"message": "Task was destroyed but it is pending!\ntask: <Task pending name='Task-71' coro=<TTSSegmentsForwarder._main_task.<locals>._forward_task() running at /Users/hoangtran/livekit/server/.venv/lib/python3.12/site-packages/livekit/agents/utils/log.py:16> wait_for=<Future pending cb=[Task.task_wakeup()]>>", "level": "ERROR", "name": "asyncio", "pid": 12459, "job_id": "AJ_Yxdyf6qEM9H5", "timestamp": "2024-10-06T12:09:55.407236+00:00"}

My use case is to send back search results to the client before sending to LLM to generate answers. Would you have any ideas how I should go about it?

Thanks,

Hoang

hoangtran98 avatar Oct 06 '24 12:10 hoangtran98

@hoangtran98 for small packets what you would can do is:

  1. Have a a dict[int,asyncio.Future] which holds callbacks
  2. every time you send a message to the client, increment a message id, place a future in the dict for that message id, publish your request over the data channel, and await the future
  3. when the client receives the message, publish a message back on the data channel with the result (you can listen for data events on the client side)
  4. when the message is received by the server, use set_result on the future and delete the key from the dict (you will need to register a listener for data events).

It gets a bit more complicated if you have large amounts of data you need to transfer, but this should work ok for small amounts of information that's a few kilobytes or less.

jezell avatar Oct 08 '24 05:10 jezell

Thanks it works

hoangtran98 avatar Oct 27 '24 15:10 hoangtran98

Thanks it works

hoangtran98 avatar Oct 27 '24 15:10 hoangtran98

I'd love to see your full example @hoangtran98 Would you mind to share? Thanks!

TomTom101 avatar Nov 15 '24 11:11 TomTom101

@TomTom101 it's pretty straight forward to use RPCs: https://docs.livekit.io/home/client/data/rpc/

the frontend should register an RPC handler, and agent would make a call into that RPC when function is called

davidzhao avatar Nov 15 '24 19:11 davidzhao

the frontend should register an RPC handler, and agent would make a call into that RPC when function is called

Can you hint me towards which hook supplies the room in

room.localParticipant?.registerRpcMethod(
  'greet',
  async (data: RpcInvocationData) => {
    console.log(`Received greeting from ${data.callerIdentity}: ${data.payload}`);
    return `Hello, ${data.callerIdentity}!`;
  }
);

I tried useLocalParticipant directly, but it has no registerRpcMethod. Matter of fact, I don't find registerRpcMethod at all in the @livekit node module.

  "dependencies": {
    "@livekit/components-react": "^2.6.4",
    "@livekit/components-styles": "^1.1.3",
    "@livekit/krisp-noise-filter": "^0.2.12",
    "framer-motion": "^11.9.0",
    "livekit-client": "^2.5.5",
    "livekit-server-sdk": "^2.6.1",
    "next": "^14.2.15",
    "react": "^18",
    "react-dom": "^18"
  }

Any further help much appreciated!

TomTom101 avatar Nov 16 '24 10:11 TomTom101

@davidzhao most of web browsers still lack complete APIs to low-level control over gRPC. So I think it'd take some time to implement that in frontend.

@TomTom101 for now, my workaround is to publish data in functions called by an agent.

... // get search results from database await self.agent.publish_data(res, topic="search_results") await asyncio.sleep(0.001) # Yield control back to the event loop

return res ...

I've played around with event loop from the agent for some time and haven't found nicer way to enqueue a task/control agent's event loop.

Hope it works for you.

hoangtran98 avatar Nov 16 '24 10:11 hoangtran98

@TomTom101 you'd want to update your client SDKs to the latest version. this is a new feature.

@hoangtran98 this isn't gRPC, but LiveKit's own RPC protocol. it works across all browsers and will be supported by all LiveKit SDKs. docs are here

davidzhao avatar Nov 16 '24 16:11 davidzhao

@davidzhao thanks for that. I will give it a try.

hoangtran98 avatar Nov 16 '24 16:11 hoangtran98

@davidzhao Thanks, getting there! Seems VS Code just fooled me, it's all there. When trying a dummy send from the server agent, I get RPC call failed: RPC not supported by server. Which "server" is this message referring to, and what can I do about it!

Thanks a bunch!

TomTom101 avatar Nov 16 '24 20:11 TomTom101

are you using LiveKit Cloud? In this case it's stating the livekit server version is incompatible

davidzhao avatar Nov 17 '24 00:11 davidzhao

I just realized the stupidity of my question. :) My local livekit server is now updated to 1.8.0, works like a charm! I totally forgot it running in the background all the time … Thank you so much!

TomTom101 avatar Nov 17 '24 10:11 TomTom101