How to send serialized Avro data in a buffered producer?
I am trying to create an application that will serialize some data using Avro, and send it to a consumer client using Kafka. I am able to send a simple string, as in the examples provided, but I am not able to pass serialized data from Avro.
I am trying to use this Avro example from their site as a starting point
std::unique_ptr<avro::OutputStream> out = avro::memoryOutputStream();
avro::EncoderPtr e = avro::binaryEncoder();
e->init(*out);
c::cpx c1;
c1.re = 1.0;
c1.im = 2.13;
avro::encode(*e, c1);
But I don't know how to go from here to send this encoded data.
This is the first time I am using both Kafka and Avro, and I am new to serialization in general, so maybe there is something obvious I am missing here?
I think I figured it out, atleast I can serialize and deserialize the data, by adding this below the above code:
std::vector<uint8_t> bytes;
size_t len = out->byteCount();
std::unique_ptr<avro::InputStream> in = avro::memoryInputStream(*out);
avro::StreamReader* reader = new avro::StreamReader(*in);
bytes.reserve(len);
while (reader->hasMore()) {
uint8_t c = reader->read();
bytes.push_back(c);
}
//builder.payload(outv);
builder.payload(bytes);
producer.produce(builder);
And I do something similar when decoding
std::vector<uint8_t> payload = msg.get_payload();
const uint8_t* pl = payload.data();
size_t len = payload.size();
/* Binary input stream */
std::unique_ptr<avro::InputStream> in = avro::memoryInputStream((const uint8_t*)pl, len);
avro::DecoderPtr d = avro::binaryDecoder();
d->init(*in);
c::cpx c1;
avro::decode(*d, c1);
And this gives be back the same data I sent in in my consumer. Not sure if this is the best way of going about it, but it works. Another issue now is I am unable to parse this data in an external consumer written in java by someone else, using the same json msg. It seems that I am missing some schema meta data that is added as a frame to the data somehow.
This also works. std::unique_ptravro::OutputStream bin_os = avro::memoryOutputStream(); avroencoder->init(*bin_os.get()); . . . std::vector<uint8_t> out; std::shared_ptr<std::vector<uint8_t> > v = avro::snapshot(*bin_os.get()); out.insert(out.end(), v->begin(), v->end());
kafkabuilder.payload(out); kafkaproducer.produce(kafkabuilder);