hyper icon indicating copy to clipboard operation
hyper copied to clipboard

Low-level pull mode API

Open hh9527 opened this issue 3 years ago • 4 comments

PROS

  • Low level, user code has the full flow-control, spawn free in hyper level
  • How-to/When-to use spawn could be full decided by the user level code
    • Connection Level
    • Stream Level
    • Stream with upgrading
    • Request Level
  • Easy to implement a full-controllable graceful shutdown
    • Listener Level
    • Connection level
    • Stream Level
  • Easy to implement priority control for user code
  • Easy to implement memory/resource/number-of-tasks control for user code
  • Possible to let user level code to known response has been sent.

CONS

  • Maybe low level too much

Example

async fn main() -> Result<()> {
    let mut listener = TcpListener::bind(...).await?;
    while let Ok((conn, _)) = listener.accept().await {
        // wrap low-level connection into http connection
        let conn = HttpProtocol::new(conn).await?;

        // stream could be h1/h2 or h3 someday, for h1: each conn yield only one stream
        while let Some(http_stream) = conn.next().await {
            handle_stream(http_stream?).await.ok();
        }
    }
    Ok(())
}

async fn handle_stream(stream: HttpStream) -> Result<()> {
    let (mut input, mut output) = http_stream.split();

    // pull a new request
    // we know that request is one-by-one inside a stream
    while let Some(item) = input.next().await {
        // key could be used to ensure request/response is matched (Eg. no re-order)
        // key is cheap to Clone, maybe Copy
        let (key, req) = item?;

        // handle upgrade if upgradable
        if key.is_upgradable() {
            let stream = output.upgrade(input, key, ...).await?; // consume input & output, into stream
            return handle_upgraded_stream(stream).await;
        }

        // handle request
        let res = handle_request(req).await;

        // now, we have the opportunity to know that reply has been sent
        let now = Instant::now();
        output.send((key, res)).await?;
        eprintln!("reply time: {:?}", now.elapsed());
    }

    Ok(())
}

hh9527 avatar Jun 22 '22 12:06 hh9527

Hey, so, this is something I've thought a lot about, and would love to make this style the actual low-level API exposed in hyper. However, I keep running into the following problems. If we could find satisfactory answers to them, I'd be thrilled.

Problems

  • Using a reply handle makes it easier for a user to "forget" to send a response in some branches. Realistically, we need to add a Drop check to see if they sent a response, and if not, use the right behavior for the HTTP version.
  • Requires a channel-like thing. Either an allocation for something like a oneshot channel, or have an internal Arc<Mutex>. This is especially required if we allow the user to spawn a new task to handle the request, and then eventually reply.
  • The example code is has a couple of bugs. That's because the pull-based API makes it significantly harder to use correctly.
    • An "accept" method that isn't called because the user isn't ready, such as while awaiting for things to send a response, would mean a user forgets they need to constantly poll_without_accepting/poll_progress.
      • We could solve this one by splitting the connection type in two: a control type, and a background driver. But that introduces another channel-like thing into the mix.
      • (We have seen this occur when people try to use the h2 crate by itself.)
    • If the connection is HTTP/2 or 3, then the multiplexing of requests means that awaiting one can pause all the others. So the best practice would be to spawn the tasks.

In my mind, it's a battle between being more versatile, and being too easy to use incorrectly.

seanmonstar avatar Jun 23 '22 21:06 seanmonstar

Thank you for your comment, @seanmonstar

I am working on a http router project (AKA http sidecar & wasm plugins). It has strong requirement to have the full control.

I think there are 3 levels about HTTP stack.

  1. High Level: Provide ready to use http server frame, users just provide their own request_handler, then it works. Eg. rocket
  2. Middle level: Users need to provide Connector, Acceptor, and then it works. (Hyper works fine in this level today)
  3. Low level: User can control almost every thing, (That is why I need pull based API)

Yes, the pull-based low-level API is hard to use correctly. But it bring the opportunity to reach full control, and leave the correctness guarantee to middle-level API.

For example, we can controll how many streams allowed inside a connection dynamically by the folloing code:

loop {
    let tok = some_token_issuer.take(|amt| amt < quota.max_stream_number).await?;
    let stream = conn.next().await?
    tokio::spawn(handle_stream(stream, tok));
}

Nowaday, hyper do have some quota related configuration things, but is very limit.

And for buffered request (AKA pipelined process)

// input: Stream<(Key, Request)>
// output: &mut Sink<(key, Response)>

// assume we need only *concurrency* in stream level, not *parallel*
let mut responses = input.map(handle_request).buffered(10);

while let Some((key, res)) in responses.next().await {
    output.send((key, res)).await?;
}

Current hyper's API has unnecessary restrict which afraid of request could be in parallel and do not let user to choose.

hh9527 avatar Jun 27 '22 06:06 hh9527

  1. Low level: User can control almost every thing, (That is why I need pull based API)

Yes, the pull-based low-level API is hard to use correctly. But it bring the opportunity to reach full control, and leave the correctness guarantee to middle-level API.

I agree with your assessment. What we settled on for hyper in its VISION is that Correct is a higher priority than Flexible. Because of those priorities, hyper provides a connection API that makes it hard to do HTTP incorrectly. The goal was that individual pieces, version-specific codecs, would be separate crates that a user could combine for their own very custom needs.

That said, many have asked for a pull-based API, and so I am interested in if there's a way to do both: provide that flexibility while also making it hard to mess up, and also find a way around the supposed performance penalties I mentioned by making things channels.


For example, we can controll how many streams allowed inside a connection dynamically by the folloing code:

loop {
    let tok = some_token_issuer.take(|amt| amt < quota.max_stream_number).await?;
    let stream = conn.next().await?
    tokio::spawn(handle_stream(stream, tok));
}

The current API which has you implement a Service can do this with it's Service::poll_ready method. If your Service checks tokens, and says its not ready to handle another stream, hyper will not peel another stream off the connection. Natural backpresure willl be applied on the connection.

seanmonstar avatar Jun 29 '22 19:06 seanmonstar

I have an additional use case where I wanted to build a proxy with hyper where incoming and outgoing connections are 1-to-1. Because of the lack of lifetimes on the future returned by the Service trait, this requires a mutex--even though this is for http 1.1 and there will never be contention.

I closed that issue because it is very similar to this one, although I don't personally need the flexibility to abstract over H2 and other protocols.

kjvalencik avatar Jun 20 '23 21:06 kjvalencik