fastapi icon indicating copy to clipboard operation
fastapi copied to clipboard

ClientDisconnected immediately after adding background task in which I access the request body as a stream

Open danielsteman opened this issue 3 years ago • 2 comments

First Check

  • [X] I added a very descriptive title to this issue.
  • [X] I used the GitHub search to find a similar issue and didn't find it.
  • [X] I searched the FastAPI documentation, with the integrated search.
  • [X] I already searched in Google "How to X in FastAPI" and didn't find any information.
  • [X] I already read and followed all the tutorial in the docs and didn't find an answer.
  • [X] I already checked if it is not related to FastAPI but to Pydantic.
  • [X] I already checked if it is not related to FastAPI but to Swagger UI.
  • [X] I already checked if it is not related to FastAPI but to ReDoc.

Commit to Help

  • [X] I commit to help with one of those options 👆

Example Code

import aioredis
from fastapi import FastAPI, HTTPException, Request, status, BackgroundTasks
from fastapi.exceptions import HTTPException
from pydantic import BaseModel, Field
from streaming_form_data import StreamingFormDataParser
from streaming_form_data.targets import FileTarget, ValueTarget
from streaming_form_data.validators import MaxSizeValidator
from uuid import UUID, uuid4
import os


MAX_FILE_SIZE = 1024 * 1024 * 1024 * 4  # = 4GB
MAX_REQUEST_BODY_SIZE = MAX_FILE_SIZE + 1024

app = FastAPI()
redis = aioredis.from_url("redis://localhost:6379", decode_responses=True)


class UploadResult(BaseModel):
    file_size: int
    result: str


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    progress: int = 0


class MaxBodySizeException(Exception):
    def __init__(self, body_len: str):
        self.body_len = body_len


class MaxBodySizeValidator:
    def __init__(self, max_size: int):
        self.body_len = 0
        self.max_size = max_size

    def __call__(self, chunk: bytes):
        self.body_len += len(chunk)
        if self.body_len > self.max_size:
            raise MaxBodySizeException(body_len=self.body_len)


@app.get("/api/v1/jobs")
async def refresh_job_progress(job: Job) -> Job:
    if not job:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail="Could not find job object to refresh",
        )
    progress = await redis.get(str(job.uid))
    job.progress = progress
    return job


async def process_file(request: Request):
    body_validator = MaxBodySizeValidator(MAX_REQUEST_BODY_SIZE)
    filename = "test.dsv"
    new_job = Job()

    filepath = os.path.join("./", os.path.basename(filename))
    file_ = FileTarget(filepath, validator=MaxSizeValidator(MAX_FILE_SIZE))
    source = ValueTarget()
    parser = StreamingFormDataParser(headers=request.headers)
    parser.register("file", file_)
    parser.register("source", source)

    async for chunk in request.stream():
        body_validator(chunk)
        parser.data_received(chunk)
        await redis.incr(str(new_job.uid), len(chunk))


@app.post("/api/v1/upload")
async def upload(request: Request, background_tasks: BackgroundTasks):
    background_tasks.add_task(process_file, request)
    return {"task": "started"}

Description

I have an endpoint that can receive multipart/form-data requests and stream large files into a local file (FileTarget) using a Python package streaming_form_data to do it in chunks.

This works as expected when the code currently in the process_file function is not called as a background task but exists directly in the upload function. The reason why I want to outsource the streaming part to a background job is that I want to keep track of progress in a Redis db, and expose this progress with another endpoint /api/v1/jobs with function refresh_job_progress.

The problem I'm facing with this solution is that the client is disconnected immediately upon calling /api/v1/upload. The traceback shows that a ClientDisconnected exception is raised when calling stream on the fastapi.Request:

async for chunk in request.stream():
  File "venv/lib/python3.10/site-packages/starlette/requests.py", line 228, in stream
    raise ClientDisconnect()
starlette.requests.ClientDisconnect

Is it possible to maintain a connection such that I can stream the request in a background job?

Operating System

Linux

Operating System Details

Distributor ID: Ubuntu Description: Ubuntu 22.04.1 LTS Release: 22.04 Codename: jammy

FastAPI Version

0.88.0

Python Version

Python 3.10.6

Additional Context

No response

danielsteman avatar Dec 29 '22 16:12 danielsteman

I may be wrong.

But I highly doubt you can do this. In general the file bytes are streamed as chunks. If you are directly passing them on to a background task, it wouldn't work I guess. I would rather suggest you write the file in an asynchronous manner to a temporary location, and then schedule a background task to process it further.

iudeen avatar Jan 01 '23 11:01 iudeen

Yes actually works, because that was my solution before having this background task. My goal now is to give a response with a task ID immediately upon calling /upload such that the client can start polling for that task to show a progress percentage. Maybe I can also do this without a background task but I'm not sure.

danielsteman avatar Jan 01 '23 21:01 danielsteman