95 votes

Comment les coroutines de Kotlin sont-elles meilleures que RxKotlin ?

Pourquoi voudrais-je utiliser les coroutines de Kotlin ?

Il semble que la bibliothèque RxKotlin soit beaucoup plus polyvalente. Les coroutines de Kotlin semblent nettement moins puissantes et plus difficiles à utiliser en comparaison.

Je base mon opinion sur les coroutines sur cet exposé sur la conception par Andrey Breslav (JetBrains)

Le diaporama de la conférence est accessible ici.


EDIT (merci à @hotkey) :

Une meilleure source sur l'état actuel des coroutines ici.

111voto

Geoffrey Marizy Points 2260

Avis de non-responsabilité : Certaines parties de cette réponse ne sont pas pertinentes puisque les Coroutines ont maintenant l'API de flux, très similaire à celle de Rx. Si vous voulez une réponse à jour, passez à la dernière modification.

Rx se compose de deux parties : le modèle Observable et un ensemble solide d'opérateurs pour les manipuler, les transformer et les combiner. Le modèle Observable, par lui-même, ne fait pas grand-chose. Il en va de même pour les coroutines ; c'est juste un autre paradigme pour gérer l'asynchronisme. Vous pouvez comparer les avantages et les inconvénients des callbacks, des Observables et des coroutines pour résoudre un problème donné, mais vous ne pouvez pas comparer un paradigme avec une bibliothèque complète. C'est comme comparer un langage avec un framework.

Comment les coroutines Kotlin sont-elles meilleures que RxKotlin ? Je n'ai pas encore utilisé les coroutines, mais cela ressemble à async/wait en C#. Vous écrivez simplement du code séquentiel, tout est aussi facile que d'écrire du code synchrone ... sauf qu'il s'exécute de manière asynchrone. C'est plus facile à comprendre.

Pourquoi voudrais-je utiliser les coroutines de Kotlin ? Je vais répondre par moi-même. La plupart du temps, je m'en tiendrai à Rx, car je privilégie l'architecture événementielle. Mais si une situation se présente où j'écris du code séquentiel, et que j'ai besoin d'appeler une méthode asynchrone au milieu, je serai heureux d'utiliser les coroutines pour que cela reste ainsi et éviter de tout envelopper dans des Observable.

Editar : Maintenant que j'utilise des coroutines, il est temps de faire une mise à jour.

RxKotlin n'est qu'un sucre syntaxique pour utiliser RxJava dans Kotlin, je parlerai donc de RxJava et non de RxKotlin dans la suite. Les coroutines sont un concept plus bas et plus général que RxJava, elles servent d'autres cas d'utilisation. Cela dit, il existe un cas d'utilisation où l'on peut comparer RxJava et les coroutines ( channel ), il transmet les données de manière asynchrone. Les coroutines ont ici un avantage clair sur RxJava :

Les coroutines sont plus efficaces pour gérer les ressources

  • Dans RxJava, vous pouvez assigner des calculs à des planificateurs mais subscribeOn() y ObserveOn() sont confuses. Chaque coroutine reçoit un contexte de thread et retourne au contexte parent. Pour un canal, chaque côté (producteur, consommateur) exécute sur son propre contexte. Les coroutines sont plus intuitives sur l'affectation des threads ou des pools de threads.
  • Les coroutines permettent de mieux contrôler le moment où ces calculs sont effectués. Vous pouvez par exemple passer la main ( yield ), donner la priorité ( select ), de paralléliser (plusieurs producer / actor en channel ) ou une ressource de verrouillage ( Mutex ) pour un calcul donné. Cela n'a peut-être pas d'importance sur le serveur (où RxJava est arrivé en premier) mais dans un environnement aux ressources limitées, ce niveau de contrôle peut être nécessaire.
  • En raison de sa nature réactive, la rétropression ne convient pas bien à RxJava. De l'autre côté send() to channel est une fonction suspensive qui se suspend lorsque la capacité du canal est atteinte. Il s'agit d'une contre-pression prête à l'emploi, donnée par la nature. Vous pouvez également offer() au canal, auquel cas l'appel n'est jamais suspendu mais retourne false dans le cas où le canal est plein, reproduisant effectivement onBackpressureDrop() de RxJava. Ou vous pouvez simplement écrire votre propre logique de rétropression personnalisée, ce qui ne sera pas difficile avec les coroutines, surtout par rapport à faire la même chose avec RxJava.

Il existe un autre cas d'utilisation, où les coroutines brillent et cela répondra à votre deuxième question "Pourquoi voudrais-je utiliser les coroutines Kotlin ? Les coroutines sont le remplacement parfait des threads d'arrière-plan ou des AsyncTask (Android). C'est aussi simple que launch { someBlockingFunction() } . Bien sûr, vous pouvez aussi réaliser cela avec RxJava, en utilisant Schedulers y Completable peut-être. Vous n'utiliserez pas (ou peu) le pattern Observer et les opérateurs qui sont la signature de RxJava, un indice que ce travail est hors de portée pour RxJava. La complexité de RxJava (une taxe inutile ici) rendra votre code plus verbeux et moins propre que la version de Coroutine.

La lisibilité est importante. À cet égard, les approches RxJava et coroutines diffèrent beaucoup. Les coroutines sont plus simples que RxJava. Si vous n'êtes pas à l'aise avec map() , flatmap() et la programmation réactive fonctionnelle en général, les manipulations de coroutines sont plus faciles, impliquant des instructions de base : for , if , try/catch ... Mais je trouve personnellement le code des coroutines plus difficile à comprendre pour les tâches non triviales. En particulier, il implique plus d'imbrication et d'indentation alors que l'enchaînement des opérateurs dans RxJava garde tout en ligne. La programmation de style fonctionnel rend le traitement plus explicite. En plus de cela, RxJava peut résoudre des transformations complexes avec quelques opérateurs standard de leur riche (OK, beaucoup trop riche) ensemble d'opérateurs. RxJava brille lorsque vous avez des flux de données complexes nécessitant un grand nombre de combinaisons et de transformations.

J'espère que ces considérations vous aideront à choisir le bon outil en fonction de vos besoins.

Editar: Coroutine a maintenant un flux, une API très, très similaire à Rx. On pourrait comparer les avantages et les inconvénients de chacun, mais la vérité est que les différences sont mineures.

Coroutines est un modèle de conception de la concurrence, avec des bibliothèques complémentaires, l'une d'entre elles étant une API de flux similaire à Rx. Évidemment, les Coroutines ont une portée beaucoup plus large que Rx, il y a beaucoup de choses que les Coroutines peuvent faire que Rx ne peut pas, et je ne peux pas les énumérer toutes. Mais généralement, si j'utilise des Coroutines dans un de mes projets, cela se résume à une seule raison :

Les coroutines sont plus efficaces pour supprimer les callbacks de votre code.

J'évite d'utiliser les callbacks qui nuisent trop à la lisibilité. Les coroutines rendent le code asynchrone simple et facile à écrire. En utilisant le mot-clé suspend, votre code ressemble à un code synchrone.

J'ai vu Rx utilisé dans des projets principalement dans le même but de remplacer les callbacks, mais si vous ne prévoyez pas de modifier votre architecture pour adopter le modèle réactif, Rx sera un fardeau. Considérez cette interface :

interface Foo {
   fun bar(callback: Callback)
}

L'équivalent Coroutine est plus explicite, avec un type de retour et le mot-clé suspend indiquant qu'il s'agit d'une opération asynchrone.

interface Foo {
   suspend fun bar: Result
}

Mais il y a un problème avec l'équivalent Rx :

interface Foo {
   fun bar: Single<Result>
}

Lorsque vous appelez bar() dans la version callback ou Coroutine, vous déclenchez le calcul ; avec la version Rx, vous obtenez une représentation d'un calcul que vous pouvez déclencher à volonté. Vous devez appeler bar() puis vous abonner au Single. En général, ce n'est pas un gros problème, mais c'est un peu déroutant pour les débutants et peut conduire à un problème subtil.

Un exemple de ces problèmes, supposez que la fonction de barre de rappel soit implémentée comme telle :

fun bar(callback: Callback) {
   setCallback(callback)
   refreshData()
}

Si vous ne le portez pas correctement, vous vous retrouverez avec un Single qui ne peut être déclenché qu'une seule fois car refreshData() est appelé dans la fonction bar() et non au moment de l'abonnement. Une erreur de débutant, certes, mais le fait est que Rx est bien plus qu'un remplacement de callback et beaucoup de développeurs ont du mal à comprendre Rx.

Si votre objectif est de transformer une tâche asynchrone de type callback en un paradigme plus agréable, les coroutines sont parfaitement adaptées alors que les Rx ajoutent un peu de complexité.

3 votes

Ainsi, au lieu de tout emballer dans un Observable, vous allez tout emballer dans un Future.

15 votes

Heureusement, les coroutines de Kotlin sont assez différentes de celles de C# et de JS et ne nécessitent pas d'envelopper votre code dans un Future. Vous pouvez consulter peut utiliser des futures avec des coroutines Kotlin, mais idiomatique Le code basé sur les coroutines de Kotlin n'utilise pratiquement pas de futures.

1 votes

On peut réaliser une architecture événementielle en utilisant des canaux assez facilement.

87voto

Roman Elizarov Points 8871

Les coroutines Kotlin sont différentes de Rx. Il est difficile de les comparer, car les coroutines de Kotlin ne sont qu'un élément du langage (avec seulement quelques concepts de base et quelques fonctions de base pour les manipuler), alors que Rx est une bibliothèque assez lourde avec une grande variété d'opérateurs prêts à l'emploi. Les deux sont conçus pour résoudre un problème de programmation asynchrone, mais leur approche de la solution est très différente :

  • Rx s'accompagne d'un style de programmation fonctionnel particulier qui peut être mis en œuvre dans pratiquement n'importe quel langage de programmation sans l'aide du langage lui-même. Ce style fonctionne bien lorsque le problème à résoudre se décompose facilement en une séquence d'opérateurs standard, et moins bien dans le cas contraire.

  • Les coroutines de Kotlin sont une fonctionnalité du langage qui permet aux auteurs de bibliothèques de mettre en œuvre divers styles de programmation asynchrones, y compris, mais sans s'y limiter, le style fonctionnel réactif (Rx). Avec les coroutines Kotlin, vous pouvez également écrire votre code asynchrone dans un style impératif, dans un style basé sur les promesses/futures, dans un style acteur, etc.

Il est plus approprié de comparer Rx avec certaines bibliothèques spécifiques qui sont mises en œuvre sur la base de coroutines Kotlin.

Prenez bibliothèque kotlinx.coroutines comme un exemple. Cette bibliothèque fournit un ensemble de primitives comme async/await et des canaux qui sont généralement intégrés dans d'autres langages de programmation. Il prend également en charge les acteurs légers et sans avenir. Vous pouvez en savoir plus dans le Guide de kotlinx.coroutines par l'exemple .

Chaînes fournies par kotlinx.coroutines peut remplacer ou compléter le Rx dans certains cas d'utilisation. Il existe une Guide des flux réactifs avec coroutines qui approfondit les similitudes et les différences avec le Rx.

0 votes

Qu'en est-il de la gestion des erreurs/exceptions ? Le Rx est-il meilleur que les Coroutines ?

2 votes

Si nous comparons Rx à la bibliothèque kotlinx.coroutines, elles offrent toutes deux des capacités de gestion des erreurs/exceptions à peu près identiques, hormis les différences de style. Vous pouvez soit installer des gestionnaires d'erreurs/exceptions globaux, soit gérer les erreurs localement en utilisant diverses constructions.

2 votes

Je dirais que les coroutines sont définitivement plus flexibles dans la gestion des erreurs parce que vous pouvez utiliser le bon vieux try-catch . Vous bénéficiez d'un contrôle de la portée prêt à l'emploi qui délimite clairement et visuellement ce que vous surveillez. Vous pouvez imbriquer ces blocs et coder des modèles complexes de gestion des erreurs qui restent faciles à comprendre. D'un point de vue syntaxique, une bibliothèque basée sur des fonctions d'ordre supérieur ne peut utiliser que la chaîne de méthodes. Les coroutines disposent de tout le langage.

75voto

Daniele Segato Points 1281

Je connais très bien RxJava et je suis récemment passé à Kotlin Coroutines et Flow.

RxKotlin est fondamentalement la même chose que RxJava, il ajoute juste un peu de sucre syntaxique pour rendre plus confortable / idiomatique l'écriture de code RxJava en Kotlin.

Une comparaison "équitable" entre les coroutines de RxJava et de Kotlin devrait inclure Flow dans le mélange et je vais essayer d'expliquer pourquoi ici. Ce sera un peu long mais je vais essayer de rester aussi simple que possible avec des exemples.

Avec RxJava, vous disposez de différents objets (depuis la version 2) :

// 0-n events without backpressure management
fun observeEventsA(): Observable<String>

// 0-n events with explicit backpressure management
fun observeEventsB(): Flowable<String>

// exactly 1 event
fun encrypt(original: String): Single<String>

// 0-1 events
fun cached(key: String): Maybe<MyData>

// just completes with no specific results
fun syncPending(): Completable

En kotlin coroutines + flow vous n'avez pas besoin de beaucoup d'entités car si vous n'avez pas un flux d'événements vous pouvez juste utiliser des coroutines simples (fonctions de suspension) :

// 0-n events, the backpressure is automatically taken care off
fun observeEvents(): Flow<String>

// exactly 1 event
suspend fun encrypt(original: String): String

// 0-1 events
suspend fun cached(key: String): MyData?

// just completes with no specific results
suspend fun syncPending()

Bonus : support de Kotlin Flow / Coroutines null (support supprimé avec RxJava 2)

Suspension des fonctions sont ce que leur nom laisse entendre : ce sont des fonctions qui peuvent mettre en pause l'exécution du code et la reprendre plus tard lorsque la fonction est terminée ; cela vous permet d'écrire du code qui semble plus naturel.

Et les opérateurs ?

Avec RxJava, vous avez tellement d'opérateurs ( map , filter , flatMap , switchMap ...), et pour la plupart d'entre eux, il existe une version pour chaque type d'entité ( Single.map() , Observable.map() , ...).

Kotlin Coroutines + Flow n'ont pas besoin d'autant d'opérateurs Nous allons voir pourquoi à l'aide de quelques exemples sur les opérateurs les plus courants.

map()

RxJava :

fun getPerson(id: String): Single<Person>
fun observePersons(): Observable<Person>

fun getPersonName(id: String): Single<String> {
  return getPerson(id)
     .map { it.firstName }
}

fun observePersonsNames(): Observable<String> {
  return observePersons()
     .map { it.firstName }
}

Coroutines Kotlin + Flow

suspend fun getPerson(id: String): Person
fun observePersons(): Flow<Person>

suspend fun getPersonName(id: String): String? {
  return getPerson(id).firstName
}

fun observePersonsNames(): Flow<String> {
  return observePersons()
     .map { it.firstName }
}

Vous n'avez pas besoin d'opérateur pour le cas "simple" et c'est assez similaire pour le cas "simple". Flow cas.

flatMap()

Le site flatMap opérateur et ses frères et sœurs switchMap , contactMap existe pour vous permettre de combiner différents objets RxJava et donc d'exécuter du code potentiellement asynchrone tout en mappant vos événements.

Disons que vous avez besoin, pour chaque personne, de récupérer dans une base de données (ou un service distant) son assurance

RxJava

fun fetchInsurance(insuranceId: String): Single<Insurance>

fun getPersonInsurance(id: String): Single<Insurance> {
  return getPerson(id)
    .flatMap { person ->
      fetchInsurance(person.insuranceId)
    }
}

fun observePersonsInsurances(): Observable<Insurance> {
  return observePersons()
    .flatMap { person ->
      fetchInsurance(person.insuranceId) // this is a Single
          .toObservable() // flatMap expect an Observable
    }
}

Voyons avec Kotlin Coroutiens + Flow

suspend fun fetchInsurance(insuranceId: String): Insurance

suspend fun getPersonInsurance(id: String): Insurance {
  val person = getPerson(id)
  return fetchInsurance(person.insuranceId)
}

fun observePersonsInsurances(): Flow<Insurance> {
  return observePersons()
    .map { person ->
      fetchInsurance(person.insuranceId)
    }
}

Comme précédemment, dans le cas de la coroutine simple, nous n'avons pas besoin d'opérateurs, nous écrivons simplement le code comme nous le ferions s'il n'était pas asynchrone, en utilisant simplement des fonctions de suspension.

Et avec Flow ce n'est PAS une faute de frappe, il n'y a pas besoin d'une flatMap nous pouvons simplement utiliser map . Et la raison est que map lambda est une fonction suspensive ! On peut y exécuter du code suspensif ! !!

Nous n'avons pas besoin d'un autre opérateur juste pour ça.

J'ai un peu triché ici.

Rx flatMap , switchMap y concatMap se comportent légèrement différemment. Rx flatMap générer un nouveau flux pour chaque événement, puis les fusionner tous ensemble : l'ordre des nouveaux flux d'événements que vous recevez en sortie est indéterminé, il peut ne pas correspondre à l'ordre ou aux événements en entrée

Rx concatMap "corrige cela et garantit que vous obtiendrez chaque nouveau flux dans le même ordre que vos événements d'entrée.

Rx switchMap se débarrassera de tout flux précédemment en cours d'exécution lorsqu'il recevra de nouveaux événements, seule la dernière entrée reçue compte avec cet opérateur.

Donc vous voyez, ce n'est pas vrai que Flow.map est le même, il est en fait plus similaire à Rx concatMap ce qui est le comportement plus naturel que l'on attend d'un opérateur de carte.

Mais il est vrai que vous avez besoin de moins d'opérateurs, à l'intérieur d'une carte vous pouvez faire n'importe quelle opération asynchrone que vous voulez et reproduire le comportement de flatMap car il s'agit d'une fonction suspendable. L'opérateur équivalent réel à RxJava flatMap es Flow.flatMapMerge opérateur.

L'équivalent du RxJava switchMap peut être réalisé dans Flow en utilisant le conflate() avant l'opérateur map opérateur.

Pour les choses plus complexes, vous pouvez utiliser le flux transform() qui, pour chaque événement, émet un flux de votre choix.

Chaque opérateur de débit accepte une fonction de suspension !

Dans le paragraphe précédent, je vous ai dit J'ai triché . Mais la clé de ce que je voulais dire par Les flux n'ont pas besoin d'autant d'opérateurs est que la plupart des callbacks de l'opérateur sont des fonctions suspensives.

Disons que vous devez filter() mais votre filtre doit effectuer un appel réseau pour savoir si vous devez garder la valeur ou non, avec RxJava vous devez combiner plusieurs opérateurs avec un code illisible, avec Flow vous pouvez simplement utiliser filter() !

fun observePersonsWithValidInsurance(): Flow<Person> {
  return observerPersons()
    .filter { person ->
        val insurance = fetchInsurance(person.insuranceId) // suspending call
        insurance.isValid()
    }
}

delay(), startWith(), concatWith(), ...

Dans RxJava, vous disposez de nombreux opérateurs pour appliquer un délai ou ajouter des éléments avant et après :

  • delay()
  • delaySubscription()
  • startWith(T)
  • startWith(Observable)
  • concatWith(...)

avec kotlin Flow, vous pouvez simplement :

grabMyFlow()
  .onStart {
    // delay by 3 seconds before starting
    delay(3000L)
    // just emitting an item first
    emit("First item!")
    emit(cachedItem()) // call another suspending function and emit the result
  }
  .onEach { value ->
    // insert a delay of 1 second after a value only on some condition
    if (value.length() > 5) {
      delay(1000L)
    }
  }
  .onCompletion {
    val endingSequence: Flow<String> = grabEndingSequence()
    emitAll(endingSequence)
  }

gestion des erreurs

RxJava a beaucoup d'opérateurs pour gérer les erreurs :

  • onErrorResumeWith()
  • onErrorReturn()
  • onErrorComplete()

Avec Flow, vous n'avez pas besoin de beaucoup plus que l'opérateur. catch() :

  grabMyFlow()
    .catch { error ->
       // emit something from the flow
       emit("We got an error: $error.message")
       // then if we can recover from this error emit it
       if (error is RecoverableError) {
          // error.recover() here is supposed to return a Flow<> to recover
          emitAll(error.recover())
       } else {
          // re-throw the error if we can't recover (aka = don't catch it)
          throw error
       }
    }

et avec la fonction de suspension, vous pouvez simplement utiliser try {} catch() {} .

Vous pouvez réaliser TOUS les opérateurs d'erreur de RxJava avec un seul catch car vous obtenez une fonction de suspension.

facile à écrire Opérateurs de flux

Grâce aux coroutines qui alimentent Flow sous le capot, il est beaucoup plus facile d'écrire des opérateurs. Si vous avez déjà écrit un opérateur RxJava, vous verrez à quel point c'est difficile et combien de choses vous devez apprendre.

L'écriture des opérateurs Kotlin Flow est plus facile, vous pouvez vous faire une idée en regardant le code source des opérateurs qui font déjà partie de Flow. aquí . La raison en est que les coroutines facilitent l'écriture de code asynchrone et que les opérateurs semblent plus naturels à utiliser.

En prime, les opérateurs de débit sont tous des fonctions d'extension kotlin, ce qui signifie que vous, ou les bibliothèques, pouvez facilement ajouter des opérateurs et qu'ils ne sembleront pas bizarres à utiliser (dans RxJava observable.lift() o observable.compose() sont nécessaires pour combiner les opérateurs personnalisés).

Le fil amont ne fuit pas en aval

Qu'est-ce que cela signifie ?

Ceci explique pourquoi dans RxJava vous avez subscribeOn() y observeOn() alors que dans Flow vous avez seulement flowOn() .

Prenons l'exemple de RxJava :

urlsToCall()
  .switchMap { url ->
    if (url.scheme == "local") {
       val data = grabFromMemory(url.path)
       Flowable.just(data)
    } else {
       performNetworkCall(url)
        .subscribeOn(Subscribers.io())
        .toObservable()
    }
  }
  .subscribe {
    // in which thread is this call executed?
  }

Alors où est le rappel dans subscribe exécuté ?

La réponse est :

dépend...

si elle provient du réseau, elle est dans un thread IO ; si elle provient de l'autre branche, elle est indéfinie, cela dépend du thread utilisé pour envoyer l'url.

Si vous y pensez, tout code que vous écrivez, vous ne savez pas dans quel thread il va être exécuté : il dépend toujours de l'appelant. Le problème ici est que le Thread ne dépend plus de l'appelant, il dépend de ce que fait un appel de fonction interne.

Supposons que vous ayez ce code standard :

fun callUrl(url: Uri) {
  val callResult = if (url.scheme == "local") {
    grabFromMemory(url.path)
  } else {
    performNetworkCall(url)
  }
  return callResult
}

Imaginez que vous n'ayez aucun moyen de savoir dans quel fil la ligne return callResult est exécuté sans regarder à l'intérieur grabFromMemory() y performNetworkCall() .

Réfléchissez-y une seconde : le fil de discussion change en fonction de la fonction que vous appelez et de ce qu'elle fait à l'intérieur.

Cela arrive tout le temps avec les API de callbacks : vous n'avez aucun moyen de savoir dans quel thread le callback que vous fournissez sera exécuté, à moins qu'il ne soit documenté.

C'est le concept de "fuite du fil amont vers l'aval".

Avec Flow et les Coroutines, ce n'est pas le cas, à moins que vous ne demandiez explicitement ce comportement (à l'aide de Dispatchers.Unconfined ).

suspend fun myFunction() {
  // execute this coroutine body in the main thread
  withContext(Dispatchers.Main) {
    urlsToCall()
      .conflate() // to achieve the effect of switchMap
      .transform { url ->
        if (url.scheme == "local") {
           val data = grabFromMemory(url.path)
           emit(data)
        } else {
           withContext(Dispatchers.IO) {
             performNetworkCall(url)
           }
        }
      }
      .collect {
        // this will always execute in the main thread
        // because this is where we collect,
        // inside withContext(Dispatchers.Main)
      }
  }
}

Le code des coroutines s'exécutera dans le contexte dans lequel il a été exécuté. Et seule la partie avec l'appel réseau sera exécutée sur le thread IO, tandis que tout le reste que nous voyons ici sera exécuté sur le thread principal.

Eh bien, en fait, nous ne savons pas où le code à l'intérieur grabFromMemory() s'exécutera, mais nous ne nous en soucions pas : nous savons qu'elle sera appelée à l'intérieur du thread principal, à l'intérieur de cette fonction de suspension, nous pourrions utiliser un autre Dispatcher, mais nous savons quand il reviendra avec le résultat. val data ce sera à nouveau dans le fil principal.

Cela signifie que, en regardant un morceau de code, il est plus facile de dire dans quel thread il sera exécuté, si vous voyez un dispatcher explicite = c'est ce dispatcher, si vous ne le voyez pas : dans n'importe quel dispatcher de thread l'appel de suspension que vous regardez est appelé.

Concurrence structurée

Ce n'est pas un concept inventé par Kotlin, mais c'est quelque chose qu'ils ont adopté plus que tout autre langage que je connais.

Si ce que j'explique ici n'est pas suffisant pour toi, lis cet article ou regarder cette vidéo .

Alors, qu'est-ce que c'est ?

Avec RxJava, vous vous abonnez à des observables, et ils vous donnent une Disposable objet.

Vous devez prendre soin de vous en débarrasser quand vous n'en avez plus besoin. Donc, ce que vous faites habituellement, c'est de garder une référence à ce fichier (ou de le placer dans un fichier de type CompositeDisposable ) pour appeler plus tard dispose() sur elle quand elle n'est plus nécessaire. Si vous ne le faites pas, le linter vous donnera un avertissement.

RxJava est un peu plus agréable qu'un fil traditionnel. Lorsque vous créez un nouveau thread et que vous exécutez quelque chose dessus, c'est un "feu et oublie", vous n'avez même pas la possibilité de l'annuler : Thread.stop() est déprécié, nuisible, et les implémentations récentes ne font rien. Thread.interrupt() fait échouer votre fil, etc. Toute exception est perdue vous voyez le tableau.

Avec les coroutines et le flow de Kotlin, le concept de "jetable" est inversé. Vous NE POUVEZ PAS créer une coroutine sans un fichier CoroutineContext .

Ce contexte définit le scope de votre coroutine. Toutes les coroutines enfants créées à l'intérieur de celle-ci partageront la même portée.

Si vous vous abonnez à un flux, vous devez être à l'intérieur d'une coroutine ou fournir une portée également.

Vous pouvez toujours garder la référence des coroutines que vous lancez ( Job ) et les annuler. Cela annulera automatiquement tous les enfants de cette coroutine.

Si vous êtes un développeur Android, ces champs d'application vous sont attribués automatiquement. Exemple : viewModelScope et vous pouvez lancer des coroutines à l'intérieur d'un viewModel avec cette portée en sachant qu'elles seront automatiquement annulées lorsque le viewmodel sera effacé.

viewModelScope.launch {
  // my coroutine here
}

Certaines portées se termineront si l'un des enfants échoue, d'autres laisseront chaque enfant quitter son propre cycle de vie sans arrêter les autres enfants si l'un d'eux échoue ( SupervisedJob ).

Pourquoi est-ce une bonne chose ?

Je vais essayer de l'expliquer comme suit Roman Elizarov a fait.

Certains anciens langages de programmation avaient ce concept de goto qui vous permettent de passer d'une ligne de code à une autre à volonté.

Très puissant, mais si on en abuse, on peut se retrouver avec un code très difficile à comprendre, difficile à déboguer et à raisonner.

Les nouveaux langages de programmation ont donc fini par le supprimer complètement du langage.

Lorsque vous utilisez if o while o when il est beaucoup plus facile de raisonner sur le code : peu importe ce qui se passe à l'intérieur de ces blocs, vous finirez par en sortir, c'est un "contexte", vous n'avez pas de sauts bizarres pour entrer et sortir.

Lancer un thread ou s'abonner à un observable de RxJava est similaire au goto : vous exécutez du code qui va continuer jusqu'à ce que l'" ailleurs " soit arrêté.

Avec les coroutines, en exigeant que vous fournissiez un contexte/un champ d'application, vous savez que lorsque votre champ d'application est terminé, tout ce qui se trouve à l'intérieur de ces coroutines se terminera lorsque votre contexte se terminera, peu importe que vous ayez une seule coroutine ou 10 000.

Vous pouvez toujours "goto" avec les coroutines en utilisant GlobalScope ce que vous ne devriez pas faire pour la même raison que vous ne devriez pas utiliser goto dans les langues qui le prévoient.

Froid vs Chaud - ShareFlow et StateFlow

Lorsque nous travaillons avec des flux réactifs, nous avons toujours ce concept de flux froid et de flux chaud. Ce sont des concepts à la fois dans le monde Rx et dans les flux Kotlin.

Froid Les flux sont comme une fonction dans notre code : ils sont là et ne font rien tant que vous ne les appelez pas. Avec un flux, cela signifie que ce que fait le flux est défini, mais qu'il ne fera rien tant que vous ne commencerez pas à le collecter. Et, comme une fonction, si vous le collectez (l'appelez) deux fois, le flux s'exécutera deux fois (par exemple, un flux froid pour exécuter une requête http exécutera la requête deux fois s'il est collecté deux fois).

Hot Les flux ne fonctionnent pas comme ça. Lorsque vous faites appel à plusieurs collectes, elles partagent toutes le même flux chaud sous le capot, ce qui signifie que votre flux chaud s'exécute une fois et que vous pouvez avoir plusieurs observateurs.

Vous pouvez généralement transformer un flux froid en flux chaud avec un certain opérateur.

Sur RxJava vous pouvez utiliser ce concept de Connectable Observable/Flowable.

val coldObservable: Observable<Something> = buildColdObservable()

// create an hot observable from the cold one
val connectableObservable: ConnectableObservable<Something> = coldObservable.publish()

// you can subscribe multiple times to this connectable
val subADisposable: Disposable = connectableObservable.subscribe(subscriberA)
val subBDisposable: Disposable = connectableObservable.subscribe(subscriberB)

// but nothing will be emitted there until you call
val hotDisposable: Disposable = connectableObservable.connect()

// which actually run the cold observable and share the result on bot subscriberA and subscriberB

// while it's active another one can start listening to it
val subCDisposable: Disposable = connectableObservable.subscribe(subscriberC)

Vous disposez également d'autres opérateurs utiles comme refCount() o autoConnect() qui renvoient le Connectable dans un flux standard et sous le capot automatiquement .connect() lorsque le premier abonné est attaché.

buildColdObservable()
   .replay(1) // when a new subscriber is attached receive the last data instantly
   .autoConnect() // keep the cold observable alive while there's some subscriber

Sur le flux vous avez le shareIn() et le stateIn() opérateurs. Vous pouvez voir la conception de l'API aquí . Ils sont moins "manuels" dans la manipulation lorsque vous vous "connectez".

buildColdFlow()
  .shareIn(
    // you need to specify a scope for the cold flow subscription
    scope = myScope,
    // when to "connect"
    started = SharingStarted.WhileSubscribed(),
    // how many events already emitted should be sent to new subscribers
    replay = 1,
  )

portée

Le champ d'application est celui de la concurrence structurée. Sur RxJava, c'est le connect() qui s'abonnent à l'observable froid, cela vous donne une Disposable vous devrez appeler .dispose() sur quelque part. Si vous utilisez refCount() o autoConnect() il est appelé sur le premier abonné et avec refCount() n'est jamais éliminé alors qu'avec autoConnect() est éliminé lorsqu'il n'y a plus d'abonnés.

Avec Flow, vous devez donner un scope dédié pour collecter le flux froid, si vous annulez ce scope, le flux froid cessera d'émettre et ne sera plus utilisable.

a commencé

Alors celui-là est facile

  • RxJava refCount() --> Flux SharingStarted.Lazily , commence à collecter sur le premier abonné
  • RxJava autoConnect() -> Flux SharingStarted.WhileSubscribed() commence à collecter sur le premier abonné et l'annule quand il n'y en a plus.
  • Appel RxJava connect() manuellement avant tout abonnement -> Flux SharingStarted.Eagerly() commence à collecter immédiatement

Le site WhileSubscribed() a des paramètres utiles, Vérifiez-les .

Vous pouvez également définir votre propre logique pour SharingStarted à gérer lors de la collecte du coldFlow.

Comportement et contre-pression

Lorsque vous avez un observable chaud, vous devez toujours faire face à des problèmes de contre-pression. Une source de données étant écoutée par de nombreuses personnes, un auditeur peut être plus lent que les autres.

Débit .shareIn collecte le flux froid dans une coroutine dédiée et émission de tampon par défaut. Cela signifie que si le flux froid est émis trop rapidement, le tampon sera utilisé. Vous pouvez changer ce comportement.

Kotlin SharedFlow vous permettent également d'accéder directement au tampon de relecture pour inspecter l'émission précédente si nécessaire.

La résiliation d'un abonné n'aura aucun effet sur le flux partagé.

en utilisant flowOn() pour modifier le Dispatcher sur l'abonné n'aura pas d'effet sur le flux partagé (utilisez la fonction flowOn() avant de partager si vous devez exécuter le flux froid dans un répartiteur spécifique)

étatIn

Flow dispose d'une version "spéciale" de ShareFlow qui s'appelle StateFlow et vous pouvez utiliser stateIn() pour en créer un à partir d'un autre flux.

A StateFlow a toujours une valeur, il ne peut pas être "vide", donc vous devez fournir la valeur initiale lorsque vous faites stateIn() .

A StateFlow ne peut jamais lancer d'exceptions et ne peut jamais se terminer (de cette manière, il est similaire à l'option BehaviorRelay dans la bibliothèque RxRelay)

A StateFlow n'émettra que si l'état change (c'est comme s'il y avait une fonction intégrée de distinctUntilChanged() .

Sujets RxJava vs Mutable*Flow A Subject dans RxJava est une classe que vous pouvez utiliser pour y pousser manuellement vos données tout en l'utilisant comme un flux.

Dans Flow, vous pouvez utiliser MutableSharedFlow o MutableStateFlow pour obtenir un effet similaire.

Avec les coroutines de Kotlin, vous pouvez également utiliser Channels mais elles sont considérées comme des API de niveau inférieur.

Des inconvénients ?

Flow est toujours en cours de développement et certaines fonctionnalités disponibles dans RxJava pourraient être marquées comme expérimentales dans Kotlin Coroutines Flow ou présenter quelques différences ici et là.

Il se peut qu'un opérateur ou une fonction d'opérateur de niche ne soit pas encore implémenté et que vous deviez l'implémenter vous-même (c'est au moins plus facile).

Mais à part cela, il n'y a pas d'inconvénients que je connaisse.

Cependant, il existe des différences dont il faut être conscient et qui pourraient causer des frictions lors du passage de RxJava et vous obliger à apprendre de nouvelles choses.

La concurrence structurée est un progrès, mais elle introduit de nouveaux concepts qu'il faut apprendre et auxquels il faut s'habituer (scopes, supervisorJob) : l'annulation est gérée de manière complètement différente.

Il y a un certain nombre de pièges à éviter.

Gotcha : Exception d'annulation

Si vous cancel() dans une coroutine ou throw CancellationException() l'exception est propagée aux coroutines parentes, à moins que vous n'ayez utilisé un scope / job Supervisor. La coroutine parent annule également les coroutines sœurs de celle qui a été annulée si cela se produit.

MAIS si vous catch(e: Exception) même en utilisant runCatching {} vous devez vous rappeler de relancer CancellationException() sinon vous aurez des résultats inattendus car la coroutine a été annulée mais votre code essaie toujours de s'exécuter comme s'il ne l'était pas.

Gotcha : UncaughtExceptionHandler

si vous le faites launch { ... } pour créer une nouvelle coroutine et cette coroutine se lance, par défaut Cela mettra fin à la coroutine mais ne fera pas planter l'application et vous pourriez ne pas voir que quelque chose s'est mal passé.

Ce code ne fera pas planter votre application.

launch {
  throw RuntimeException()
}

Dans certains cas, il se peut même que le journal ne contienne rien.

S'il s'agit d'une exception d'annulation, rien ne sera imprimé dans le journal.

6voto

Davide Bertola Points 41

L'exposé/doc que vous avez cité ne parle pas des canaux. Les canaux sont ce qui comble le fossé entre votre compréhension actuelle des coroutines et de la programmation événementielle.

Avec les coroutines et les canaux, vous pouvez faire de la programmation événementielle comme vous avez probablement l'habitude de le faire avec rx, mais vous pouvez le faire avec un code d'apparence synchrone et sans autant d'opérateurs "personnalisés".

Si vous voulez mieux comprendre, je vous suggère de regarder en dehors de Kotlin, où ces concepts sont plus matures et raffinés (et non expérimentaux). Regardez core.async de Clojure, vidéos de Rich Hickey, posts et discussions connexes.

3voto

Samuel Urbanowicz Points 477

Les coroutines sont conçues pour fournir un cadre de programmation asynchrone léger. Léger en termes de ressources nécessaires pour lancer le travail asynchrone. Les coroutines n'obligent pas à utiliser une API externe et sont plus naturelles pour les utilisateurs (programmeurs). En revanche, RxJava + RxKotlin possède un paquetage de traitement de données supplémentaire qui n'est pas vraiment nécessaire dans Kotlin qui possède une API très riche dans la bibliothèque standard pour le traitement des séquences et des collections.

Si vous souhaitez en savoir plus sur l'utilisation pratique des coroutines sur Android, je vous recommande mon article : https://www.netguru.com/codestories/Android-coroutines-%EF%B8%8Fin-2020

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X