broadcaster icon indicating copy to clipboard operation
broadcaster copied to clipboard

Postgres: Cannot perform operation: another operation is in progress

Open kellen opened this issue 5 years ago • 6 comments

Using broadcaster w/ fastapi and seeing an exception when using broadcaster via websockets.

Relevant parts:

    await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
    # ...
    'cannot perform operation: another operation is in progress')

Full trace:

INFO:     ('127.0.0.1', 52654) - "WebSocket /viewers" [accepted]
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File ".../site-packages/uvicorn/protocols/websockets/websockets_impl.py", line 154, in run_asgi
    result = await self.app(self.scope, self.asgi_receive, self.asgi_send)
  File ".../site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
    return await self.app(scope, receive, send)
  File ".../site-packages/fastapi/applications.py", line 179, in __call__
    await super().__call__(scope, receive, send)
  File ".../site-packages/starlette/applications.py", line 111, in __call__
    await self.middleware_stack(scope, receive, send)
  File ".../site-packages/starlette/middleware/errors.py", line 146, in __call__
    await self.app(scope, receive, send)
  File ".../site-packages/starlette/exceptions.py", line 58, in __call__
    await self.app(scope, receive, send)
  File ".../site-packages/starlette/routing.py", line 566, in __call__
    await route.handle(scope, receive, send)
  File ".../site-packages/starlette/routing.py", line 283, in handle
    await self.app(scope, receive, send)
  File ".../site-packages/starlette/routing.py", line 57, in app
    await func(session)
  File ".../site-packages/fastapi/routing.py", line 228, in app
    await dependant.call(**values)
  File "./main.py", line 198, in events_ws
    (viewers_ws_sender, {"websocket": websocket}),
  File ".../site-packages/starlette/concurrency.py", line 18, in run_until_first_complete
    [task.result() for task in done]
  File ".../site-packages/starlette/concurrency.py", line 18, in <listcomp>
    [task.result() for task in done]
  File "./main.py", line 204, in viewers_ws_receiver
    await broadcast.publish(channel="viewers", message=message)
  File ".../site-packages/broadcaster/_base.py", line 72, in publish
    await self._backend.publish(channel, message)
  File ".../site-packages/broadcaster/_backends/postgres.py", line 25, in publish
    await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
  File ".../site-packages/asyncpg/connection.py", line 297, in execute
    _, status, _ = await self._execute(query, args, 0, timeout, True)
  File ".../site-packages/asyncpg/connection.py", line 1444, in _execute
    with self._stmt_exclusive_section:
  File ".../site-packages/asyncpg/connection.py", line 1891, in __enter__
    'cannot perform operation: another operation is in progress')
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
INFO:     ('127.0.0.1', 52655) - "WebSocket /events" [accepted]

Doing this via code that looks something like this, where I think the lock is just slowing things down and exposing the contention.

@app.websocket("/viewers", name="viewers_ws")
async def events_ws(websocket: WebSocket):
    await websocket.accept()
    await run_until_first_complete(
        (viewers_ws_receiver, {"websocket": websocket}),
        (viewers_ws_sender, {"websocket": websocket}),
    )

my_lock = threading.Lock()

async def viewers_ws_receiver(websocket: WebSocket):
    async for message in websocket.iter_text():
        await broadcast.publish(channel="viewers", message=message)

async def viewers_ws_sender(websocket: WebSocket):
    async with broadcast.subscribe(channel="viewers") as subscriber:
        async for event in subscriber:
            counter = 0
            with my_lock:
                # do something with event.message
                counter = ...
            await websocket.send_json({"viewers": counter})

Update: Refactored down to a single websocket and not using a lock for anything and saw this exception again. :shrug:

kellen avatar Sep 18 '20 21:09 kellen

@kellen did you find a solution for this?

AntonOfTheWoods avatar Oct 07 '21 03:10 AntonOfTheWoods

@AntonOfTheWoods no, I added redis as the backend instead.

kellen avatar Oct 22 '21 18:10 kellen

@AntonOfTheWoods A little delay solves the problem

@app.websocket("/viewers", name="viewers_ws")
async def events_ws(websocket: WebSocket):
    await websocket.accept()
    await run_until_first_complete(
        (viewers_ws_receiver, {"websocket": websocket}),
        (viewers_ws_sender, {"websocket": websocket}),
    )

my_lock = threading.Lock()

async def viewers_ws_receiver(websocket: WebSocket):
    await asyncio.sleep(0.01) # <-- pass error
    async for message in websocket.iter_text():
        await broadcast.publish(channel="viewers", message=message)

async def viewers_ws_sender(websocket: WebSocket):
    async with broadcast.subscribe(channel="viewers") as subscriber:
        async for event in subscriber:
            counter = 0
            with my_lock:
                # do something with event.message
                counter = ...
            await websocket.send_json({"viewers": counter})

uralbash avatar Nov 21 '23 13:11 uralbash