flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-34027] Introduces AsyncScalarFunction as a new UDF type

Open AlanConfluent opened this issue 2 years ago • 6 comments

What is the purpose of the change

This introduces AsyncScalarFunction a new UDF type which allows asynchronous responses. The functionality here is covered in FLIP-400: https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support

Brief change log

(for example:)

  • The TaskInfo is stored in the blob store on job creation time as a persistent artifact
  • Deployments RPC transmits only the blob storage reference
  • TaskManagers retrieve the TaskInfo from the blob cache

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know) It changes it only for new uses of the AsyncScalarFunction, not existing code paths.
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

AlanConfluent avatar Dec 21 '23 00:12 AlanConfluent

CI report:

  • b3e0ada74e5f9168907c30a88991b69952130b18 Azure: FAILURE
  • 61da4086c217b69531be6599233108654ea61282 Azure: PENDING
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Dec 21 '23 00:12 flinkbot

We should add appropriate content to the UDF docs as part of implementing this FLIP.

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/ is the page where that goes. I think a new section right after the section on Scalar Functions would be the right place for that. Most of the text needed can be copied out of the FLIP, along with the example.

alpinegizmo avatar Jan 05 '24 15:01 alpinegizmo

@twalthr Thanks Timo for the in-depth review! I think I was able to respond to everything you mentioned, no real straggling issues. Please take another look.

AlanConfluent avatar Jan 09 '24 00:01 AlanConfluent

I did notice that if you have some fairly complex generic hierarchy:

public abstract static class AsyncFuncGeneric<T> extends AsyncFuncBase {

        private static final long serialVersionUID = 3L;

        abstract T[] newT(int param);

        public void eval(CompletableFuture<T[]> future, Integer param) {
            executor.schedule(() -> future.complete(newT(param)), 10, TimeUnit.MILLISECONDS);
        }
    }

    /** Test function. */
    public static class LongAsyncFuncGeneric extends AsyncFuncGeneric<Long> {
        @Override
        Long[] newT(int param) {
            Long[] result = new Long[1];
            result[0] = 10L + param;
            return result;
        }
    }

It will fail to resolve the type here. I actually have a stash where I have gone much farther is implementing resolve for the type system so that I can not only resolve a TypeVariable, but other types. I think if we really want to handle this sort of case, I can push that stash, but was unsure if it was over the top for this purpose.

AlanConfluent avatar Jan 10 '24 00:01 AlanConfluent

I pushed my commit which adds more to the ability to resolve types in this manner. I haven't yet had a chance to see all of the cases ScalarFunction works or doesn't, but this is pretty powerful now. Take a look. @twalthr

AlanConfluent avatar Jan 10 '24 01:01 AlanConfluent

The only issue currently is a green build. Is the error in the Python related to your changes?

I'm pretty sure it's not. I can't make any sense of it at all anyhow. Will rebase and push again and hopefully that will do it.

AlanConfluent avatar Jan 13 '24 00:01 AlanConfluent

I have seen other PR jobs failing with identical python errors, so seems unrelated to this PR.

AlanConfluent avatar Jan 13 '24 21:01 AlanConfluent