hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Missing records when using Kafka Hudi sink to write to S3.

Open haripriyarhp opened this issue 3 years ago • 6 comments

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at [email protected].

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

I am using the Kafka Hudi sink to write to S3. I am having mismatch in the number messages present in a topic and the number of records showing up in Athena for both MoR and CoW. For MoR, even after running the compaction there are some missing records.

To Reproduce

Steps to reproduce the behavior:

  1. Initially, I sent 100 messages to a topic. It refelected in Athena after compaction.
  2. Later sent 100 more new messages + some updates + some duplicates of previous 100. Record count was not correct.
  3. And later sent like 1000 messages and still record count was not correct after compaction.
  4. The config file properties are { "name": "hudi-sink", "config": { "bootstrap.servers": "localhost:9092", "connector.class": "org.apache.hudi.connect.HoodieSinkConnector", "tasks.max": "4", "control.topic.name": "hudi-control-topic-mor", "topics": "sensor", "hoodie.table.name": "sensor-mor", "hoodie.table.type": "MERGE_ON_READ", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "hoodie.base.path": "s3a://path/sensor_mor", "hoodie.datasource.write.recordkey.field":"oid,styp,sname,ts", "hoodie.datasource.write.partitionpath.field":"gid,datatype,origin,oid", "hoodie.datasource.write.keygenerator.type":"COMPLEX", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.compact.inline.max.delta.commits":2, "fs.s3a.fast.upload": "true", "fs.s3a.access.key": "myaccesskey", "fs.s3a.secret.key": "secretkey", "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider", "hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8081/subjects/sensor/versions/latest", "hoodie.kafka.commit.interval.secs": 60 } }

Expected behavior

Irrespective of the messages sent to topic (could be new messages or duplicates or updates), as described, the connector should append them to tables.

Environment Description

  • Hudi version : 0.11.0

  • Spark version : 3.1.3

  • Hive version :

  • Hadoop version : 3.2

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : No

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

haripriyarhp avatar Jul 21 '22 07:07 haripriyarhp

@haripriyarhp the current kafka connector only supports inserts and not updates. Could you clarify the below comment:

Later sent 100 more new messages + some updates + some duplicates of previous 100. Record count was not correct.

In this case, do you see more records in the Hudi table than the kafka records? Or vice versa.

rmahindra123 avatar Jul 21 '22 19:07 rmahindra123

Hi @rmahindra123 , thanks for your response. Okay, a little clarification. First, i sent 100 messages. It was fine. Athena also showed 100 records Next I sent 100 new messages + 25 updates + 25 duplicates of previous 100 messages. In total there are 250 messages in Kafka but Athena showed only 247. Irrespective of inserts, duplicates or updates, I am assuming that the connector should append the messages. Later on, I continued sending several rounds of messages and found that the count did not match. Few records were missing (somewhere between 20 -50) for around 500-600 messages sent to Kafka. I made this test several times And each time, there were some missing records. I tested with CoW too, it also had missing records. The no.of records in Athena was always less than no.of messages in Kafka

haripriyarhp avatar Jul 21 '22 20:07 haripriyarhp

@haripriyarhp Can you verify that all .log files were compacted to parquet files after you ran compaction? Want to make sure records are not in the log files when querying

rmahindra123 avatar Jul 26 '22 07:07 rmahindra123

Also, could you run a parquet query to see if all records are in hudi? It would be helpful if you could share your .hoodie folder here. Thanks

rmahindra123 avatar Jul 26 '22 07:07 rmahindra123

@rmahindra123 : Yes, all the files were compacted. There are no more open compaction.requested files. All the compaction was completed and only then I queried the Athena table.

Also, could you run a parquet query to see if all records are in hudi? It would be helpful if you could share your .hoodie folder here. Thanks

Is this some special query? Because I query the Athena table.

haripriyarhp avatar Jul 28 '22 10:07 haripriyarhp

@rmahindra123 : Unfortunately, I am not able to share the .hoodie folder. Just to add, yesterday I tried it out again. I sent messages to a topic in batches. Below are the steps I followed

  1. Sent a batch of 100 records to kafka. Ran compaction. No.of messages in kafka and no.of records in Athena, matched.
  2. Sent a batch of another 100 records to Kafka -> Compaction -> no.of msgs in kafka = no.of records in Athena.
  3. Sent a batch of another 100 records (here there were some duplicates ) -> Compaction -> no.of.msgs in Kafka = no. of records in Athena.
  4. Sent another batch 98 records (some were duplicates) -> compaction -> no.of messages != no.of records in Athena. There were no more files to be compacted. About 24 records were missing.
  5. Sent another 100 records. -> compaction -> record count did not match. there was same 24 missing.

More or less, I followed the above steps several times before I raised the issue here. Each time, after few runs the record count does not match even after running compaction.

haripriyarhp avatar Jul 29 '22 12:07 haripriyarhp

As per latest master code we do upsert for MOR table. Able to reproduce the data duplicate issue similar to @haripriyarhp .

Setup Used as per demo - https://github.com/apache/hudi/blob/master/hudi-kafka-connect/README.md

# Publish first 100 records
bash setupKafka.sh -n 100 -k test1
    Before Compaction - 100
    After Compaction - 100

# Publish next  100 records all with new keys
bash setupKafka.sh -n 100 -t -o 100 -k test1
    Before Compaction - 200
    After Compaction - 200

# Publish next  100 records 50 upsets and 50 new keys
bash setupKafka.sh -n 100 -t -o 150 -k test1
    Before Compaction - 285 [ Distinct count of ("_hoodie_partition_path","volume") = 250)
    After Compaction - 285
    So, dupes coming after this step.    


bash setupKafka.sh -n 100 -t -o 180 -k test1
    Before Compaction - 369 Distinct count = 280
    After Compaction - 369 Distinct count = 280

connect properties used

{
    "name": "test1",
    "config": {
                "bootstrap.servers": "kafkabroker:9092",
                "connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
                "tasks.max": "4",
                "key.converter": "org.apache.kafka.connect.storage.StringConverter",
                "value.converter": "org.apache.kafka.connect.storage.StringConverter",
                "value.converter.schemas.enable": "false",
                "topics": "test1",
                "hoodie.table.name": "test1",
                "hoodie.table.type": "MERGE_ON_READ",
                "hoodie.base.path": "file:///tmp/hoodie/test1",
                "hoodie.datasource.write.recordkey.field": "volume",
                "hoodie.datasource.write.partitionpath.field": "date",
                "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
                "hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8082/subjects/test1/versions/latest",
                "hoodie.kafka.commit.interval.secs": 60,
                "hoodie.compact.schedule.inline":"true",
                "hoodie.compact.inline.max.delta.commits":1
      }
}

Also, I noticed the data loss with async clustering enabled.

ad1happy2go avatar May 04 '23 11:05 ad1happy2go