OneAgent-SDK-Python-AutoInstrumentation
OneAgent-SDK-Python-AutoInstrumentation copied to clipboard
Support for Dramatiq
Is your feature request related to a problem? Please describe. Please consider adding dramatiq to the supported technologies. It currently has around 4.1k stars on Github, so it is certainly a popular and widely used library.
Describe the solution you'd like Adding support for Dramatiq.
Describe alternatives you've considered
Additional context
I created this Middleware:
from autodynatrace.sdk import sdk
from dramatiq import Broker
from dramatiq import Message
from dramatiq.middleware import Middleware
from oneagent.common import ChannelType
from oneagent.common import MessagingDestinationType
from oneagent.sdk import Channel
from oneagent.sdk.tracers import Tracer
class DynatraceMiddleware(Middleware):
def __init__(self):
self.tracers = {}
@staticmethod
def _create_msi_handle(message: Message):
return sdk.create_messaging_system_info(
'Dramatiq',
message.actor_name,
MessagingDestinationType.QUEUE,
Channel(ChannelType.OTHER, 'dramatiq'),
)
def _start_tracer(self, message: Message, tracer: Tracer):
tracer.start()
self.tracers[message.message_id] = tracer
def _end_tracer(self, message: Message):
tracer: Tracer | None = self.tracers.get(message.message_id, None)
if tracer is not None:
tracer.end()
self.tracers.pop(message.message_id, None)
def before_enqueue(self, broker: Broker, message: Message, delay: int):
msi_handle = self._create_msi_handle(message=message)
with msi_handle:
tracer = sdk.trace_outgoing_message(msi_handle)
self._start_tracer(message=message, tracer=tracer)
def before_process_message(self, broker: Broker, message: Message):
msi_handle = self._create_msi_handle(message=message)
with msi_handle:
with sdk.trace_incoming_message_receive(msi_handle):
tracer = sdk.trace_incoming_message_process(msi_handle)
self._start_tracer(message=message, tracer=tracer)
def after_enqueue(self, broker: Broker, message: Message, delay: int):
self._end_tracer(message=message)
def after_process_message(
self, broker: Broker, message: Message, *, result=None, exception=None
):
self._end_tracer(message=message)
def after_skip_message(self, broker: Broker, message: Message):
self._end_tracer(message=message)
How should it have to be modified to work as a wrapper?