What is the reactive way to handle result of the SendEvent?
I created simple test machine with trigger-less transitions, my tests using StateMachineTestPlan shouldGetToTheFinishAndWinStepTest and shouldGetToTheFinishAndLooseStepTest as well as nonreactive tests shouldGetToTheFinishAndWinNonReactiveTest and shouldGetToTheFinishAndLooseNonReactiveTest using sendEvent(...).subscribe() work as expected however getting flux from SendEvent and "flatmapping" them in tests 'shouldGetToTheFinishAndWinReactiveTest' and 'shouldGetToTheFinishAndLooseReactiveTest' do not work. Is it a bug or a feature? What am I missing? Thanks in advance!
package ssm;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Test;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.StateMachineEventResult;
import org.springframework.statemachine.config.StateMachineBuilder;
import org.springframework.statemachine.test.StateMachineTestPlan;
import org.springframework.statemachine.test.StateMachineTestPlanBuilder;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.EnumSet;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@Log4j2
public class StateMachineTest {
enum States {START, POINT1, POINT2, FINISH, PODIUM}
enum Events {GO}
StateMachine<States, Events> createStateMachine(Duration runnerSpeed, boolean shouldWin) throws Exception {
StateMachineBuilder.Builder<States, Events> builder = StateMachineBuilder.builder();
builder.configureStates()
.withStates()
.initial(States.START)
.states(EnumSet.allOf(States.class));
builder.configureTransitions()
.withExternal().source(States.START).target(States.POINT1).event(Events.GO)
.actionFunction(ctx -> Mono.delay(runnerSpeed).doOnNext(aLong -> log.info("Point1 - check")).then())
.and()
.withExternal().source(States.POINT1).target(States.POINT2)
.actionFunction(ctx -> Mono.delay(runnerSpeed).doOnNext(aLong -> log.info("Point2 - check")).then())
.and()
.withExternal().source(States.POINT2).target(States.FINISH)
.actionFunction(ctx -> Mono.delay(runnerSpeed).doOnNext(aLong -> log.info("Finish!")).then())
.and()
.withExternal().source(States.FINISH).target(States.PODIUM)
.guard(stateContext -> {
log.info(shouldWin ? "Winner!" : "Too Slow!");
return shouldWin;
})
// Where is the .guardFunction(Guard<States, Events>) or .guard(ReactveGuard<States, Events>) ??
// .guard(new ReactiveGuard<States, Events>() {
// @Override
// public Mono<Boolean> apply(StateContext<States, Events> ctx) {
// log.info(shouldWin ? "Winner!" : "Too Slow!");
// return Mono.just(shouldWin);
// }
// })
;
builder.configureConfiguration()
.withConfiguration()
.autoStartup(false)
.machineId(this.getClass().getSimpleName());
return builder.build();
}
@Test
public void shouldGetToTheFinishAndWinStepTest() throws Exception {
var ssm = createStateMachine(Duration.ofMillis(100), true);
StateMachineTestPlan<States, Events> plan =
StateMachineTestPlanBuilder.<States, Events>builder()
.defaultAwaitTime(1)
.stateMachine(ssm, this.getClass().getSimpleName())
.step()
.expectStateMachineStarted(1)
.expectState(States.START)
.and()
.step()
.sendEvent(Events.GO)
.expectStateChanged(4)
.expectStates(States.PODIUM)
.and()
.build();
plan.test();
}
@Test
public void shouldGetToTheFinishAndLooseStepTest() throws Exception {
var ssm = createStateMachine(Duration.ofMillis(100), false);
StateMachineTestPlan<States, Events> plan =
StateMachineTestPlanBuilder.<States, Events>builder()
.defaultAwaitTime(1)
.stateMachine(ssm, this.getClass().getSimpleName())
.step()
.expectStateMachineStarted(1)
.expectState(States.START)
.and()
.step()
.sendEvent(Events.GO)
.expectStateChanged(3)
.expectStates(States.FINISH)
.and()
.build();
plan.test();
}
@Test
public void shouldGetToTheFinishAndWinReactiveTest() throws Exception {
AtomicReference<StateMachine<States, Events>> machine = new AtomicReference<>();
Mono.just(createStateMachine(Duration.ofMillis(100), false))
.flatMap(ssm -> ssm.startReactively().thenReturn(ssm))
.doOnNext(machine::set)
.flatMapMany(ssm -> ssm.sendEvent(Mono.just(MessageBuilder.withPayload(Events.GO).build())))
.flatMap(StateMachineEventResult::complete)
.blockLast();
assertThat(machine.get().getState().getId(), is(States.PODIUM));
}
@Test
public void shouldGetToTheFinishAndLooseReactiveTest() throws Exception {
AtomicReference<StateMachine<States, Events>> machine = new AtomicReference<>();
Mono.just(createStateMachine(Duration.ofMillis(100), true))
.flatMap(ssm -> ssm.startReactively().thenReturn(ssm))
.doOnNext(machine::set)
.flatMapMany(ssm -> ssm.sendEvent(Mono.just(MessageBuilder.withPayload(Events.GO).build())))
.flatMap(StateMachineEventResult::complete)
.blockLast();
assertThat(machine.get().getState().getId(), is(States.FINISH));
}
@Test
public void shouldGetToTheFinishAndWinNonReactiveTest() throws Exception {
var ssm = createStateMachine(Duration.ofMillis(100), false);
ssm.startReactively().block();
ssm.sendEvent(Mono.just(MessageBuilder.withPayload(Events.GO).build()))
.subscribe(r -> log.info("event {}, {}", r.getResultType(), ssm.getState()));
Thread.sleep(1000);
assertThat(ssm.getState().getId(), is(States.FINISH));
}
@Test
public void shouldGetToTheFinishAndLooseNonReactiveTest() throws Exception {
var ssm = createStateMachine(Duration.ofMillis(100), true);
ssm.startReactively().block();
ssm.sendEvent(Mono.just(MessageBuilder.withPayload(Events.GO).build()))
.publishOn(Schedulers.boundedElastic())
.subscribe(r -> log.info("event {}, {}", r.getResultType(), ssm.getState()));
Thread.sleep(1000);
assertThat(ssm.getState().getId(), is(States.PODIUM));
}
}
I struggled with this a bit but found a workaround. Simply cast StateMachineTransitionBuilder on StateMachineTransitionConfigurer (in your case, builder.configureTransitions()) to access additional configuration methods. Then use StateMachineTransitionBuilder.addTransition method to include your ReactiveGuard.