KinesisClient.deregisterStreamConsumer doesn't work.
Describe the bug
Hello,
I'm encountering problems with the use of KinesisClient when trying to deregister a customer.
With the following code :
kinesisClient.deregisterStreamConsumer(builder -> DeregisterStreamConsumerRequest.builder()
.consumerARN(consumer.consumerARN())
.build())
I have the following error :
software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Both streamARN and consumer name must be provided if consumerARN is not provied.
I don't understand why I have this error as I have a valid consumer ARN.
Replacing the consumerARN by the streamARN and the consumer name causes the same error.
I'm using the last version of the SDK (2.21.37).
Thank you
Expected Behavior
The consumer is expecting to be deregistered when providing the consumer ARN
Current Behavior
AWS doesn't deregister the consumer and the following error is returned :
software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Both streamARN and consumer name must be provided if consumerARN is not provied.
Reproduction Steps
Register a consumer :
Mono.fromFuture(() -> kinesisClient.registerStreamConsumer(RegisterStreamConsumerRequest.builder()
.streamARN(streamArn)
.consumerName(STREAM_CONSUMER_NAME + shard.shardId())
.build()))
.subscribe()
List the consumers and deregister them :
return rlog.info("Initialisation du streaming vers kinesis. ARN : " + streamArn)
// Suppression des consumers existants
.then(Mono.fromFuture(() -> kinesisClient.listStreamConsumers(ListStreamConsumersRequest.builder()
.streamARN(streamArn)
.build())))
.flatMapMany(response -> rlog.info("Consumer existants : " + response.consumers().stream()
.map(consumer -> consumer.consumerName() + " - ARN : " + consumer.consumerARN())
.collect(Collectors.joining("\n")))
.thenMany(Flux.fromIterable(response.consumers())))
.flatMap(consumer -> rlog.info("Suppression du consumer " + consumer.consumerARN())
.then(Mono.fromFuture(() ->
kinesisClient.deregisterStreamConsumer(builder -> DeregisterStreamConsumerRequest.builder()
.consumerARN(consumer.consumerARN())
.build())
)
)
)
Possible Solution
No response
Additional Information/Context
No response
AWS Java SDK version used
2.21.37
JDK version used
17.0.9
Operating System and version
Debian 12
Hi @guirak,
Using AWS SDK for Java 2.21.37, I am unable to reproduce the issue. I am able to list the Stream Consumers to pull the ConsumerARN and pass it successfully to the deregisterStreamConsumer.
Here's the sample code that I used:
public static String deregConsumer(KinesisClient kinesisClient) {
try {
ListStreamConsumersRequest listCon = ListStreamConsumersRequest.builder()
.streamARN("<<redacted>>")
.build();
ListStreamConsumersResponse response = kinesisClient.listStreamConsumers(listCon);
List<Consumer> list = response.consumers();
String ConsumerARN = "";
for(Consumer item: list) {
ConsumerARN = item.consumerARN();
}
DeregisterStreamConsumerRequest deRegCon = DeregisterStreamConsumerRequest.builder()
.consumerARN(ConsumerARN)
.build();
DeregisterStreamConsumerResponse resp = kinesisClient.deregisterStreamConsumer(deRegCon);
return resp.toString();
} catch (KinesisException e) {
System.err.println(e.getMessage());
System.exit(1);
}
return "";
}
Can you confirm if you are getting the correct consumer.consumerARN() that is supplied to deregisterStreamConsumer?
If you continue to see an issue, can you kindly provide the verbose wire logs for further analysis?
Instructions to enable verbose logging can be found here.
Regards, Chaitanya
Hi,
Thank you for this quick feedback.
The consumerARN value is correct.
I have just seen that I was creating a new builder instead using the one of the supplier :
kinesisClient.deregisterStreamConsumer(builder -> DeregisterStreamConsumerRequest.builder()
.consumerARN(consumer.consumerARN())
.build())
The bug is not present with the following code :
kinesisClient.deregisterStreamConsumer(builder -> builder
.consumerARN(consumer.consumerARN())
.build())
For me it's ok as it's cleaner to use the supplied builder but maybe there is something to look why the first code was not working.