Performance problem with server-side cursor and fetchmany()
I'm reading a very large result set from MySQL and trying to interleave calling fetchmany() chunks from multiple connections' streams using async. This works with the default cursor settings, but when using a server-side cursor, it appears to spend about 10x as much time handling IO. I'm fairly confident it's not a problem specifically with the server side cursor implementation, since the performance difference between client-side and server-side cursors is negligible in an equivalent synchronized test case.
When I profile an async test using the default client-side cursor, most of the execution time is spent handling IO (select.kqueue.control()), as expected. However, when using the server-side cursor the number of calls roughly doubles (1435 -> 2573) and wall clock time increases 10x (3149ms -> 33631ms).
I'm using the mysqlclient driver on macOS. Is there any reason why this might be expected? Or a bug? Please let me know if there is anything I can do to help isolate this problem. This test case should reproduce the issue (it doesn't setup the db, though).
import asyncio
from aiostream import stream
from sqlalchemy_aio import ASYNCIO_STRATEGY
from sqlalchemy import select, create_engine
url = "mysql+mysqldb://root:@localhost/mydb"
engine = create_engine(url, strategy=ASYNCIO_STRATEGY, server_side_cursors=True)
engine_sync = create_engine(url, server_side_cursors=True)
QUERY_ROWS = 1_000_000
BATCH_SIZE = 10_000
query = select([
mytable.c.col1,
mytable.c.col2,
mytable.c.col3
]).limit(QUERY_ROWS)
def get_results_sync(q, batch_size):
with engine_sync.connect() as conn:
result = conn.execute(q)
while True:
chunk = result.fetchmany(batch_size)
if not chunk:
break
yield len(chunk)
async def get_results(q, batch_size):
async with engine.connect() as conn:
result = await conn.execute(q)
while True:
chunk = await result.fetchmany(batch_size)
if not chunk:
break
yield len(chunk)
async def main():
gens = (get_results(query, BATCH_SIZE) for i in range(5))
xs = stream.merge(*gens)
return sum(await stream.list(xs))
def main_sync():
gens = (get_results_sync(query, BATCH_SIZE) for i in range(5))
return sum(count for gen in gens for count in gen)
Run asyncio.run(main()) and toggle server_side_cursors on engine to repro the performance problem. To see server_side_cursors working as expected, run main_sync()