fastapi-listing icon indicating copy to clipboard operation
fastapi-listing copied to clipboard

Async Session

Open elektracodes opened this issue 1 year ago • 3 comments

Hello!

Can this work with AsyncSession? I am trying to work with the examples but I cannot seem to be able to overwrite the sorter.

I get this error

fastapi_listing/sorter/page_sorter.py
File "/opt/pysetup/.venv/lib/python3.11/site-packages/fastapi_listing/sorter/page_sorter.py", line 20, in sort_dsc_util
  query = query.order_by(inst_field.desc())
AttributeError: 'coroutine' object has no attribute 'order_by'

I have overwrite the get_default_read to work with async, but I cannot see how to do that in the page_sorter.

class VideoDao(GenericDao):
    def __init__(self, db: AsyncSession) -> None:
        super().__init__(db)

    name = "videoresponse"
    model = Videos

    async def get_default_read(self, fields_to_read: Optional[list]):
        query = await self.db.query(Videos)
        return query

elektracodes avatar Mar 12 '24 23:03 elektracodes

Allow me to ask you a couple of questions first. I don't think Asyncsession works with query api(correct me if I'm wrong) you should have faced an error like AttributeError: 'async_sessionmaker' object has no attribute 'query' but your code was able to call page_sorter it doesn't support AsyncSession natively for now but there is a way around you could use select statement at dao level and in page_builder you could pass async_session as extra_context where you can do session.execute(stmt) and call all or fetchall method on your result. Do you want me to add a working example for this workaround? Or I think we can work on adding a support layer for AsyncSession.

danielhasan1 avatar Mar 13 '24 11:03 danielhasan1

Hello. async queries in sqlalchemy work in a different way. I can suggest you to make a session dependency, use repository pattern for queries instead of combining DAO and queries to db.

Async Session Maker

from types import TracebackType

from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)

from src.somewhere.settings import settings  # my pydantic-settings, use yours instead.


class AsyncSessionMaker:
    _engine = create_async_engine(
        url=settings.POSTGRES_URL,
        pool_pre_ping=True,
        future=True,
    )
    _sessionmaker = async_sessionmaker(
        bind=_engine,
        autoflush=False,
        autocommit=False,
    )

    def __init__(self) -> None:
        self._session = self._sessionmaker()

    @property
    def session(self) -> AsyncSession:
        return self._session

    async def __aenter__(self) -> AsyncSession:
        return await self._session.__aenter__()

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        return await self._session.__aexit__(exc_type, exc_value, traceback)

Session Dependency

from collections.abc import AsyncGenerator
from typing import Annotated

from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

from src.somewhere.db import AsyncSessionMaker  # import previously made session maker

__all__ = ("AsyncSessionDep", "get_async_session")


async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
    async with AsyncSessionMaker() as session:
        try:
            yield session
            await session.commit()
        finally:
            await session.close()


AsyncSessionDep = Annotated[AsyncSession, Depends(get_async_session)]

Repository Base-class and Example for Model Video

BaseRepository

from collections.abc import Sequence
from typing import Generic, TypeVar
from uuid import UUID  # id for those examples would be as UUID

from sqlalchemy import ColumnExpressionArgument, Subquery, func, inspect, select
from sqlalchemy.ext.asyncio import AsyncSession

from src.somewhere.db import SQLAlchemyBaseModel  # your base-class for sqlalchemy models

_MT = TypeVar("_MT", bound=SQLAlchemyBaseModel)


class BaseRepository(Generic[_MT]):
    _model: type[_MT]
    _session: AsyncSession

    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def get_by_id(self, id_: UUID) -> _MT:
        """
        Get model instance by id
        """
        query = select(self._model).where(self._model.id == id_)
        result = await self._session.execute(query)
        return result.scalar_one()

    async def get_all(self) -> Sequence[_MT]:
        """
        Get all model instances
        """
        query = select(self._model)
        result = await self._session.execute(query)
        return result.scalars().all()

    async def save(self, instance: _MT) -> _MT:
        """
        Save a new model instance or update if exists
        """
        inspr = inspect(instance)
        if not inspr.modified and inspr.has_identity:
            return instance

        self._session.add(instance)
        await self._session.flush()
        await self._session.refresh(instance)

        return instance

    async def delete(self, instance: _MT) -> None:
        """
        Delete a model instance from database
        """
        await self._session.delete(instance)
        await self._session.flush()

    async def count(
        self, from_query: ColumnExpressionArgument[bool] | None = None, sub_query: Subquery | None = None
    ) -> int:
        if from_query is not None:
            query = select(func.count()).select_from(select(self._model).where(from_query).subquery())
        elif sub_query is not None:
            query = select(func.count()).select_from(sub_query)
        else:
            query = select(func.count()).select_from(self._model)

        result = await self._session.execute(query)
        return result.scalar_one()

VideoRepository

from src.somewhere.videos.models import Video
from src.somewhere.db.repositories import BaseRepository


class VideoRepository(BaseRepository[Video]):
    _model = Video

Usage

import uuid

from fastapi import APIRouter, status
from sqlalchemy.exc import NoResultFound

from src.somewhere.dependencies.db import AsyncSessionDep  # made above for you
from src.somewhere.videos.dao import VideoDao
from src.somewhere.videos.models import Video
from src.somewhere.videos.repositories import VideoRepository

router = APIRouter(prefix="/{video_id}")


@router.get("/", response_model=VideoDao)
async def get_video_by_id(
    video_id: uuid.UUID,
    session: AsyncSessionDep,
) -> Video:
    try:
        return await VideoRepository(session).get_by_id(id_=video_id)
    except NoResultFound as err:
        raise YourOwnException("or do whatever you want") from err

ruslan-korneev avatar Jun 10 '24 15:06 ruslan-korneev

@ruslan-korneev thanks for the suggestion and apologies for the delayed response. I understand you wanting to separate sync data operations from async and agree with you on that, but adding another design pattern just for this seems a little overwhelming and when repository is a high level of dao(strictly speaking for data related stuff). We could work on adding a dao layer for async db sessions which is almost the same accept the name and initialisation stuff. What do you say are you up for it do you want to open a PR for it?

danielhasan1 avatar Jul 10 '24 09:07 danielhasan1