quick-xml icon indicating copy to clipboard operation
quick-xml copied to clipboard

EndEventMismatch error when asynchronously reading to end in loop

Open giorgi-o opened this issue 2 years ago • 2 comments

I've been getting an unusual EndEventMismatch error when reading from a TCP (TLS) socket in a loop.

The error message is "Expecting </<tag> found </tag>" EndEventMismatch { expected: "<tag", found: "tag" }.

I managed to write a small(ish) self-contained reproducible example:

main.rs
use std::time::Duration;

use quick_xml::events::Event as XmlEvent;
use tokio::io::{AsyncRead, BufReader};

struct XmlReader {
    next_message_ready: bool,
}

impl XmlReader {
    fn new() -> Self {
        Self {
            next_message_ready: false,
        }
    }
}

impl AsyncRead for XmlReader {
    fn poll_read(
        mut self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> std::task::Poll<std::io::Result<()>> {
        if !self.next_message_ready {
            return std::task::Poll::Pending;
        }

        let response = "<tag></tag>";
        buf.put_slice(response.as_bytes());
        self.next_message_ready = false;
        std::task::Poll::Ready(Ok(()))
    }
}

struct XmlConsumer {
    reader: quick_xml::Reader<BufReader<XmlReader>>,
}

impl XmlConsumer {
    pub fn new() -> XmlConsumer {
        let reader = XmlReader::new();
        let reader = BufReader::new(reader);
        let reader = quick_xml::Reader::from_reader(reader);

        Self { reader }
    }

    pub fn set_next_message_ready(&mut self) {
        self.reader.get_mut().get_mut().next_message_ready = true;
    }

    pub async fn recv_xml(&mut self) {
        // read start event
        let mut buf = Vec::new();
        let start_event = self.reader.read_event_into_async(&mut buf).await.unwrap();
        println!(
            "\n\nstart_event={:?} buffer position={}",
            start_event,
            self.reader.buffer_position()
        );

        let bytes_start = match start_event {
            XmlEvent::Start(e) => e,
            _ => panic!("Expected Start event"),
        };

        // read until corresponding end event
        let mut buf = Vec::new();
        let span = self
            .reader
            .read_to_end_into_async(bytes_start.name(), &mut buf)
            .await
            .unwrap(); // <- panics here
        println!(
            "read to end, span={:?} buffer position={}",
            span,
            self.reader.buffer_position()
        );
    }
}

#[tokio::main]
async fn main() {
    let mut xml_consumer = XmlConsumer::new();

    loop {
        xml_consumer.set_next_message_ready();

        loop {
            let xml_event_fut = xml_consumer.recv_xml();
            let xml_event_fut = tokio::time::timeout(Duration::from_millis(1), xml_event_fut);

            let r = xml_event_fut.await;
            if r.is_err() {
                println!("Timeout");
                break;
            }
        }
    }
}

This script sets up a mock object called XmlReader that implements AsyncRead. It is supposed to represent a TCP echo server of sorts, that responds with "<tag></tag>" whenever next_message_ready is set to true. It is then wrapped in a BufReader and fed to quick_xml.

The read_event_into_async() and read_to_end_into_async() methods are called one after the other, in a loop. On the second iteration on the loop, the quick_xml reader seems to read <<tag> instead of <tag> somehow, causing an error when it tries to read the matching closing tag.

Here are the logs generated by the code above:

start_event=Start(BytesStart { buf: Borrowed("tag"), name_len: 3 }) buffer position=5
read to end, span=5..5 buffer position=11
Timeout


start_event=Start(BytesStart { buf: Borrowed("<tag"), name_len: 4 }) buffer position=16
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: EndEventMismatch { expected: "<tag", found: "tag" }', src\main.rs:73:14

giorgi-o avatar Jul 22 '23 18:07 giorgi-o

After a few more days of debugging, my guess as to why this occurs is that it might be due to the fact that in my code above, the reader's poll_read() function can return Poll::Pending, with no intention of using the ctx to ever wake the executor, causing the future to never resolve. It instead uses tokio's timeout to cancel the future after a while.

Assuming the future returned by read_event_into_async() isn't cancel-safe would explain it: since we are interrupting the future mid-execution, it leaves the XML parser in an invalid state, which causes problems when the method is called again.

I suppose the blame partly comes down to my (ab)use of rust futures, although it still was a pain to debug. If I could suggest something to prevent this from happening to others in the future, it would be to either make read_event_into_async() cancel-safe, or document it as not being so. My preference lies on the former, as in Rust most asynchronous recv() functions are expected to be cancel-safe, so that they can be used in tokio::select!() or a timeout.

giorgi-o avatar Jul 24 '23 23:07 giorgi-o

@giorgi-o, I would be happy if you could convert your example into #[tokio::test] routine with assert!s instead of printing, so it can be included in quick_xml testcases. I tried to do that, but it ran over 5 minutes without any result or consuming CPU.

Mingun avatar Nov 12 '23 09:11 Mingun