reflex icon indicating copy to clipboard operation
reflex copied to clipboard

Feature Request -- Send multiple deltas

Open paul-sx opened this issue 3 years ago β€’ 4 comments

Although support for async events exist, state updates are only sent once the function is awaited and completes. This prevents us from sending multiple updates to the front end for a long running event task.

For example, with an on_click event, I'd like to set a widget to update to show that a process has started, then show the current status of the process, until finally when the long-running process finishes, show the result. Currently, the UI appears to "freeze" until the long running process finishes and only the final results are shown. With the use of the websocket, we should be able to push multiple state updates over the course of one event.

paul-sx avatar Jan 05 '23 17:01 paul-sx

Got suggestion well brainstorm how to implement this

Alek99 avatar Jan 06 '23 00:01 Alek99

Maybe we can use Python generators, something like

class State(pc.State):
    progress: float

    def do_thing(self):
        self.progress = 0
        for i in range(100):
            self.progress += 0.1
            yield

Then every time it yields it will send a delta back to update the frontend.

picklelo avatar Jan 06 '23 00:01 picklelo

I was thinking something where the event function passed the websocket to process

def event(app: App):
    """Websocket endpoint for events.
    Args:
        app: The app to add the endpoint to.
    Returns:
        The websocket endpoint.
    """

    async def ws(websocket: WebSocket):
        """Create websocket endpoint.
        Args:
            websocket: The websocket sending events.
        """
        app.websocket = websocket
        # Accept the connection.
        await websocket.accept()

        # Process events until the connection is closed.
        while True:
            # Get the event.
            try:
                event = Event.parse_raw(await websocket.receive_text())
            except WebSocketDisconnect:
                # Close the connection.
                return

            # Process the event.
            update = await process(app, event, websocket)

            # Send the update.
            await websocket.send_text(update.json())

    return ws

With the change at update = await process(app, event, websocket)

Then flow that in process to the event

async def process(app: App, event: Event, websocket: Websocket) -> StateUpdate:
    """Process an event.
    Args:
        app: The app to process the event for.
        event: The event to process.
    Returns:
        The state update after processing the event.
    """
    # Get the state for the session.
    state = app.state_manager.get_state(event.token)

    # Preprocess the event.
    pre = app.preprocess(state, event, websocket)
    if pre is not None:
        return StateUpdate(delta=pre)

    # Apply the event to the state.
    update = await state.process(event, websocket)
    app.state_manager.set_state(event.token, state)

    # Postprocess the event.
    post = app.postprocess(state, event, websocket, update.delta)
    if post is not None:
        return StateUpdate(delta=post)

    # Return the update.
    return update

Then within state, have a callable that can be used within the state

class State(Base, ABC):
    """The state of the app."""
    
    send_deltas: callable = None

And set it when an event method is called

    async def process(self, event: Event, websocket: Websocket) -> StateUpdate:
        """Process an event.
        Args:
            event: The event to process.
        Returns:
            The state update after processing the event.
        """
        
        def create_update(event: Event, websocket: Websocket):
            def update(self):
                delta = self.get_delta()
                self.clean()
                update = StateUpdate(delta=delta, events=[])
                websocket.send_text(update.json())
            return update
        self.send_deltas = create_update(event, websocket)
  
        # Get the event handler.
        path = event.name.split(".")
        path, name = path[:-1], path[-1]
        substate = self.get_substate(path)
        handler = getattr(substate, name)

        # Process the event.
        fn = functools.partial(handler.fn, substate)
        try:
            if asyncio.iscoroutinefunction(fn.func):
                events = await fn(**event.payload)
            else:
                events = fn(**event.payload)
        except:
            error = traceback.format_exc()
            print(error)
            return StateUpdate(
                events=[window_alert("An error occurred. See logs for details.")]
            )
        finally:
            self.send_deltas = None

        # Fix the returned events.
        events = utils.fix_events(events, event.token)

        # Get the delta after processing the event.
        delta = self.get_delta()

        # Reset the dirty vars.
        self.clean()

        # Return the state update.
        return StateUpdate(delta=delta, events=events)

Then when you want to send a state update from a state function, you'd just call self.send_deltas()

Or at least was what I was thinking of when I was reporting

paul-sx avatar Jan 06 '23 03:01 paul-sx

Got it, that makes a lot of sense, thanks for the explanation! We definitely need something like this - will add it in the following release.

picklelo avatar Jan 07 '23 21:01 picklelo