tonic icon indicating copy to clipboard operation
tonic copied to clipboard

make codec::decode::Streaming a trait for easier testability

Open stefanhoelzl opened this issue 5 years ago • 7 comments

Feature Request

make codec::decode::Streaming a trait. This will allow easier mocking needed for testing of grpc calls using client streaming.

Crates

tonic tonic-build

Motivation

currently when I have a grpc call with client streaming like ByteStream::Write I get a rust function with this signature:

async fn write(&self, request: Request<Streaming<WriteRequest>>) -> Result<Response<WriteResponse>, Status>

If I want to test this function I would need to create a Streaming object which is quite complicated. It would be much easier if the generated function write would be generic accepting a argument implementing the trait Stream and Streaming would implement this.

Proposal

Adding a new trait

trait Stream {
    type Item;
    async fn message<'_>(&'_ mut self) -> Result<Option<Self::Item>, Status>;
    async fn trailers<'_>(&'_ mut self) -> Result<Option<MetadataMap>, Status>;
}

and generate function definitions for ByteStream::write with such a signature

async fn write<S: Stream<Item = WriteRequest >>(&self, request: Request<S>);

This would allow to create simple Mocks which implement Stream for testing. And otherwise Streaming is still used, which must then implement Stream.

Alternatives

Please let me know if you know alternative how testing can be simplified.

stefanhoelzl avatar Sep 24 '20 21:09 stefanhoelzl

Not exactly what you are after but maybe you could delegate complex logic to a function similar to the one you propose, and write most of your tests against that:

impl BytestreamService for Foo {
    async fn write(
        &self,
        request: Request<Streaming<WriteRequest>>,
    ) -> Result<Response<WriteResponse>, Status> {
        do_write(request).await // if you need the request
        do_write2(request.into_inner()).await // or just the stream
    }
}

async fn do_write<S>(request: Request<S>) -> Result<Response<WriteResponse>, Status>
where
    S: Stream<Item = Result<WriteRequest, Status>> + Unpin,
{
    //..
}

async fn do_write2<S>(stream: S) -> Result<Response<WriteResponse>, Status>
where
    S: Stream<Item = Result<WriteRequest, Status>> + Unpin,
{
    //..
}

alce avatar Sep 25 '20 03:09 alce

@alce yes, this workaround I am currently using. But I would prefer to fix tonic and not needing this workaround :)

stefanhoelzl avatar Sep 25 '20 18:09 stefanhoelzl

Put up a Draft-PR how this could work!

There are two tests I may need some guidance how to solve this (see PR).

This will also break backwards-compatibility, but maybe it can be put behind a feature flag?

stefanhoelzl avatar Sep 25 '20 19:09 stefanhoelzl

I don't know if moving stream to a trait like this is the correct solution. I think adding a way to pass in complete messages to the stream struct would be better.

LucioFranco avatar Sep 29 '20 15:09 LucioFranco

FWIW, I ran into this recently when writing a bazel remote execution server in rust. Everything was going so smooth until I started pulling my hair out trying to find the easiest way to wire up mpsc (or similar) stream to test my code.

allada avatar Dec 28 '20 07:12 allada

After lots of document & code reading I finally was able to work around this issue. The key came from an undocumented public function. Probably not good for long-term, but works for now at least and allows me to write my tests before writing my code to a satisfactory amount.

Here's the code for anyone that would like a work-around:

use std::convert::TryFrom;

use bytestream_server::ByteStreamServer;
use prost::{bytes::Bytes, Message};
use tonic::{
    codec::Codec, // Needed for .decoder().
    codec::ProstCodec,
    transport::Body,
    Request,
    Streaming,
};

use proto::google::bytestream::{
    byte_stream_server::ByteStream, // Needed to call .write().
    WriteRequest,
};

#[cfg(test)]
pub mod write_tests {
    use super::*;

    // Utility to encode our proto into GRPC stream format.
    fn encode<T: Message>(proto: &T) -> Result<Bytes, Box<dyn std::error::Error>> {
        use bytes::{BufMut, BytesMut};
        let mut buf = BytesMut::new();
        // See below comment on spec.
        use std::mem::size_of;
        const PREFIX_BYTES: usize = size_of::<u8>() + size_of::<u32>();
        for _ in 0..PREFIX_BYTES {
            // Advance our buffer first.
            // We will backfill it once we know the size of the message.
            buf.put_u8(0);
        }
        proto.encode(&mut buf)?;
        let len = buf.len() - PREFIX_BYTES;
        {
            let mut buf = &mut buf[0..PREFIX_BYTES];
            // See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#:~:text=Compressed-Flag
            // for more details on spec.
            // Compressed-Flag -> 0 / 1 # encoded as 1 byte unsigned integer.
            buf.put_u8(0);
            // Message-Length -> {length of Message} # encoded as 4 byte unsigned integer (big endian).
            buf.put_u32(len as u32);
            // Message -> *{binary octet}.
        }

        Ok(buf.freeze())
    }

    #[tokio::test]
    pub async fn chunked_stream_receives_all_data() -> Result<(), Box<dyn std::error::Error>> {
        let bs_server = YourByteStreamServer::default();

        // Setup stream.
        let (mut tx, join_handle) = {
            let (tx, body) = Body::channel();
            let mut codec = ProstCodec::<WriteRequest, WriteRequest>::default();
            // Note: This is an undocumented function.
            let stream = Streaming::new_request(codec.decoder(), body);

            let join_handle = tokio::spawn(async move {
                let response_future = bs_server.write(Request::new(stream));
                response_future.await
            });
            (tx, join_handle)
        };

        // Send data.
        let data_size = {
            let raw_data = "1234".as_bytes();
            let data_size = raw_data.len();
            // Chunk our data into two chunks to simulate something a client
            // might do.
            const BYTE_SPLIT_OFFSET: usize = 2;

            tx.send_data(encode(&WriteRequest {
                resource_name: "foobar".to_string(),
                write_offset: 0,
                finish_write: false,
                data: raw_data[..BYTE_SPLIT_OFFSET].to_vec(),
            })?)
            .await?;

            tx.send_data(encode(&WriteRequest {
                resource_name: "foobar".to_string(),
                write_offset: 0,
                finish_write: true,
                data: raw_data[..BYTE_SPLIT_OFFSET].to_vec(),
            })?)
            .await?;
            let _ = tx; // Emulate sender-side stream hangup.
            data_size
        };
        // Check results of server.
        {
            // One for spawn() future and one for result.
            let server_result = join_handle.await??;
            let committed_size = usize::try_from(server_result.into_inner().committed_size)
                .or(Err("Cant convert i64 to usize"))?;
            assert_eq!(committed_size as usize, data_size);
        }
        Ok(())
    }
}

allada avatar Dec 28 '20 22:12 allada

Yeah, I agree there should be an easier way. Let me take a think here.

LucioFranco avatar Jan 04 '21 16:01 LucioFranco

I stumbled across this issue as well, when trying to mock out a Response<Streaming<T>>. Is there any known good workaround for this? The only thing I could think of is sending the request to a separate test server running in the background and passing the value back to the caller.

Will-Low avatar Feb 03 '23 23:02 Will-Low

There is a mock example you can use that uses an in-memory connection.

LucioFranco avatar Feb 13 '23 15:02 LucioFranco

Based on @allada's example, the following is a bit more concise and good enough for testing:

    fn setup_stream<T: prost::Message + Default + 'static>(
        s: impl Stream<Item = T> + Send + 'static,
    ) -> Request<Streaming<T>> {
        let body = Body::wrap_stream(s.map(|m| Ok::<Bytes, EncodeError>(m.encode_to_vec().into())));
        let mut codec = ProstCodec::<T, T>::default();
        Request::new(Streaming::new_request(codec.decoder(), body, None))
    }

wngr avatar Apr 18 '23 11:04 wngr