40 votes

RxJava2 observable take throws UndeliverableException

Si je comprends bien RxJava2 values.take(1) crée un autre Observable qui ne contient qu'un seul élément de l'Observable original. Quel est le site NE DOIT PAS lancer une exception car il est filtré par l'effet de take(1) comme c'est arrivé la seconde fois.

dans le cas du suivant extrait de code

    Observable<Integer> values = Observable.create(o -> {
        o.onNext(1);
        o.onError(new Exception("Oops"));
    });

    values.take(1)
            .subscribe(
                    System.out::println,
                    e -> System.out.println("Error: " + e.getMessage()),
                    () -> System.out.println("Completed")
            );

Sortie

1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more

Mes questions :

  1. Est-ce que je comprends bien ?
  2. Ce qui se passe réellement pour causer l'exception.
  3. Comment résoudre ce problème du point de vue du consommateur ?

0 votes

Voir FlowableEmitter.tryOnError et les méthodes similaires depuis 2.1.1 .

59voto

Kiskae Points 11240
  1. Oui, mais le fait que l'observable "se termine" ne signifie pas que le code qui s'exécute à l'intérieur create(...) est arrêté. Pour être totalement sûr dans ce cas, vous devez utiliser o.isDisposed() pour voir si l'observable s'est terminé en aval.
  2. L'exception est là parce que RxJava 2 a pour politique de ne JAMAIS autoriser un fichier de type onError l'appel à être perdu. Il est soit livré en aval, soit jeté en tant qu'événement global. UndeliverableException si l'observable s'est déjà terminé. C'est au créateur de l'observable de gérer "correctement" le cas où l'observable s'est terminé et où une exception se produit.
  3. Le problème est que le producteur ( Observable ) et le consommateur ( Subscriber ) ne sont pas d'accord sur le moment où le flux se termine. Puisque le producteur dépasse le consommateur dans ce cas, le problème ne peut être résolu que chez le producteur.

6 votes

Serait if (!o.isDisposed()) { o.onError(new Exception("Oops")); } être une manière correcte de gérer cela ?

2 votes

S'il est acceptable que cette exception soit perdue dans le cas où l'observable n'est plus observé, alors oui. Si l'exception doit vraiment aller quelque part, elle doit être appelée sans condition.

0 votes

@Kiskae Y a-t-il un moyen de gérer cela depuis le consommateur ?

20voto

Ilia Kurtov Points 171

@Kiskae dans le commentaire précédent a correctement répondu sur la raison pour laquelle une telle exception peut se produire.

Voici le lien vers la documentation officielle sur ce thème : RxJava2-wiki .

Parfois, il n'est pas possible de modifier ce comportement ; il existe donc un moyen de gérer cette situation. UndeliverableException 's. Voici un extrait de code qui montre comment éviter les plantages et les mauvais comportements :

RxJavaPlugins.setErrorHandler(e -> {
    if (e instanceof UndeliverableException) {
        e = e.getCause();
    }
    if ((e instanceof IOException) || (e instanceof SocketException)) {
        // fine, irrelevant network problem or API that throws on cancellation
        return;
    }
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
        // that's likely a bug in the application
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    if (e instanceof IllegalStateException) {
        // that's a bug in RxJava or in a custom operator
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    Log.warning("Undeliverable exception received, not sure what to do", e);
});

Ce code est extrait du lien ci-dessus.

Remarque importante. Cette approche définit un gestionnaire d'erreurs global pour RxJava, donc si vous pouvez vous débarrasser de ces exceptions, ce serait la meilleure option.

0 votes

Je n'arrive pas à me débarrasser de ces exceptions et j'ai besoin de certaines données à partir de ce RX, mais lorsque cette erreur se produit, je ne peux pas obtenir mes données à partir de Json que dois-je faire maintenant? :(

0 votes

Lorsque je fais une demande de réseau à l'intérieur de Observable.create mon subscriber n'est pas éliminé au début de l'appel au réseau et l'est déjà lorsque j'obtiens une réponse à l'appel. Existe-t-il un moyen de ne pas obtenir InterruptedException en RxJavaPlugins.setErrorHandler ?

3voto

Sanan Points 11

Kotlin

J'appelle ceci dans la méthode onCreate de MainActivity

private fun initRxErrorHandler(){
    RxJavaPlugins.setErrorHandler { throwable ->
        if (throwable is UndeliverableException) {
            throwable.cause?.let {
                Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), it)
                return@setErrorHandler
            }
        }
        if (throwable is IOException || throwable is SocketException) {
            // fine, irrelevant network problem or API that throws on cancellation
            return@setErrorHandler
        }
        if (throwable is InterruptedException) {
            // fine, some blocking code was interrupted by a dispose call
            return@setErrorHandler
        }
        if (throwable is NullPointerException || throwable is IllegalArgumentException) {
            // that's likely a bug in the application
            Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
            return@setErrorHandler
        }
        if (throwable is IllegalStateException) {
            // that's a bug in RxJava or in a custom operator
            Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
            return@setErrorHandler
        }
        Log.w("Undeliverable exception", throwable)
    }
}

0voto

Ayush Jain Points 111

Si vous utilisez observable.create(), utilisez simplement tryOnError(). onError() ne garantit pas que l'erreur sera traitée. Il existe plusieurs opérateurs de traitement des erreurs sont là ICI

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X