Add usage example for Celery
I recently wanted to collect custom application metrics from a rather large Celery app. I tried several approaches with varying degrees of awkwardness, until finally settling on having a "sidecar" HTTP server process for all Celery nodes.
This works pretty well, but took me a while to get right. To spare others from wasting as much time as me, I would like to propose adding a section on getting Celery metrics set up with the Prometheus client. I wrote up the solution on StackOverflow: https://stackoverflow.com/a/75799358/2532203
If you're interested, I can properly summarise it and open a PR?
Also, if you look at the code, I needed to copy some swaths of the library because the built-in Prometheus WSGI server is hard-coded to demonise the thread; this causes trouble when running in a demonised process. Maybe this could be made optional with another keyword argument that defaults to True?
Thanks a lot for this as it is exactly what I was looking.
Should it work with Celery in Django as well?
@NicoCaldo That's exactly my setup. It works fine because the metrics HTTP server doesn't rely on Django, so there's nothing additional to configure
I ran into this myself. While your solution looks correct, I found this to also work:
# Whatever hook you want, you can do this outside of a hook too.
@signals.celeryd_init.connect
def name_this_what_you_want(sender=None, conf=None, **kwargs):
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
start_http_server(8000, registry=registry)
This way the processes in the celery pool can send back custom metrics (as in, non-host metrics) back to the one and only http_server. This avoids using a pushgateway.
This way the processes in the celery pool can send back custom metrics (as in, non-host metrics) back to the one and only http_server. This avoids using a pushgateway.
You wouldn't need a pushgateway with my solution either, but your approach uses way less code, so that definitely is an improvement!
Edit: looking back, I actually tried it this way and ran into multiprocessing issues, although I'm no longer quite sure which. Have you used this code in production, with multiple workers and multiple nodes?
This way the processes in the celery pool can send back custom metrics (as in, non-host metrics) back to the one and only http_server. This avoids using a pushgateway.
You wouldn't need a pushgateway with my solution either, but your approach uses way less code, so that definitely is an improvement!
Edit: looking back, I actually tried it this way and ran into multiprocessing issues, although I'm no longer quite sure which. Have you used this code in production, with multiple workers and multiple nodes?
Not quite extensively, though so far I haven't ran into issues. You could be right, I will write back if I ever run into any.
Reporting back for those who are interested, the code I provided has not caused any issues so far. Metric collection works fine. 32 concurrent cores writing to prometheus through my setup have proven to not crash. I should note that we do multithreading within a worker, it works.
@Radiergummi I highly recommend adding an example that shows your methodology and mine. It would have saved me a day of headaches. Feel free to @ me.
I ran into this myself. While your solution looks correct, I found this to also work:
# Whatever hook you want, you can do this outside of a hook too. @signals.celeryd_init.connect def name_this_what_you_want(sender=None, conf=None, **kwargs): registry = CollectorRegistry() multiprocess.MultiProcessCollector(registry) start_http_server(8000, registry=registry)This way the processes in the celery pool can send back custom metrics (as in, non-host metrics) back to the one and only http_server. This avoids using a pushgateway.
Error occurred for me when Celery received a new task and start to process, not sure why
objc[7678]: +[NSCharacterSet initialize] may have been in progress in another thread when fork() was called.
objc[7678]: +[NSCharacterSet initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
[2024-02-22 16:02:08,420: ERROR/MainProcess] Process 'ForkPoolWorker-8' pid:7678 exited with 'signal 6 (SIGABRT)'
[2024-02-22 16:02:08,434: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 6 (SIGABRT) Job: 0.')
Traceback (most recent call last):
File "/Users/folyd/UTA/chatbot/chatbot/.venv/lib/python3.11/site-packages/billiard/pool.py", line 1264, in mark_as_worker_lost
raise WorkerLostError(
billiard.einfo.ExceptionWithTraceback:
"""
Traceback (most recent call last):
File "/Users/folyd/UTA/chatbot/chatbot/.venv/lib/python3.11/site-packages/billiard/pool.py", line 1264, in mark_as_worker_lost
raise WorkerLostError(
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 6 (SIGABRT) Job: 0.
@Folyd That looks unrelated to the metrics code, at least on first glance. Have you tried my original code from StackOverflow, too? The fork issue vaguely looks similar to my issue back then.