faktory_worker_python icon indicating copy to clipboard operation
faktory_worker_python copied to clipboard

BrokenProcessPool error

Open JoaoPedroAssis opened this issue 2 years ago • 4 comments

Hey guys! I've been using this faktory adaptor for quite some time and have had great success with it!

My application consists of a faktory worker inside of a kubernetes pod, which runs a image processing algorithm. The system works really well, but when I checked the logs recently I encoutered this error:

                    ╭──── Traceback (most recent call last) ─────╮              
                    │ /app/venv/lib/python3.8/site-packages/fakt │              
                    │ ory/worker.py:248 in                       │              
                    │ send_status_to_faktory                     │              
                    │                                            │              
                    │   245 │   │   │   if future.done():        │              
                    │   246 │   │   │   │   self._pending.remove │              
                    │   247 │   │   │   │   try:                 │              
                    │ ❱ 248 │   │   │   │   │   future.result(ti │              
                    │   249 │   │   │   │   │   self._ack(future │              
                    │   250 │   │   │   │   except KeyboardInter │              
                    │   251 │   │   │   │   │   self._fail(futur │              
                    │                                            │              
                    │ /usr/local/lib/python3.8/concurrent/future │              
                    │ s/_base.py:437 in result                   │              
                    │                                            │              
                    │   434 │   │   │   │   if self._state in [C │              
                    │   435 │   │   │   │   │   raise CancelledE │              
                    │   436 │   │   │   │   elif self._state ==  │              
                    │ ❱ 437 │   │   │   │   │   return self.__ge │              
                    │   438 │   │   │   │                        │              
                    │   439 │   │   │   │   self._condition.wait │              
                    │   440                                      │              
                    │                                            │              
                    │ /usr/local/lib/python3.8/concurrent/future │              
                    │ s/_base.py:389 in __get_result             │              
                    │                                            │              
                    │   386 │   def __get_result(self):          │              
                    │   387 │   │   if self._exception:          │              
                    │   388 │   │   │   try:                     │              
                    │ ❱ 389 │   │   │   │   raise self._exceptio │              
                    │   390 │   │   │   finally:                 │              
                    │   391 │   │   │   │   # Break a reference  │              
                    │   392 │   │   │   │   self = None          │              
                    ╰────────────────────────────────────────────╯              
                    BrokenProcessPool: A process in the process                 
                    pool was terminated abruptly while the future               
                    was running or pending.

I saw that this error apparently was resolved in #42 , but my pods are runnin faktory 1.0.0, so the fix should be availiable. This does not kill the pod or make it restart, but I would like to get to the bottom of this. Any tips?

Thanks in advance!

JoaoPedroAssis avatar Jun 29 '23 13:06 JoaoPedroAssis

#42 is just about recovering from a BrokenProcessPool. It does nothing to prevent the exception in the first place. The exception is raised when the Python process hosting the job exits unexpectedly.

The most likely cause your job is hitting a memory limit and the process is killed.

cdrx avatar Jun 29 '23 13:06 cdrx

#42 is just about recovering from a BrokenProcessPool. It does nothing to prevent the exception in the first place. The exception is raised when the Python process hosting the job exits unexpectedly.

The most likely cause your job is hitting a memory limit and the process is killed.

I see. The worker is set with a concurrency of 1, so that means that all the memory available for the k8s pod is also available to that single process (currently 600Mi). But if that limit is exceeded, the pod should be restarted instead of throwing this error. Is there some other memory configurations regarding the worker that I should pay attention? Increasing the number of processes can help with this? (I can always spawn more pods to compensate for the single process in the worker, but more processes per pod seems interesting)

Thanks for the reply! @cdrx

JoaoPedroAssis avatar Jul 03 '23 14:07 JoaoPedroAssis

that means that all the memory available for the k8s pod is also available to that single process (currently 600Mi).

There are two processes. The master process, which starts a second process - the child (worker) that actually runs your job. Linux will kill the child process if is using more memory than the k8s allows. K8s will only restart the pod if the master process dies.

If you have a concurrency of 1, you can try to use threads in the worker instead. This is done by passing use_threads=True at Worker creation.

It will run only a single process, so k8s will notice if it is killed for ignoring a memory limit.

cdrx avatar Jul 03 '23 14:07 cdrx

Thanks! I tried this today and the error has not appeared, but I'll have to wait until some real users try the service and see if the error appears again. In the meantime, do you kwnow some form of measuring each process memory consumption? it will help in the future so I dont need 1 pod per worker and can up the concurrency a bit

JoaoPedroAssis avatar Jul 03 '23 21:07 JoaoPedroAssis