Low-level pull mode API
PROS
- Low level, user code has the full flow-control,
spawnfree in hyper level - How-to/When-to use
spawncould be full decided by the user level code- Connection Level
- Stream Level
- Stream with upgrading
- Request Level
- Easy to implement a full-controllable graceful shutdown
- Listener Level
- Connection level
- Stream Level
- Easy to implement priority control for user code
- Easy to implement memory/resource/number-of-tasks control for user code
- Possible to let user level code to known response has been sent.
CONS
- Maybe low level too much
Example
async fn main() -> Result<()> {
let mut listener = TcpListener::bind(...).await?;
while let Ok((conn, _)) = listener.accept().await {
// wrap low-level connection into http connection
let conn = HttpProtocol::new(conn).await?;
// stream could be h1/h2 or h3 someday, for h1: each conn yield only one stream
while let Some(http_stream) = conn.next().await {
handle_stream(http_stream?).await.ok();
}
}
Ok(())
}
async fn handle_stream(stream: HttpStream) -> Result<()> {
let (mut input, mut output) = http_stream.split();
// pull a new request
// we know that request is one-by-one inside a stream
while let Some(item) = input.next().await {
// key could be used to ensure request/response is matched (Eg. no re-order)
// key is cheap to Clone, maybe Copy
let (key, req) = item?;
// handle upgrade if upgradable
if key.is_upgradable() {
let stream = output.upgrade(input, key, ...).await?; // consume input & output, into stream
return handle_upgraded_stream(stream).await;
}
// handle request
let res = handle_request(req).await;
// now, we have the opportunity to know that reply has been sent
let now = Instant::now();
output.send((key, res)).await?;
eprintln!("reply time: {:?}", now.elapsed());
}
Ok(())
}
Hey, so, this is something I've thought a lot about, and would love to make this style the actual low-level API exposed in hyper. However, I keep running into the following problems. If we could find satisfactory answers to them, I'd be thrilled.
Problems
- Using a
replyhandle makes it easier for a user to "forget" to send a response in some branches. Realistically, we need to add aDropcheck to see if they sent a response, and if not, use the right behavior for the HTTP version. - Requires a channel-like thing. Either an allocation for something like a oneshot channel, or have an internal
Arc<Mutex>. This is especially required if we allow the user to spawn a new task to handle the request, and then eventually reply. - The example code is has a couple of bugs. That's because the pull-based API makes it significantly harder to use correctly.
- An "accept" method that isn't called because the user isn't ready, such as while awaiting for things to send a response, would mean a user forgets they need to constantly
poll_without_accepting/poll_progress.- We could solve this one by splitting the connection type in two: a control type, and a background driver. But that introduces another channel-like thing into the mix.
- (We have seen this occur when people try to use the
h2crate by itself.)
- If the connection is HTTP/2 or 3, then the multiplexing of requests means that
awaiting one can pause all the others. So the best practice would be to spawn the tasks.
- An "accept" method that isn't called because the user isn't ready, such as while awaiting for things to send a response, would mean a user forgets they need to constantly
In my mind, it's a battle between being more versatile, and being too easy to use incorrectly.
Thank you for your comment, @seanmonstar
I am working on a http router project (AKA http sidecar & wasm plugins). It has strong requirement to have the full control.
I think there are 3 levels about HTTP stack.
- High Level: Provide ready to use http server frame, users just provide their own
request_handler, then it works. Eg. rocket - Middle level: Users need to provide Connector, Acceptor, and then it works. (Hyper works fine in this level today)
- Low level: User can control almost every thing, (That is why I need pull based API)
Yes, the pull-based low-level API is hard to use correctly. But it bring the opportunity to reach full control, and leave the correctness guarantee to middle-level API.
For example, we can controll how many streams allowed inside a connection dynamically by the folloing code:
loop {
let tok = some_token_issuer.take(|amt| amt < quota.max_stream_number).await?;
let stream = conn.next().await?
tokio::spawn(handle_stream(stream, tok));
}
Nowaday, hyper do have some quota related configuration things, but is very limit.
And for buffered request (AKA pipelined process)
// input: Stream<(Key, Request)>
// output: &mut Sink<(key, Response)>
// assume we need only *concurrency* in stream level, not *parallel*
let mut responses = input.map(handle_request).buffered(10);
while let Some((key, res)) in responses.next().await {
output.send((key, res)).await?;
}
Current hyper's API has unnecessary restrict which afraid of request could be in parallel and do not let user to choose.
- Low level: User can control almost every thing, (That is why I need pull based API)
Yes, the pull-based low-level API is hard to use correctly. But it bring the opportunity to reach full control, and leave the correctness guarantee to middle-level API.
I agree with your assessment. What we settled on for hyper in its VISION is that Correct is a higher priority than Flexible. Because of those priorities, hyper provides a connection API that makes it hard to do HTTP incorrectly. The goal was that individual pieces, version-specific codecs, would be separate crates that a user could combine for their own very custom needs.
That said, many have asked for a pull-based API, and so I am interested in if there's a way to do both: provide that flexibility while also making it hard to mess up, and also find a way around the supposed performance penalties I mentioned by making things channels.
For example, we can controll how many streams allowed inside a connection dynamically by the folloing code:
loop { let tok = some_token_issuer.take(|amt| amt < quota.max_stream_number).await?; let stream = conn.next().await? tokio::spawn(handle_stream(stream, tok)); }
The current API which has you implement a Service can do this with it's Service::poll_ready method. If your Service checks tokens, and says its not ready to handle another stream, hyper will not peel another stream off the connection. Natural backpresure willl be applied on the connection.
I have an additional use case where I wanted to build a proxy with hyper where incoming and outgoing connections are 1-to-1. Because of the lack of lifetimes on the future returned by the Service trait, this requires a mutex--even though this is for http 1.1 and there will never be contention.
I closed that issue because it is very similar to this one, although I don't personally need the flexibility to abstract over H2 and other protocols.