Document if Sender is fork-safe.
Consider a scenario where:
- master process attaches fluent Sender or logging Handler
- master process forks off a bunch of worker processes
- workers use handler
Will the socket get cloned or reopened? Does fluent logger expect a response from fluentd? If it does, can the response be read out by wrong sender process?
It depends on Python's socket implementation. If CPython, PyPy and other implmentaions are safe, we can write "This is fork-safe."
I'm interested in this as well, having some issues getting async handler sending to a remote fluentd host with a basic gunicorn + Django deployment
~I don't think~ It's definitely not fork-safe. For my example above, I needed make sure workers weren't preloaded to get the asynchandler.FluentHandler working. This is because gunicorns forks workers and the logging initialization happens pre-fork, in which a daemon is created by the async sender.
https://github.com/benoitc/gunicorn/issues/1045#issuecomment-351822283
Generally-speaking NO threaded application is fork-safe. Forks and threads are not compatible as POSIX mandates killing all threads upon forking. Sync sender should be safe.
If you want the fD survive you will need to make sure the worker inherit from it at the system level. This can be done similarly to what does the logging handler.
This turned out to be the cause of a hang I was seeing in the following setup:
- celery using
preforkpool - logging via
fluent.asynchandler.FluentHandler
If the celery worker parent process has done any logging before it forked the child workers, then the handler in the parent process will have constructed its async sender (which starts its thread) - so then all future child celery worker processes have a handler with a broken asyncsender (no thread) which causes the following symptoms:
- log messages from the celery worker children do not get sent to fluentd
- eventually the asyncsender queue gets full, which with defaults (queue_maxsize=100, queue_circular=False) causes the worker to hang when attempting to log its 101-th message, and it gets stuck in this stack trace e.g:
File "/python/lib/python3.11/threading.py", line 320 in wait
File "/python/lib/python3.11/queue.py", line 140 in put
File "site-packages/fluent/asyncsender.py", line 116 in _send
File "site-packages/fluent/sender.py", line 97 in emit_with_time
File "site-packages/fluent/handler.py", line 239 in emit
File "/python/lib/python3.11/logging/__init__.py", line 978 in handle
File "/python/lib/python3.11/logging/__init__.py", line 1706 in callHandlers
File "/python/lib/python3.11/logging/__init__.py", line 1644 in handle
File "/python/lib/python3.11/logging/__init__.py", line 1634 in _log
File "/python/lib/python3.11/logging/__init__.py", line 1477 in debug
...
Initially this only happened when I had certain log levels configured (resulting in early logging in the celery parent) but actually if anything is ever logged in the parent, then after that point if any child worker dies (e.g. OOM killer) or is recycled (by celery worker_max_tasks_per_child/--max-tasks-per-child for example) then the celery parent will start a new worker child to replace it and if the parent has, by that time, logged at least 1 message (resulting in the sender being set up) then the new child will hit this same problem.
I think it would be worthwhile documenting the risk of using this with anything that does fork and perhaps explicitly mentioning celery given it could be a common use case.
Also I wonder if fluent.asyncsender.FluentSender._send could help flag this issue by printing a warning if the thread is dead? e.g. after the if self._closed check:
if not self._send_thread.is_alive():
print(f'WARNING async FluentSender thread in pid={os.getpid()} is not alive', file=sys.stderr)
or possibly even automatically start the thread again (which would avoid the bug)?
Or another option would be to use os.register_at_fork to either log a warning in the parent that the fork-ed child is going to hit this issue, or to try and fix the problem in the child - e.g. by finding all async FluentHandler and removing their sender so they automatically create a new one. (I'll admit this option feels pretty hacky but came to mind as I'm considering how to work around this problem given we are currently using fluent-logger with celery and hitting this issue).
I'm happy to open a PR for any of the above suggestions if you think they are acceptable changes.