airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Implemented StreamedOperator

Open dabla opened this issue 1 year ago • 6 comments

As proposed on the devlist I've implemented a new oprator called the "StreamedOperator". Of course I don't know if this is a good name, but I'll explain what I wanted to achieve within Airflow.

At our company we have some DAG's that have to process a lot of paged results, which are returned as indexed XCom's. If those indexed XCom's aren't that many, it's easily possible to process those using the MappedOperator with the partial and expand functionality. But once you have like more than 1k indexed XComs, processing those becomes very hard (unless maybe you have a beefy Postgres database behind it due to the high number of dynamic task being created).

We are using the KubernetesExecutor, which means that each taskinstance runs on a dedicated worker, and of course you can use parallelism to process multiple tasks at once, but when there are too many dynamic tasks for one operator, this become's more a problem than a solution. Processing is actually slower and the UI has also trouble monitoring all those dynamic tasks (see screenshot which shows difference in execution time between expand and streamed). As you can see, the difference in performance is huge when you compare the expand vs stream solution.

image

So to bypass those issues, I've thought of an operator which instead of expanding all mapped arguments, "streams" them. The reason why I call it stream (and implemented it as a StreamedOperator) is because I inspired this solution from the Java 8 stream API, which allows you to process a list or an iterable in a lazy way, and if wanted, apply parallelism to it. Here of course it's not completely the same, but the idea behind it is. The advantage of this is that for Airflow, all mapped arguments which are translated to multiple task instances, are actually processed as one within one operator, the StreamedOperator that is. You could see this solution as some kind of a conccurent for loop within an operator for mapped parameters.

But as opposed to the MappedOperator, the StreamedOperator will execute the partial operator within the same operator and task instance using asynchronous code and a semaphore, the later one is being used to limit to number of threads being used simultaneously. This can be done by specifying the 'max_active_tis_per_dag' parameter, but if not specified it will use the number of cpu's available withing that worker. If you don't want parallelism, you can set it to 1 so that each task gets executed sequentially. Sometimes this can be handy if you don't want to "DDOS" a REST endpoint and so avoid being throttled. I a tasks fail, it will use the do same as with dynamic tasks mapping an retry it until it's number of retries are exceeded. Also the retry_delay will work the same way, of course only failed tasks will be retried. You will notice that most of the code is actually re-used code from Airflow, except the async part execution and the complete evaluation of all values in the ExpandInput instance is new code.

Also async operators, which use a trigger, will be executed that way, so all processing happens asynchronously within the same operator and thus task instance and thus worker. Of course this can be perceived a bit as hackish code to achieve this way of working within Airflow, but this allowed me to easily patch our own Airflow installation and allowed me to "easily" add this functionality. This functionality could also be implemented as an alternative strategy within the expand method using a parameter to decide which one to use, still I personally found a dedicated stream method more elegant.

Of course the code could still use some refactoring, but I tried to implement it as clean as possible. This is still a draft, so I still need to add some unit tests, which shouldn't be that big of a challenge. It would also be a solution of the question asked here without the need of custom code.

Here a simple example of a DAG using the stream functionality:

with DAG(
    "streamed_operator_performance_test",
    default_args=DEFAULT_ARGS,
    schedule_interval=timedelta(hours=24),
    max_active_runs=5,
    concurrency=5,
    catchup=False,
) as dag:
    distinct_users_ids_task = SQLExecuteQueryOperator(
        task_id="distinct_users_ids",
        conn_id="odbc_dev",
        sql="SELECT TOP 1000 ID FROM USERS",
        dag=dag,
    )
 
    user_registered_devices_task = MSGraphAsyncOperator.partial(
        task_id="user_registered_devices",
        conn_id="msgraph_api",
        url="users/{userId}/registeredDevices",
        retry_delay=60,
        dag=dag,
    ).stream(path_parameters=distinct_users_ids_task.output.map(lambda u: {"userId": u[0]}))

^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

dabla avatar Sep 29 '24 17:09 dabla

Airlfow is not a processing tool ( stream or batch )

but if you still want to run and control batching manually in airflow then ;

ater your first task add an intermediary task that create N batch and make your show_user_id task accept a batch of values to loop sequentially on

raphaelauv avatar Sep 29 '24 21:09 raphaelauv

Airlfow is not a processing tool ( stream or batch )

but if you still want to run and control batching manually in airflow then ;

ater your first task add an intermediary task that create N batch and make your show_user_id task accept a batch of values to loop sequentially on

What if the second operator is also a native (async) operator? Then your proposition won’t be possible

dabla avatar Sep 30 '24 06:09 dabla

I don't understand.

The second operator I'm talking about is just a PythonOperator splitting a list of "work" in N sublists

raphaelauv avatar Sep 30 '24 07:09 raphaelauv

I don't understand.

The second operator I'm talking about is just a PythonOperator splitting a list of "work" in N sublists

I've updated the example DAG is PR description using the MSGraphAsyncOperator instead of PythonOperator, this is how we use it now at our company to bypass issues when using expand. Now you'll understand that the MSGraphAsyncOperator won't accept a batch of values, nor will any operator except the PythonOpertor (or a decorated @task method) because that one involves custom python code, which you cannot do with any other operator, you have to use it as it is, hence why expand was invented and why I introduced iterate to achieve the same but in another more efficient way if there are to many inputs to iterate over.

dabla avatar Sep 30 '24 07:09 dabla

after your first task add an intermediary task that create N batch

with DAG(
        "a",
        default_args=DEFAULT_ARGS,
        schedule_interval=timedelta(hours=24),
):
    distinct_users_ids_task = SQLExecuteQueryOperator(
        task_id="distinct_users_ids",
        conn_id="odbc_dev",
        sql="SELECT TOP 1000 ID FROM USERS",
    )

    def split_fn(work_to_split,nb_chunks):
        pass

    split_work_task = PythonOperator(
        task_id="split_work",
        python_callable=split_fn,
        op_kwargs={"work_to_split": distinct_users_ids_task.output, "nb_chunks": 5})  # 5 could be a dynamic value

    user_registered_devices_task = MSGraphAsyncOperator.partial(
        task_id="user_registered_devices",
        conn_id="msgraph_api",
        url="users/{userId}/registeredDevices",
        retry_delay=60,
    ).expand(path_parameters=split_work_task.output)

you can do this if splitting logic is simple but don't reinvent the wheel ( use a specialize data processing tool/framework for more advanced cases )

raphaelauv avatar Sep 30 '24 09:09 raphaelauv

after your first task add an intermediary task that create N batch

with DAG(
        "a",
        default_args=DEFAULT_ARGS,
        schedule_interval=timedelta(hours=24),
):
    distinct_users_ids_task = SQLExecuteQueryOperator(
        task_id="distinct_users_ids",
        conn_id="odbc_dev",
        sql="SELECT TOP 1000 ID FROM USERS",
    )

    def split_fn(work_to_split,nb_chunks):
        pass

    split_work_task = PythonOperator(
        task_id="split_work",
        python_callable=split_fn,
        op_kwargs={"work_to_split": distinct_users_ids_task.output, "nb_chunks": 5})  # 5 could be a dynamic value

    user_registered_devices_task = MSGraphAsyncOperator.partial(
        task_id="user_registered_devices",
        conn_id="msgraph_api",
        url="users/{userId}/registeredDevices",
        retry_delay=60,
    ).expand(path_parameters=split_work_task.output)

you can do this if splitting logic is simple but don't reinvent the wheel ( use a specialize data processing tool/framework for more advanced cases )

Agreed on the specialized framework, but how do you intend to use the Airflow operator there? You won’t be able too. Also, why introduce custom python code to avoid the problem with expand if Airflow would allow you to do it natively? The example above is how we load our data into our bronze layer, then once done, we use a specialized tool to process those json files. But if we would want to re do this all in the specialized tool, that would involve a lot more custom python code, which means more maintenance and thus more possible bugs. Using it this way in Airflow, you have a standard non-custom solution purely using operators, no custom code needed, nor workaround needed by fiddling with chunks, which at some point, will again start to fail if there is too much to process.

You could also make the same argument regarding the HttpOperator or even the HttpHook. Why is it in Airflow? You can do the same with a PythonOperator using the requests or httpx library... I would then answer: ease of use and integration, no custom code required, Airflow handles it for you in a convenient way.

dabla avatar Sep 30 '24 09:09 dabla

I'll go back to the list with this feedback, but this implementation is tantamount to a parallel scheduler, executor and triggerer implementation so is very unlikely to be accepted in to core.

ashb avatar Dec 03 '24 10:12 ashb

Airlfow is not a processing tool ( stream or batch )

I very strongly disagree with this statement.

ashb avatar Dec 03 '24 10:12 ashb

I'll go back to the list with this feedback, but this implementation is tantamount to a parallel scheduler, executor and triggerer implementation so is very unlikely to be accepted in to core.

Looking forward to the discussion :).

I am not strong for / against this kind of operators, but I see a class of use cases - which might be very interesting in the near future. I.e. runnig a independent "airlfow" tasks on the same machine, using the fact that those independent tasks could store the data they are working on in-memory. Leveraging things like Apache Arrow to enable 0-data-copy optimizations and the fact that many existing tools and libraries already support it.

And It seems that it very nicely fits into the case where multiple tasks might be using different libraries to run complex - and sometimes parallel but on the same machine - worfklows using that data loadded in CPU and GPU. This has been mentioned multiple times in the past in various forms (but "task affinity" is one that seems like best fitting the need there) - and I think streamed Operator as defined now is not really implementing it in the way that is best, but I would not exclude we will get something there sooner or later.

IMHO, we are at the verge on users looking at very aggressive optimizations in this space - and while it all could be done by writing task flow tasks, there is a value in being able to see, observe have dependencies and parallelise those tasks via Airflow mechanisms. I am not sure if "streamed operator" is the right abstraction for it and maybe we could make decision this is not interesting for the community, I think we should definitely discuss it and see if we can do something there beyond Airflow 3.

Airlfow is not a processing tool ( stream or batch )

I very strongly disagree with this statement.

Me too.

potiuk avatar Dec 03 '24 10:12 potiuk

I'll go back to the list with this feedback, but this implementation is tantamount to a parallel scheduler, executor and triggerer implementation so is very unlikely to be accepted in to core.

Yes, that's because you just can't simply execute multiple deferrable operator within a loop by calling their execute method. Maybe we could think of some kind of interface/expansion mechanism to the PartialOperator that you could enhance with additional functionalities, like here the iterate method which returns an IterableOperator instead of a MappedOperator. Then we could define this functionality into a provider, then it doesn't have to be part of the core for example if this would be the ultimate issue. Anyway I've started drafting an AIP with @Joffreybvn where we describe what we want to solve here and also trying to summarize everything we've discussed so far here, in slack and devlists.

dabla avatar Dec 04 '24 18:12 dabla

This PR will be closed in favor of AIP-88

dabla avatar Jul 03 '25 15:07 dabla