aws-sdk-java-v2 icon indicating copy to clipboard operation
aws-sdk-java-v2 copied to clipboard

BedrockRuntimeAsyncClient.invokeModelWithResponseStream swallowed the exception that being throw by Visitor

Open nico-arianto opened this issue 1 year ago • 2 comments

Describe the bug

Issue: the exception that being throw within the Visitor is being swallowed by the BedrockRuntimeAsyncClient.invokeModelWithResponseStream

Expected Behavior

The exception should be throw by CompletableFuture<Void> and wrapped by CompletionException

Current Behavior

The reply from the base model is being cut while the main process is closed with exit code 0 which misleading for our use case.

Reproduction Steps

Code Snippet:

internal object BedrockRuntime {
    ...
    fun <T> invokeModelWithResponseStream(modelId: String, body: Any, type: Class<T>, consumer: Consumer<T>) {
        val request = InvokeModelWithResponseStreamRequest.builder()
            .modelId(modelId)
            .body(SdkBytes.fromUtf8String(objectMapper.writeValueAsString(body)))
            .contentType(JSON_MEDIA_TYPE)
            .accept(JSON_MEDIA_TYPE)
            .build()
        val visitor = InvokeModelWithResponseStreamResponseHandler.Visitor.builder()
            .onChunk {
                val payload = it
                    .bytes()
                    .asUtf8String()
                val response = objectMapper.readValue(payload, type)
                consumer.accept(response)
            }
            .build()
        val handler = InvokeModelWithResponseStreamResponseHandler.builder()
            .subscriber(visitor)
            .build()
        try {
            asyncClient.invokeModelWithResponseStream(request, handler)
                .join()
        } catch (exception: Exception) {
            exception.printStackTrace()
        }
    }
    ...
}
...
object Application {
    @JvmStatic
    fun main(args: Array<String>) {
        ...
        var counter = 0
        BedrockRuntime.invokeModelWithResponseStream(CLAUDE_MODEL_ID, request, ClaudeBodyResponse::class.java) {
            counter++
            print(it.completion)
            if (counter > 10) {
                throw RuntimeException("TEST")
            }
        }
        ...
    }
}

Output:

 Berikut adalah beberapa tips unt
Process finished with exit code 0

Possible Solution

No response

Additional Information/Context

Kotlin Version: 1.9.22

AWS Java SDK version used

2.24.1

JDK version used

openjdk version "21.0.2" 2024-01-16 LTS OpenJDK Runtime Environment Temurin-21.0.2+13 (build 21.0.2+13-LTS) OpenJDK 64-Bit Server VM Temurin-21.0.2+13 (build 21.0.2+13-LTS, mixed mode)

Operating System and version

macOS Sonoma Version 14.3.1

nico-arianto avatar Feb 14 '24 11:02 nico-arianto

Facing the same issue. Below is Java code to reproduce:


import software.amazon.awssdk.auth.credentials.*;
import software.amazon.awssdk.regions.*;
import software.amazon.awssdk.services.bedrockruntime.*;
import software.amazon.awssdk.services.bedrockruntime.model.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import static java.lang.System.*;
import static software.amazon.awssdk.services.bedrockruntime.model.ContentBlock.*;
import static software.amazon.awssdk.services.bedrockruntime.model.ConversationRole.*;

public class BedrockConverseStreamTest {

    public static void main(String[] args) throws Throwable {

        var msgCounter = new AtomicInteger(0);

        var client = BedrockRuntimeAsyncClient.builder().credentialsProvider(DefaultCredentialsProvider.create()).region(Region.US_WEST_2).build();

        var request = ConverseStreamRequest.builder()
                .modelId("anthropic.claude-3-5-sonnet-20240620-v1:0")
                .messages(List.of(Message.builder()
                        .role(USER)
                        .content(List.of(fromText("Say: Hello from Bedrock streaming")))
                        .build()))
                .build();

        CompletableFuture<Void> future = client.converseStream(
                request,
                ConverseStreamResponseHandler.builder()
                        .onResponse(resp -> out.println("📨 onResponse called::" + resp.toString()))
                        .onError(err -> err.printStackTrace())
                        .onComplete(() -> out.println("onComplete called"))
                        .subscriber(new ConverseStreamResponseHandler.Visitor() {
                            @Override
                            public void visitDefault(ConverseStreamOutput e) {
                                out.println(msgCounter + "::On Visit::count:" + e.toString());

                                // inject poison pill at random on visit call let say 13th on visit.
                                if (msgCounter.incrementAndGet() == 13) throw new RuntimeException("auuch");
                            }
                        })
                        .build());

        future.get(150, TimeUnit.SECONDS); // block until stream done (tune as needed)
    }
}

Expected the onError to be called

bhuvangu avatar Sep 07 '25 08:09 bhuvangu

Its been long this issue is open

bhuvangu avatar Sep 09 '25 19:09 bhuvangu