aws-sdk-java-v2 icon indicating copy to clipboard operation
aws-sdk-java-v2 copied to clipboard

KinesisClient.deregisterStreamConsumer doesn't work.

Open guirak opened this issue 2 years ago • 2 comments

Describe the bug

Hello,

I'm encountering problems with the use of KinesisClient when trying to deregister a customer.

With the following code :

 kinesisClient.deregisterStreamConsumer(builder -> DeregisterStreamConsumerRequest.builder()
                                                .consumerARN(consumer.consumerARN())
                                                .build())

I have the following error :

software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Both streamARN and consumer name must be provided if consumerARN is not provied.

I don't understand why I have this error as I have a valid consumer ARN.

Replacing the consumerARN by the streamARN and the consumer name causes the same error.

I'm using the last version of the SDK (2.21.37).

Thank you

Expected Behavior

The consumer is expecting to be deregistered when providing the consumer ARN

Current Behavior

AWS doesn't deregister the consumer and the following error is returned :

software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Both streamARN and consumer name must be provided if consumerARN is not provied.

Reproduction Steps

Register a consumer :


Mono.fromFuture(() -> kinesisClient.registerStreamConsumer(RegisterStreamConsumerRequest.builder()
                        .streamARN(streamArn)
                        .consumerName(STREAM_CONSUMER_NAME + shard.shardId())
                        .build()))
                        .subscribe()

List the consumers and deregister them :

return rlog.info("Initialisation du streaming vers kinesis. ARN : " + streamArn)

                // Suppression des consumers existants
                .then(Mono.fromFuture(() -> kinesisClient.listStreamConsumers(ListStreamConsumersRequest.builder()
                        .streamARN(streamArn)
                        .build())))
                .flatMapMany(response -> rlog.info("Consumer existants : " + response.consumers().stream()
                                .map(consumer -> consumer.consumerName() + " -  ARN : " + consumer.consumerARN())
                                .collect(Collectors.joining("\n")))
                        .thenMany(Flux.fromIterable(response.consumers())))
                .flatMap(consumer -> rlog.info("Suppression du consumer " + consumer.consumerARN())
                        .then(Mono.fromFuture(() ->
                                        kinesisClient.deregisterStreamConsumer(builder -> DeregisterStreamConsumerRequest.builder()
                                                .consumerARN(consumer.consumerARN())
                                                .build())
                                )
                        )
      )         

Possible Solution

No response

Additional Information/Context

No response

AWS Java SDK version used

2.21.37

JDK version used

17.0.9

Operating System and version

Debian 12

guirak avatar Dec 04 '23 10:12 guirak

Hi @guirak,

Using AWS SDK for Java 2.21.37, I am unable to reproduce the issue. I am able to list the Stream Consumers to pull the ConsumerARN and pass it successfully to the deregisterStreamConsumer.

Here's the sample code that I used:

 public static String deregConsumer(KinesisClient kinesisClient) {

        try {
            ListStreamConsumersRequest listCon = ListStreamConsumersRequest.builder()
                    .streamARN("<<redacted>>")
                    .build();

            ListStreamConsumersResponse response = kinesisClient.listStreamConsumers(listCon);

            List<Consumer> list = response.consumers();
            String ConsumerARN = "";

            for(Consumer item: list) {
                ConsumerARN = item.consumerARN();
            }

            DeregisterStreamConsumerRequest deRegCon = DeregisterStreamConsumerRequest.builder()
                    .consumerARN(ConsumerARN)
                    .build();

            DeregisterStreamConsumerResponse resp = kinesisClient.deregisterStreamConsumer(deRegCon);
            return resp.toString();

        } catch (KinesisException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        }
        return "";
    }

Can you confirm if you are getting the correct consumer.consumerARN() that is supplied to deregisterStreamConsumer? If you continue to see an issue, can you kindly provide the verbose wire logs for further analysis? Instructions to enable verbose logging can be found here.

Regards, Chaitanya

bhoradc avatar Dec 04 '23 18:12 bhoradc

Hi,

Thank you for this quick feedback.

The consumerARN value is correct.

I have just seen that I was creating a new builder instead using the one of the supplier :

kinesisClient.deregisterStreamConsumer(builder -> DeregisterStreamConsumerRequest.builder()
        .consumerARN(consumer.consumerARN())
        .build())

The bug is not present with the following code :

kinesisClient.deregisterStreamConsumer(builder -> builder
                                                .consumerARN(consumer.consumerARN())
                                                .build())

For me it's ok as it's cleaner to use the supplied builder but maybe there is something to look why the first code was not working.

guirak avatar Dec 05 '23 07:12 guirak