agents icon indicating copy to clipboard operation
agents copied to clipboard

Websocket connection error with long tool call responses

Open unique1o1 opened this issue 10 months ago • 6 comments

Reproduction Steps

  1. Set up a project using Cloudflare agents-starter template
  2. Implement tool calls using Vercel AI SDK
  3. Send a prompt that triggers a tool call which returns a large response
  4. Observe that while the tool response reaches the client, the LLM response handling fails with websocket errors

Environment

  • Vercel AI SDK
  • Cloudflare Workers/Agents

Expected Behavior

The LLM should be able to continue the conversation after receiving the tool call response, regardless of the response size.

Current Behavior

The websocket connection errors out with "internal error" when the tool call response is too large, breaking the conversation flow.

Additional Context

This may be related to websocket payload size limitations or how the stream is being handled between Vercel AI SDK and Cloudflare Workers.

Error Message

✘ [ERROR] Error on websocket connection: 9fd6aaee-f47d-4fe8-ba99-920e42767134 [Error: internal error; reference = 80ec7ttqtjg2b3geaqgppojl]

✘ [ERROR] Override onError(connection, error) to handle websocket connection errors

✘ [ERROR] Uncaught (in promise) Error: internal error; reference = 80ec7ttqtjg2b3geaqgppojl

✘ [ERROR] Uncaught (async) Error: internal error; reference = 80ec7ttqtjg2b3geaqgppojl

unique1o1 avatar Mar 26 '25 09:03 unique1o1

Got a repro? Happy to look into it

threepointone avatar Mar 26 '25 09:03 threepointone

Sorry don't have the repo, but this is the extra code that I have added to the agent starter template repo.

PS: I think this issue is because I have crossed the context window limit.

// durable_objects.ts
import { DurableObject } from "cloudflare:workers";
import type { Env } from "./env";

export class Neo4jCacheDO extends DurableObject {
  private cache: Map<string, { data: any; timestamp: number }> = new Map();
  private CACHE_TTL = 60 * 60 * 1000; // 1 hour cache TTL
  private state: DurableObjectState;

  constructor(state: DurableObjectState, env: Env) {
    super(state, env);
    this.state = state;
    this.state.blockConcurrencyWhile(async () => {
      const stored = await this.state.storage.get("relationships");
      if (stored) {
        this.cache = new Map(Object.entries(stored));
      }
    });
  }

  // Get cached relationships or return null if not found/expired
  async getRelationships(): Promise<any | null> {
    const cacheKey = "neo4j-relationships";
    const cachedData = this.cache.get(cacheKey);

    if (cachedData && Date.now() - cachedData.timestamp < this.CACHE_TTL) {
      return cachedData.data;
    }

    return null;
  }

  // Store relationships in cache
  async setRelationships(data: any): Promise<void> {
    const cacheKey = "neo4j-relationships";
    const cacheEntry = { data, timestamp: Date.now() };
    this.cache.set(cacheKey, cacheEntry);

    // Persist to storage
    await this.state.storage.put(
      "relationships",
      Object.fromEntries(this.cache)
    );
    return;
  }
}

// tool.ts
const getNeo4jRelaitonship = tool({
  description: "this function gets data from Neo4j",
  parameters: z.object({}),
  execute: async ({}) => {
    console.log(`Getting relationship from Neo4j `);
    const cacheId = global_env.NEO4J_CACHE.idFromName("neo4j-relationships");
    const stub = global_env.NEO4J_CACHE.get(cacheId);

    // Try to get data from cache using the stub's fetch API
    const cacheResponse = await stub.getRelationships();
    if (cacheResponse) {
      //@ts-ignore
      const cachedData = cacheResponse;
      console.log(
        "Using cached Neo4j relationship data",
        cachedData.slice(0, 5)
      );
      return JSON.parse(cachedData);
    }
    try {
      const result = await session.run(`
        MATCH ()-[r]->()
        RETURN type(r) AS RelationshipType, count(r) AS Count
        ORDER BY Count DESC
`);

      const records = result.records;
      await stub.setRelationships(JSON.stringify(records));
            return records;
    } catch (error) {
      return `Error running query: ${error}`;
    }
  },
});
const getNeo4jData = tool({
  description: "this function gets data from Neo4j",
  parameters: z.object({
    query: z.string({
      description:
        "The query to execute in Neo4j (should use LOWER() for String Comparisons)",
    }),
  }),
  execute: async ({ query }) => {
    console.log(`Getting data from Neo4j with query: ${query}`);
    try {
      const result = await session.run(query);
      const records = result.records;
   
      return records;
    } catch (error) {
      return `Error running query: ${error}`;
      // await session.close();
    }
  },
});

unique1o1 avatar Mar 26 '25 09:03 unique1o1

Ok, I see you're returning a lot of data from a tool call, and that's probably what's making the big websocket message. I'll look into it, thanks!

threepointone avatar Mar 26 '25 09:03 threepointone

Thanks! A proper error message would be helpful.

unique1o1 avatar Mar 26 '25 10:03 unique1o1

Hey, thank you for this project. It's exciting to contribute to a cutting edge technology.

I can reliably trigger this websocket error by sending too much data from the app.tsx client to the server.

I have recently run into this issue and can provide some information on a potential fix. I have built a prototype with this system that tool-calls an elasticsearch site, which dumps 50-150kb of documents to the LLM. The raw tool call results get streamed to the client. When the client responds, all messages are streamed back to the LLM. So as the chat progresses, after 5-6 elasticsearch calls, this websocket payload grows over time. At around 1 MB it crashes.

As a workaround, I limit the payload size by looping over the agentMessages array in reverse, and keeping only as many items in it as would not exceed 950 kb. The pseudocode looks like this and gets called onSubmit.

limitedMessages = []; 
for (let i = agentMessages.length - 1; i >= 0; i--) { // fill from the end to keep the most recent messages
      const msg = agentMessages[i];
      // if (sizeof(msg) + sizeof(limitedMessages)) < MAX_SIZE_900kb
      //        limitedMessages.unshift(msg)

setMessages(limitedMessages) 
// this seems to work, but I'm not sure if this is the canonical thing to do. 

If there is a way to accommodate this directly via a flag in the useAgentChat module, that would be nice. I'm not sure I've seen anything in the vercel AI SDK either.

Ideally, at least in my situation, the results of the tool call shouldn't really be sent to the client, but that means we need to maintain some server-side context that is sent to the LLM for continued conversations.

Hope this helps someone.

kbhalerao avatar Jun 05 '25 20:06 kbhalerao

@threepointone does agent-sdk automatically discard the old chat history and only use the recent ones if it exceeds the context window?

unique1o1 avatar Jun 22 '25 16:06 unique1o1