Split on Celery tasks
Hi, I am wondering if you had any guides for using Split within Celery tasks. I noticed that it happens sometimes that if I try to evaluate a feature flag within a task running on a worker using a prefork pool, said flag is out of sync (its value is the value set on Split when the worker was spawned). The same doesn't happen with workers using a gevent pool. I suppose we might need to do something along the lines of what is recommended for using Split with uwsgi, but thought I'd ask to see if you had any suggestions.
Hi @edufschmidt, I apologize for not being able to get back to you earlier on this. We don't have any specific guidelines for working Celery tasks. Please let me know if this is still an issue for you so we may look into it with our team, or if you have found a workaround.
We're experiencing the same issue: the split client does not receive flag updates until a restart of the Celery Worker that pre-forks. I've tried implementing the pre-fork workaround as documented in the SDK guide, but this didn't seem to help. I'm going to take a spin on configuration parameters today, and will circle back if anything works.
Circling back: for us it indeed turned out to be a bug w/ our pre-forking setup. Here's some pseudo-code that should help narrow in the problem. When you set stack_info=True in the logger, it'll show you what's calling the client property. Depending on how your log lines are formatted, you should also be able to see the process name that logged the line. You want to make sure it shows something along the lines of ForkPoolWorker instead of MainProcess when accessing the client property.
# Split module
class SplitWrapper:
def __init__(self, split_api_key) -> None:
self.split_api_key = split_api_key
config = {
# https://help.split.io/hc/en-us/articles/360020359652-Python-SDK#preforked-client-setup
"preforkedInitialization": True,
}
self._factory = get_factory(self.split_api_key, config=config)
logger.info(
"SplitFactory status in init_app is: %s", self._factory._status, stack_info=True
)
self._client = None
@property
def client(self) -> SplitClient:
"""Lazily instantiates client after any preforking is done."""
if not self._client:
try:
logger.info(
"SplitFactory status in client property before resume is: %s",
self._factory._status,
stack_info=True,
)
self._factory.resume()
logger.info(
"SplitFactory status in client property after resume is: %s",
self._factory._status,
)
self._factory.block_until_ready(5)
logger.info(
"SplitFactory status in client property after block_until_ready is: %s",
self._factory._status,
)
self._client = self._factory.client()
except TimeoutException:
sys.exit()
return self._client
# Celery module
from celery.signals import worker_process_init
@worker_process_init.connect
def split_post_fork(*args, **kwargs):
"""Confirm split client."""
# This line should be the first thing that access the client property, causing its debug-logs to fire.
local_logger.info("Touching split client: %s.", split_wrapper.client, stack_info=True)
Hi @johnjameswhitman, I am curious, does the code you suggested works on celery?
Thanks Bilal
Yep, that celery signal is the crux of the fix; however, wiring it all together seemed to effect import order throughout our repo because we had to rearrange some things. If you end up having to do the same, then you might run into issues with things like decorators and other import side effects.
Great, thanks.