fern icon indicating copy to clipboard operation
fern copied to clipboard

SSE support for fastapi server generators

Open mdickey-method-security opened this issue 1 year ago • 1 comments

How important is this?

P1 - High (Strongly needed)

What's the feature?

Currently, server side generators, such as the fast api generator, do not support streaming/SSE. The reason why this is strongly needed is so that we can always ensure that an endpoint which supports SSE will always match with what the client side generators expect. Fern already handles the client side SSE generators today.

An example fern defition could be:

sendMessageStream:
      method: POST
      path: /{chatId}/messages/stream
      path-parameters:
        chatId: string
      response-stream:
        type: SendMessageResponse
        format: sse
      audiences:
        - public

This works great for our client side generators as mentioned before. However, on the server side of things, we have to ignore this endpoint (we are using audiences to do this) and manually specify this new endpoint by doing something along the lines of:

async_chat_router.py

from typing import Annotated

from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from fastapi.security import HTTPBearer

from myproject.auth import validate_token
from myproject.api.chat.chat_service import ChatService
from myproject.fern.backend.src.fern_fastapi_starter.api.generated.resources.chat.types import (
    SendMessageRequest,
)
from myproject.fern.backend.src.fern_fastapi_starter.api.generated.security import (
    ApiAuth,
    BearerToken,
    FernAuth,
)

async_chat_router = APIRouter(prefix="/chat", tags=["chat"], dependencies=[Depends(validate_token)])


def get_chat_service() -> ChatService:
    return ChatService()


@async_chat_router.post("/{chat_id}/messages/stream", response_class=StreamingResponse)
async def stream_message(
    chat_id: str,
    body: SendMessageRequest,
    bearer_token: Annotated[BearerToken, Depends(HTTPBearer)],
    auth: Annotated[ApiAuth, Depends(FernAuth)],
    chat_service: Annotated[ChatService, Depends(get_chat_service)],
):
    return await chat_service.stream_message(
        chat_id=chat_id,
        body=body,
        auth=auth,
    )

chat_service.stream_message looks like:

    async def stream_message(
        self, *, chat_id: str, body: SendMessageRequest, auth: ApiAuth
    ) -> StreamingResponse:
        """Stream chat messages using Server-Sent Events (SSE)"""
        validate_fern_auth(auth=auth)
        return StreamingResponse(
            self._generate_message_stream(chat_id, body, auth),
            media_type="text/event-stream",
            headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
        )

The self._generate_message_stream is just returning a AsyncGenerator[str, None] since that is what Fast API's StreamingResponse is expecting. The valuable thing that fern could do is verify that the strings being generated to send as SSE events matches the expected response class in the fern definition

Then, in app.py , I include the router so that it's added with all the other fern generated endpoints:

app.include_router(async_chat_router, prefix=f"{context.getConfig().server.base_path}/api")

Any alternatives?

The work around is to manually specify the Streaming FastApi endpoint ourselves instead of having fern generate it with the rest of the endpoints for this service.

mdickey-method-security avatar Mar 10 '25 20:03 mdickey-method-security

+1 Our team has also been impacted by this

aryehklein avatar Mar 10 '25 23:03 aryehklein