piccolo icon indicating copy to clipboard operation
piccolo copied to clipboard

Cannot perform operation: another operation is in progress

Open devsarvesh92 opened this issue 2 years ago • 5 comments

Description

I am executing a data migration process and encountering an error while calling asyncio.gather with a list of contracts to persist. The error message I am receiving is Cannot perform operation: another operation is in progress.

Code Snippet

async def save(contract): try: await Contract( ... ).save() except Exception as ex: print(f"Unable to save {contract=} due to {ex}")

async def persist_contracts(): ... asyncio.gather( *[save(contract=contract) for _, contract in contracts.iterrows()] )

Steps to Reproduce

  • Execute the data migration process.
  • Call the persist_contracts function.
  • Observe the error message Cannot perform operation: another operation is in progress.

Expected Result

The contracts should be successfully persisted.

Actual Result

The error message Cannot perform operation: another operation is in progress is raised.

Environment

Python version: 3.11.1 Piccolo version: 0.105.0

devsarvesh92 avatar Feb 01 '23 12:02 devsarvesh92

Any update on this?

sarvesh-deserve avatar Feb 03 '23 11:02 sarvesh-deserve

I haven't had time to properly investigate this yet.

When I've seen similar problems in the past, it's usually something to do with connection pools.

I'll have to write a unit test and see what's going on.

dantownsend avatar Feb 03 '23 17:02 dantownsend

Any update on this?

devsarvesh92 avatar Feb 17 '23 05:02 devsarvesh92

I've done a quick test, and asyncio.gather seems to work OK in general:

import asyncio

from piccolo.engine.postgres import PostgresEngine
from piccolo.table import Table
from piccolo.columns.column_types import Varchar


DB = PostgresEngine(config={
    'database': 'piccolo_asyncio_gather_demo'
})


class Band(Table, db=DB):
    name = Varchar()


async def main():
    await DB.start_connection_pool()

    await Band.create_table(if_not_exists=True)

    responses = await asyncio.gather(
        Band.select(),
        Band.select(),
        Band.select()
    )
    print(responses)

    await DB.close_connection_pool()

if __name__ == '__main__':
    asyncio.run(main())

I'll see if there's something with Piccolo's save method which doesn't work.

dantownsend avatar Feb 21 '23 13:02 dantownsend

I tried it with the save method too, and it seems OK.

import asyncio

from piccolo.engine.postgres import PostgresEngine
from piccolo.table import Table
from piccolo.columns.column_types import Varchar


DB = PostgresEngine(config={
    'database': 'piccolo_asyncio_gather_demo'
})


class Band(Table, db=DB):
    name = Varchar()


async def save():
    band = Band(name="test")
    await band.save()


async def main():
    await DB.start_connection_pool()

    await Band.create_table(if_not_exists=True)

    await asyncio.gather(
        save(),
        save(),
        save()
    )

    await DB.close_connection_pool()

if __name__ == '__main__':
    asyncio.run(main())

So it's a bit of a head scratcher - I'm not sure what's going on.

dantownsend avatar Feb 21 '23 13:02 dantownsend