Publish events to Spring Cloud Stream channels
We can integrate Axon EventBus with Spring Integration and Spring Cloud Stream with these configurations.
As a Message Producer:
@Bean
public IntegrationFlow flow(EventBus eventBus) {
return IntegrationFlows.from(new AxonInboundChannelAdapter(eventBus))
.channel(Source.OUTPUT)
.get();
}
As a Consumer:
@Bean
public IntegrationFlow flow(EventBus eventBus) {
return IntegrationFlows.from(Sink.INPUT)
.handle(new AxonOutboundChannelAdapter(eventBus))
.get();
}
@Autowired
public void configure(EventProcessingConfigurer config) {
config.usingSubscribingEventProcessors();
}
Sink.INPUT and Source.OUTPUT are MessageChannels from Spring Cloud Stream.
@mehdichitforoosh, are you up for adjusting your PR towards a different format? As it stands, I think there's value in having an option to connect to Spring Cloud Streams for events. However, I don't think this should be contained inside the module you've dropped it in right now.
In essence, this extension has been set up to tap into Spring Cloud Discovery for Command Message distribution. What you are providing is a solution to tap into Spring Cloud Stream for Event Message distribution.
That would thus opt for the current code to be moved to an axon-springcloud-discovery module, whereas your code would reside in an axon-springcloud-streams module.
If you want to take it to that end, please let us know.
In absence of response, which might be imaginable after such a long timeslot, we'll likely pick up your code and do this ourselves eventually.
Hi @smcvb. Sorry for my late reply. Yes,I want to complete this PR. What changes should I make?
That's amazing to hear @mehdichitforoosh! As first stabs to revamp this PR, I'd like you to do the following:
- Add a distinct module to this project, dedicated to this solution. Calling the package
springcloud-streamsand the artifactaxon-springcloud-streamswould clarify this is different from the existing Spring Cloud Discovery implementation. - Repoint this PR towards the most recent version of the extension. This might be some effort, true, but it would be most reasonable to think of releasing this with a recent version of the extension. Pointing your PR to
masterand merging it with the current changes inmastershould be sufficient to achieve this.
Ok dear Steven @smcvb Well, I will definitely do it and I will inform you in the next few days. Thanks.
dear @smcvb I added changes. What should I do for the next step?
Thanks for the effort so far @mehdichitforoosh, much appreciated. For now, I'd like to ask you to do the following things:
- Add descriptive class-level Javadoc to any of the classes you have added. When doing so, don't forget to adjust the
@author(with your preferred name, of course) and@since(pointing towards 4.5, for now) tags correctly. - Using "...Axon..." in any of the class names shouldn't be necessary I think. As long as the implementation describes the intent of that class in the translation from Spring-Streams to Axon's idea of Events, Event Processors and Message Sources, we should be good. The
AxonProcessorMessageHandlerto me for example sounds like aSpringStreamsMessageSource. - Any of the copyright notices should stretch towards 2021.
- A lot of the classes seem to be duplicated right now. Make sure only a single instance of each is present.
- I am missing unit test classes for the provided code. We definitely need these prior to approve this PR.
- For components like these, a short sample implementation through an integration test would be highly beneficial. This would ensure the process works as desired, but can also serve as a sample to others who want to use this extension.
Likely there are more fine-grained comments to go over too, but let's first take a stab at the above. Again, thanks for the effort so far @mehdichitforoosh! If your time doesn't allow to proceed with this, I think there might be an option that we'll (at AxonIQ) take it over. Just let us know whether you prefer that.
Thanks @smcvb for your attention. I will finish this PR soon :-)
Hi Steven @smcvb I think this PR is completed.
- I added Javadoc
- Rename and refactor all classes
- Add copyright texts
- Removed duplicate classes
- Unit test classes with Junit5, Mockito and Spring cloud stream test support Thanks for your patience. Tell me if there is a problem in latest commits. Good luck.
Hi @smcvb @idugalic @abuijze . Did you check the latest commits? Please Tell me if there is a problem with the code. Thank you.
Hi @mehdichitforoosh. Thank you for this PR. Much appreciated! Conceptually this looks very good!
It is important to note that this extension module is supporting SubscribableMessageSource only. The subscribable stream leaves all the ordering specifics in the hands of brokers (Kafka, for example), which means the events should be published on a consistent partition to ensure ordering. I am totally fine with this, and this is not a blocker or stopper in my opinion. To control events of a certain group to be placed in a dedicated partition, based on an aggregate identifier, for example, the message converter's SequencingPolicy can be utilized. StreamableMessageSource could be considered, but it is not something I would push now (let's let the brokers do the job).
Hi @idugalic Thank you for your comments.