Add support for logical replication protocol.
I looked at docs and code. I didn't find anything similar.
Assuming you are referring to the new logical replication commands in the upcoming PostgreSQL 10, there's no need for a special API in asyncpg. You can issue these commands like any other SQL through Connection.execute.
Actually, I meant https://www.postgresql.org/docs/9.6/static/logicaldecoding.html
In that case also, conn.fetch('SELECT * pg_logical_slot_get_changes(...)') would work. What are you looking from asyncpg in this case specifically?
I need asynchronous driver to consume messages from PostgreSQL and put them to queue. Every message issues when was change. I think if there are a lot of changes then synchronous variant will be slow
AFAIK, Postgres does not send protocol-level notifications on the arrival of replicated data. It's a stream you need to poll yourself. Just create an asyncio task on a separate connection that runs the above query continuously. Something like that:
async def poll_replication_stream(conn):
while True:
async for r in conn.fetch('SELECT * FROM pg_logical_slot_get_changes(...)'):
await put_to_queue(r)
await asyncio.sleep(polling_interval)
...
conn = await asyncpg.connect(...)
loop.create_task(poll_replication_stream(conn))
Just to clarify, there IS an entirely separate replication protocol: https://www.postgresql.org/docs/9.6/static/protocol-replication.html. However, that is very low level: you essentially deal with the raw WAL log. It's not clear whether there's any benefit of implementing it in asyncpg.
Thanks for advice. Code which implements logical replication in psycopg2 uses protocal replication. I thought use it too but I don't know yet.(I mean protocal replication, psycopg2 doesn't support protocol replication in async mode)
BTW, postgresql doc tells that after success connection, postgresql server will send stream to client (On success, server responds with a CopyBothResponse message, and then starts to stream WAL to the frontend) that's why I wanted to use asynchronous variant. Server will send a lot of messages to my client in this way.
Maybe, your suggested variant more simple and suitable, but maybe it will spend more resources because of higher level? Oh, I didn't see polling interval
I'll keep this open. We may potentially look into implementing support for the replication protocol.
Any thoughts on implementing this? Was hopping it was built in asyncpg
+1 this would be a good feature to have