http-body icon indicating copy to clipboard operation
http-body copied to clipboard

Add Body::poll_progress

Open sfackler opened this issue 2 years ago • 46 comments

As described in https://github.com/hyperium/hyper/issues/3121, this allows server implementations to abort body writes even if the client is not reading data.

sfackler avatar Mar 11 '23 02:03 sfackler

@sfackler would it be possible to include a timeout example with this? Or maybe add that example in the doc comment I am curious to see how this would be used.

LucioFranco avatar Mar 14 '23 15:03 LucioFranco

We could add a TimeoutBody wrapper to http-body-util, though that wold require making tokio an optional dependency. I can update the PR tonight.

sfackler avatar Mar 14 '23 15:03 sfackler

Maybe even at least an example so tokio doesn't need to be a public dep.

LucioFranco avatar Mar 14 '23 18:03 LucioFranco

Here's a TimeoutBody implementation (untested):

#[pin_project]
pub struct TimeoutBody<B> {
    #[pin]
    inner: B,
    #[pin]
    timer: Sleep,
    timeout: Duration,
    waiting: bool,
}

impl<B> Body for TimeoutBody<B>
where
    B: Body,
{
    type Data = B::Data;
    type Error = TimeoutErro<B::Error>;

    fn poll_frame(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
        if let Poll::Ready(o) = self.as_mut().project().inner.poll_frame(cx) {
            *this.waiting = false;
            return Poll::Ready(o.map(|r| r.map_err(TimeoutError::Inner)));
        }

        self.is_healthy(cx)?;
        Poll::Pending
    }

    fn is_healthy(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), Self::Error> {
        let this = self.project();

        if !*this.waiting {
            this.timer.reset(Instant::now() + *this.timeout);
            *this.waiting = true;
        }

        if this.timer.poll(cx).is_ready() {
            return Err(TimeoutError::TimedOut);
        }

        Ok(())
    }

    fn is_end_stream(&self) -> bool {
        self.inner.is_end_stream()
    }

    fn size_hint(&self) -> SizeHint {
        self.inner.size_hint()
    }
}

pub enum TimeoutError<E> {
    Inner(E),
    TimedOut,
}

sfackler avatar Mar 14 '23 19:03 sfackler

@seanmonstar thoughts on this?

sfackler avatar Aug 16 '23 18:08 sfackler

I've updated this to move from poll_healthy to a more general poll_progress, like what boats described for the Stream trait. In addition to the error reporting case I opened this to handle, the same motivations apply here as well.

sfackler avatar Jan 23 '24 00:01 sfackler

We (Linkerd) are interested in moving this proposal forward.

I've been testing this with patched versions of http-body 0.3 and Hyper 0.14 and the results are promising.

I'm using this to implement a middleware that enforces a progress timeout to cancel stuck streams.

What does the process look like for finalizing this proposal? Is there anything I can do to help?

olix0r avatar Apr 18 '24 01:04 olix0r

Thanks for trying it out, @olix0r! I'm glad to have at least 2 use cases for something so fundamental. We can move this forward.

In my prep for doing so, I went back and read the previous conversations, and also the poll_progress post. I think the problem and solution that withoutboats describes is similar, but different enough that perhaps we shouldn't use the same name/mechanism. That's because, this feature isn't describing making progress on the body separate from producing a frame. It's rather to propagate cancelation while waiting on backpressure to clear up.

It feels closer to oneshot::Sender::poll_closed(). Should we change the name here to poll_closed() or poll_canceled()?

At the same time, I'm writing up a longer blog post about this feature, I'll share a draft with you soon.

seanmonstar avatar May 06 '24 14:05 seanmonstar

That seems like a reasonable-enough name to me, though it might be a bit strange to have poll_closed return Result<(), Error>? That's why the original name in the PR was poll_healthy.

sfackler avatar May 06 '24 14:05 sfackler

Hm, thanks for pointing that out. It made me think through the problem a bit more I at first figured we could just make it poll_closed() -> Poll<()>, like the oneshot sender. But that has some things to work out:

  • A default implementation of the method needs to work, and can't check any state.
    • Well, actually, could it check Self::is_end_stream()? Maybe that could work like Send::poll_closed() and is_closed().
    • But even the default implementation of is_end_stream() will always be false, so something waiting on poll_closed() then would wait forever.
  • Should it indicate a bad condition? Or just closure?

seanmonstar avatar May 06 '24 14:05 seanmonstar

A default implementation that just returns Poll::Ready(Ok(())) is correct, and equivalent to the current state of the world.

I don't think it would indicate closure, just drive any background IO (like poll_progress does). For convenience, it allows errors to be returned directly, but an error out of poll_healthy or whatever we call it would be equivalent to that error being returned from poll_frame.

sfackler avatar May 06 '24 15:05 sfackler

In that sense, it is really pretty similar to Boats's poll_progress, just fallible.

We could make it infallible and force the error to come out of poll_frame but that just seems like it'd make the implementation more annoying for no real benefit.

sfackler avatar May 06 '24 15:05 sfackler

So you find that it was needed to make background progress, too? I didn't think that was a goal of the method. Just to detect closure while a frame isn't needed, such as by polling a Sleep.

seanmonstar avatar May 06 '24 15:05 seanmonstar

Polling a sleep is background progress IMO! :)

My initial use case was purely around detecting disconnects/timeouts and things like that, but Boats's post on poll_progress made me feel like there's no reason you couldn't have some other body implementation that wanted to do some background IO. For example, if you're proxying you may want to internally pull data from the upstream connection while the downstream connection is idle.

sfackler avatar May 06 '24 15:05 sfackler

That's a fair point. Perhaps the naming is fine then. Or at least, worth considering if poll_progress is better than the alternatives.

Now, I think one more remaining question is about return type. I do think a poll method should return Poll<T>. But what does each value mean? Specifically, what's the difference between Ready(Ok(())) and Pending? It does feel like their different. The default will just return Ok(()). And if it were checking a Sleep, I assume it'd return Pending. What should the caller (such as inside hyper) do with that information? Whatever we determine for that should end up documented on the method.

seanmonstar avatar May 06 '24 20:05 seanmonstar

I think that Ready(Ok(()) would mean "I'm done making progress in the background". If we don't require poll_progress to be fused, the caller would need to remember that and not call it again which seems pretty annoying TBH. Pending means that there's more background progress work to be done later, the same as any other future.

In practice, the only thing that callers would actually look for is the Ready(Err) case (see https://github.com/hyperium/hyper/pull/3169).

EDIT: Actually, I think we have to require that it's fused to be able to use it properly.

sfackler avatar May 06 '24 20:05 sfackler

Added a few bits of docs on return values, and poll_progress implementations to http-body-util combinators.

sfackler avatar May 07 '24 13:05 sfackler

@seanmonstar Are you comfortable with this PR? Is there anything else to consider?

olix0r avatar Jul 29 '24 20:07 olix0r

I believe generally yes. I got part way through a write-up to publish as a blog post, to explore the area more and get more feedback, since I think it's a sufficiently interesting change to some fundamental crates. I still plan to finish that up, I just had to pause as I dealt with some other contracting work.

seanmonstar avatar Aug 02 '24 20:08 seanmonstar