improve examples
One of the next patterns I'm trying to implement is a notification system. I'm not familiar with async programming as a paradigm so it would be great to have some example of how this might be done with SSE.
I'd be happy to build out an example but I am still adjusting. This is my first attempt:
class Listener:
def __init__(self):
self.messages = []
def send(self, message):
self.messages.append(message)
async def receive(self):
while len(self.messages) == 0:
await asyncio.sleep(0.1)
return self.messages.pop()
class Notifier:
def __init__(self):
self.listeners = []
async def listen(self):
listener = Listener()
self.listeners.append(listener)
while True:
notification = await listener.receive()
yield (b'notify', notification)
def notify(self, message):
for l in self.listeners:
l.send(message)
notifier = Notifier()
@app.route('/notifications$')
async def notify(sse=None, **server):
if sse:
await sse.send(b'listening', event="notify")
async for ev, data in notifier.listen():
await sse.send(data, event=ev)
await sse.close()
@app.route('/ping$')
async def ping():
notifier.notify(b'ping!')
I'm opening an EventSource at /notifications and successfully pinging all listening clients, but the event sends multiple times and I generally feel unhappy with how the implementation feels. I feel like there should be some way to call send() and have that resolve the awaiting receive() in Listener without using a message queue and sleep loop, but I'm in unfamiliar territory here.
Suggestions?
Classes are very confusing at the beginning.
Let's start with the basics, is it your idea to do something like this?
import asyncio
listeners = set()
@app.route('/notifications$')
async def notify(sse=None, **server):
if sse:
await sse.send(b'listening', event='notify')
queue = asyncio.Queue(maxsize=1)
listeners.add(queue)
try:
while True:
ev, data = await queue.get()
await sse.send(data, event=ev)
finally: # the client has gone, clean up
listeners.discard(queue)
await sse.close()
@app.route('/ping$')
async def ping():
for queue in listeners:
queue.put_nowait(['notify', b'ping!'])
return f'ping to {len(listeners)} listeners'
Alternatively without Queue:
import asyncio
listeners = set()
@app.route('/notifications$')
async def notify(sse=None, **server):
if sse:
await sse.send(b'listening', event='notify')
listeners.add(sse)
try:
await asyncio.get_running_loop().create_future()
finally: # the client has gone, clean up
listeners.discard(sse)
await sse.close()
@app.route('/ping$')
async def ping():
for sse in listeners:
try:
await sse.send(b'ping!', event='notify')
except RuntimeError: # the client has gone
pass
return f'ping to {len(listeners)} listeners'
I'm not opposed to the queue, I just figured there would be a better way than what I implemented.
I think that futures are the solution that I was looking for, but they read less intuitively than the first example. I'll put some work into it from here, thanks.
Glad if it helps. For the record, regex routing without ^ can go wrong, as it can happily accept /another/path/notifications.
would be great to have this as part of an example: https://github.com/nggit/tremolo/issues/295
this is also good example material I think: https://github.com/nggit/tremolo/issues/288