LLM Streaming Functionality with E2E Integration Tests
LLM Streaming Integration with Tooling Support
Overview
This PR introduces comprehensive LLM streaming capabilities with full tooling integration, providing reactive streaming APIs that work seamlessly with @Tool annotated methods.
User Code Usage
Basic Streaming with Tools
// Extension function approach (recommended)
runner = ai.withLlmByRole("fastest")
val results = runner.asStreaming()
.withPrompt("Test integration streaming")
.createObjectListWithThinking(SimpleItem::class.java)
results
.doOnNext { event ->
when {
event.isThinking() -> {
val content = event.getThinking()!!
receivedEvents.add("THINKING: $content")
logger.info("Integration test received thinking: {}", content)
}
event.isObject() -> {
val obj = event.getObject()!!
receivedEvents.add("OBJECT: ${obj.name}")
logger.info("Integration test received object: {}", obj.name)
}
}
}
.doOnError { error ->
errorOccurred = error
logger.error("Integration test stream error: {}", error.message)
}
.doOnComplete {
completionCalled = true
logger.info("Integration test stream completed successfully")
}
Extension Functions API
// Pure casting (fast) runner.asStreaming()
// Safe conversion with validation runner.asStreamingWithValidation()
Code Organization
New Components
- StreamingPromptRunnerOperations - Core streaming operations interface
- StreamingPromptRunnerOperationsImpl - Implementation bridging API to SPI
- StreamingChatClientOperations - Spring AI integration layer
- Extension Functions - Clean alternatives to manual casting
Architecture Flow
LLM Raw Chunks → Line Buffering → Content Classification → Event Generation → User Stream
Flux<String> LineBuffer Thinking vs JSON StreamingEvent<T> Subscription
OperationContextPromptRunner serves as the bridge between existing code and streaming features, enabling seamless transition from traditional blocking operations to reactive streaming without requiring changes to existing business logic or tool definitions.
Challenge: Minimal Changes to Existing Artifacts
StreamingJacksonOutputConverter
- JSONL Processing - Converts streaming LLM responses to typed objects
- Real-time Parsing - Objects emitted as they arrive, not after completion
- Thinking Support - Preserves LLM reasoning alongside structured output
- See PR https://github.com/embabel/embabel-common/pull/89
Workflow
- LLM streams JSONL → {"name": "item1"}\n{"name": "item2"}
- Converter processes → Real-time object creation
- Objects emitted → Flux<T> for reactive consumption. possible enhancement - conceal
Flux4.Tools invoked
Testing Coverage
- Unit Tests - Streaming capability detection and operations
- Integration Tests - Tool registration and streaming workflow
- API Tests - Both traditional and extension function approaches
Remaining Major Tasks:
- Implement combination of WithExampleConverter format with StreamingJacksonOutputConverter for few-shot examples in streaming
- Create common PromptFormatBuilder utility to reduce code duplication between JSON and JSONL format instructions
- Add tests for mixed content, error cases, and edge conditions
- Incorporate StreamingUtility into Streaming converter
- Explore Streaming Metadata (from model DB?)
- Discuss classification as {Object | Thinking |String}. Currently non-classified AS {Object |Thinking} got swallowed
- Implement backpressure strategies for high-volume streaming
- Add retry / reconnect logic for transient OpenAI failures
- clean-up APIs
additional documentation.
Please see write-up on introduction of Streaming Capability interface. streaming-capability.md
Quality Gate passed
Issues
0 New issues
0 Accepted issues
Measures
0 Security Hotspots
81.2% Coverage on New Code
0.0% Duplication on New Code