[FLINK-34027] Introduces AsyncScalarFunction as a new UDF type
What is the purpose of the change
This introduces AsyncScalarFunction a new UDF type which allows asynchronous responses. The functionality here is covered in FLIP-400: https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
Brief change log
(for example:)
- The TaskInfo is stored in the blob store on job creation time as a persistent artifact
- Deployments RPC transmits only the blob storage reference
- TaskManagers retrieve the TaskInfo from the blob cache
Verifying this change
Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
- Added integration tests for end-to-end deployment with large payloads (100MB)
- Extended integration test for recovery after master (JobManager) failure
- Added test that validates that TaskInfo is transferred only once across recoveries
- Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (yes / no) - The serializers: (yes / no / don't know)
- The runtime per-record code paths (performance sensitive): (yes / no / don't know) It changes it only for new uses of the AsyncScalarFunction, not existing code paths.
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
- The S3 file system connector: (yes / no / don't know)
Documentation
- Does this pull request introduce a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
CI report:
- b3e0ada74e5f9168907c30a88991b69952130b18 Azure: FAILURE
- 61da4086c217b69531be6599233108654ea61282 Azure: PENDING
Bot commands
The @flinkbot bot supports the following commands:-
@flinkbot run azurere-run the last Azure build
We should add appropriate content to the UDF docs as part of implementing this FLIP.
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/ is the page where that goes. I think a new section right after the section on Scalar Functions would be the right place for that. Most of the text needed can be copied out of the FLIP, along with the example.
@twalthr Thanks Timo for the in-depth review! I think I was able to respond to everything you mentioned, no real straggling issues. Please take another look.
I did notice that if you have some fairly complex generic hierarchy:
public abstract static class AsyncFuncGeneric<T> extends AsyncFuncBase {
private static final long serialVersionUID = 3L;
abstract T[] newT(int param);
public void eval(CompletableFuture<T[]> future, Integer param) {
executor.schedule(() -> future.complete(newT(param)), 10, TimeUnit.MILLISECONDS);
}
}
/** Test function. */
public static class LongAsyncFuncGeneric extends AsyncFuncGeneric<Long> {
@Override
Long[] newT(int param) {
Long[] result = new Long[1];
result[0] = 10L + param;
return result;
}
}
It will fail to resolve the type here. I actually have a stash where I have gone much farther is implementing resolve for the type system so that I can not only resolve a TypeVariable, but other types. I think if we really want to handle this sort of case, I can push that stash, but was unsure if it was over the top for this purpose.
I pushed my commit which adds more to the ability to resolve types in this manner. I haven't yet had a chance to see all of the cases ScalarFunction works or doesn't, but this is pretty powerful now. Take a look. @twalthr
The only issue currently is a green build. Is the error in the Python related to your changes?
I'm pretty sure it's not. I can't make any sense of it at all anyhow. Will rebase and push again and hopefully that will do it.
I have seen other PR jobs failing with identical python errors, so seems unrelated to this PR.