Middleware Request parse Hangs forever
I'm currently trying to integrate some metrics tooling in a middleware where I need to inspect the request body. The problem I am seeing is that this triggers the request to hang forever. The following is how you can reproduce it.
app = Starlette()
class ParseRequestMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
# payload = await request.json() # This will cause test() to hang
response = await call_next(request)
payload = await request.json() # Hangs forever
return response
async def test(scope: Scope, receive: Receive, send: Send):
request = Request(scope=scope, receive=receive)
data = await request.json()
response = Response(status_code=405)
await response(scope, receive, send)
app.mount("/test", test)
app.add_middleware(ParseRequestMiddleware)
I'm still pretty new to starlette but it seems to be an issue with the fact that the middleware and ASGI app use different Request objects and therefore can't share the Request._body cache. Which means they exhaust the stream on the first read and the second read isn't correctly detecting that the stream is exhausted and so gets stuck in that while True loop. I'm not sure I haven't dove deeper than the Request object. I'm not sure where the code is for the receive function that gets passed into the ASGI App is.
So i confirmed that if I use the same Request object it works. I added a middleware to store the request object into a ContextVar and if I use that instead of the receive and send passed into the ASGI app then the request doesn't hang anymore. This might be enough of a workaround for me right now but the more apps I add (especially third party ones) the more of a problem this will become.
Update: Running more tests today I had hoped to reduce the number of places I have to ignore the send/receive args but it looks like BaseHTTPMiddleware also creates a new Request object. So basically anyone who needs to share the request body data should get the request object from ContextVars to ensure the stream is only read once and doesn't get exhausted.
I'm still unclear whether this is all by design and the developer is expected to manage a cache of the data streams. It seems like at minimum it shouldn't hang and it should be raising an error instead of getting stuck in an infinite loop. I would greatly appreciate some insights here whenever you have a moment @tomchristie
Ok I think I found a solution here. See PR #848
For those looking for a workaround that doesn't use ContextVars. I basically just took the new receive function from #848 and created a CachedRequest that i use in my middleware instead.
class CachedRequest(Request):
"""
TODO Remove usage of CachedRequest when https://github.com/encode/starlette/pull/848 is released
"""
@property
def receive(self) -> Receive:
body = None
if hasattr(self, "_body"):
body = self._body
elif hasattr(self, "_form"):
body = bytes(urlencode(dict(self._form)), "utf-8")
if body is not None:
async def cached_receive() -> Message:
return dict(type="http.request", body=body)
return cached_receive
return self._receive
class ParseRequestMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
cached_request = CachedRequest(request.scope, request.receive, request._send)
payload = await cached_request.json()
return await call_next(cached_request) # No longer hangs forever
When the middleware executors finally call the app it passes cached_request.receive function to the app which will return the cached_receive function instead of the original self._receive
Ok my previous workaround no longer works after v0.13.3. StreamResponse now listens a disconnect which breaks because of the cached receive function. After learning ASGI and starlette more I realize that the cached_receive function wasn't the way to go. I have a new PR that fixes this issue by using the scope to store the cached content instead of the Request instance. Because the scope is shared across all the ASGI apps this means every app (including middleware) now has access to the cached request body. The Request class now looks for the cache in the scope instead of the instance fields. See #944
I haven't come up with a way to take this concept and implement a workaround until this PR gets merged. Creating a CachedRequest again won't work for third party ASGI apps nor for Route mounted apps because they use the built-in Request instead of the CachedRequest. I think the only workaround is to fork starlette if you can't wait for this PR to get merged. Could really use some attention on this one @tomchristie
Noted yup, it's on my backlog for sure. Also looking at httpx 1.0 and a release of Django REST framework, but spending some time with Starlette again is next after that, and the request parsing is def. an issue that needs some attention.
Awesome, thanks Tom! I'm willing to help wherever I can. If you think my approach in #944 isn't the way to go I am willing to take another stab at implementing how you think this should be fixed. Just let me know whenever you get some more cycles.
Any news about it? I'm having the same issue when using a middleware to catch all errors. I'd like to have access to the request.body so I can log it to a file.
This is my work around right now. You basically have to replace everywhere that instantiates a Request with CachedRequest. This means any third party middleware or asgi apps you will need to extend and replace where they instantiate the Request. Below you can see how I did it with Route. I can use this new Route class in my app now. It's incredibly painful to scale this but it is working.
from starlette.routing import Route as _Route
class CachedRequest(Request):
_stream_consumed: bool
async def stream(self) -> typing.AsyncGenerator[bytes, None]:
if "body" in self.scope:
yield self.scope["body"]
yield b""
return
if self._stream_consumed:
raise RuntimeError("Stream consumed")
self._stream_consumed = True
while True:
message = await self._receive()
if message["type"] == "http.request":
body = message.get("body", b"")
if body:
yield body
if not message.get("more_body", False):
break
elif message["type"] == "http.disconnect":
self._is_disconnected = True
raise ClientDisconnect()
yield b""
async def body(self) -> bytes:
if "body" not in self.scope:
self.scope["body"] = b"".join([chunk async for chunk in self.stream()])
return self.scope["body"]
async def json(self) -> typing.Any:
if "json" not in self.scope:
body = await self.body()
self.scope["json"] = json.loads(body)
return self.scope["json"]
async def form(self) -> FormData:
if "form" not in self.scope:
assert (
parse_options_header is not None
), "The `python-multipart` library must be installed to use form parsing."
content_type_header = self.headers.get("Content-Type")
content_type, options = parse_options_header(content_type_header)
if content_type == b"multipart/form-data":
multipart_parser = MultiPartParser(self.headers, self.stream())
self.scope["form"] = await multipart_parser.parse()
elif content_type == b"application/x-www-form-urlencoded":
form_parser = FormParser(self.headers, self.stream())
self.scope["form"] = await form_parser.parse()
else:
self.scope["form"] = FormData()
return self.scope["form"]
async def close(self) -> None:
if "form" in self.scope:
await self.scope["form"].close()
def cached_request_response(func: typing.Callable) -> ASGIApp:
"""
Takes a function or coroutine `func(request) -> response`,
and returns an ASGI application.
"""
is_coroutine = asyncio.iscoroutinefunction(func)
async def app(scope: Scope, receive: Receive, send: Send) -> None:
request = CachedRequest(scope, receive=receive, send=send)
if is_coroutine:
response = await func(request)
else:
response = await run_in_threadpool(func, request)
await response(scope, receive, send)
return app
class Route(_Route):
def __init__(
self,
path: str,
endpoint: typing.Callable,
*args,
methods: typing.List[str] = None,
name: str = None,
include_in_schema: bool = True,
) -> None:
super(Route, self).__init__(
path,
endpoint,
*args,
methods=methods,
name=name,
include_in_schema=include_in_schema,
)
if inspect.isfunction(endpoint) or inspect.ismethod(endpoint):
# Endpoint is function or method. Treat it as `func(request) -> response`.
self.app = cached_request_response(endpoint)
else:
# Endpoint is a class. Treat it as ASGI.
self.app = endpoint
@tomchristie I just wanted to check in on your schedule for this issue. I know you've been busy but thought I'd check in to see if you had time to review the PR for this issue and provide any feedback.
@tomchristie have you gotten a chance to look at #944? i'm still chugging along with my CachedRequest workaround but i'd like to reduce this point of failure for future middleware i add to my apps.
any info on this? ContextMiddleware is unusable for me until this is in.
We use starlette in combination with fastapi. We want to log full request bodies if there are certain exceptions.
Due to us not beeing able to access the request body in exception handles or middleware directly we have to work with ugly workarounds.
It would be great if there would be some feedback on what needs to be done to get this functionality fixed, or what would need to change in #944 for it to get merged.
@tomchristie I started a new job and am no longer working on Starlette projects anymore. I am happy to help out with anything needing to get this pushed across the finish line given the interest from others, but someone else is going to need to champion this fix. #944 is a simple fix that just replaces the Request object instance cache with the scope. I hope y'all will be able to get this fix in as managing the workaround is cumbersome and prone to errors as new asgi apps get added.
cc: @timthesinner
If someone is experiencing the same issue and cannot wait until a solution is found, my current simple fix is simply to patch the Request. For example (the code is not tested, but you can see the idea):
app = Starlette()
class ParseRequestMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
payload = await request.json() # This will cause test() to hang
with patch.object(Request, "body", request.body):
response = await call_next(request)
payload = await request.json() # Hangs forever
return response
async def test(scope: Scope, receive: Receive, send: Send):
request = Request(scope=scope, receive=receive)
data = await request.json()
response = Response(status_code=405)
await response(scope, receive, send)
app.mount("/test", test)
app.add_middleware(ParseRequestMiddleware)
I'll just refer to this guys post about using Starlette with FastAPI and how we want to use middleware to log a request body... https://github.com/encode/starlette/issues/847#issuecomment-788887820
Would really like to see a non-dirty solution to this.