Flowise icon indicating copy to clipboard operation
Flowise copied to clipboard

Feature - Enable Streaming Capabilities For Tool Calls That Send Real-Time Notificaitons

Open Amrrx opened this issue 5 months ago • 0 comments

MCP Streaming Support

Adds real-time streaming capabilities to MCP (Model Context Protocol) tool integrations with enhanced connection management and SSE integration.

Changes

  • Enhanced MCP client connection management with SSE integration
  • Added streaming support for MCP tool execution with real-time feedback
  • Implemented server-side event streaming for MCP tool responses
  • Added connection lifecycle management with automatic cleanup

Server Capabilities Schema

Server advertises streaming support via capabilities response:

// Server capabilities detection const capabilities = client.getServerCapabilities() const hasStreaming = capabilities?.notifications?.streaming === true || capabilities?.experimental?.notifications?.streaming === true

Expected server capabilities format: { "capabilities": { "notifications": { "streaming": true } } }

Alternative experimental format: { "capabilities": { "experimental": { "notifications": { "streaming": true } } } }

Tool Capabilities Schema

MCP tools declare streaming support via annotations:

// Tool definition with streaming annotation { "name": "tool_name", "description": "Tool description", "annotations": { "streaming_enabled": true, "notification_types": ["task_completion", "progress", "logging"] }, "inputSchema": { "type": "object", "properties": { /* tool parameters */ } } }

Tool streaming annotations:

  • streaming_enabled: boolean - Enables streaming for this tool
  • notification_types: string[] - Completion signal types (default: ["task_completion"])

SSE Streaming Events

Tools with streaming support emit events via IServerSideEventStreamer:

interface IServerSideEventStreamer { // Core streaming methods streamTokenEvent(chatId: string, data: string): void streamToolEvent(chatId: string, data: any): void streamCustomEvent(chatId: string, eventType: string, data: any): void

// MCP connection management
addMcpConnection?(chatId: string, toolName?: string): void
removeMcpConnection?(chatId: string, toolName?: string): void
markMcpConnectionCompleting?(chatId: string, toolName?: string): void
hasMcpConnections?(chatId: string): boolean

}

Notification Handling Schema

MCP notifications follow the LoggingMessageNotificationSchema:

// Notification structure { "method": "logging/message", "params": { "data": "notification message", "logger": "task_completion" // Used for completion detection } }

Configuration

const MCP_STREAMING_CONFIG = { DEFAULT_COMPLETION_TIMEOUT: 600000, // 10 minutes fallback NOTIFICATION_DELAY: 1000, // 1 second cleanup delay SUPPORTED_NOTIFICATION_TYPES: ['logging/message', 'progress'], STREAMING_MARKER: '[MCP Streaming]' // UI indicator }

Technical Implementation

  • Transport Layer: Supports StdioClientTransport, SSEClientTransport, and StreamableHTTPClientTransport
  • Connection Management: Per-tool connection tracking with automatic cleanup
  • Completion Detection: Based on notification logger types from tool annotations
  • Fallback Support: Graceful degradation for non-streaming servers
  • Progress Tokens: Unique identifiers for tracking tool execution progress

Enables real-time user feedback during MCP tool execution while maintaining backward compatibility with existing non-streaming MCP servers.

Amrrx avatar Jul 29 '25 12:07 Amrrx