tremolo icon indicating copy to clipboard operation
tremolo copied to clipboard

improve examples

Open ZenenTreadwell opened this issue 11 months ago • 5 comments

One of the next patterns I'm trying to implement is a notification system. I'm not familiar with async programming as a paradigm so it would be great to have some example of how this might be done with SSE.

I'd be happy to build out an example but I am still adjusting. This is my first attempt:

class Listener:
    def __init__(self):
        self.messages = []

    def send(self, message):
        self.messages.append(message)

    async def receive(self):
        while len(self.messages) == 0:
            await asyncio.sleep(0.1)

        return self.messages.pop()


class Notifier:
    def __init__(self):
        self.listeners = []

    async def listen(self):
        listener = Listener()
        self.listeners.append(listener)

        while True:
            notification = await listener.receive()
            yield (b'notify', notification)

    def notify(self, message):
        for l in self.listeners:
            l.send(message)

notifier = Notifier()

@app.route('/notifications$')
async def notify(sse=None, **server):
    if sse:
        await sse.send(b'listening', event="notify")

        async for ev, data in notifier.listen():
            await sse.send(data, event=ev)
        await sse.close()


@app.route('/ping$')
async def ping():
        notifier.notify(b'ping!')

I'm opening an EventSource at /notifications and successfully pinging all listening clients, but the event sends multiple times and I generally feel unhappy with how the implementation feels. I feel like there should be some way to call send() and have that resolve the awaiting receive() in Listener without using a message queue and sleep loop, but I'm in unfamiliar territory here.

Suggestions?

ZenenTreadwell avatar May 15 '25 15:05 ZenenTreadwell

Classes are very confusing at the beginning.

Let's start with the basics, is it your idea to do something like this?

import asyncio

listeners = set()


@app.route('/notifications$')
async def notify(sse=None, **server):
    if sse:
        await sse.send(b'listening', event='notify')

        queue = asyncio.Queue(maxsize=1)
        listeners.add(queue)

        try:
            while True:
                ev, data = await queue.get()
                await sse.send(data, event=ev)
        finally:  # the client has gone, clean up
            listeners.discard(queue)
            await sse.close()


@app.route('/ping$')
async def ping():
    for queue in listeners:
        queue.put_nowait(['notify', b'ping!'])

    return f'ping to {len(listeners)} listeners'

Alternatively without Queue:

import asyncio

listeners = set()


@app.route('/notifications$')
async def notify(sse=None, **server):
    if sse:
        await sse.send(b'listening', event='notify')
        listeners.add(sse)

        try:
            await asyncio.get_running_loop().create_future()
        finally:  # the client has gone, clean up
            listeners.discard(sse)
            await sse.close()


@app.route('/ping$')
async def ping():
    for sse in listeners:
        try:
            await sse.send(b'ping!', event='notify')
        except RuntimeError:  # the client has gone
            pass

    return f'ping to {len(listeners)} listeners'

nggit avatar May 15 '25 23:05 nggit

I'm not opposed to the queue, I just figured there would be a better way than what I implemented.

I think that futures are the solution that I was looking for, but they read less intuitively than the first example. I'll put some work into it from here, thanks.

ZenenTreadwell avatar May 16 '25 01:05 ZenenTreadwell

Glad if it helps. For the record, regex routing without ^ can go wrong, as it can happily accept /another/path/notifications.

nggit avatar May 16 '25 05:05 nggit

would be great to have this as part of an example: https://github.com/nggit/tremolo/issues/295

ZenenTreadwell avatar Jul 07 '25 14:07 ZenenTreadwell

this is also good example material I think: https://github.com/nggit/tremolo/issues/288

ZenenTreadwell avatar Jul 07 '25 15:07 ZenenTreadwell