Workflow execution with Workflow.await(condition) times out in unit tests with enabled time skipping
Expected Behavior
The unit test below should always pass
Actual Behavior
Sometimes the test fails with io.temporal.client.WorkflowNotFoundException. Changing Workflow.await(condition) to Workflow.await(Duration.ofSeconds(100), condition) in TestWorkflowImpl seems to fix the problem, but not sure why.
Attached are the TRACE logs for io.temporal for when the issue reproduces: bug.log
Steps to Reproduce the Problem
Run the following test:
import io.temporal.activity.ActivityOptions;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.testing.TestWorkflowEnvironment;
import io.temporal.testing.TestWorkflowExtension;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkflowImplementationOptions;
import io.temporal.workflow.*;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class WorkflowExecutionTimeoutTest {
@RegisterExtension
public static final TestWorkflowExtension TEST_WORKFLOW_EXTENSION =
TestWorkflowExtension.newBuilder()
.setDoNotStart(true)
.build();
private ProcessEventsWorkflow processWorkflowStub;
private TestWorkflow testWorkflowStub;
@BeforeEach
public void setUpTemporal(TestWorkflowEnvironment testEnv,
Worker worker,
WorkflowClient workflowClient,
WorkflowOptions workflowOptions) {
worker.registerWorkflowImplementationTypes(
WorkflowImplementationOptions.newBuilder()
.setDefaultActivityOptions(ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(10))
.build())
.build(),
ProcessEventsWorkflowImpl.class,
TestWorkflowImpl.class);
testEnv.start();
processWorkflowStub = workflowClient.newWorkflowStub(ProcessEventsWorkflow.class,
WorkflowOptions.newBuilder(workflowOptions)
.setWorkflowId("ProcessEventsWorkflow")
.build());
testWorkflowStub = workflowClient.newWorkflowStub(TestWorkflow.class,
WorkflowOptions.newBuilder(workflowOptions)
.setWorkflowId("TestWorkflow")
.build());
}
@Test
public void testBug() throws TimeoutException {
// create artificial load to reproduce the bug - seems to help, but still the bug does not always reproduce
IntStream.range(0, 20).forEach(index -> new Thread(this::busyWork).start());
WorkflowClient.start(testWorkflowStub::execute);
WorkflowStub.fromTyped(processWorkflowStub).signalWithStart("addEvent",
new Object[]{"testEvent"},
new Object[]{"TestWorkflow", Duration.ofSeconds(1)});
WorkflowStub.fromTyped(processWorkflowStub).getResult(10, TimeUnit.SECONDS, Object.class);
testWorkflowStub.stop(); // <----- fails here
WorkflowStub.fromTyped(testWorkflowStub).getResult(10, TimeUnit.SECONDS, Object.class);
assertEquals(Arrays.asList("testEvent"), testWorkflowStub.getEvents());
}
private void busyWork() {
int count = 100000000;
int sleepIndex = (int) (Math.random() * count);
while(count-- > 0) {
if (sleepIndex == count) { // yield at random intervals
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Math.sqrt(count);
}
}
@WorkflowInterface
public interface ProcessEventsWorkflow {
@WorkflowMethod
void execute(String targetWorkflowId, Duration keepAliveTimeout);
@SignalMethod
void addEvent(String event);
}
public static class ProcessEventsWorkflowImpl implements ProcessEventsWorkflow {
private final Queue<String> events = new LinkedList<>();
@Override
public void execute(String targetWorkflowId, Duration keepAliveTimeout) {
while (true) {
while (!events.isEmpty()) {
String event = events.poll();
Workflow.newExternalWorkflowStub(TestWorkflow.class, targetWorkflowId).onEvent(event);
}
Workflow.await(keepAliveTimeout, () -> !events.isEmpty());
if (events.isEmpty()) {
return;
}
}
}
@Override
public void addEvent(String event) {
events.add(event);
}
}
@WorkflowInterface
public interface TestWorkflow {
@WorkflowMethod
void execute();
@SignalMethod
void stop();
@SignalMethod
void onEvent(String event);
@QueryMethod
List<String> getEvents();
}
public static class TestWorkflowImpl implements TestWorkflow {
private boolean stop = false;
private final List<String> events = new ArrayList<>();
@Override
public void execute() {
Workflow.await(() -> stop);
}
@Override
public void stop() {
stop = true;
}
@Override
public void onEvent(String event) {
events.add(event);
}
@Override
public List<String> getEvents() {
return events;
}
}
}
Specifications
- Version: Temporal Java SDK 1.11.0, 1.12.0, 1.13.0 (reproduces on all of these), Temporal Server 1.16.2
- Platform: Java
Confirm the issue. It's related to the peculiarities of time skipping, how it's implemented, and an absence of total order between some operations in Temporal. When you create a load, the client code doesn't stop time skipping soon enough and the time gets advanced A LOT to the workflow task timeout which completed the workflow and doesn't allow the signal to pass through. And there is no way for the server to make sure that the code that is supposed to be executed by the client after a long poll is returned is actually finished executing.
The fix here is not trivial. It's also related only to the test framework and not the core functionality. I will think about the best fix here, but I have to temporarily deprioritize it behind some other tasks affecting an actual production functionality.
Thank you! Could you confirm if the suggested workaround should work? If I change Workflow.await(condition) to Workflow.await(timeout, condition), the issue seems to stop reproducing at least locally. Is this a reliable workaround?
No, it's not. The same "bug" or... limitation will trigger your Workflow.await(timeout, condition) earlier than you expect it, like its triggering workflow timeout now. I think the only gentle workaround here is not to set Workflow Timeout of any kind or don't use time skipping for such a test.