OneAgent-SDK-Python-AutoInstrumentation icon indicating copy to clipboard operation
OneAgent-SDK-Python-AutoInstrumentation copied to clipboard

Support for Dramatiq

Open santigandolfo opened this issue 1 year ago • 1 comments

Is your feature request related to a problem? Please describe. Please consider adding dramatiq to the supported technologies. It currently has around 4.1k stars on Github, so it is certainly a popular and widely used library.

Describe the solution you'd like Adding support for Dramatiq.

Describe alternatives you've considered

Additional context

santigandolfo avatar May 20 '24 17:05 santigandolfo

I created this Middleware:

from autodynatrace.sdk import sdk
from dramatiq import Broker
from dramatiq import Message
from dramatiq.middleware import Middleware
from oneagent.common import ChannelType
from oneagent.common import MessagingDestinationType
from oneagent.sdk import Channel
from oneagent.sdk.tracers import Tracer


class DynatraceMiddleware(Middleware):
    def __init__(self):
        self.tracers = {}

    @staticmethod
    def _create_msi_handle(message: Message):
        return sdk.create_messaging_system_info(
            'Dramatiq',
            message.actor_name,
            MessagingDestinationType.QUEUE,
            Channel(ChannelType.OTHER, 'dramatiq'),
        )

    def _start_tracer(self, message: Message, tracer: Tracer):
        tracer.start()
        self.tracers[message.message_id] = tracer

    def _end_tracer(self, message: Message):
        tracer: Tracer | None = self.tracers.get(message.message_id, None)
        if tracer is not None:
            tracer.end()
        self.tracers.pop(message.message_id, None)

    def before_enqueue(self, broker: Broker, message: Message, delay: int):
        msi_handle = self._create_msi_handle(message=message)
        with msi_handle:
            tracer = sdk.trace_outgoing_message(msi_handle)
            self._start_tracer(message=message, tracer=tracer)

    def before_process_message(self, broker: Broker, message: Message):
        msi_handle = self._create_msi_handle(message=message)
        with msi_handle:
            with sdk.trace_incoming_message_receive(msi_handle):
                tracer = sdk.trace_incoming_message_process(msi_handle)
                self._start_tracer(message=message, tracer=tracer)

    def after_enqueue(self, broker: Broker, message: Message, delay: int):
        self._end_tracer(message=message)

    def after_process_message(
        self, broker: Broker, message: Message, *, result=None, exception=None
    ):
        self._end_tracer(message=message)

    def after_skip_message(self, broker: Broker, message: Message):
        self._end_tracer(message=message)

How should it have to be modified to work as a wrapper?

santigandolfo avatar May 22 '24 19:05 santigandolfo