WorkflowClient#execute doesn't pair correctly with workflowStub.getResult()
Actual behavior
https://github.com/Spikhalskiy/java-sdk/commit/40c0d626e0ea4778a956e388d33754e67bde23a8#diff-7cfba47d2337ff3ee746b09a4d916e5e839f9b8bff45a26e588727667437c160R94
@Test
public void executeAndGetResultFromStub() throws InterruptedException, ExecutionException {
TestNoArgsWorkflowProc stubP =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestNoArgsWorkflowProc.class);
WorkflowStub workflowStub = WorkflowStub.fromTyped(stubP);
CompletableFuture<Void> executeCF = WorkflowClient.execute(stubP::proc);
// This test hangs (times out), but uncommenting of either if these two lines makes it
// pass, which doesn't make much sense
// sleep(1000);
// executeCF.get();
workflowStub.getResult(Void.class);
}
This unit test for a trivial workflow, that finishes immediately, hangs.
Uncommenting on either sleep or waiting for a completable future makes it pass.
Replacing execute with start (that returns WorkflowExecution) also makes this test pass.
Expected behavior
The test passes.
I am seeing a similar issue with WorkflowClient.execute() but not sure if it is exactly the same situation, as calling Thread.sleep() or waiting on the future does not fix it:
The use case looks like this:
class EventConsumer {
private WorkflowClient workflowClient;
public EventConsumer(WorkflowClient client) {
this.workflowClient = client;
}
public void accept(Stream<SomeEvent> eventStream) {
eventStream.forEach(event -> {
final var workflowOptions = WorkflowOptions.newBuilder()
.setTaskQueue("someQueue)
.setWorkflowId(event.getId())
.setWorkflowIdReusePolicy(WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY)
.build();
final var workflow = workflowClient.newWorkflowStub(MyWorkflow.class, workflowOptions);
WorkflowClient.execute(workflow::processEvent, event)
.whenComplete((v, throwable) -> log.info("completed"));
});
}
}
The test case uses junit5 extension:
class EventConsumerTest {
@RegisterExtension
public static final TestWorkflowExtension testWorkflowExtension = TestWorkflowExtension.newBuilder()
.setWorkflowClientOptions(WorkflowClientOptions.newBuilder()
.setNamespace("namespace")
.build())
.setWorkflowTypes(MyWorkflow.class)
.setDoNotStart(true)
.build();
@Test
void startWorkflow(TestWorkflowEnvironment testEnv, Worker worker) {
MyActivity activity = mock(MyActivity.class, withSettings().withoutAnnotations());
worker.registerActivitiesImplementations(activity);
testEnv.start();
EventConsumer consumer = new EventConsumer(testEnv.getWorkflowClient());
// assertJ assertion
assertThatCode(() -> consumer.accept(Stream.of(event))).doesNotThrowAnyException();
// mockito verification
then(activity).should(timeout(5000)).doSomething();
}
The test times out after 5s as specified on the mockito verification and the (anonymized) log trace looks like this:
16:11:05.761 [main] INFO io.temporal.serviceclient.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=1, target=directaddress:///3c1827fc-4a62-4d7d-a725-efe16cfef991}}
16:11:05.777 [main] INFO io.temporal.serviceclient.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=5, target=directaddress:///06e8951f-ef30-489d-80fb-ec06add9150e}}
16:11:05.881 [main] INFO io.temporal.internal.worker.Poller - start: Poller{name=Workflow Poller taskQueue="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", namespace="namespace", identity=37008@myhostname}
16:11:05.885 [main] INFO io.temporal.internal.worker.Poller - start: Poller{name=Local Activity Poller taskQueue="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", namespace="namespace", identity=37008@myhostname}
16:11:05.887 [main] INFO io.temporal.internal.worker.Poller - start: Poller{name=Activity Poller taskQueue="namespace", namespace="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", identity=37008@myhostname}
16:11:05.888 [main] INFO io.temporal.internal.worker.Poller - start: Poller{name=Host Local Workflow Poller, identity=90621874-bf17-42a0-b6b4-16b5f6433a3b}
16:11:11.266 [main] INFO io.temporal.worker.WorkerFactory - shutdownNow: WorkerFactory{identity=37008@myhostname, uniqueId=90621874-bf17-42a0-b6b4-16b5f6433a3b}
16:11:11.266 [main] INFO io.temporal.internal.worker.Poller - shutdown: Poller{name=Host Local Workflow Poller, identity=90621874-bf17-42a0-b6b4-16b5f6433a3b}
16:11:11.272 [main] INFO io.temporal.worker.WorkerFactory - awaitTermination begin: WorkerFactory{identity=37008@myhostname, uniqueId=90621874-bf17-42a0-b6b4-16b5f6433a3b}
16:11:11.272 [Host Local Workflow Poller: 3] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@2f410cb5
16:11:11.272 [Host Local Workflow Poller: 4] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@1da8bb99
16:11:11.272 [Host Local Workflow Poller: 2] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@368de08f
16:11:11.272 [Host Local Workflow Poller: 1] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@56e21162
16:11:11.272 [Host Local Workflow Poller: 5] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@18cc9d5b
16:11:11.526 [TemporalShutdownManager: 1] INFO io.temporal.internal.worker.Poller - shutdown: Poller{name=Workflow Poller taskQueue="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", namespace="namespace", identity=37008@myhostname}
16:11:11.527 [Workflow Poller taskQueue="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", namespace="namespace": 1] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@1d1ce3ca
16:11:11.527 [Workflow Poller taskQueue="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", namespace="namespace": 2] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@61c24b97
16:11:11.529 [TemporalShutdownManager: 1] INFO io.temporal.internal.worker.Poller - shutdown: Poller{name=Activity Poller taskQueue="namespace", namespace="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", identity=37008@myhostname}
16:11:11.529 [TemporalShutdownManager: 1] INFO io.temporal.internal.worker.Poller - shutdown: Poller{name=Local Activity Poller taskQueue="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", namespace="namespace", identity=37008@myhostname}
16:11:11.530 [Local Activity Poller taskQueue="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", namespace="namespace": 1] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@775ac2f4
16:11:11.530 [Activity Poller taskQueue="namespace", namespace="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]": 3] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@13f7bf0c
16:11:11.530 [Activity Poller taskQueue="namespace", namespace="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]": 5] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@17e59124
16:11:11.530 [Activity Poller taskQueue="namespace", namespace="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]": 2] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@1604dc95
16:11:11.530 [Activity Poller taskQueue="namespace", namespace="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]": 1] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@130e4636
16:11:11.530 [Activity Poller taskQueue="namespace", namespace="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]": 4] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@72885c3d
16:11:11.783 [main] INFO io.temporal.worker.WorkerFactory - awaitTermination done: WorkerFactory{identity=37008@myhostname, uniqueId=90621874-bf17-42a0-b6b4-16b5f6433a3b}
16:11:11.784 [main] INFO io.temporal.serviceclient.WorkflowServiceStubsImpl - shutdownNow
16:11:11.790 [ForkJoinPool.commonPool-worker-19] DEBUG io.temporal.internal.retryer.GrpcAsyncRetryer - Retrying after failure
io.grpc.StatusRuntimeException: UNAVAILABLE: Channel shutdownNow invoked