EndEventMismatch error when asynchronously reading to end in loop
I've been getting an unusual EndEventMismatch error when reading from a TCP (TLS) socket in a loop.
The error message is "Expecting </<tag> found </tag>" EndEventMismatch { expected: "<tag", found: "tag" }.
I managed to write a small(ish) self-contained reproducible example:
main.rs
use std::time::Duration;
use quick_xml::events::Event as XmlEvent;
use tokio::io::{AsyncRead, BufReader};
struct XmlReader {
next_message_ready: bool,
}
impl XmlReader {
fn new() -> Self {
Self {
next_message_ready: false,
}
}
}
impl AsyncRead for XmlReader {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
if !self.next_message_ready {
return std::task::Poll::Pending;
}
let response = "<tag></tag>";
buf.put_slice(response.as_bytes());
self.next_message_ready = false;
std::task::Poll::Ready(Ok(()))
}
}
struct XmlConsumer {
reader: quick_xml::Reader<BufReader<XmlReader>>,
}
impl XmlConsumer {
pub fn new() -> XmlConsumer {
let reader = XmlReader::new();
let reader = BufReader::new(reader);
let reader = quick_xml::Reader::from_reader(reader);
Self { reader }
}
pub fn set_next_message_ready(&mut self) {
self.reader.get_mut().get_mut().next_message_ready = true;
}
pub async fn recv_xml(&mut self) {
// read start event
let mut buf = Vec::new();
let start_event = self.reader.read_event_into_async(&mut buf).await.unwrap();
println!(
"\n\nstart_event={:?} buffer position={}",
start_event,
self.reader.buffer_position()
);
let bytes_start = match start_event {
XmlEvent::Start(e) => e,
_ => panic!("Expected Start event"),
};
// read until corresponding end event
let mut buf = Vec::new();
let span = self
.reader
.read_to_end_into_async(bytes_start.name(), &mut buf)
.await
.unwrap(); // <- panics here
println!(
"read to end, span={:?} buffer position={}",
span,
self.reader.buffer_position()
);
}
}
#[tokio::main]
async fn main() {
let mut xml_consumer = XmlConsumer::new();
loop {
xml_consumer.set_next_message_ready();
loop {
let xml_event_fut = xml_consumer.recv_xml();
let xml_event_fut = tokio::time::timeout(Duration::from_millis(1), xml_event_fut);
let r = xml_event_fut.await;
if r.is_err() {
println!("Timeout");
break;
}
}
}
}
This script sets up a mock object called XmlReader that implements AsyncRead. It is supposed to represent a TCP echo server of sorts, that responds with "<tag></tag>" whenever next_message_ready is set to true. It is then wrapped in a BufReader and fed to quick_xml.
The read_event_into_async() and read_to_end_into_async() methods are called one after the other, in a loop. On the second iteration on the loop, the quick_xml reader seems to read <<tag> instead of <tag> somehow, causing an error when it tries to read the matching closing tag.
Here are the logs generated by the code above:
start_event=Start(BytesStart { buf: Borrowed("tag"), name_len: 3 }) buffer position=5
read to end, span=5..5 buffer position=11
Timeout
start_event=Start(BytesStart { buf: Borrowed("<tag"), name_len: 4 }) buffer position=16
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: EndEventMismatch { expected: "<tag", found: "tag" }', src\main.rs:73:14
After a few more days of debugging, my guess as to why this occurs is that it might be due to the fact that in my code above, the reader's poll_read() function can return Poll::Pending, with no intention of using the ctx to ever wake the executor, causing the future to never resolve. It instead uses tokio's timeout to cancel the future after a while.
Assuming the future returned by read_event_into_async() isn't cancel-safe would explain it: since we are interrupting the future mid-execution, it leaves the XML parser in an invalid state, which causes problems when the method is called again.
I suppose the blame partly comes down to my (ab)use of rust futures, although it still was a pain to debug. If I could suggest something to prevent this from happening to others in the future, it would be to either make read_event_into_async() cancel-safe, or document it as not being so. My preference lies on the former, as in Rust most asynchronous recv() functions are expected to be cancel-safe, so that they can be used in tokio::select!() or a timeout.
@giorgi-o, I would be happy if you could convert your example into #[tokio::test] routine with assert!s instead of printing, so it can be included in quick_xml testcases. I tried to do that, but it ran over 5 minutes without any result or consuming CPU.