zerocode
zerocode copied to clipboard
Kafka - Consume RAW, JSON and/or records
Given a Topic
When I read from the topic
Then I have a choice of reading a RAW record and-or JSON record
- Explanation
- JSON is useful for avro messages via
Schema-Registry - RAW is useful in general for reading Text type messages
- Both(for the same READ) are very rarely used at a time, but implemented for - When user wants to look at the meta data it consumed.
- JSON is useful for avro messages via
See examples
- Reading a
avromessage via Schema-Registry, user can assert both RAW and/or JSON, but JSON is more relevant here for an e2e testing. Schema info and partition info is relevant when clustered Kafka server/broker is used and tester is particular about from where the READ was done.
{
"scenarioName": "consume a AVRO msg message",
"steps": [
{
"name": "consume_avro_msg_as_json",
"url": "kafka-topic:bar",
"operation": "unload",
"request": {
"consumerLocalConfigs": {
"recordType": "JSON",
"commitSync": false,
"maxNoOfRetryPollsOrTimeouts": 3
}
},
"assertions": {
"size": 1,
"jsonRecords": [
{
"key": null,
"jsonKey": null,
"value": {
"f1": "value1"
}
}
]
}
},
{
"name": "consume_avro_msg_as_raw",
"url": "kafka-topic:bar",
"operation": "unload",
"request": {
"consumerLocalConfigs": {
"recordType": "RAW",
"commitSync": false,
"showConsumedRecords": true,
"maxNoOfRetryPollsOrTimeouts": 3
}
},
"assertions": {
"size": 1,
"records": [
{
"topic": "bar",
"partition": 0,
"offset": 0,
"value": {
"schema": {
"fields": [
{
"name": "f1",
"position": 0,
"schema": {
"type": "STRING"
}
}
],
"name": {
"name": "myrecord",
"full": "myrecord"
}
},
"values": [
{
"string": "value1"
}
]
}
}
]
}
}
]
}
Note -
// Do not put msgs directly to the topic without avro registry i.e. schema registry.
// When you put via Avro it updates the message with an schema ID(call it as avro ID or
reference) from the registry
// If you put directly(by passing Avro), then it can not be consumed via AvroDeserialzer,
// because message is not updated with an avro ID, hence not understood by avro deserializer
// Error -
// {"error_code":50002,"message":"Kafka error: Error deserializing Avro message for id -1"}
Full error -
// 2019-01-04 09:46:29,371 [main] ERROR org.jsmart.zerocode.core.kafka.client.BasicKafkaClient -
// Exception during operation:unload, topicName:bar, error:Error deserializing key/value for partition bar-0 at offset 1.
// If needed, please seek past the record to continue consumption.
Implementation, Dev and Test -Done.
- TODO is
- HelloWorld
- Wiki page on
How To