starlette icon indicating copy to clipboard operation
starlette copied to clipboard

Middleware Request parse Hangs forever

Open nikordaris opened this issue 5 years ago • 14 comments

I'm currently trying to integrate some metrics tooling in a middleware where I need to inspect the request body. The problem I am seeing is that this triggers the request to hang forever. The following is how you can reproduce it.

app = Starlette()
class ParseRequestMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
        # payload = await request.json()  # This will cause test() to hang
        response = await call_next(request)
        payload = await request.json()  # Hangs forever
        return response


async def test(scope: Scope, receive: Receive, send: Send):
    request = Request(scope=scope, receive=receive)
    data = await request.json()
    response = Response(status_code=405)
    await response(scope, receive, send)


app.mount("/test", test)
app.add_middleware(ParseRequestMiddleware)

I'm still pretty new to starlette but it seems to be an issue with the fact that the middleware and ASGI app use different Request objects and therefore can't share the Request._body cache. Which means they exhaust the stream on the first read and the second read isn't correctly detecting that the stream is exhausted and so gets stuck in that while True loop. I'm not sure I haven't dove deeper than the Request object. I'm not sure where the code is for the receive function that gets passed into the ASGI App is.

nikordaris avatar Feb 27 '20 04:02 nikordaris

So i confirmed that if I use the same Request object it works. I added a middleware to store the request object into a ContextVar and if I use that instead of the receive and send passed into the ASGI app then the request doesn't hang anymore. This might be enough of a workaround for me right now but the more apps I add (especially third party ones) the more of a problem this will become.

Update: Running more tests today I had hoped to reduce the number of places I have to ignore the send/receive args but it looks like BaseHTTPMiddleware also creates a new Request object. So basically anyone who needs to share the request body data should get the request object from ContextVars to ensure the stream is only read once and doesn't get exhausted.

I'm still unclear whether this is all by design and the developer is expected to manage a cache of the data streams. It seems like at minimum it shouldn't hang and it should be raising an error instead of getting stuck in an infinite loop. I would greatly appreciate some insights here whenever you have a moment @tomchristie

nikordaris avatar Feb 27 '20 04:02 nikordaris

Ok I think I found a solution here. See PR #848

nikordaris avatar Feb 27 '20 23:02 nikordaris

For those looking for a workaround that doesn't use ContextVars. I basically just took the new receive function from #848 and created a CachedRequest that i use in my middleware instead.

class CachedRequest(Request):
    """
    TODO Remove usage of CachedRequest when https://github.com/encode/starlette/pull/848 is released
    """
    @property
    def receive(self) -> Receive:
        body = None
        if hasattr(self, "_body"):
            body = self._body
        elif hasattr(self, "_form"):
            body = bytes(urlencode(dict(self._form)), "utf-8")
        if body is not None:
            async def cached_receive() -> Message:
                return dict(type="http.request", body=body)
            return cached_receive
        return self._receive

class ParseRequestMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
        cached_request = CachedRequest(request.scope, request.receive, request._send)
        payload = await cached_request.json() 
        return await call_next(cached_request) # No longer hangs forever

When the middleware executors finally call the app it passes cached_request.receive function to the app which will return the cached_receive function instead of the original self._receive

nikordaris avatar Mar 20 '20 23:03 nikordaris

Ok my previous workaround no longer works after v0.13.3. StreamResponse now listens a disconnect which breaks because of the cached receive function. After learning ASGI and starlette more I realize that the cached_receive function wasn't the way to go. I have a new PR that fixes this issue by using the scope to store the cached content instead of the Request instance. Because the scope is shared across all the ASGI apps this means every app (including middleware) now has access to the cached request body. The Request class now looks for the cache in the scope instead of the instance fields. See #944

I haven't come up with a way to take this concept and implement a workaround until this PR gets merged. Creating a CachedRequest again won't work for third party ASGI apps nor for Route mounted apps because they use the built-in Request instead of the CachedRequest. I think the only workaround is to fork starlette if you can't wait for this PR to get merged. Could really use some attention on this one @tomchristie

nikordaris avatar May 12 '20 19:05 nikordaris

Noted yup, it's on my backlog for sure. Also looking at httpx 1.0 and a release of Django REST framework, but spending some time with Starlette again is next after that, and the request parsing is def. an issue that needs some attention.

lovelydinosaur avatar May 13 '20 10:05 lovelydinosaur

Awesome, thanks Tom! I'm willing to help wherever I can. If you think my approach in #944 isn't the way to go I am willing to take another stab at implementing how you think this should be fixed. Just let me know whenever you get some more cycles.

nikordaris avatar May 13 '20 14:05 nikordaris

Any news about it? I'm having the same issue when using a middleware to catch all errors. I'd like to have access to the request.body so I can log it to a file.

Feijo avatar Jun 17 '20 11:06 Feijo

This is my work around right now. You basically have to replace everywhere that instantiates a Request with CachedRequest. This means any third party middleware or asgi apps you will need to extend and replace where they instantiate the Request. Below you can see how I did it with Route. I can use this new Route class in my app now. It's incredibly painful to scale this but it is working.

from starlette.routing import Route as _Route

class CachedRequest(Request):
    _stream_consumed: bool

    async def stream(self) -> typing.AsyncGenerator[bytes, None]:
        if "body" in self.scope:
            yield self.scope["body"]
            yield b""
            return

        if self._stream_consumed:
            raise RuntimeError("Stream consumed")

        self._stream_consumed = True
        while True:
            message = await self._receive()
            if message["type"] == "http.request":
                body = message.get("body", b"")
                if body:
                    yield body
                if not message.get("more_body", False):
                    break
            elif message["type"] == "http.disconnect":
                self._is_disconnected = True
                raise ClientDisconnect()
        yield b""

    async def body(self) -> bytes:
        if "body" not in self.scope:
            self.scope["body"] = b"".join([chunk async for chunk in self.stream()])
        return self.scope["body"]

    async def json(self) -> typing.Any:
        if "json" not in self.scope:
            body = await self.body()
            self.scope["json"] = json.loads(body)
        return self.scope["json"]

    async def form(self) -> FormData:
        if "form" not in self.scope:
            assert (
                parse_options_header is not None
            ), "The `python-multipart` library must be installed to use form parsing."
            content_type_header = self.headers.get("Content-Type")
            content_type, options = parse_options_header(content_type_header)
            if content_type == b"multipart/form-data":
                multipart_parser = MultiPartParser(self.headers, self.stream())
                self.scope["form"] = await multipart_parser.parse()
            elif content_type == b"application/x-www-form-urlencoded":
                form_parser = FormParser(self.headers, self.stream())
                self.scope["form"] = await form_parser.parse()
            else:
                self.scope["form"] = FormData()
        return self.scope["form"]

    async def close(self) -> None:
        if "form" in self.scope:
            await self.scope["form"].close()


def cached_request_response(func: typing.Callable) -> ASGIApp:
    """
    Takes a function or coroutine `func(request) -> response`,
    and returns an ASGI application.
    """
    is_coroutine = asyncio.iscoroutinefunction(func)

    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = CachedRequest(scope, receive=receive, send=send)
        if is_coroutine:
            response = await func(request)
        else:
            response = await run_in_threadpool(func, request)
        await response(scope, receive, send)

    return app


class Route(_Route):
    def __init__(
        self,
        path: str,
        endpoint: typing.Callable,
        *args,
        methods: typing.List[str] = None,
        name: str = None,
        include_in_schema: bool = True,
    ) -> None:
        super(Route, self).__init__(
            path,
            endpoint,
            *args,
            methods=methods,
            name=name,
            include_in_schema=include_in_schema,
        )
        if inspect.isfunction(endpoint) or inspect.ismethod(endpoint):
            # Endpoint is function or method. Treat it as `func(request) -> response`.
            self.app = cached_request_response(endpoint)
        else:
            # Endpoint is a class. Treat it as ASGI.
            self.app = endpoint

nikordaris avatar Jun 19 '20 03:06 nikordaris

@tomchristie I just wanted to check in on your schedule for this issue. I know you've been busy but thought I'd check in to see if you had time to review the PR for this issue and provide any feedback.

nikordaris avatar Sep 28 '20 12:09 nikordaris

@tomchristie have you gotten a chance to look at #944? i'm still chugging along with my CachedRequest workaround but i'd like to reduce this point of failure for future middleware i add to my apps.

nikordaris avatar Feb 19 '21 21:02 nikordaris

any info on this? ContextMiddleware is unusable for me until this is in.

ZebraCat avatar Feb 23 '21 11:02 ZebraCat

We use starlette in combination with fastapi. We want to log full request bodies if there are certain exceptions.

Due to us not beeing able to access the request body in exception handles or middleware directly we have to work with ugly workarounds.

It would be great if there would be some feedback on what needs to be done to get this functionality fixed, or what would need to change in #944 for it to get merged.

lwinkler-cloudflight avatar Mar 02 '21 12:03 lwinkler-cloudflight

@tomchristie I started a new job and am no longer working on Starlette projects anymore. I am happy to help out with anything needing to get this pushed across the finish line given the interest from others, but someone else is going to need to champion this fix. #944 is a simple fix that just replaces the Request object instance cache with the scope. I hope y'all will be able to get this fix in as managing the workaround is cumbersome and prone to errors as new asgi apps get added.

cc: @timthesinner

nikordaris avatar May 07 '21 18:05 nikordaris

If someone is experiencing the same issue and cannot wait until a solution is found, my current simple fix is simply to patch the Request. For example (the code is not tested, but you can see the idea):

app = Starlette()
class ParseRequestMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
        payload = await request.json()  # This will cause test() to hang
        with patch.object(Request, "body", request.body):
            response = await call_next(request)
            payload = await request.json()  # Hangs forever
            return response


async def test(scope: Scope, receive: Receive, send: Send):
    request = Request(scope=scope, receive=receive)
    data = await request.json()
    response = Response(status_code=405)
    await response(scope, receive, send)


app.mount("/test", test)
app.add_middleware(ParseRequestMiddleware)

loikki avatar Mar 18 '22 13:03 loikki

I'll just refer to this guys post about using Starlette with FastAPI and how we want to use middleware to log a request body... https://github.com/encode/starlette/issues/847#issuecomment-788887820

Would really like to see a non-dirty solution to this.

granowski-bamboo avatar Oct 03 '22 13:10 granowski-bamboo