Je connais très bien RxJava et je suis récemment passé à Kotlin Coroutines et Flow.
RxKotlin est fondamentalement la même chose que RxJava, il ajoute juste un peu de sucre syntaxique pour rendre plus confortable / idiomatique l'écriture de code RxJava en Kotlin.
Une comparaison "équitable" entre les coroutines de RxJava et de Kotlin devrait inclure Flow dans le mélange et je vais essayer d'expliquer pourquoi ici. Ce sera un peu long mais je vais essayer de rester aussi simple que possible avec des exemples.
Avec RxJava, vous disposez de différents objets (depuis la version 2) :
// 0-n events without backpressure management
fun observeEventsA(): Observable<String>
// 0-n events with explicit backpressure management
fun observeEventsB(): Flowable<String>
// exactly 1 event
fun encrypt(original: String): Single<String>
// 0-1 events
fun cached(key: String): Maybe<MyData>
// just completes with no specific results
fun syncPending(): Completable
En kotlin coroutines + flow vous n'avez pas besoin de beaucoup d'entités car si vous n'avez pas un flux d'événements vous pouvez juste utiliser des coroutines simples (fonctions de suspension) :
// 0-n events, the backpressure is automatically taken care off
fun observeEvents(): Flow<String>
// exactly 1 event
suspend fun encrypt(original: String): String
// 0-1 events
suspend fun cached(key: String): MyData?
// just completes with no specific results
suspend fun syncPending()
Bonus : support de Kotlin Flow / Coroutines null
(support supprimé avec RxJava 2)
Suspension des fonctions sont ce que leur nom laisse entendre : ce sont des fonctions qui peuvent mettre en pause l'exécution du code et la reprendre plus tard lorsque la fonction est terminée ; cela vous permet d'écrire du code qui semble plus naturel.
Et les opérateurs ?
Avec RxJava, vous avez tellement d'opérateurs ( map
, filter
, flatMap
, switchMap
...), et pour la plupart d'entre eux, il existe une version pour chaque type d'entité ( Single.map()
, Observable.map()
, ...).
Kotlin Coroutines + Flow n'ont pas besoin d'autant d'opérateurs Nous allons voir pourquoi à l'aide de quelques exemples sur les opérateurs les plus courants.
map()
RxJava :
fun getPerson(id: String): Single<Person>
fun observePersons(): Observable<Person>
fun getPersonName(id: String): Single<String> {
return getPerson(id)
.map { it.firstName }
}
fun observePersonsNames(): Observable<String> {
return observePersons()
.map { it.firstName }
}
Coroutines Kotlin + Flow
suspend fun getPerson(id: String): Person
fun observePersons(): Flow<Person>
suspend fun getPersonName(id: String): String? {
return getPerson(id).firstName
}
fun observePersonsNames(): Flow<String> {
return observePersons()
.map { it.firstName }
}
Vous n'avez pas besoin d'opérateur pour le cas "simple" et c'est assez similaire pour le cas "simple". Flow
cas.
flatMap()
Le site flatMap
opérateur et ses frères et sœurs switchMap
, contactMap
existe pour vous permettre de combiner différents objets RxJava et donc d'exécuter du code potentiellement asynchrone tout en mappant vos événements.
Disons que vous avez besoin, pour chaque personne, de récupérer dans une base de données (ou un service distant) son assurance
RxJava
fun fetchInsurance(insuranceId: String): Single<Insurance>
fun getPersonInsurance(id: String): Single<Insurance> {
return getPerson(id)
.flatMap { person ->
fetchInsurance(person.insuranceId)
}
}
fun observePersonsInsurances(): Observable<Insurance> {
return observePersons()
.flatMap { person ->
fetchInsurance(person.insuranceId) // this is a Single
.toObservable() // flatMap expect an Observable
}
}
Voyons avec Kotlin Coroutiens + Flow
suspend fun fetchInsurance(insuranceId: String): Insurance
suspend fun getPersonInsurance(id: String): Insurance {
val person = getPerson(id)
return fetchInsurance(person.insuranceId)
}
fun observePersonsInsurances(): Flow<Insurance> {
return observePersons()
.map { person ->
fetchInsurance(person.insuranceId)
}
}
Comme précédemment, dans le cas de la coroutine simple, nous n'avons pas besoin d'opérateurs, nous écrivons simplement le code comme nous le ferions s'il n'était pas asynchrone, en utilisant simplement des fonctions de suspension.
Et avec Flow
ce n'est PAS une faute de frappe, il n'y a pas besoin d'une flatMap
nous pouvons simplement utiliser map
. Et la raison est que map lambda est une fonction suspensive ! On peut y exécuter du code suspensif ! !!
Nous n'avons pas besoin d'un autre opérateur juste pour ça.
J'ai un peu triché ici.
Rx flatMap
, switchMap
y concatMap
se comportent légèrement différemment. Rx flatMap
générer un nouveau flux pour chaque événement, puis les fusionner tous ensemble : l'ordre des nouveaux flux d'événements que vous recevez en sortie est indéterminé, il peut ne pas correspondre à l'ordre ou aux événements en entrée
Rx concatMap
"corrige cela et garantit que vous obtiendrez chaque nouveau flux dans le même ordre que vos événements d'entrée.
Rx switchMap
se débarrassera de tout flux précédemment en cours d'exécution lorsqu'il recevra de nouveaux événements, seule la dernière entrée reçue compte avec cet opérateur.
Donc vous voyez, ce n'est pas vrai que Flow.map
est le même, il est en fait plus similaire à Rx concatMap
ce qui est le comportement plus naturel que l'on attend d'un opérateur de carte.
Mais il est vrai que vous avez besoin de moins d'opérateurs, à l'intérieur d'une carte vous pouvez faire n'importe quelle opération asynchrone que vous voulez et reproduire le comportement de flatMap
car il s'agit d'une fonction suspendable. L'opérateur équivalent réel à RxJava flatMap
es Flow.flatMapMerge
opérateur.
L'équivalent du RxJava switchMap
peut être réalisé dans Flow en utilisant le conflate()
avant l'opérateur map
opérateur.
Pour les choses plus complexes, vous pouvez utiliser le flux transform()
qui, pour chaque événement, émet un flux de votre choix.
Chaque opérateur de débit accepte une fonction de suspension !
Dans le paragraphe précédent, je vous ai dit J'ai triché . Mais la clé de ce que je voulais dire par Les flux n'ont pas besoin d'autant d'opérateurs est que la plupart des callbacks de l'opérateur sont des fonctions suspensives.
Disons que vous devez filter()
mais votre filtre doit effectuer un appel réseau pour savoir si vous devez garder la valeur ou non, avec RxJava vous devez combiner plusieurs opérateurs avec un code illisible, avec Flow vous pouvez simplement utiliser filter()
!
fun observePersonsWithValidInsurance(): Flow<Person> {
return observerPersons()
.filter { person ->
val insurance = fetchInsurance(person.insuranceId) // suspending call
insurance.isValid()
}
}
delay(), startWith(), concatWith(), ...
Dans RxJava, vous disposez de nombreux opérateurs pour appliquer un délai ou ajouter des éléments avant et après :
- delay()
- delaySubscription()
- startWith(T)
- startWith(Observable)
- concatWith(...)
avec kotlin Flow, vous pouvez simplement :
grabMyFlow()
.onStart {
// delay by 3 seconds before starting
delay(3000L)
// just emitting an item first
emit("First item!")
emit(cachedItem()) // call another suspending function and emit the result
}
.onEach { value ->
// insert a delay of 1 second after a value only on some condition
if (value.length() > 5) {
delay(1000L)
}
}
.onCompletion {
val endingSequence: Flow<String> = grabEndingSequence()
emitAll(endingSequence)
}
gestion des erreurs
RxJava a beaucoup d'opérateurs pour gérer les erreurs :
- onErrorResumeWith()
- onErrorReturn()
- onErrorComplete()
Avec Flow, vous n'avez pas besoin de beaucoup plus que l'opérateur. catch()
:
grabMyFlow()
.catch { error ->
// emit something from the flow
emit("We got an error: $error.message")
// then if we can recover from this error emit it
if (error is RecoverableError) {
// error.recover() here is supposed to return a Flow<> to recover
emitAll(error.recover())
} else {
// re-throw the error if we can't recover (aka = don't catch it)
throw error
}
}
et avec la fonction de suspension, vous pouvez simplement utiliser try {} catch() {}
.
Vous pouvez réaliser TOUS les opérateurs d'erreur de RxJava avec un seul catch
car vous obtenez une fonction de suspension.
facile à écrire Opérateurs de flux
Grâce aux coroutines qui alimentent Flow sous le capot, il est beaucoup plus facile d'écrire des opérateurs. Si vous avez déjà écrit un opérateur RxJava, vous verrez à quel point c'est difficile et combien de choses vous devez apprendre.
L'écriture des opérateurs Kotlin Flow est plus facile, vous pouvez vous faire une idée en regardant le code source des opérateurs qui font déjà partie de Flow. aquí . La raison en est que les coroutines facilitent l'écriture de code asynchrone et que les opérateurs semblent plus naturels à utiliser.
En prime, les opérateurs de débit sont tous des fonctions d'extension kotlin, ce qui signifie que vous, ou les bibliothèques, pouvez facilement ajouter des opérateurs et qu'ils ne sembleront pas bizarres à utiliser (dans RxJava observable.lift()
o observable.compose()
sont nécessaires pour combiner les opérateurs personnalisés).
Le fil amont ne fuit pas en aval
Qu'est-ce que cela signifie ?
Ceci explique pourquoi dans RxJava vous avez subscribeOn()
y observeOn()
alors que dans Flow vous avez seulement flowOn()
.
Prenons l'exemple de RxJava :
urlsToCall()
.switchMap { url ->
if (url.scheme == "local") {
val data = grabFromMemory(url.path)
Flowable.just(data)
} else {
performNetworkCall(url)
.subscribeOn(Subscribers.io())
.toObservable()
}
}
.subscribe {
// in which thread is this call executed?
}
Alors où est le rappel dans subscribe
exécuté ?
La réponse est :
dépend...
si elle provient du réseau, elle est dans un thread IO ; si elle provient de l'autre branche, elle est indéfinie, cela dépend du thread utilisé pour envoyer l'url.
Si vous y pensez, tout code que vous écrivez, vous ne savez pas dans quel thread il va être exécuté : il dépend toujours de l'appelant. Le problème ici est que le Thread ne dépend plus de l'appelant, il dépend de ce que fait un appel de fonction interne.
Supposons que vous ayez ce code standard :
fun callUrl(url: Uri) {
val callResult = if (url.scheme == "local") {
grabFromMemory(url.path)
} else {
performNetworkCall(url)
}
return callResult
}
Imaginez que vous n'ayez aucun moyen de savoir dans quel fil la ligne return callResult
est exécuté sans regarder à l'intérieur grabFromMemory()
y performNetworkCall()
.
Réfléchissez-y une seconde : le fil de discussion change en fonction de la fonction que vous appelez et de ce qu'elle fait à l'intérieur.
Cela arrive tout le temps avec les API de callbacks : vous n'avez aucun moyen de savoir dans quel thread le callback que vous fournissez sera exécuté, à moins qu'il ne soit documenté.
C'est le concept de "fuite du fil amont vers l'aval".
Avec Flow et les Coroutines, ce n'est pas le cas, à moins que vous ne demandiez explicitement ce comportement (à l'aide de Dispatchers.Unconfined
).
suspend fun myFunction() {
// execute this coroutine body in the main thread
withContext(Dispatchers.Main) {
urlsToCall()
.conflate() // to achieve the effect of switchMap
.transform { url ->
if (url.scheme == "local") {
val data = grabFromMemory(url.path)
emit(data)
} else {
withContext(Dispatchers.IO) {
performNetworkCall(url)
}
}
}
.collect {
// this will always execute in the main thread
// because this is where we collect,
// inside withContext(Dispatchers.Main)
}
}
}
Le code des coroutines s'exécutera dans le contexte dans lequel il a été exécuté. Et seule la partie avec l'appel réseau sera exécutée sur le thread IO, tandis que tout le reste que nous voyons ici sera exécuté sur le thread principal.
Eh bien, en fait, nous ne savons pas où le code à l'intérieur grabFromMemory()
s'exécutera, mais nous ne nous en soucions pas : nous savons qu'elle sera appelée à l'intérieur du thread principal, à l'intérieur de cette fonction de suspension, nous pourrions utiliser un autre Dispatcher, mais nous savons quand il reviendra avec le résultat. val data
ce sera à nouveau dans le fil principal.
Cela signifie que, en regardant un morceau de code, il est plus facile de dire dans quel thread il sera exécuté, si vous voyez un dispatcher explicite = c'est ce dispatcher, si vous ne le voyez pas : dans n'importe quel dispatcher de thread l'appel de suspension que vous regardez est appelé.
Concurrence structurée
Ce n'est pas un concept inventé par Kotlin, mais c'est quelque chose qu'ils ont adopté plus que tout autre langage que je connais.
Si ce que j'explique ici n'est pas suffisant pour toi, lis cet article ou regarder cette vidéo .
Alors, qu'est-ce que c'est ?
Avec RxJava, vous vous abonnez à des observables, et ils vous donnent une Disposable
objet.
Vous devez prendre soin de vous en débarrasser quand vous n'en avez plus besoin. Donc, ce que vous faites habituellement, c'est de garder une référence à ce fichier (ou de le placer dans un fichier de type CompositeDisposable
) pour appeler plus tard dispose()
sur elle quand elle n'est plus nécessaire. Si vous ne le faites pas, le linter vous donnera un avertissement.
RxJava est un peu plus agréable qu'un fil traditionnel. Lorsque vous créez un nouveau thread et que vous exécutez quelque chose dessus, c'est un "feu et oublie", vous n'avez même pas la possibilité de l'annuler : Thread.stop()
est déprécié, nuisible, et les implémentations récentes ne font rien. Thread.interrupt()
fait échouer votre fil, etc. Toute exception est perdue vous voyez le tableau.
Avec les coroutines et le flow de Kotlin, le concept de "jetable" est inversé. Vous NE POUVEZ PAS créer une coroutine sans un fichier CoroutineContext
.
Ce contexte définit le scope
de votre coroutine. Toutes les coroutines enfants créées à l'intérieur de celle-ci partageront la même portée.
Si vous vous abonnez à un flux, vous devez être à l'intérieur d'une coroutine ou fournir une portée également.
Vous pouvez toujours garder la référence des coroutines que vous lancez ( Job
) et les annuler. Cela annulera automatiquement tous les enfants de cette coroutine.
Si vous êtes un développeur Android, ces champs d'application vous sont attribués automatiquement. Exemple : viewModelScope
et vous pouvez lancer des coroutines à l'intérieur d'un viewModel avec cette portée en sachant qu'elles seront automatiquement annulées lorsque le viewmodel sera effacé.
viewModelScope.launch {
// my coroutine here
}
Certaines portées se termineront si l'un des enfants échoue, d'autres laisseront chaque enfant quitter son propre cycle de vie sans arrêter les autres enfants si l'un d'eux échoue ( SupervisedJob
).
Pourquoi est-ce une bonne chose ?
Je vais essayer de l'expliquer comme suit Roman Elizarov a fait.
Certains anciens langages de programmation avaient ce concept de goto
qui vous permettent de passer d'une ligne de code à une autre à volonté.
Très puissant, mais si on en abuse, on peut se retrouver avec un code très difficile à comprendre, difficile à déboguer et à raisonner.
Les nouveaux langages de programmation ont donc fini par le supprimer complètement du langage.
Lorsque vous utilisez if
o while
o when
il est beaucoup plus facile de raisonner sur le code : peu importe ce qui se passe à l'intérieur de ces blocs, vous finirez par en sortir, c'est un "contexte", vous n'avez pas de sauts bizarres pour entrer et sortir.
Lancer un thread ou s'abonner à un observable de RxJava est similaire au goto : vous exécutez du code qui va continuer jusqu'à ce que l'" ailleurs " soit arrêté.
Avec les coroutines, en exigeant que vous fournissiez un contexte/un champ d'application, vous savez que lorsque votre champ d'application est terminé, tout ce qui se trouve à l'intérieur de ces coroutines se terminera lorsque votre contexte se terminera, peu importe que vous ayez une seule coroutine ou 10 000.
Vous pouvez toujours "goto" avec les coroutines en utilisant GlobalScope
ce que vous ne devriez pas faire pour la même raison que vous ne devriez pas utiliser goto
dans les langues qui le prévoient.
Froid vs Chaud - ShareFlow et StateFlow
Lorsque nous travaillons avec des flux réactifs, nous avons toujours ce concept de flux froid et de flux chaud. Ce sont des concepts à la fois dans le monde Rx et dans les flux Kotlin.
Froid Les flux sont comme une fonction dans notre code : ils sont là et ne font rien tant que vous ne les appelez pas. Avec un flux, cela signifie que ce que fait le flux est défini, mais qu'il ne fera rien tant que vous ne commencerez pas à le collecter. Et, comme une fonction, si vous le collectez (l'appelez) deux fois, le flux s'exécutera deux fois (par exemple, un flux froid pour exécuter une requête http exécutera la requête deux fois s'il est collecté deux fois).
Hot Les flux ne fonctionnent pas comme ça. Lorsque vous faites appel à plusieurs collectes, elles partagent toutes le même flux chaud sous le capot, ce qui signifie que votre flux chaud s'exécute une fois et que vous pouvez avoir plusieurs observateurs.
Vous pouvez généralement transformer un flux froid en flux chaud avec un certain opérateur.
Sur RxJava vous pouvez utiliser ce concept de Connectable Observable/Flowable.
val coldObservable: Observable<Something> = buildColdObservable()
// create an hot observable from the cold one
val connectableObservable: ConnectableObservable<Something> = coldObservable.publish()
// you can subscribe multiple times to this connectable
val subADisposable: Disposable = connectableObservable.subscribe(subscriberA)
val subBDisposable: Disposable = connectableObservable.subscribe(subscriberB)
// but nothing will be emitted there until you call
val hotDisposable: Disposable = connectableObservable.connect()
// which actually run the cold observable and share the result on bot subscriberA and subscriberB
// while it's active another one can start listening to it
val subCDisposable: Disposable = connectableObservable.subscribe(subscriberC)
Vous disposez également d'autres opérateurs utiles comme refCount()
o autoConnect()
qui renvoient le Connectable
dans un flux standard et sous le capot automatiquement .connect()
lorsque le premier abonné est attaché.
buildColdObservable()
.replay(1) // when a new subscriber is attached receive the last data instantly
.autoConnect() // keep the cold observable alive while there's some subscriber
Sur le flux vous avez le shareIn()
et le stateIn()
opérateurs. Vous pouvez voir la conception de l'API aquí . Ils sont moins "manuels" dans la manipulation lorsque vous vous "connectez".
buildColdFlow()
.shareIn(
// you need to specify a scope for the cold flow subscription
scope = myScope,
// when to "connect"
started = SharingStarted.WhileSubscribed(),
// how many events already emitted should be sent to new subscribers
replay = 1,
)
portée
Le champ d'application est celui de la concurrence structurée. Sur RxJava, c'est le connect()
qui s'abonnent à l'observable froid, cela vous donne une Disposable
vous devrez appeler .dispose()
sur quelque part. Si vous utilisez refCount()
o autoConnect()
il est appelé sur le premier abonné et avec refCount()
n'est jamais éliminé alors qu'avec autoConnect()
est éliminé lorsqu'il n'y a plus d'abonnés.
Avec Flow, vous devez donner un scope dédié pour collecter le flux froid, si vous annulez ce scope, le flux froid cessera d'émettre et ne sera plus utilisable.
a commencé
Alors celui-là est facile
- RxJava
refCount()
--> Flux SharingStarted.Lazily
, commence à collecter sur le premier abonné
- RxJava
autoConnect()
-> Flux SharingStarted.WhileSubscribed()
commence à collecter sur le premier abonné et l'annule quand il n'y en a plus.
- Appel RxJava
connect()
manuellement avant tout abonnement -> Flux SharingStarted.Eagerly()
commence à collecter immédiatement
Le site WhileSubscribed()
a des paramètres utiles, Vérifiez-les .
Vous pouvez également définir votre propre logique pour SharingStarted
à gérer lors de la collecte du coldFlow.
Comportement et contre-pression
Lorsque vous avez un observable chaud, vous devez toujours faire face à des problèmes de contre-pression. Une source de données étant écoutée par de nombreuses personnes, un auditeur peut être plus lent que les autres.
Débit .shareIn
collecte le flux froid dans une coroutine dédiée et émission de tampon par défaut. Cela signifie que si le flux froid est émis trop rapidement, le tampon sera utilisé. Vous pouvez changer ce comportement.
Kotlin SharedFlow
vous permettent également d'accéder directement au tampon de relecture pour inspecter l'émission précédente si nécessaire.
La résiliation d'un abonné n'aura aucun effet sur le flux partagé.
en utilisant flowOn()
pour modifier le Dispatcher
sur l'abonné n'aura pas d'effet sur le flux partagé (utilisez la fonction flowOn()
avant de partager si vous devez exécuter le flux froid dans un répartiteur spécifique)
étatIn
Flow dispose d'une version "spéciale" de ShareFlow
qui s'appelle StateFlow
et vous pouvez utiliser stateIn()
pour en créer un à partir d'un autre flux.
A StateFlow
a toujours une valeur, il ne peut pas être "vide", donc vous devez fournir la valeur initiale lorsque vous faites stateIn()
.
A StateFlow
ne peut jamais lancer d'exceptions et ne peut jamais se terminer (de cette manière, il est similaire à l'option BehaviorRelay
dans la bibliothèque RxRelay)
A StateFlow
n'émettra que si l'état change (c'est comme s'il y avait une fonction intégrée de distinctUntilChanged()
.
Sujets RxJava vs Mutable*Flow A Subject
dans RxJava est une classe que vous pouvez utiliser pour y pousser manuellement vos données tout en l'utilisant comme un flux.
Dans Flow, vous pouvez utiliser MutableSharedFlow
o MutableStateFlow
pour obtenir un effet similaire.
Avec les coroutines de Kotlin, vous pouvez également utiliser Channels
mais elles sont considérées comme des API de niveau inférieur.
Des inconvénients ?
Flow est toujours en cours de développement et certaines fonctionnalités disponibles dans RxJava pourraient être marquées comme expérimentales dans Kotlin Coroutines Flow ou présenter quelques différences ici et là.
Il se peut qu'un opérateur ou une fonction d'opérateur de niche ne soit pas encore implémenté et que vous deviez l'implémenter vous-même (c'est au moins plus facile).
Mais à part cela, il n'y a pas d'inconvénients que je connaisse.
Cependant, il existe des différences dont il faut être conscient et qui pourraient causer des frictions lors du passage de RxJava et vous obliger à apprendre de nouvelles choses.
La concurrence structurée est un progrès, mais elle introduit de nouveaux concepts qu'il faut apprendre et auxquels il faut s'habituer (scopes, supervisorJob) : l'annulation est gérée de manière complètement différente.
Il y a un certain nombre de pièges à éviter.
Gotcha : Exception d'annulation
Si vous cancel()
dans une coroutine ou throw CancellationException()
l'exception est propagée aux coroutines parentes, à moins que vous n'ayez utilisé un scope / job Supervisor. La coroutine parent annule également les coroutines sœurs de celle qui a été annulée si cela se produit.
MAIS si vous catch(e: Exception)
même en utilisant runCatching {}
vous devez vous rappeler de relancer CancellationException()
sinon vous aurez des résultats inattendus car la coroutine a été annulée mais votre code essaie toujours de s'exécuter comme s'il ne l'était pas.
Gotcha : UncaughtExceptionHandler
si vous le faites launch { ... }
pour créer une nouvelle coroutine et cette coroutine se lance, par défaut Cela mettra fin à la coroutine mais ne fera pas planter l'application et vous pourriez ne pas voir que quelque chose s'est mal passé.
Ce code ne fera pas planter votre application.
launch {
throw RuntimeException()
}
Dans certains cas, il se peut même que le journal ne contienne rien.
S'il s'agit d'une exception d'annulation, rien ne sera imprimé dans le journal.