RxJS : les bases fondamentales
Sommaire
- Introduction
- Les bases fondamentales de RxJS
- Exploration des opérateurs RxJS
- Maîtriser la gestion des erreurs avec RxJS
- Gestion efficace du désabonnement
- Comprendre Unicast et Multicast en RxJS
- Les retours de notre expert
Introduction
La programmation réactive est un paradigme puissant et de plus en plus utilisé pour gérer les flux de données et les évènements asynchrones dans les applications modernes. Contrairement à la programmation impérative traditionnelle, où les étapes de traitement sont définies de manière séquentielle, la programmation réactive permet de réagir aux changements de données ou aux évènements de manière déclarative. Ce paradigme est particulièrement adapté aux interfaces utilisateur réactives, aux applications en temps réel, et à tout système nécessitant une gestion élégante et efficace des évènements.
RxJS (Reactive Extensions for JavaScript) est une bibliothèque populaire qui implémente ce paradigme en JavaScript. Elle introduit la notion d’Observables, des objets représentant des flux de données pouvant être consommés par un ou plusieurs Observateurs. Grâce à RxJS, il devient possible de composer des flux de données et de manipuler les évènements asynchrones avec une grande facilité. Les opérateurs RxJS offrent une multitude de possibilités pour filtrer, transformer, et combiner ces flux, rendant le code plus lisible et maintenable.
Pour mieux comprendre comment RxJS simplifie la gestion des flux de données, il est essentiel de se familiariser avec ses concepts fondamentaux et ses principaux opérateurs. Dans la suite de cet article, nous explorerons les bases de RxJS, en commençant par les différences entre la programmation impérative et réactive à travers des exemples concrets. Ensuite, nous examinerons les diverses catégories d’opérateurs disponibles et leurs utilisations spécifiques. Enfin, nous aborderons des sujets avancés tels que la gestion des erreurs, le désabonnement et les patterns de diffusion de données comme le Multicast.
Les bases fondamentales de RxJS
Programmation impérative vs réactive
Pour illustrer la transition de la programmation impérative à la programmation réactive, prenons un exemple où nous souhaitons dessiner sur une page HTML en suivant les mouvements de la souris. En programmation impérative, cela nécessite plusieurs écouteurs d’évènements imbriqués :
// Programmation impérative
document.addEventListener('mousedown', () => {
function mouseMoveEventListener(e: MouseEvent) {
console.log(`draw point at ${e.x}, ${e.y}`);
}
document.addEventListener('mousemove', mouseMoveEventListener);
document.addEventListener(
'mouseup',
() => document.removeEventListener('mousemove', mouseMoveEventListener),
{ once: true }
);
});
// Programmation réactive
fromEvent(document, 'mousedown').pipe(
switchMap(() => fromEvent<MouseEvent>(document, 'mousemove').pipe(
takeUntil(fromEvent(document, 'mouseup'))
))
).subscribe((e: MouseEvent) => console.log(`draw point at ${e.x}, ${e.y}`));
En programmation réactive, chaque élément issu d’une source de données sera propagé et transformé à travers une suite d’opérations. Au final, on obtient une approche plus déclarative. Le passage d’un flux à un autre est également simplifié, ici j’ai la possibilité d’utiliser switchMap ou mergeMap que nous verrons en détail par la suite. Sur des exemples plus complexes, la programmation impérative nous amènerait à multiplier les imbrications de callback rendant le code difficilement compréhensible.
Bien souvent, les tâches asynchrones sont réalisées à l’aide de Promesses. Cependant, les Observables offrent une alternative avec plusieurs différences majeures.
- Un Observable n’est pas actif tant que sa méthode subscribe n’a pas été appelée. Une Promesse est active dès sa création.
new Observable(subscriber => {
console.log("from observable") ;
subscriber.next(1);
subscriber.complete();
});
console.log("end");
// end
new Promise((resolve, reject) => {
console.log("from promise");
resolve(1);
});
console.log("end");
// end
// from promise
- Un Observable peut produire 0 à n valeurs alors qu’une Promesse génère une seule valeur.
- Un Observable peut être synchrone contrairement à une Promesse qui est toujours asynchrone.
new Observable(subscriber => {
subscriber.next(1);
subscriber.complete();
}).subscribe(console.log);
console.log("end");
// 1
//end
Promise.resolve(1)
.then(console.log);
console.log("end")
// end
// 1
L’atout principal d’RxJS vient de ses nombreux opérateurs. L’ordre de déclaration est un point important, puisqu’il a une incidence sur le résultat final.
of(3, 4).pipe(
filter(i => i % 2),
map(i => i * 2)
).subscribe(console.log);
// 8
of(3, 4).pipe(
map(i => i * 2),
filter(i => i % 2),
).subscribe(console.log);
// 6
// 8
En continuant cette exploration, nous plongerons plus en profondeur dans les nombreux opérateurs disponibles, leur utilisation et leurs avantages. Nous découvrirons comment RxJS peut transformer la manière dont vous gérez les évènements asynchrones et les flux de données dans vos applications, rendant votre code plus réactif, efficace et facile à comprendre.
Exploration des opérateurs RxJS
Avec plus d’une centaine d’opérateurs, il est parfois difficile de choisir le bon. Certains sont très simples à utiliser (filter, map, count), d’autres sont plus complexes et nécessitent quelques exemples pour bien les comprendre.
- concatMap / mergeMap / switchMap / exhaustMap
Ces opérateurs créent un nouveau flux de données pour chaque élément traité. L’orchestration de ces flux varie selon l’opérateur utilisé.
concatMap attend la terminaison du flux précédent avant d’en créer un nouveau contrairement à mergeMap qui le crée immédiatement.
switchMap arrête le flux précédent à l’arrivée d’un nouvel élément.
exhaustMap ignore les nouveaux éléments si le flux précédent n’est pas encore terminé.
switchMap est souvent utilisé pour une recherche utilisateur afin d’interrompre la précédente lors d’une nouvelle saisie.
- zip / forkJoin / combineLatest
Ces opérateurs regroupent les éléments de plusieurs sources de données.
zip fait correspondre les éléments ayant le même indice. Dans le cas où les sources ne produisent pas la même quantité de données, les éléments en trop ne seront pas traités.
forkJoin prend uniquement en compte la dernière valeur produite. Il est souvent utilisé pour regrouper les résultats de plusieurs requêtes http.
combineLatest associe chaque valeur produite avec le dernier élément des autres flux.
- distinct / distinctUntilChanged
distinctUntilChanged empêche d’émettre consécutivement des valeurs identiques.
- filter / find / first / single
Plusieurs opérateurs sont disponibles pour filtrer les éléments d’un flux.
Si aucun élément ne correspond au filtre, filter renverra un flux vide, find produira la valeur undefined, first et single déclencheront une erreur.
Dans le cas où plusieurs éléments sont éligibles au filtre, filter et find retourneront l’ensemble de ces éléments, first renverra uniquement le premier tandis que single provoquera une erreur.
- debounceTime / throttleTime
debounceTime retarde l’émission d’un élément et l’annule si un autre arrive entre-temps. Cela permet par exemple de limiter la création de requêtes http lorsque l’utilisateur saisit du texte dans un champ de recherche.
throttleTime envoie la donnée immédiatement puis ignore les prochains éléments pendant un certain temps.
Maîtriser la gestion des erreurs avec RxJS
Un Observable peut se terminer de plusieurs façons : avec succès, en erreur ou à la suite d’un désabonnement initié par le consommateur. Dans tous les cas, l’Observable entre dans un état terminal et ne peut plus émettre de données.
La méthode throwError sert à créer un Observable se terminant immédiatement par une erreur.
new Observable(subscriber => {
subscriber.error(
new Error('custom error')
);
}).subscribe({
error: console.error,
});
// Error: custom error
throwError(() => new Error('custom error'))
.subscribe({
error: console.error,
});
// Error: custom error
catchError remplace le try catch que l’on utilise habituellement pour intercepter une erreur en programmation impérative. Il permet également de générer de nouvelles données en retournant un autre Observable.
throwError(() => new Error('custom error'))
.pipe(
catchError((e) => {
console.error(e);
return of(1, 2);
})
)
.subscribe(console.log);
// Error: custom error
// 1
// 2
throwError(() => new Error('custom error'))
.pipe(
catchError((e) => {
console.error(e);
return EMPTY;
})
)
.subscribe(console.log);
// Error: custom error
La place de catchError doit être judicieusement choisie pour obtenir le résultat souhaité.
function not2(n: number): Observable<number> {
if (n === 2) {
return throwError(() => new Error(`${n} is not valid`));
}
return of(n);
}
interval(1000)
.pipe(
switchMap((n) => not2(n)),
catchError((e) => {
console.error(e);
return EMPTY;
})
)
.subscribe({
next: console.log,
complete() {
console.log('complete');
},
});
// 0
// 1
// Error : 2 is not valid
// complete interval(1000)
.pipe(
switchMap((n) => not2(n).pipe(
catchError((e) => {
console.error(e);
return EMPTY;
})
)),
)
.subscribe({
next: console.log,
complete() {
console.log('complete');
},
});
// 0
// 1
// Error : 2 is not valid
// 3
// 4
// ....
Le déclenchement d’une erreur dans le premier exemple entraîne la terminaison de l’intervalle. Pour éviter ce comportement, la solution consiste à intercepter l’erreur au sein de switchMap, afin d’empêcher sa propagation jusqu’au flux initial.
Gestion efficace du désabonnement
Lorsqu’on manipule un flux infini de données, il est nécessaire de penser au désabonnement sous peine d’engendrer des problèmes de mémoire.
Il suffit d’appeler la méthode unsubscribe pour demander la terminaison d’un Observable, ce dernier exécutera une fonction de nettoyage si elle est définie.
const obs$ = new Observable(subscriber => {
let i = 0;
const intervalId = setInterval(() => {
subscriber.next(i++);
}, 1000);
return function cleanup() {
console.log('cleanup');
clearInterval(intervalId);
};
});
const subscription = obs$.subscribe(console.log);
function onDestroy() {
console.log('unsubscribe');
subscription.unsubscribe();
}
// 1
// 2
// ...
// unsubscribe
// cleanup
Une fois désabonné, les appels aux méthodes next
, error
, ou complete
n’auront plus aucun effet. De plus, la fonction de nettoyage est toujours exécutée, même si le flux se termine normalement.
const obs$ = new Observable(subscriber => {
setTimeout(() => {
// ne fait rien après unsubscribe
subscriber.next(1);
subscriber.complete();
}, 1000);
});
const sub = obs$.subscribe({
next: console.log,
complete() {
console.log('complete');
},
});
console.log("unsubscribe");
sub.unsubscribe();
// unsubscribe
new Observable((subscriber) => {
subscriber.next(1);
subscriber.complete();
return () => console.log("cleanup");
}).subscribe({
next: console.log,
complete() {
console.log('complete');
},
});
// 1
// complete
// cleanup
Pour gérer efficacement le désabonnement, plusieurs solutions ont émergé dans le monde Angular. Une approche courante consiste à regrouper toutes les souscriptions, permettant ainsi d’appeler la méthode unsubscribe
une seule fois.
@Component({...})
export class AppComponent implements OnInit, OnDestroy {
a$ = interval(500);
b$ = interval(700);
sub = new Subscription();
ngOnInit() {
this.sub.add(this.a$.subscribe(console.log));
this.sub.add(this.b$.subscribe(console.log));
}
ngOnDestroy() {
this.sub.unsubscribe();
}
}
Une autre solution consiste à utiliser la méthode takeUntilDestroyed
fournie par Angular, qui permet d’arrêter le flux automatiquement lorsque le composant est détruit.
@Component({...})
export class AComponent {
interval(1000)
.pipe(takeUntilDestroyed())
.subscribe(console.log);
}
Comprendre Unicast et Multicast en RxJS
Les Observables vus jusqu’à présent sont classés dans la catégorie Unicast. Chaque souscription entraîne la création d’un producteur de données, on a une relation 1 producteur <–> 1 observateur.
const obs$ = new Observable((subscriber) => {
// Unicast : les données sont créées dans la fonction de souscription
const data = [1, 2];
data.forEach((n) => subscriber.next(n));
});
obs$.subscribe((n) => console.log(`Sub1 ${n}`));
// Sub1 1
// Sub1 2
obs$.subscribe((n) => console.log(`Sub2 ${n}`));
// Sub2 1
// Sub2 2
Le Multicast correspond à une relation 1 producteur <–> N observateurs. Le producteur de données est créé en dehors de la fonction de souscription et sera partagé par un ensemble d’Observateurs.
Le Multicast en RxJS fait apparaître la notion de Subject. Similaire à un Observable, il a la possibilité d’envoyer des données vers plusieurs Observateurs.
const subject = new Subject<number>();
const sub1 = subject.subscribe({
next: (n) => console.log(`Sub1 ${n}`),
complete: () => console.log('Sub1 completed'),
});
const sub2 = subject.subscribe({
next: (n) => console.log(`Sub2 ${n}`),
complete: () => console.log('Sub2 completed'),
});
subject.next(1);
// Sub1 1
// Sub2 1
sub1.unsubscribe();
subject.next(2);
// Sub2 2
subject.complete();
// Sub2 completed
subject.next(3);
Un Subject peut lui-même observer un flux. Il servira alors d’intermédiaire entre le flux initial et les Observateurs qui lui sont associés. Grâce au Subject, on a la possibilité de transformer un Observable Unicast en Multicast.
const obs$ = interval(1000).pipe(
tap({
subscribe: () => console.log('subscribe'),
unsubscribe: () => console.log('unsubscribe'),
})
);
const subject = new Subject<number>();
obs$.subscribe(subject);
const sub1 = subject.subscribe((n) => console.log(`Sub1 ${n}`));
const sub2 = subject.subscribe((n) => console.log(`Sub2 ${n}`));
// subscribe
// Sub1 0
// Sub2 0
// Sub1 1
// Sub2 1
setTimeout(() => {
sub1.unsubscribe();
sub2.unsubscribe();
}, 2000);
setTimeout(() => {
const sub3 = subject.subscribe((n) => console.log(`Sub3 ${n}`));
}, 6000);
// Sub3 6
// Sub3 7
// ...
Les lecteurs attentifs auront remarqué que l’absence d’Observateurs à partir 2000 ms n’entraîne pas de désabonnement au flux initial. L’intervalle continue à produire des données qui seront consommées de façon transparente jusqu’à l’arrivée d’un nouvel Observateur.
Pour faciliter le Multicast, RxJS fournit l’opérateur share, qui par défaut, se désabonne du flux principal lorsqu’il n’y a plus d’Observateurs.
const obs$ = interval(1000).pipe(
tap({
subscribe: () => console.log('subscribe'),
unsubscribe: () => console.log('unsubscribe'),
}),
share()
);
const sub1 = obs$.subscribe((n) => console.log(`Sub1 ${n}`));
const sub2 = obs$.subscribe((n) => console.log(`Sub2 ${n}`));
// subscribe
// Sub1 0
// Sub2 0
// Sub1 1
// Sub2 1
setTimeout(() => {
sub1.unsubscribe();
sub2.unsubscribe();
}, 2000);
// unsubscribe
setTimeout(() => {
obs$.subscribe((n) => console.log(`Sub3 ${n}`));
}, 6000);
// subscribe
// Sub3 0
// Sub3 1
// ...
Pour personnaliser ce comportement, il est possible de spécifier une configuration particulière, par exemple avec share({resetOnRefCountZero: false})
.
En outre, deux variantes de Subject sont disponibles pour la consommation d’anciennes valeurs lors de la souscription : BehaviorSubject, qui conserve un seul élément en mémoire avec une valeur d’initialisation, et ReplaySubject, qui permet de mémoriser plusieurs valeurs.
BehaviorSubject
const subject = new BehaviorSubject<number>(0);
subject.subscribe((n) =>
console.log(`Sub1 ${n}`));
// Sub1 0
subject.next(1);
// Sub1 1
subject.subscribe((n) =>
console.log(`Sub2 ${n}`));
// Sub2 1
subject.next(2);
// Sub1 2
// Sub2 2
subject.complete();
ReplaySubject
const subject = new ReplaySubject<number>(2);
subject.subscribe((n) =>
console.log(`Sub1 ${n}`));
subject.next(1);
// Sub1 1
subject.next(2);
// Sub1 2
subject.next(3);
// Sub1 3
subject.subscribe((n) =>
console.log(`Sub2 ${n}`));
// Sub2 2
// Sub2 3
subject.next(4);
// Sub1 4
// Sub2 4
subject.complete();
RxJS définit l’opérateur shareReplay pour mémoriser un ensemble de valeurs qui seront consommées lors des prochaines souscriptions. Contrairement à share, il ne se désabonne pas lorsque le nombre d’Observateurs redescend à zéro.
const subscription = new Subscription();
const obs$ = interval(1000).pipe(
tap({ unsubscribe: () => console.log('unsubscribe') }),
shareReplay(2)
);
subscription.add(obs$.subscribe((n) => console.log(`Sub1 ${n}`)));
// Sub1 0
// Sub1 1
setTimeout(() => {
subscription.add(obs$.subscribe((n) => console.log(`Sub2 ${n}`)));
}, 2500);
// Sub2 0
// Sub2 1
// Sub1 2
// Sub2 2
// Sub1 3
// Sub2 3
setTimeout(() => subscription.unsubscribe(), 4000);
Les retours de notre expert
L’Observable est au cœur de RxJS, servant de fondation à la programmation réactive en représentant les flux de données. Cette approche offre une méthode déclarative pour gérer les évènements et les données asynchrones, rendant les applications plus dynamiques et réactives.
Les nombreux opérateurs de RxJS sont un de ses principaux atouts, permettant de manipuler et transformer les flux de manière flexible et puissante. Une gestion efficace du désabonnement est cruciale pour éviter les fuites de mémoire, surtout avec les flux de longue durée.
La notion de Subject permet de transformer des Observables Unicast en Multicast, permettant à plusieurs Observateurs de partager le même flux de données. Les variantes comme BehaviorSubject
et ReplaySubject
ajoutent des fonctionnalités utiles, comme la conservation et la relecture des valeurs passées.
En somme, RxJS est une bibliothèque robuste et flexible pour gérer les traitements asynchrones, facilitant le développement d’applications réactives et performantes. En maîtrisant ces concepts, vous serez mieux équipé pour créer des applications évolutives et maintenables, pleinement adaptées à la gestion des évènements asynchrones.