RxJava - Introduction

ReactiveX

ReactiveX est une définition étendue et normalisée du pattern observer.

L’objectif est de proposer une API simple pour recevoir une succession d’évènements asynchrones (quelque soit la source) et une suite d’opérateurs pour les modifier et combiner. En utilisant ocrrectment l’API fournie, on évite normalement de tomber dans la fameuse callback hell

Le pattern est très utile dans les contextes fortement évènementiels, en particulier pour les IHM. Ce qui explique qu’on le retrouve beaucoup utilisé sur Angular et Android par exemple.

Le défaut est que cela oblige à penser et coder différemment, il est plutôt difficile de passer d’un code itératif classique à une version réactive. C’est sans doute pour cette raison que certains préfèreront des alternatives comme les coroutines Kotlin ou les fonctions génératrices JavaScript.

ReactiveX est polyglotte, on retrouve une implémentation dans quasiment tous les langages :

Ici nous allons voir en détail la version Java mais les grands principes sont les même pour tous les langages.

Observable

L’observable est l’objet de base de l’API, il s’agit de la représentation d’un flux d’évènements asynchrones.

Un observable peut envoyer :

  • entre 0 et une infinité d’évènements
  • une erreur
  • la fin du flux

On peut donc en déduire qu’un Observable peut être vide, infini, terminer en erreur, terminer normalement.

L’avantage d’un observable c’est qu’il ne laisse pas transparaitre les spécificités techniques de la source. On l’utilise de la même manière quelque soit l’implémentation interne (qui peut donc changer sans rien casser).

Les variantes de RxJava

RxJava possède bien sûr son implémentation d'Observable mais il a également introduit certaines variantes spécifiques :

  • Single (renvoie un et un seul élément)
  • Maybe (renvoie un ou aucun élément)
  • Completable (ne renvoie aucun élément, mais notifie de sa fin)
  • Flowable (un flux permettant de contrôler la backpressure)

La création

À partir de valeurs

Il est possible de créer simplement un flux qui renvoie des valeurs que l’on possède déjà :

1
Observable.just("alpha", "bravo", "charlie", "delta")

mais il est également possible de transformer une collection d’éléments (qui implémente Iterable) :

1
2
List<String> elts = Arrays.asList("alpha", "bravo", "charlie", "delta");
Observable.fromIterable(elts)

ou d’un tableau :

1
2
String[] elts = new String[]{"alpha", "bravo", "charlie", "delta"};
Observable.fromArray(elts)

À partir d’un traitement

Bien que générer un observable (ou un de ses variantes) à partir de valeurs peut être intéressant, il est plus souvent utile de renvoyer le résultat d’un long traitement asynchrone.

Imaginons qu’on ait une méthode String veryLongOperation() et qu’on chercher à récupérer le résultat sous la forme d’un observable, on écrirait ceci :

1
Observable.fromCallable(() -> veryLongOperation())

NB : Pour l’instant le traitement n’est pas asynchrone, il sera exécuté dans le thread qui appelle la méthode subscribe mais on y reviendra plus tard.

Par conversion

Il est possible aussi de convertir les format Java standard :

Cas particuliers

Enfin il est possible de générer des observables spécifiques :

  • qui finisse en erreur : error()
  • qui s’achève immédiatement sans renvoyer d’élément : empty()
  • qui ne termine jamais : never()

Émettre plusieurs valeurs

Pour les cas les plus complexes, quand les cas précédemment cités ne suffisent plus, on peut créer un observable plus complexe à l’aide de la méthode create :

1
2
3
4
5
6
7
Observable.<String>create(emitter -> {
    emitter.onNext("alpha");
    emitter.onNext("bravo");
    emitter.onNext("charlie");
    emitter.onNext("delta");
    emitter.onComplete();
})

Il s’agit de la manière la plus complète de créer un observable et presque tout ce qui est imaginable peut être construit comme ça. Cependant il s’agit aussi de la manière la plus complexe. Il est nécessaire de comprendre le cycle de vie de la souscription pour être sûr de ne pas lancer inutilement des traitements (avant ou après la souscription) et de bien libérer les ressources au moment de la désinscription du client.

La souscription

Maintenant qu’on sait créer des flux de données, on va voir comment les consommer et utiliser les valeurs à l’aide de la méthode subscribe().

Les callbacks

On passe à cette méthode 3 callbacks :

  • onNext (T) -> {} appelé pour chaque élément du flux.
  • onError (Throwable) -> {} appelée si le flux s’arrête à cause d’une erreur.
  • onComplete () -> {} appelée si le flux se termine normalement.

Un exemple de souscription :

1
2
3
4
5
6
Observable.just("alpha", "bravo", "charlie", "delta")
        .subscribe(
                value -> System.out.println(value),
                error -> System.err.println("error occured: " + error.getMessage()),
                () -> System.out.println("no more value")
        )

il est possible de ne pas passer les callback onComplete ou onError. Une exception non gérée sera récupérée par un handler global RxJavaPlugins.onError(Throwable).

La disposition

L’appel à la méthode subscribe renvoie un objet Disposable qui va permettre d’annuler cette souscription et de libérer toutes les ressources allouées. Il suffit pour ça d’appeler la méthode dispose().

Il est important de lier la souscription au cycle de vie du composant qui consomme les données. Cela permet d’éviter les problèmes de fuites de mémoire mais également des bugs en cas d’appels à la référence d’un objet détruit.

En résumé

RxJava est une library permettant de modéliser des flux asynchrones de données :

  • On a vu les notions derrière l’élément de base qui est l'Obserbable.
  • On a vu comment créer un flux à partir de valeurs, d’un traitement ou d’autres object Java.
  • On a vu qu’on pouvait souscrire à ce flux et donc consommer les évènements à l’aide de callbacks.
  • Enfin on a vu comment mettre fin à une souscription pour libérer les ressoures et éviter les fuites de mémoire.

Lors de la prochaine étape, on va voir ce qui fait la véritable force de cette library avec les opérateurs les plus communs.

Load Comments?