reflex icon indicating copy to clipboard operation
reflex copied to clipboard

Cloud: Otel + Prometheus support

Open Kastier1 opened this issue 10 months ago • 2 comments

Target Period: Q3 2025 (Stretch) Description: Internally we use Opentelemetry (Otel) and Prometheus (metrics) to help us monitor our sites. We believe these things help all developers better understand how their app is working and ways to improve it. So we want to provide this service to our users. The goal is to provide an endpoint your can export your traces to and have a UI to view them in Cloud. As well as being able to enable your site to be scraped by our blackbox exporter and then allowing you to view your metrics over time and create custom dashboards.

This is a lofty goal of ours but we believe it should help provided increase visibility in to all our users apps!

Kastier1 avatar Mar 07 '25 17:03 Kastier1

Would be awesome to natively add OTEL instrumentations to the framework. If an exporter is defined, traces will be exported to the desired monitoring platform (e.g. Prometheus, Azure Application Insights, Datadog, ...).

As of today, I monkey patched the framework to intercept state changes and push traces with context. But it would be easier for everyone to provide it natively! ❤️

# app/app.py

# Init Reflex
app = rx.App(
    ...
)

# Import for monkey-patching
from app.helpers import reflex_patch_otel
# app/helpers/reflex_patch_otel.py

from collections.abc import AsyncIterator, Generator
from contextlib import contextmanager
from urllib.parse import urlparse

import reflex.app
from opentelemetry.semconv._incubating.attributes import (
    deployment_attributes,
    session_attributes,
    user_attributes,
)
from opentelemetry.semconv.attributes import (
    client_attributes,
    service_attributes,
    url_attributes,
)
from reflex.app import App
from reflex.event import Event
from reflex.state import StateUpdate

from app.helpers.logging import SERVICE_NAME, VERSION, log_context, logger
from app.helpers.monitoring import ENV_NAME, tracer
from app.helpers.state import AppState
from app.models.user import UserModel


@contextmanager
def _otel_event(
    client_ip: str,
    last_path: str,
    name: str,
    path: str,
    session_id: str,
    user: UserModel | None,
) -> Generator[None]:
    """
    Context manager to start an OpenTelemetry span for an event.

    Span name is derived from the event name, with the prefix `state.`.
    """
    # Build URL
    url = urlparse(last_path)  # Get previous state URL
    url._replace(path=path)  # Replace path with current path

    attributes: dict[str, str] = {
        # Service attributes
        service_attributes.SERVICE_NAME: SERVICE_NAME,
        service_attributes.SERVICE_VERSION: VERSION,
        # Session attributes
        session_attributes.SESSION_ID: session_id,
        # URL attributes
        url_attributes.URL_FRAGMENT: url.fragment,
        url_attributes.URL_FULL: url.geturl(),
        url_attributes.URL_PATH: url.path,
        url_attributes.URL_QUERY: url.query,
        url_attributes.URL_SCHEME: url.scheme,
        # Client attributes
        client_attributes.CLIENT_ADDRESS: client_ip,
    }

    # Deployment attributes
    if ENV_NAME:
        attributes.update(
            {
                deployment_attributes.DEPLOYMENT_ENVIRONMENT_NAME: ENV_NAME,
            }
        )

    # Set user attributes
    if user:
        attributes.update(
            {
                user_attributes.USER_HASH: user.hash,
                user_attributes.USER_ID: user.id,
                user_attributes.USER_NAME: user.email,
            }
        )
        if user.name:
            attributes.update(
                {
                    user_attributes.USER_FULL_NAME: user.name,
                }
            )

    # Start span
    with tracer.start_as_current_span(
        attributes=attributes,
        name=f"state.{name}",
    ):
        yield


async def process_event(
    app: App,
    event: Event,
    sid: str,
    headers: dict,
    client_ip: str,
) -> AsyncIterator[StateUpdate]:
    """
    Process an event with OpenTelemetry tracing.

    This function is a monkey-patched version of `reflex.app.process` that adds OpenTelemetry tracing.
    """
    state: AppState = await (
        await app.state_manager.get_state(event.substate_token)
    ).get_state(AppState)  # pyright: ignore[reportAssignmentType]
    user = state.user

    # Enrich logs
    with log_context(
        session=sid,
        user=user.small_hash if user else None,
    ):
        event_name = ".".join(event.name.split(".")[-2:])
        with _otel_event(
            client_ip=client_ip,
            last_path=state.router.page.full_raw_path,
            name=event_name,
            path=event.router_data["asPath"],
            session_id=sid,
            user=user,
        ):
            logger.debug("Processing event: %s", event_name)
            async for update in super_process(
                app=app,
                client_ip=client_ip,
                event=event,
                headers=headers,
                sid=sid,
            ):
                yield update


# Monkey-patch the process method
super_process = reflex.app.process
reflex.app.process = process_event

clemlesne avatar Apr 08 '25 12:04 clemlesne