[BUG] otel sources should show a more clear exception when receiving data that cannot be processed based on the configured compression type
Describe the bug
When sending data to an otel source, both the otel-collector and the source of the pipeline must align on the compression used. This compression defaults to none in the sources.
When sending compressed data to a source with compression none, an error like below is currently shown
2024-01-24T17:04:23.769 [armeria-common-worker-epoll-3-1] ERROR org.opensearch.dataprepper.GrpcRequestExceptionHandler - Unexpected exception handling gRPC request
io.grpc.StatusRuntimeException: INTERNAL: Invalid protobuf byte sequence
at io.grpc.Status.asRuntimeException(Status.java:529) ~[grpc-api-1.58.0.jar:1.58.0]
at com.linecorp.armeria.internal.common.grpc.GrpcMessageMarshaller.deserializeProto(GrpcMessageMarshaller.java:253) ~[armeria-grpc-1.26.4.jar:?]
at com.linecorp.armeria.internal.common.grpc.GrpcMessageMarshaller.deserializeRequest(GrpcMessageMarshaller.java:118) ~[armeria-grpc-1.26.4.jar:?]
at com.linecorp.armeria.internal.server.grpc.AbstractServerCall.onRequestMessage(AbstractServerCall.java:343) ~[armeria-grpc-1.26.4.jar:?]
at com.linecorp.armeria.server.grpc.UnaryServerCall.lambda$startDeframing$0(UnaryServerCall.java:107) ~[armeria-grpc-1.26.4.jar:?]
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079) ~[?:?]
at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.collect(FixedStreamMessage.java:235) ~[armeria-1.26.4.jar:?]
at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.lambda$collect$2(FixedStreamMessage.java:203) ~[armeria-1.26.4.jar:?]
at com.linecorp.armeria.common.DefaultContextAwareRunnable.run(DefaultContextAwareRunnable.java:45) ~[armeria-1.26.4.jar:?]
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413) ~[netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: com.google.protobuf.InvalidProtocolBufferException$InvalidWireTypeException: Protocol message tag had invalid wire type.
at com.google.protobuf.InvalidProtocolBufferException.invalidWireType(InvalidProtocolBufferException.java:142) ~[protobuf-java-3.24.3.jar:?]
at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:526) ~[protobuf-java-3.24.3.jar:?]
at com.google.protobuf.GeneratedMessageV3.parseUnknownField(GeneratedMessageV3.java:332) ~[protobuf-java-3.24.3.jar:?]
at io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest.<init>(ExportLogsServiceRequest.java:63) ~[opentelemetry-proto-0.16.0-alpha.jar:0.16.0]
at io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest.<init>(ExportLogsServiceRequest.java:9) ~[opentelemetry-proto-0.16.0-alpha.jar:0.16.0]
at io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest$1.parsePartialFrom(ExportLogsServiceRequest.java:935) ~[opentelemetry-proto-0.16.0-alpha.jar:0.16.0]
at io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest$1.parsePartialFrom(ExportLogsServiceRequest.java:929) ~[opentelemetry-proto-0.16.0-alpha.jar:0.16.0]
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:86) ~[protobuf-java-3.24.3.jar:?]
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:91) ~[protobuf-java-3.24.3.jar:?]
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48) ~[protobuf-java-3.24.3.jar:?]
at com.linecorp.armeria.internal.common.grpc.GrpcMessageMarshaller.deserializeProto(GrpcMessageMarshaller.java:243) ~[armeria-grpc-1.26.4.jar:?]
... 18 more
Expected behavior Return a 400 bad request with a message indicating that it could be related to compression mismatch.
Screenshots If applicable, add screenshots to help explain your problem.
Environment (please complete the following information):
- OS: [e.g. Ubuntu 20.04 LTS]
- Version [e.g. 22]
Additional context It may be possible to dynamically support compression based on what otel tries to send (as in otel asks the pipeline source if it supports this compression, the pipeline responds yes or no, and then the pipeline dynamically handles the data based on the compression type if it supports it, otherwise it throws a 400 Bad Request