[BUG] `DeleteRecords` returns "Unknown Topic or Partition" for existing partition
Describe the bug
KOP answers DeleteRecords requests with "Unknown Topic or Partition" even when the partition clearly exists.
To Reproduce When using KOP based on the following Dockerfile:
FROM apachepulsar/pulsar:2.10.0
ENV CURL_FLAGS="--proto =https --tlsv1.2 -sSf" \
KOP_VERSION="2.10.0.3" \
KOP_SHA256="fa93f1fea1a1c3bd2103873a02cb291dd1f4e2239796b2df9856d8530767b094"
RUN env HOME=/pulsar curl ${CURL_FLAGS} -LO "https://github.com/streamnative/kop/releases/download/v${KOP_VERSION}/pulsar-protocol-handler-kafka-${KOP_VERSION}.nar" && \
echo "${KOP_SHA256} pulsar-protocol-handler-kafka-${KOP_VERSION}.nar" | sha256sum -c && \
mkdir protocols && \
mv "pulsar-protocol-handler-kafka-${KOP_VERSION}.nar" protocols && \
cp conf/standalone.conf tmp.conf && \
echo "messagingProtocols=kafka" >> tmp.conf && \
echo "protocolHandlerDirectory=./protocols" >> tmp.conf && \
echo "kafkaListeners=PLAINTEXT://0.0.0.0:9092" >> tmp.conf && \
echo "kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092" >> tmp.conf && \
echo "brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor" >> tmp.conf && \
echo "brokerDeleteInactiveTopicsEnabled=false" >> tmp.conf && \
echo "kafkaTransactionCoordinatorEnabled=true" >> tmp.conf
CMD ["bash", "-c", "cp tmp.conf conf/standalone.conf && bin/pulsar standalone"]
and the following preparation:
- create topic with 1 partition
- issue 3 produce requests:
- single record
- two records, remember the offset of the first (call it
o) - single record
- Issue a
DeleteRecordsrequest with offseto
Then I would expect the response to be OK (at least this works with "official" Kafka implementation). However the response is "Unknown Topic or Partition".
Here is a pcap dump of the network traffic: kop-delete-bug.zip
A codified version of that test can be found here:
https://github.com/influxdata/rskafka/blob/5d4245c219b500c414cfdf4dddb76bfd167e5bd2/tests/client.rs#L400-L500
Expected behavior Deletion should work.
Additional context The dumps were produces by the rskafka integration tests.
KoP doesn't support this request because the backed Pulsar doesn't support deleting a message as well.
That's reasonable. But then why is the feature advertised in ApiVersions? To reference some "prior art": redpanda also does NOT support deletes, but clearly communicates that to the client by excluding this request type from ApiVersions.
Good point. I think we should also adopt the same solution in KoP.
I'm sorry I found DeleteRecords request has already been supported based on the truncateAsync API in https://github.com/streamnative/kop/pull/871. I will take a look at this issue later.
The truncateAsync API cannot achieve the same goal of Kafka's deleteRecords method, the implementation of #871 was added only to make Kafka Streams work.
I think we still need to implement this API in future. For now, maybe we just need a better error response.
as @BewareMyPower pointed out, I had implemented deleteRecords with truncate. But this actually works for truncating the whole topic.
This is the usecase required by KStreams and KSQLDB and it works quite well.
Currently in Pulsar it is not possible to "delete some records".
@crepererum I found a workaround to remove some data in kop, using topic/namespace level retention of pulsar.
My use case is: I have a corrupt data in the middle of a kop topic, this breaks my kafka consumer client. I am using pulsar and kop 2.10
Since I am able to consume all the records prior to the corrupt one, I just need to set retention such that data before and including the corrupt data are removed from pulsar. And my client is able to consume data from kop again.
pulsar-admin namespaces set-retention public/default
pulsar-admin topics set-retention public/default/mytopic
The catch is:
- It does not delete just a particular record, but delete all records up to the particular one.
Hope this helps.
It does not delete just a particular record, but delete all records up to the particular one.
That's the semantics of the DeleteRecords Kafka API. The name is a bit misleading. It's technically a "trim" rather than a delete at a single offset.
We have to implement that in Pulsar, then we can add it to KOP