tonic icon indicating copy to clipboard operation
tonic copied to clipboard

Server panics close response streams without an error (unlike unary responses)

Open Jonathan-Landeed opened this issue 2 years ago • 1 comments

Bug Report

0.9.2 (sorry it's a little old, I'm waiting for prost-wkt)

├── tonic v0.9.2 └── tonic-build v0.9.2

Platform

Darwin Js-MacBook-Pro.local 22.5.0 Darwin Kernel Version 22.5.0: Thu Jun 8 22:22:23 PDT 2023; root:xnu-8796.121.3~7/RELEASE_ARM64_T6020 arm64

Description

A server panic in a unary response gives an error, but in a response stream just closes the stream indistinguishably from a normal termination. I double checked with wireshark and both a normal termination and panic send a HEADERS packet with "End Stream: True".

Regular panic:

    async fn paniccheck(&self, _request: Request<Empty>) -> Result<Response<Empty>, Status> {
        panic!("test");
    }

Response headers received:
(empty)

Response trailers received:
(empty)
Sent 1 request and received 0 responses
ERROR:
  Code: Canceled
  Message: stream terminated by RST_STREAM with error code: CANCEL

Empty stream:

    type StreamemptycheckStream =
        Pin<Box<dyn Stream<Item = Result<Empty, Status>> + Send + Sync + 'static>>;
    async fn streamemptycheck(
        &self,
        _request: Request<Empty>,
    ) -> Result<Response<Self::StreamemptycheckStream>, Status> {
        let (_tx, rx) = mpsc::channel(1);
        tokio::spawn(async move {});
        let result_stream = ReceiverStream::new(rx);
        Ok(Response::new(Box::pin(result_stream)))
    }

Response headers received:
content-type: application/grpc
date: Sat, 07 Oct 2023 03:45:14 GMT

Response trailers received:
(empty)
Sent 1 request and received 0 responses

Panic stream:

    type StreampaniccheckStream =
        Pin<Box<dyn Stream<Item = Result<Empty, Status>> + Send + Sync + 'static>>;
    async fn streampaniccheck(
        &self,
        _request: Request<Empty>,
    ) -> Result<Response<Self::StreampaniccheckStream>, Status> {
        let (_, rx) = mpsc::channel(1);
        tokio::spawn(async move {
            panic!("test");
        });
        let result_stream = ReceiverStream::new(rx);
        Ok(Response::new(Box::pin(result_stream)))
    }

Response headers received:
content-type: application/grpc
date: Sat, 07 Oct 2023 03:47:51 GMT

Response trailers received:
(empty)
Sent 1 request and received 0 responses

Could we make the stream behavior more like unary responses? I'll try unwinding the panic and sending an error, but it would be nice if it were built in.

Jonathan-Landeed avatar Oct 07 '23 05:10 Jonathan-Landeed

Oh, it looks like an mpsc receiver doesn't know if the sender was dropped due to scope or panic. This seems to work though.

use futures::stream::Stream;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::mpsc;

#[derive(Clone)]
pub struct PanickingSender<T> {
    sender: mpsc::Sender<T>,
    panicked: Arc<AtomicBool>,
}
impl<T> Drop for PanickingSender<T> {
    fn drop(&mut self) {
        if std::thread::panicking() {
            self.panicked.store(true, Ordering::Relaxed);
        }
        //std::mem::forget(self.sender.clone())
    }
}
impl<T> PanickingSender<T> {
    pub async fn send(&self, value: T) -> Result<(), mpsc::error::SendError<T>> {
        self.sender.send(value).await
    }
}

pub struct PanickingReceiver<T> {
    receiver: mpsc::Receiver<T>,
    panicked: Arc<AtomicBool>,
}

impl<T> PanickingReceiver<T> {
    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
        match self.receiver.poll_recv(cx) {
            Poll::Ready(None) => {
                if self.panicked.load(Ordering::Relaxed) {
                    panic!("One of the senders panicked");
                }
                Poll::Ready(None)
            }
            ret => ret,
        }
    }
}
impl<T> Stream for PanickingReceiver<T> {
    type Item = T;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.poll_recv(cx)
    }
}

pub fn channel<T>(buffer: usize) -> (PanickingSender<T>, PanickingReceiver<T>) {
    let (sender, receiver) = mpsc::channel(buffer);
    let panicked = Arc::new(AtomicBool::new(false));
    (
        PanickingSender {
            sender,
            panicked: panicked.clone(),
        },
        PanickingReceiver { receiver, panicked },
    )
}

Though you have to be careful to actually move the sender when testing:

    type StreampaniccheckStream =
        Pin<Box<dyn Stream<Item = Result<Empty, Status>> + Send + Sync + 'static>>;
    async fn streampaniccheck(
        &self,
        _request: Request<Empty>,
    ) -> Result<Response<Self::StreampaniccheckStream>, Status> {
        let (tx, rx) = channel(1);
        tokio::spawn(async move {
            //let _ = tx; // https://github.com/rust-lang/rust/issues/48852#issuecomment-589197747
            let _tx = tx;
            panic!("test");
        });
        Ok(Response::new(Box::pin(rx)))
    }
    
Response headers received:
content-type: application/grpc
date: Sun, 08 Oct 2023 21:21:04 GMT

Response trailers received:
(empty)
Sent 1 request and received 0 responses
ERROR:
  Code: Canceled
  Message: stream terminated by RST_STREAM with error code: CANCEL

Jonathan-Landeed avatar Oct 08 '23 21:10 Jonathan-Landeed