cppkafka icon indicating copy to clipboard operation
cppkafka copied to clipboard

How to send serialized Avro data in a buffered producer?

Open steinio opened this issue 6 years ago • 2 comments

I am trying to create an application that will serialize some data using Avro, and send it to a consumer client using Kafka. I am able to send a simple string, as in the examples provided, but I am not able to pass serialized data from Avro.

I am trying to use this Avro example from their site as a starting point

    std::unique_ptr<avro::OutputStream> out = avro::memoryOutputStream();
    avro::EncoderPtr e = avro::binaryEncoder();
    e->init(*out);
    c::cpx c1;
    c1.re = 1.0;
    c1.im = 2.13;
    avro::encode(*e, c1);

But I don't know how to go from here to send this encoded data.

This is the first time I am using both Kafka and Avro, and I am new to serialization in general, so maybe there is something obvious I am missing here?

steinio avatar Jul 11 '19 14:07 steinio

I think I figured it out, atleast I can serialize and deserialize the data, by adding this below the above code:

std::vector<uint8_t> bytes;

size_t len = out->byteCount();
std::unique_ptr<avro::InputStream> in = avro::memoryInputStream(*out);
avro::StreamReader* reader = new avro::StreamReader(*in);

bytes.reserve(len);

while (reader->hasMore()) {
    uint8_t c = reader->read();
    bytes.push_back(c);
}

//builder.payload(outv);
builder.payload(bytes);
producer.produce(builder);

And I do something similar when decoding

std::vector<uint8_t> payload = msg.get_payload();
const uint8_t* pl = payload.data();

size_t len = payload.size();
/* Binary input stream */
std::unique_ptr<avro::InputStream> in = avro::memoryInputStream((const uint8_t*)pl, len);

avro::DecoderPtr d = avro::binaryDecoder();
d->init(*in);

c::cpx c1;
avro::decode(*d, c1);

And this gives be back the same data I sent in in my consumer. Not sure if this is the best way of going about it, but it works. Another issue now is I am unable to parse this data in an external consumer written in java by someone else, using the same json msg. It seems that I am missing some schema meta data that is added as a frame to the data somehow.

steinio avatar Jul 11 '19 17:07 steinio

This also works. std::unique_ptravro::OutputStream bin_os = avro::memoryOutputStream(); avroencoder->init(*bin_os.get()); . . . std::vector<uint8_t> out; std::shared_ptr<std::vector<uint8_t> > v = avro::snapshot(*bin_os.get()); out.insert(out.end(), v->begin(), v->end());

kafkabuilder.payload(out); kafkaproducer.produce(kafkabuilder);

marsattacks20 avatar Jan 19 '22 16:01 marsattacks20