sim icon indicating copy to clipboard operation
sim copied to clipboard

fix(models): memory fixes, provider code typing, cost calculation cleanup

Open icecrasher321 opened this issue 1 month ago • 3 comments

Summary

  • Memory + Streaming fixes, need to wrap streaming response to persist memory at the end of accumulation
  • Memory should not be block scoped. Managed via conversation id.
  • Move cost calculation to same level as token count calcualtion. This fixes bugs related to client side execution not tracking cost for chat
  • Type providers using official SDK types and remove fallbacks that were difficult to see when hit

Type of Change

  • [x] Bug fix
  • [x] New feature

Testing

Tested manually with all providers with streaming and no streaming for OpenAI, Anthropic, Gemini.

Checklist

  • [x] Code follows project style guidelines
  • [x] Self-reviewed my changes
  • [x] Tests added/updated and passing
  • [x] No new warnings introduced
  • [x] I confirm that I have read and agree to the terms outlined in the Contributor License Agreement (CLA)

icecrasher321 avatar Dec 22 '25 06:12 icecrasher321

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Review Updated (UTC)
docs Ready Ready Preview, Comment Dec 22, 2025 10:57pm

vercel[bot] avatar Dec 22 '25 06:12 vercel[bot]

Greptile Summary

This PR refactors memory management, improves provider type safety, and consolidates cost calculation logic. The changes address three key areas:

Memory Management Improvements

  • Changed memory scoping from workflowId to workspaceId for conversation-level persistence
  • Wrapped streaming responses to persist assistant messages after stream completion
  • Used atomic PostgreSQL array concatenation (||) for race-safe appends
  • Migration backfills workspaceId from workflows and deletes orphaned records

Provider Type Safety

  • Added explicit TypeScript types from official SDKs (ChatCompletionCreateParamsStreaming, RawMessageStreamEvent, etc.)
  • Removed fallback code for legacy response formats (Google's content.function_call)
  • Enhanced streaming utilities to track token usage from provider events

Cost Calculation Consolidation

  • Moved cost calculation from logger to provider layer for consistency
  • Cost now calculated in real-time during streaming using onComplete callbacks
  • Both streaming and non-streaming paths now compute costs at the same level

Issues Found

  • Memory seeding in agent-handler.ts:714 has race condition potential when concurrent requests use the same conversationId
  • Sliding window memory modes fetch-modify-write, which could lose messages during concurrent appends
  • Cost tracking happens in streaming callback before response completes - interrupted streams may record costs for incomplete responses

Confidence Score: 4/5

  • Safe to merge with minor race condition risks in high-concurrency scenarios
  • The PR implements solid architectural improvements with proper type safety and atomic database operations. However, memory seeding and sliding window modes have race condition vulnerabilities when multiple requests use the same conversationId concurrently. These are edge cases that may not surface in typical usage but could cause message loss or duplication under load. The provider type safety improvements and cost calculation consolidation are well-executed.
  • apps/sim/executor/handlers/agent/agent-handler.ts (seeding race condition) and apps/sim/executor/handlers/agent/memory.ts (sliding window race condition)

Important Files Changed

Filename Overview
apps/sim/executor/handlers/agent/memory.ts Refactored memory from workflow-scoped to workspace-scoped with atomic PostgreSQL operations, but sliding window modes still have race condition risk
apps/sim/executor/handlers/agent/agent-handler.ts Wrapped streaming responses for memory persistence and moved memory seeding logic, but seeding has potential race condition with concurrent requests
apps/sim/providers/openai/index.ts Added proper TypeScript types from OpenAI SDK and moved cost calculation into streaming callback for real-time tracking
apps/sim/providers/anthropic/utils.ts Enhanced streaming to track token usage from Anthropic SDK events and invoke callback with accumulated content and usage metrics
packages/db/schema.ts Changed memory table from workflowId to workspaceId scoping with unique constraint on workspace+key
packages/db/migrations/0130_bored_master_chief.sql Migration backfills workspaceId from workflows and deletes orphaned memory records that couldn't be resolved
apps/sim/providers/google/utils.ts Removed legacy function_call format fallback, now only supports functionCall in parts

Sequence Diagram

sequenceDiagram
    participant Client
    participant AgentHandler
    participant Provider
    participant MemoryService
    participant Database

    Client->>AgentHandler: execute(block, inputs)
    AgentHandler->>AgentHandler: buildMessages()
    
    alt Memory Enabled
        AgentHandler->>MemoryService: fetchMemoryMessages()
        MemoryService->>Database: SELECT data FROM memory
        Database-->>MemoryService: existing messages
        
        alt First Run (no existing)
            AgentHandler->>MemoryService: seedMemory(conversationMessages)
            MemoryService->>Database: INSERT memory record
        else Existing Memory
            alt New User Message
                AgentHandler->>MemoryService: appendToMemory(userMessage)
                MemoryService->>Database: UPDATE with array concat
            end
        end
    end

    AgentHandler->>Provider: executeProviderRequest()
    
    alt Streaming Response
        Provider-->>AgentHandler: StreamingExecution
        
        alt Memory Enabled
            AgentHandler->>MemoryService: wrapStreamForPersistence()
            MemoryService-->>AgentHandler: wrapped stream
        end
        
        AgentHandler-->>Client: stream chunks
        
        Note over MemoryService: Stream accumulates content
        
        MemoryService->>MemoryService: flush() callback
        MemoryService->>Database: UPDATE memory (append assistant message)
        MemoryService->>Provider: calculate cost from usage
        
    else Non-Streaming Response
        Provider-->>AgentHandler: BlockOutput
        Provider->>Provider: calculate cost from tokens
        
        alt Memory Enabled
            AgentHandler->>MemoryService: persistResponseToMemory()
            MemoryService->>Database: UPDATE memory (append assistant)
        end
        
        AgentHandler-->>Client: complete response
    end

greptile-apps[bot] avatar Dec 22 '25 06:12 greptile-apps[bot]

@greptile

icecrasher321 avatar Dec 22 '25 07:12 icecrasher321

@greptile

icecrasher321 avatar Dec 22 '25 20:12 icecrasher321

@greptile

icecrasher321 avatar Dec 22 '25 20:12 icecrasher321

@greptile

waleedlatif1 avatar Dec 22 '25 22:12 waleedlatif1