BedrockRuntimeAsyncClient.invokeModelWithResponseStream swallowed the exception that being throw by Visitor
Describe the bug
Issue: the exception that being throw within the Visitor is being swallowed by the BedrockRuntimeAsyncClient.invokeModelWithResponseStream
Expected Behavior
The exception should be throw by CompletableFuture<Void> and wrapped by CompletionException
Current Behavior
The reply from the base model is being cut while the main process is closed with exit code 0 which misleading for our use case.
Reproduction Steps
Code Snippet:
internal object BedrockRuntime {
...
fun <T> invokeModelWithResponseStream(modelId: String, body: Any, type: Class<T>, consumer: Consumer<T>) {
val request = InvokeModelWithResponseStreamRequest.builder()
.modelId(modelId)
.body(SdkBytes.fromUtf8String(objectMapper.writeValueAsString(body)))
.contentType(JSON_MEDIA_TYPE)
.accept(JSON_MEDIA_TYPE)
.build()
val visitor = InvokeModelWithResponseStreamResponseHandler.Visitor.builder()
.onChunk {
val payload = it
.bytes()
.asUtf8String()
val response = objectMapper.readValue(payload, type)
consumer.accept(response)
}
.build()
val handler = InvokeModelWithResponseStreamResponseHandler.builder()
.subscriber(visitor)
.build()
try {
asyncClient.invokeModelWithResponseStream(request, handler)
.join()
} catch (exception: Exception) {
exception.printStackTrace()
}
}
...
}
...
object Application {
@JvmStatic
fun main(args: Array<String>) {
...
var counter = 0
BedrockRuntime.invokeModelWithResponseStream(CLAUDE_MODEL_ID, request, ClaudeBodyResponse::class.java) {
counter++
print(it.completion)
if (counter > 10) {
throw RuntimeException("TEST")
}
}
...
}
}
Output:
Berikut adalah beberapa tips unt
Process finished with exit code 0
Possible Solution
No response
Additional Information/Context
Kotlin Version: 1.9.22
AWS Java SDK version used
2.24.1
JDK version used
openjdk version "21.0.2" 2024-01-16 LTS OpenJDK Runtime Environment Temurin-21.0.2+13 (build 21.0.2+13-LTS) OpenJDK 64-Bit Server VM Temurin-21.0.2+13 (build 21.0.2+13-LTS, mixed mode)
Operating System and version
macOS Sonoma Version 14.3.1
Facing the same issue. Below is Java code to reproduce:
import software.amazon.awssdk.auth.credentials.*;
import software.amazon.awssdk.regions.*;
import software.amazon.awssdk.services.bedrockruntime.*;
import software.amazon.awssdk.services.bedrockruntime.model.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static java.lang.System.*;
import static software.amazon.awssdk.services.bedrockruntime.model.ContentBlock.*;
import static software.amazon.awssdk.services.bedrockruntime.model.ConversationRole.*;
public class BedrockConverseStreamTest {
public static void main(String[] args) throws Throwable {
var msgCounter = new AtomicInteger(0);
var client = BedrockRuntimeAsyncClient.builder().credentialsProvider(DefaultCredentialsProvider.create()).region(Region.US_WEST_2).build();
var request = ConverseStreamRequest.builder()
.modelId("anthropic.claude-3-5-sonnet-20240620-v1:0")
.messages(List.of(Message.builder()
.role(USER)
.content(List.of(fromText("Say: Hello from Bedrock streaming")))
.build()))
.build();
CompletableFuture<Void> future = client.converseStream(
request,
ConverseStreamResponseHandler.builder()
.onResponse(resp -> out.println("📨 onResponse called::" + resp.toString()))
.onError(err -> err.printStackTrace())
.onComplete(() -> out.println("onComplete called"))
.subscriber(new ConverseStreamResponseHandler.Visitor() {
@Override
public void visitDefault(ConverseStreamOutput e) {
out.println(msgCounter + "::On Visit::count:" + e.toString());
// inject poison pill at random on visit call let say 13th on visit.
if (msgCounter.incrementAndGet() == 13) throw new RuntimeException("auuch");
}
})
.build());
future.get(150, TimeUnit.SECONDS); // block until stream done (tune as needed)
}
}
Expected the onError to be called
Its been long this issue is open