embabel-agent icon indicating copy to clipboard operation
embabel-agent copied to clipboard

LLM Streaming Functionality with E2E Integration Tests

Open igordayen opened this issue 2 months ago • 3 comments

LLM Streaming Integration with Tooling Support

Overview

This PR introduces comprehensive LLM streaming capabilities with full tooling integration, providing reactive streaming APIs that work seamlessly with @Tool annotated methods.

User Code Usage

Basic Streaming with Tools

// Extension function approach (recommended)
runner = ai.withLlmByRole("fastest")

    val results = runner.asStreaming()
          .withPrompt("Test integration streaming")
          .createObjectListWithThinking(SimpleItem::class.java)

      results
          .doOnNext { event ->
              when {
                  event.isThinking() -> {
                      val content = event.getThinking()!!
                      receivedEvents.add("THINKING: $content")
                      logger.info("Integration test received thinking: {}", content)
                  }
                  event.isObject() -> {
                      val obj = event.getObject()!!
                      receivedEvents.add("OBJECT: ${obj.name}")
                      logger.info("Integration test received object: {}", obj.name)
                  }
              }
          }
          .doOnError { error ->
              errorOccurred = error
              logger.error("Integration test stream error: {}", error.message)
          }
          .doOnComplete {
              completionCalled = true
              logger.info("Integration test stream completed successfully")
          }

Extension Functions API

// Pure casting (fast) runner.asStreaming()

// Safe conversion with validation runner.asStreamingWithValidation()

Code Organization

New Components

  • StreamingPromptRunnerOperations - Core streaming operations interface
  • StreamingPromptRunnerOperationsImpl - Implementation bridging API to SPI
  • StreamingChatClientOperations - Spring AI integration layer
  • Extension Functions - Clean alternatives to manual casting

Architecture Flow

  LLM Raw Chunks → Line Buffering → Content Classification → Event Generation → User Stream
      Flux<String>     LineBuffer       Thinking vs JSON      StreamingEvent<T>    Subscription

OperationContextPromptRunner serves as the bridge between existing code and streaming features, enabling seamless transition from traditional blocking operations to reactive streaming without requiring changes to existing business logic or tool definitions.

Challenge: Minimal Changes to Existing Artifacts

StreamingJacksonOutputConverter

  • JSONL Processing - Converts streaming LLM responses to typed objects
  • Real-time Parsing - Objects emitted as they arrive, not after completion
  • Thinking Support - Preserves LLM reasoning alongside structured output
  • See PR https://github.com/embabel/embabel-common/pull/89

Workflow

  1. LLM streams JSONL → {"name": "item1"}\n{"name": "item2"}
  2. Converter processes → Real-time object creation
  3. Objects emitted → Flux<T> for reactive consumption. possible enhancement - conceal Flux 4.Tools invoked

Testing Coverage

  • Unit Tests - Streaming capability detection and operations
  • Integration Tests - Tool registration and streaming workflow
  • API Tests - Both traditional and extension function approaches

Remaining Major Tasks:

  • Implement combination of WithExampleConverter format with StreamingJacksonOutputConverter for few-shot examples in streaming
  • Create common PromptFormatBuilder utility to reduce code duplication between JSON and JSONL format instructions
  • Add tests for mixed content, error cases, and edge conditions
  • Incorporate StreamingUtility into Streaming converter
  • Explore Streaming Metadata (from model DB?)
  • Discuss classification as {Object | Thinking |String}. Currently non-classified AS {Object |Thinking} got swallowed
  • Implement backpressure strategies for high-volume streaming
  • Add retry / reconnect logic for transient OpenAI failures
  • clean-up APIs

igordayen avatar Nov 29 '25 04:11 igordayen

additional documentation. streaming-design

igordayen avatar Nov 29 '25 21:11 igordayen

Please see write-up on introduction of Streaming Capability interface. streaming-capability.md

igordayen avatar Dec 01 '25 22:12 igordayen