[Feature Request] Suggest Providing activities extraction utilities to get them from a class and a module
Is your feature request related to a problem? Please describe.
Creating activities and decorating them is easy using activy.def in python Making sure that they are all included in worker launch is more difficult.
Describe the solution you'd like
It would be helpful if a utility was provided that allowed
- extracting activities from a class
- extracting activities from a class instance
- extracting activities from a module
Below is a sample implementation for extraction from class and class instances, assuming async method implementations that uses ast and inspection.
It looks like one could find which methods are decorate by checking if fn.__temporal_activity_definition but that is a private variable name and is not exposed in the temporalio activity.py module. Code that uses __temporal_activity_definition would be simpler and not invoking ast.parse(inspect.getsource(cls)) is preferrable.
import ast
import inspect
import typing
class _MyNodeVisitor(ast.NodeVisitor):
def __init__(self):
self.fn_name_to_decorators: dict[str, set[str]] = {}
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef):
self.fn_name_to_decorators[node.name] = set()
for decorator in node.decorator_list:
print(decorator)
if isinstance(decorator, ast.Call):
# noinspection PyUnresolvedReferences
name = (
decorator.func.attr
if isinstance(decorator.func, ast.Attribute)
else decorator.func.id
)
else:
# noinspection PyUnresolvedReferences
name = (
decorator.value.id + "." + decorator.attr
if isinstance(decorator, ast.Attribute)
else decorator.id
)
self.fn_name_to_decorators[node.name].add(name)
def get_fn_name_to_decorators(self) -> dict[str, set[str]]:
return self.fn_name_to_decorators
class ActivitiesListProvider:
@classmethod
def __get_activities(
cls,
instance: typing.Union[
type["ActivitiesListProvider"], "ActivitiesListProvider"
],
) -> list[typing.Callable]:
visitor = _MyNodeVisitor()
visitor.visit(ast.parse(inspect.getsource(cls)))
fn_name_to_decorators: dict[str, set[str]] = visitor.get_fn_name_to_decorators()
activities = []
for fn_name, decorators in fn_name_to_decorators.items():
if "activity.defn" in decorators:
method = getattr(instance, fn_name)
activities.append(method)
return activities
def get_activities_from_instance(self) -> list[typing.Callable]:
return self.__get_activities(self)
@classmethod
def get_activities_from_cls(cls) -> list[typing.Callable]:
return cls.__get_activities(cls)
And some Tests:
from workflow_metrics.temporal_tools import activities_class
from temporalio import activity
class SomeActivities(activities_class.ActivitiesListProvider):
@activity.defn
async def instance_method_activity(self):
pass
@activity.defn
async def class_method_activity(self):
pass
@staticmethod
@activity.defn
async def static_method_activity():
pass
def test_get_activities_from_cls():
assert SomeActivities.get_activities_from_cls() == [
SomeActivities.instance_method_activity,
SomeActivities.class_method_activity,
SomeActivities.static_method_activity,
]
class ActivitiesClassThatNeedsInstance(activities_class.ActivitiesListProvider):
@activity.defn
async def instance_method_activity(self):
pass
@activity.defn
async def class_method_activity(self):
pass
@staticmethod
@activity.defn
async def static_method_activity():
pass
def test_get_activities_from_instance():
inst = ActivitiesClassThatNeedsInstance()
assert inst.get_activities_from_instance() == [
inst.instance_method_activity,
inst.class_method_activity,
inst.static_method_activity,
]
Additional context
In code that I am working on activities are mainly defined in one module when they are fns and in class methods.
Sorry, just noticed this feature issue. Having discussion if viability on PR at https://github.com/temporalio/sdk-python/pull/759#discussion_r1939572982. Also circulating with team.
Hi @spacether, thanks for the input here. Before getting into implementation, can you expand on the problem that you want to see solved and what you see as the requirements for possible solutions? I.e. expand on this
Making sure that they are all included in worker launch is more difficult.
E.g. what is it that you don't like currently, and, without speculating about specific implementations, what sorts of behaviors / semantics regarding activities would you like users to be able to express when starting a worker? It might help if you sketch some Worker launch code featuring imaginary APIs / function calls that would give you the semantics that you want.
So right now it is entirely up to a develop to manually build an explicit list of activities in python. Our activities are already segregated by the kind fo work that they do into a python modules and activity classes. So then when we need to use them in a worker, one has to to list every activity decorated funcction in that module and in one or two classes when we already know that the activities I need come from these 3 sources: 1 module, and two classes. Adding these utilities lets me vend activities from those sources easily.
I would like:
worker = Worker(
activities=[*ActivitiesClass.get_acitivities(), *get_activities(activities_module)]
)
or
worker = Worker(
activities_classes=(ActivitiesClass,)
activities_modules=(activities_module),
)
or
worker = Worker(
activities=[ActivitiesClass, activities_module, etc...]
)
One could make activities accept a list of (callables or a module or a class that has activities methods in it), where all of the activities from the class or module would be loaded into the worker.
I recently came to the same conclusion as @spacether and made a similar function for collecting activities as part of a utility library for enforcing best practices at test time to avoid runtime failures.
My developers use a collection function in a worker.py file so that they don't forget to add an activity method to the worker every time they write a new one.
I also have a function that is intended to run in a test, you just import your project's temporal directory module and it gets every @activity.defn method in the module and submodules and so that we can validate everything automatically:
Repo is very much a work in progress still, not much documentation for the function-based validation as opposed to inheriting from a special validator class but the collection methods are here:
https://github.com/noxasaxon/temporal_utils_python/blob/main/src/temporal_utils/collectors.py
I've resorted more than once to wrapping temporal's activity and workflow decorators in my own to make activity and workflow collection easy.
Typically speaking, I have two needs:
- be able to import activities easily from a file or folder
- be able to put some filters in place -- e.g. allow certain test-oriented activities to be co-located with standard activities to avoid a fragmented codebase.
Manual import is messy because
- it is error-prone, especially in the case of things like feature flags
- it makes for poor separation of concerns, since the worker needs to be explicitly aware of every piece of functionality it serves.
Personally I'd be just as happy with a method that provides for easy collection, e.g.
from temporalio import worker
import my_activities
from other_activities import ActivityClass
w = worker.Worker(
...,
activities=worker.collect_activities(
"path/to/activity/folder",
my_activities,
ActivityClass
), ...
)
together with some simple filters. One that would be useful is a skip directive in the activity decorator, which causes collection to be skipped (but doesn't prevent it from being registered explicitly with a worker):
@activity.defn(skip=True)
async def my_activity():
...
This would facilitate e.g. test cases, dev/prod feature flags, and the like.
More flexible and comprehensive would be the ability to configure tags (this could easily be used in lieu of skip):
@activity.defn(tags=["dev", "e2e"])
async def prototyped_task():
...
together with filtering directives in the collect_activities function:
activities = worker.collect_activities(module, tags=["prod"])
activities = worker.collect_activities(module, exclude_tags=["test"])
Finally, some kind of name pattern filter might be nice (though once again this could generally be handled by tags):
activities = worker.collect_activities(module, regex=r".*_v[45]|database_.*")
My activities are usually bare functions so I don't have a real opinion on the best approach for activities that are owned by a class or class instance.
As a side note, one important issue to address is being able to deal effectively with mocks for tests. With my current wrappers I actually just collect the activities at definition time (optionally grouped by worker name), but also provide facilities to override them inside tests:
@flows.activity
async def real_activity():
...
@flows.activity(worker="other_worker")
async def other_activity():
...
async def real_activity_mock():
...
async def other_activity_mock():
...
@flows.testing.case
@flows.testing.provide(activities={"real_activity": other_activity_mock})
@pytest.mark.asyncio
async def test_with_mock_other_activity(flow_runner):
await flow_runner.execute_workflow(my_workflow, ...)
# actually mainly used to mock activities handled by remote workers,
# but can be used with other workers in the same process
@flows.testing.case
@flows.testing.provide("other_worker", activities={"other_activity": other_activity_mock})
@pytest.mark.asyncio
async def test_with_mock_other_activity(flow_runner):
await flow_runner.execute_workflow(multi_worker_workflow, ...)
With collection by a function as above configuring concise declarative tests might be more challenging. Including an override directive could be helpful for this, e.g.
worker.collect_activities(module, overrides={"my_activity": my_activity_mock})