Websocket connection error with long tool call responses
Reproduction Steps
- Set up a project using Cloudflare agents-starter template
- Implement tool calls using Vercel AI SDK
- Send a prompt that triggers a tool call which returns a large response
- Observe that while the tool response reaches the client, the LLM response handling fails with websocket errors
Environment
- Vercel AI SDK
- Cloudflare Workers/Agents
Expected Behavior
The LLM should be able to continue the conversation after receiving the tool call response, regardless of the response size.
Current Behavior
The websocket connection errors out with "internal error" when the tool call response is too large, breaking the conversation flow.
Additional Context
This may be related to websocket payload size limitations or how the stream is being handled between Vercel AI SDK and Cloudflare Workers.
Error Message
✘ [ERROR] Error on websocket connection: 9fd6aaee-f47d-4fe8-ba99-920e42767134 [Error: internal error; reference = 80ec7ttqtjg2b3geaqgppojl]
✘ [ERROR] Override onError(connection, error) to handle websocket connection errors
✘ [ERROR] Uncaught (in promise) Error: internal error; reference = 80ec7ttqtjg2b3geaqgppojl
✘ [ERROR] Uncaught (async) Error: internal error; reference = 80ec7ttqtjg2b3geaqgppojl
Got a repro? Happy to look into it
Sorry don't have the repo, but this is the extra code that I have added to the agent starter template repo.
PS: I think this issue is because I have crossed the context window limit.
// durable_objects.ts
import { DurableObject } from "cloudflare:workers";
import type { Env } from "./env";
export class Neo4jCacheDO extends DurableObject {
private cache: Map<string, { data: any; timestamp: number }> = new Map();
private CACHE_TTL = 60 * 60 * 1000; // 1 hour cache TTL
private state: DurableObjectState;
constructor(state: DurableObjectState, env: Env) {
super(state, env);
this.state = state;
this.state.blockConcurrencyWhile(async () => {
const stored = await this.state.storage.get("relationships");
if (stored) {
this.cache = new Map(Object.entries(stored));
}
});
}
// Get cached relationships or return null if not found/expired
async getRelationships(): Promise<any | null> {
const cacheKey = "neo4j-relationships";
const cachedData = this.cache.get(cacheKey);
if (cachedData && Date.now() - cachedData.timestamp < this.CACHE_TTL) {
return cachedData.data;
}
return null;
}
// Store relationships in cache
async setRelationships(data: any): Promise<void> {
const cacheKey = "neo4j-relationships";
const cacheEntry = { data, timestamp: Date.now() };
this.cache.set(cacheKey, cacheEntry);
// Persist to storage
await this.state.storage.put(
"relationships",
Object.fromEntries(this.cache)
);
return;
}
}
// tool.ts
const getNeo4jRelaitonship = tool({
description: "this function gets data from Neo4j",
parameters: z.object({}),
execute: async ({}) => {
console.log(`Getting relationship from Neo4j `);
const cacheId = global_env.NEO4J_CACHE.idFromName("neo4j-relationships");
const stub = global_env.NEO4J_CACHE.get(cacheId);
// Try to get data from cache using the stub's fetch API
const cacheResponse = await stub.getRelationships();
if (cacheResponse) {
//@ts-ignore
const cachedData = cacheResponse;
console.log(
"Using cached Neo4j relationship data",
cachedData.slice(0, 5)
);
return JSON.parse(cachedData);
}
try {
const result = await session.run(`
MATCH ()-[r]->()
RETURN type(r) AS RelationshipType, count(r) AS Count
ORDER BY Count DESC
`);
const records = result.records;
await stub.setRelationships(JSON.stringify(records));
return records;
} catch (error) {
return `Error running query: ${error}`;
}
},
});
const getNeo4jData = tool({
description: "this function gets data from Neo4j",
parameters: z.object({
query: z.string({
description:
"The query to execute in Neo4j (should use LOWER() for String Comparisons)",
}),
}),
execute: async ({ query }) => {
console.log(`Getting data from Neo4j with query: ${query}`);
try {
const result = await session.run(query);
const records = result.records;
return records;
} catch (error) {
return `Error running query: ${error}`;
// await session.close();
}
},
});
Ok, I see you're returning a lot of data from a tool call, and that's probably what's making the big websocket message. I'll look into it, thanks!
Thanks! A proper error message would be helpful.
Hey, thank you for this project. It's exciting to contribute to a cutting edge technology.
I can reliably trigger this websocket error by sending too much data from the app.tsx client to the server.
I have recently run into this issue and can provide some information on a potential fix. I have built a prototype with this system that tool-calls an elasticsearch site, which dumps 50-150kb of documents to the LLM. The raw tool call results get streamed to the client. When the client responds, all messages are streamed back to the LLM. So as the chat progresses, after 5-6 elasticsearch calls, this websocket payload grows over time. At around 1 MB it crashes.
As a workaround, I limit the payload size by looping over the agentMessages array in reverse, and keeping only as many items in it as would not exceed 950 kb. The pseudocode looks like this and gets called onSubmit.
limitedMessages = [];
for (let i = agentMessages.length - 1; i >= 0; i--) { // fill from the end to keep the most recent messages
const msg = agentMessages[i];
// if (sizeof(msg) + sizeof(limitedMessages)) < MAX_SIZE_900kb
// limitedMessages.unshift(msg)
setMessages(limitedMessages)
// this seems to work, but I'm not sure if this is the canonical thing to do.
If there is a way to accommodate this directly via a flag in the useAgentChat module, that would be nice. I'm not sure I've seen anything in the vercel AI SDK either.
Ideally, at least in my situation, the results of the tool call shouldn't really be sent to the client, but that means we need to maintain some server-side context that is sent to the LLM for continued conversations.
Hope this helps someone.
@threepointone does agent-sdk automatically discard the old chat history and only use the recent ones if it exceeds the context window?