Sagas et Machine à états au sein de votre architecture micro-services grâce à MassTransit
Définition
La saga est un processus distribué à long terme sur plusieurs services, orchestrée par un coordinateur.
Elle est initialisée par un événement, qui contient un identifiant de corrélation, qui se retrouve ensuite dans tous les messages qui composent cette transaction. C’est bien souvent un Guid, mais ça peut être autre chose.
Ce coordinateur garantit la cohérence des données, et est capable de commander des actions de compensations en cas d’erreurs partielles dans le système. Par définition, une Saga est donc "stateful".
Dans cet article nous prendrons pour exemple la commande d’un café : on donne nos choix, on paie, la machine à café sort la base du café, puis on ajoute les toppings.
Prérequis
- Une bonne connaissance de .NET et des injections de dépendances aidera à la compréhension.
- Il me parait également bon de connaître les principes de base des agents de messages (dit "message broker") et de leur fonctionnement.
- Être également à l’aise avec la programmation asynchrone.
- Avoir lu le précédent article sur MassTransit
Fonctionnement d’une Saga
Un événement en provenance d’un service ou d’une UI lance la Saga, qui à son tour commande une action à un autre service, qui émet à son tour un événement pour signaler la fin de sa tâche. La Saga capte cet événement et réagit en fonction, en lançant la commande suivante telle qu’elle a été programmée.
La Saga peut être vue comme un chef d’orchestre. Elle ne joue aucun instrument (elle ne fait aucune action sur le système lui-même), mais elle commande aux différents acteurs (musiciens) de faire ce pour quoi ils sont programmmés (jouer leur instrument).
Ce fonctionnement a été décrit dans un document de Princeton University, USA en 1986, quand l’informatique n’en était qu’à ses balbutiements.
Il a été ensuite décrit également par Arnon Rotem-Gal-Oz plus tard dans ce document en 2012.
Machine à états
La machine a états est une modélisation des états et des transitions inhérentes à une saga. Chaque instance est liée à un identifiant de corrélation, qui démarre à un état initial et qui transitionne vers l’état final au fur et à mesure des événements captés et des commandes envoyées.
Exemple du Coffeeshop
Pour la suite on va prendre l’exemple d’un coffeeshop. Vous entrez dans le coffeeshop, vous commandez votre café. Puis vous payez et une fois fait, la base du café est réalisée puis les toppings sont ajoutés.
Nous retranscrivons cela via une Saga :
Une UI recueille les commandes et les paiements, un service de machine à café pose la base et un service dédié s’occupe des toppings.
Pour la démo, le paiement peut être refusé, la Saga ordonne alors à l’UI de demander un nouveau paiement. Dans une vraie application, on peut considérer le paiement dans un seul contexte et laisser les Retry Policies gérer cela.
La Saga se termine quand le café est prêt.
Intégration dans MassTransit : Messages
Il est important que tous les messages contiennent un identifiant de corrélation.
Pour cela rien de plus simple, il suffit que toutes vos interfaces de messages aient également CorrelatedBy
. Cela ajoute alors la propriété CorrelationId à votre message.
public interface RequestPaymentCommand : CorrelatedBy<Guid>
{
float Amount { get; set; }
}
Consumer Saga
Saga sans vraiment être une machine à états, cette classe est en fait un aggrégat de consumers. Elle implémente ISaga
, qui lui impose un CorrelationId.
On déclare qu’un événement l’initialise, par l’interface InitiatedBy
. MassTransit enregistre alors automatiquement le CorrélationId dans l’instance de la Saga à l’arrivée du message, garantissant que tous les messages suivants avec le même identifiant de corrélation arrivent dans la même instance.
Tous les événements suivants sont orchestrés par la Saga, de fait, la Saga implémente logiquement Orchestrates
.
Implémentation
public class CoffeeMachineSaga : ISaga,
// Evènement initiateur de la saga
InitiatedBy<OrderSubmittedEvent>,
// Evènements gérés par la saga
Orchestrates<PaymentAcceptedEvent>,
Orchestrates<PaymentRefusedEvent>,
Orchestrates<BaseCoffeeFinishedEvent>,
Orchestrates<ToppingsAddedEvent>
{
public Guid CorrelationId { get; set; }
// Autres propriétés de ma saga nécessaires à la logique métier
// telles que le type de café, les toppings...
// On peut ici aussi rajouter une propriété pour stocker l'état de la saga
public string State {get; private set;} = "Not Started"
// (...)
// URIs des endpoints pour l'envoi des commandes
// (...)
public async Task Consume(ConsumeContext<OrderSubmittedEvent> context)
{
// On enregistre les données du message dans l'instance
// (...)
// On envoie une demande de paiment
var sendEndpoint = await context.GetSendEndpoint(requestPaymentEndpoint);
await sendEndpoint.Send<RequestPaymentCommand>(new { this.CorrelationId, this.Amount });
this.State = "AwaitingPayment";
}
// On réagit au paiment accepté en demandant la base du café
public async Task Consume(ConsumeContext<PaymentAcceptedEvent> context)
{
var sendEndpoint = await context.GetSendEndpoint(createBaseCoffeeEndpoint);
await sendEndpoint.Send<CreateBaseCoffeeCommand>(new { this.CorrelationId, CoffeeType = this.CoffeeTypeRequested, NoTopping = string.IsNullOrWhiteSpace(this.ToppingsRequested) });
this.State = "Paid";
}
// On réagit au paiement refusé en demandant un nouveau paiement
public async Task Consume(ConsumeContext<PaymentRefusedEvent> context)
{
var sendEndpoint = await context.GetSendEndpoint(requestPaymentEndpoint);
await sendEndpoint.Send<RequestPaymentCommand>(new { this.CorrelationId, this.Amount });
}
// Le café de base est fini, sans topping
public async Task Consume(ConsumeContext<BaseCoffeeFinishedEvent> context)
{
// Si on a demandé des toppings
if (!string.IsNullOrWhiteSpace(this.ToppingsRequested))
{
// On les demande
var sendEndpoint = await context.GetSendEndpoint(addToppingsEndpoint);
await sendEndpoint.Send<AddToppingsCommand>(new { this.CorrelationId, Toppings = this.ToppingsRequested.Split(",").Select(t => Enum.Parse<Topping>(t)) });
this.State = "BaseCoffeeOK";
}
else
{
// Sinon on est arrivé à la fin de la saga ici
this.State = "Ended";
// On peut s'imaginer que la saga enregistre quelque chose en base, envoie une commande pour un service externe...
// (...)
}
}
// Les toppings ont été ajoutés
public async Task Consume(ConsumeContext<ToppingsAddedEvent> context)
{
// La saga est terminée
this.State = "Ended";
// On peut s'imaginer que la saga enregistre quelque chose en base, envoie une commande pour un service externe...
// (...)
}
}
Configuration
services.AddMassTransit(cfgMassTransit =>
{
// Ajout du bus
cfgMassTransit.AddBus(...);
// Autres configurations
// (...)
// Enregistrement de la saga
cfgMassTransit.AddSaga<CoffeeMachineSaga>().InMemoryRepository();
});
On peut changer InMemoryRepository()
par une implémentation avec Dapper, EntityFramework…
Saga State Machine (machine à états)
Initilisation
Pour les machines à états, il nous faut d’abord un ensemble de propriétés qui vont évoluer tout au long de la vie de l’instance de la saga, dans une classe dédiée, que l’on suffixe souvent par State. Elle doit contenir le CorrelationId ainsi qu’une propriété qu’on définira comme l’état ("State"), qui peut être un enum, un int ou une string.
En Enum ou Int, il faut alors définir tous les states possibles dès le constructeur de la Saga, ce qui peut être contraignant. On va donc prendre ici un champ de type String.
public class CoffeeState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public string CustomerName { get; set; }
public string ToppingsRequested { get; set; }
public CoffeeType CoffeeTypeRequested { get; set; }
public float Amount { get; set; }
}
Puis on peut commencer à créer notre Saga et sa logique, en lui disant quelle classe de State on prend, ainsi que la propriété dans laquelle enregistrer le state. De base, une machine à états a deux états : Initial et Final.
Dans le cas de notre coffeeshop, on en a d’autres, on peut donc préparer aussi à l’avance les States intermédiaires.
public class CoffeeStateMachine : MassTransitStateMachine<CoffeeState>
{
public CoffeeStateMachine()
{
InstanceState(x => x.CurrentState);
public State AwaitingPayment { get; private set; }
public State Paid { get; private set; }
public State BaseCoffeeOK { get; private set; }
}
}
Configuration
services.AddMassTransit(cfgMassTransit =>
{
// Autres configurations
// (...)
// Configuration du bus
cfgMassTransit.AddBus(registrationContext => Bus.Factory.CreateUsingRabbitMq(cfgBus =>
{
// Config RabbitMQ
// (...)
// Configuration du endpoint de la Saga
var repository = registrationContext.Container.GetService<ISagaRepository<CoffeeState>>();
cfgBus.ReceiveEndpoint("state-machine", e => e.StateMachineSaga(registrationContext.Container.GetService<CoffeeStateMachine>(), repository));
}));
// Enregistrement de la Saga dans le DI
cfgMassTransit.AddSagaStateMachine<CoffeeStateMachine, CoffeeState>().InMemoryRepository();
});
Bindings des events
On a identifié les événements qui vont avoir lieu et vont faire vivre la Saga. Nous allons donc les déclarer et les relier à la saga via l’ID de Corrélation
public class CoffeeStateMachine : MassTransitStateMachine<CoffeeState>
{
public CoffeeStateMachine()
{
// (...)
// On indique à la Saga quelle est la propriété de corrélation dans les messages pour chaque événement par 'CorrelateById'
// Pour l'event 'OrderSubmittedEvent', qui est l'événement déclencheur, on indique par SelectId la propriété d'où provient la corrélation.
Event(() => OrderSubmittedEvent, x => x.CorrelateById(context => context.Message.CorrelationId).SelectId(context => context.Message.CorrelationId));
Event(() => PaymentAcceptedEvent, x => x.CorrelateById(context => context.Message.CorrelationId));
Event(() => PaymentRefusedEvent, x => x.CorrelateById(context => context.Message.CorrelationId));
Event(() => BaseCoffeeFinishedEvent, x => x.CorrelateById(context => context.Message.CorrelationId));
Event(() => ToppingsAddedEvent, x => x.CorrelateById(context => context.Message.CorrelationId));
// (...)
}
// (...)
public Event<OrderSubmittedEvent> OrderSubmittedEvent { get; private set; }
public Event<PaymentAcceptedEvent> PaymentAcceptedEvent { get; private set; }
public Event<PaymentRefusedEvent> PaymentRefusedEvent { get; private set; }
public Event<BaseCoffeeFinishedEvent> BaseCoffeeFinishedEvent { get; private set; }
public Event<ToppingsAddedEvent> ToppingsAddedEvent { get; private set; }
}
Ajout de l’orchestration
Maintenant qu’on a déclaré nos événements et comment notre instance de Saga pouvait s’y retrouver dans les messages, on doit à présent implémenter l’orchestration.
Cela se fait dans le constructeur, sous un format plutôt explicite :
// Quand on est à l'état initial, on attend qu'un type d'événement : OrderSubmittedEvent
Initially(When(OrderSubmittedEvent)
// Action qui est lancée à la réception de l'événement
.Then(x =>
{
x.Instance.CustomerName = x.Data.CustomerName;
x.Instance.CoffeeTypeRequested = x.Data.CoffeeType;
x.Instance.ToppingsRequested = string.Join(",", x.Data.Toppings);
x.Instance.Amount = CoffeePriceCalculator.Compute(x.Data.CoffeeType, x.Data.Toppings);
})
// On envoie une commande...
.SendAsync(requestPaymentEndpoint, context => context.Init<RequestPaymentCommand>(new { context.Instance.CorrelationId, context.Instance.Amount }))
// ... et on change d'état
.TransitionTo(AwaitingPayment));
// Pendant cet état-là, on attend deux types d'événements : PaymentAcceptedEvent et PaymentRefusedEvent
During(AwaitingPayment,
// En cas de paiement accepté
When(PaymentAcceptedEvent)
.SendAsync(createBaseCoffeeEndpoint, context => context.Init<CreateBaseCoffeeCommand>(new { context.Instance.CorrelationId, CoffeeType = context.Instance.CoffeeTypeRequested, NoTopping = string.IsNullOrWhiteSpace(context.Instance.ToppingsRequested) }))
.TransitionTo(Paid),
// En cas de paiement refusé
When(PaymentRefusedEvent)
.SendAsync(requestPaymentEndpoint, context => context.Init<RequestPaymentCommand>(new { context.Instance.CorrelationId, context.Instance.Amount })));
During(Paid, When(BaseCoffeeFinishedEvent)
// Deux possibilités : si toppings demandés, on a un état en plus où il faut aller, sinon on peut passer au dernier état.
.IfElse(context => !string.IsNullOrWhiteSpace(context.Instance.ToppingsRequested), x => x
.SendAsync(addToppingsEndpoint, context => context.Init<AddToppingsCommand>(new { context.Instance.CorrelationId, Toppings = context.Instance.ToppingsRequested.Split(",").Select(t => Enum.Parse<Topping>(t)) }))
.TransitionTo(BaseCoffeeOK),
// Sinon, on prévient que c'est fini et la saga se termine.
x => x.Finalize()));
During(BaseCoffeeOK, When(ToppingsAddedEvent)
// Fin de la saga : passage à l'état Final
.Finalize());
Je vous invite à comparer le code entre la ConsumerSaga et la SagaStateMachine. Vous retrouverez l’implémentation des consumers des commandes émises par la Saga dans mon Github de démonstration.
Liens utiles
Officiels
Documentation
GitHub, Webinars, Discord
Sagas
Princeton University, USA
Arnon Rotem-Gal-Oz
SOAT
Article : 1ere partie
Webinar du 7 mai 2020
Mon Github de démonstration
© SOAT
Toute reproduction interdite sans autorisation de la société SOAT