There are very few example on how to implement saga pattern with Kafka so I tried to come up with my own implementation choreography saga pattern using Kafka. I need to implement outbox pattern as well I guess.
public interface SagaPattern<M extends Message<?>> {
void beforeStart();
void beforeEnd();
}
public interface ChoreographySaga<M extends Message<E>, E> extends SagaPattern<M> {
void on(E event);
void compensate(E event);
}
abstract class AbstractChoreographySaga<E> implements ChoreographySaga<Message<E>, E> {
@Override
public void on(E event) {
try {
beforeStart();
onTransaction(event);
beforeEnd();
}
catch (Exception error) {
compensate(event);
}
}
abstract void onTransaction(E event);
@Override
public void beforeStart() {
// do nothing yet
}
@Override
public void beforeEnd() {
// do nothing yet
}
}
public class OrderPlacementSaga extends AbstractChoreographySaga<OrderCreatedEvent> {
private final MessageBroker<Message<?>> messageBroker;
private final OrderRepository orderRepository;
public OrderPlacementSaga(MessageBroker<Message<?>> messageBroker, OrderRepository orderRepository) {
this.messageBroker = messageBroker;
this.orderRepository = orderRepository;
}
@Override
@MessageConsumer(topics = KafkaTopic.ORDER, groupId = KafkaGroup.ORDER_GROUP)
public void on(OrderCreatedEvent event) {
super.on(event);
}
@Override
void onTransaction(OrderCreatedEvent event) {
// todo outbox pattern
var order = Order.fromEvent(event);
orderRepository.save(order);
messageBroker.publish(
KafkaMessage.asGenericMessage(
KafkaTopic.ORDER,
PaymentCreatedEvent.fromOrder(order)));
}
@Override
public void compensate(OrderCreatedEvent event) {
orderRepository.deleteById(event.getOrderId());
}
}