asyncpg icon indicating copy to clipboard operation
asyncpg copied to clipboard

Add support for logical replication protocol.

Open GregEremeev opened this issue 8 years ago • 10 comments

I looked at docs and code. I didn't find anything similar.

GregEremeev avatar Mar 21 '17 13:03 GregEremeev

Assuming you are referring to the new logical replication commands in the upcoming PostgreSQL 10, there's no need for a special API in asyncpg. You can issue these commands like any other SQL through Connection.execute.

elprans avatar Mar 21 '17 13:03 elprans

Actually, I meant https://www.postgresql.org/docs/9.6/static/logicaldecoding.html

GregEremeev avatar Mar 21 '17 13:03 GregEremeev

In that case also, conn.fetch('SELECT * pg_logical_slot_get_changes(...)') would work. What are you looking from asyncpg in this case specifically?

elprans avatar Mar 21 '17 13:03 elprans

I need asynchronous driver to consume messages from PostgreSQL and put them to queue. Every message issues when was change. I think if there are a lot of changes then synchronous variant will be slow

GregEremeev avatar Mar 21 '17 13:03 GregEremeev

AFAIK, Postgres does not send protocol-level notifications on the arrival of replicated data. It's a stream you need to poll yourself. Just create an asyncio task on a separate connection that runs the above query continuously. Something like that:


async def poll_replication_stream(conn):
    while True:
        async for r in conn.fetch('SELECT * FROM pg_logical_slot_get_changes(...)'):
            await put_to_queue(r)
        await asyncio.sleep(polling_interval)

...

conn = await asyncpg.connect(...)
loop.create_task(poll_replication_stream(conn))

elprans avatar Mar 21 '17 13:03 elprans

Just to clarify, there IS an entirely separate replication protocol: https://www.postgresql.org/docs/9.6/static/protocol-replication.html. However, that is very low level: you essentially deal with the raw WAL log. It's not clear whether there's any benefit of implementing it in asyncpg.

elprans avatar Mar 21 '17 14:03 elprans

Thanks for advice. Code which implements logical replication in psycopg2 uses protocal replication. I thought use it too but I don't know yet.(I mean protocal replication, psycopg2 doesn't support protocol replication in async mode)

BTW, postgresql doc tells that after success connection, postgresql server will send stream to client (On success, server responds with a CopyBothResponse message, and then starts to stream WAL to the frontend) that's why I wanted to use asynchronous variant. Server will send a lot of messages to my client in this way.

Maybe, your suggested variant more simple and suitable, but maybe it will spend more resources because of higher level? Oh, I didn't see polling interval

GregEremeev avatar Mar 21 '17 14:03 GregEremeev

I'll keep this open. We may potentially look into implementing support for the replication protocol.

elprans avatar Mar 21 '17 14:03 elprans

Any thoughts on implementing this? Was hopping it was built in asyncpg

oldani avatar Sep 12 '21 19:09 oldani

+1 this would be a good feature to have

karol-gruszczyk avatar Aug 17 '22 19:08 karol-gruszczyk