Return function calling results to the client
Hi team,
I have been tweaking with event loop and event emitter to send back function calling results to the client on "function_calls_finished" event, but haven't been able to make it work. As assistant.on() takes a synchronous callback, I am trying to use the event loop to create task. Often it conflicts with TTS segment forwarder, causing it to be destroyed. I am just wondering if there's any ways I could work around this.
Thanks
Hoang
What we do is just send them over the data channel to the participant on a topic for agent callbacks.
It looks to me that assistant.on() only takes synchronous callbacks, while publishing data is asynchronous. Is there any ways I can work around this?
the tool calls themselves can be async so that's where we do it
yes I agreed. I can do it manually through chat.on("message_received"). What I am question about is how to do so within VoicePipelineAgent (VoiceAssistant).
I tried:
class AssistantFnc(llm.FunctionContext):
def __init__(self, agent: LocalParticipant):
super().__init__()
self.agent = agent
@llm.ai_callable()
async def get_weather(
self,
location: Annotated[
str, llm.TypeInfo(description="The location to get the weather for")
],
):
"""Called when the user asks about the weather. This function will return the weather for the given location."""
logger.info(f"getting weather for {location}")
url = f"https://wttr.in/{location}?format=%C+%t"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
weather_data = await response.text()
# response from the function call is returned to the LLM
await self.agent.publish_data(weather_data, "weather_data")
return f"The weather in {location} is {weather_data}."
else:
raise f"Failed to get weather data, status code: {response.status}"
async def entrypoint(ctx: JobContext):
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
fnc_ctx = AssistantFnc(ctx.agent) # create our fnc ctx instance
initial_chat_ctx = llm.ChatContext().append(
text=(
"You are a weather assistant created by LiveKit. Your interface with users will be voice. "
"You will provide weather information for a given location."
),
role="system",
)
participant = await ctx.wait_for_participant()
agent = VoicePipelineAgent(
vad=ctx.proc.userdata["vad"],
stt=deepgram.STT(),
llm=openai.LLM(),
tts=openai.TTS(),
fnc_ctx=fnc_ctx,
chat_ctx=initial_chat_ctx,
)
agent.start(ctx.room, participant)
await agent.say(
"Hello from the weather station. Would you like to know the weather? If so, tell me your location."
)
it gives me the following error:
{"message": "Task was destroyed but it is pending!\ntask: <Task pending name='Task-69' coro=<TTSSegmentsForwarder._main_task() running at /Users/hoangtran/livekit/server/.venv/lib/python3.12/site-packages/livekit/agents/utils/log.py:16> wait_for=<Future pending cb=[Task.task_wakeup()]>>", "level": "ERROR", "name": "asyncio", "pid": 12459, "job_id": "AJ_Yxdyf6qEM9H5", "timestamp": "2024-10-06T12:09:55.407125+00:00"} {"message": "Task was destroyed but it is pending!\ntask: <Task pending name='Task-71' coro=<TTSSegmentsForwarder._main_task.<locals>._forward_task() running at /Users/hoangtran/livekit/server/.venv/lib/python3.12/site-packages/livekit/agents/utils/log.py:16> wait_for=<Future pending cb=[Task.task_wakeup()]>>", "level": "ERROR", "name": "asyncio", "pid": 12459, "job_id": "AJ_Yxdyf6qEM9H5", "timestamp": "2024-10-06T12:09:55.407236+00:00"}
My use case is to send back search results to the client before sending to LLM to generate answers. Would you have any ideas how I should go about it?
Thanks,
Hoang
@hoangtran98 for small packets what you would can do is:
- Have a a dict[int,asyncio.Future] which holds callbacks
- every time you send a message to the client, increment a message id, place a future in the dict for that message id, publish your request over the data channel, and await the future
- when the client receives the message, publish a message back on the data channel with the result (you can listen for data events on the client side)
- when the message is received by the server, use set_result on the future and delete the key from the dict (you will need to register a listener for data events).
It gets a bit more complicated if you have large amounts of data you need to transfer, but this should work ok for small amounts of information that's a few kilobytes or less.
Thanks it works
Thanks it works
I'd love to see your full example @hoangtran98 Would you mind to share? Thanks!
@TomTom101 it's pretty straight forward to use RPCs: https://docs.livekit.io/home/client/data/rpc/
the frontend should register an RPC handler, and agent would make a call into that RPC when function is called
the frontend should register an RPC handler, and agent would make a call into that RPC when function is called
Can you hint me towards which hook supplies the room in
room.localParticipant?.registerRpcMethod(
'greet',
async (data: RpcInvocationData) => {
console.log(`Received greeting from ${data.callerIdentity}: ${data.payload}`);
return `Hello, ${data.callerIdentity}!`;
}
);
I tried useLocalParticipant directly, but it has no registerRpcMethod.
Matter of fact, I don't find registerRpcMethod at all in the @livekit node module.
"dependencies": {
"@livekit/components-react": "^2.6.4",
"@livekit/components-styles": "^1.1.3",
"@livekit/krisp-noise-filter": "^0.2.12",
"framer-motion": "^11.9.0",
"livekit-client": "^2.5.5",
"livekit-server-sdk": "^2.6.1",
"next": "^14.2.15",
"react": "^18",
"react-dom": "^18"
}
Any further help much appreciated!
@davidzhao most of web browsers still lack complete APIs to low-level control over gRPC. So I think it'd take some time to implement that in frontend.
@TomTom101 for now, my workaround is to publish data in functions called by an agent.
... // get search results from database await self.agent.publish_data(res, topic="search_results") await asyncio.sleep(0.001) # Yield control back to the event loop
return res ...
I've played around with event loop from the agent for some time and haven't found nicer way to enqueue a task/control agent's event loop.
Hope it works for you.
@TomTom101 you'd want to update your client SDKs to the latest version. this is a new feature.
@hoangtran98 this isn't gRPC, but LiveKit's own RPC protocol. it works across all browsers and will be supported by all LiveKit SDKs. docs are here
@davidzhao thanks for that. I will give it a try.
@davidzhao Thanks, getting there! Seems VS Code just fooled me, it's all there. When trying a dummy send from the server agent, I get RPC call failed: RPC not supported by server.
Which "server" is this message referring to, and what can I do about it!
Thanks a bunch!
are you using LiveKit Cloud? In this case it's stating the livekit server version is incompatible
I just realized the stupidity of my question. :) My local livekit server is now updated to 1.8.0, works like a charm! I totally forgot it running in the background all the time … Thank you so much!