2 votes

Comment attendre le flowable onComplete ?

Dans RxJava2, voici mon code :

public void myMethod() {
Flowable.create(e->{
  // do sth.
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.subscribe(new Subscriber<ContentsWithChannelVO>() {
  @Override
  public void onSubscribe(Subscription subscription) { // do sth. }

  @Override
  public void onNext(ContentsWithChannelVO contentAndChannel) { // do sth. }

  @Override
  public void onError(Throwable throwable) { // do sth. }

  @Override
  public void onComplete() { // do sth. }
});
doSomething();
}

Voici un problème de synchronisation.

Je veux savoir comment faire en sorte que ma méthode attende que la méthode flowable onComplete, c'est-à-dire doSomething() sera appelé après le flowable onCompleted.

Je l'ai cherché mais cela ne m'aide pas.

2voto

Vang Points 843

Vous pouvez utiliser concat mais vous devez envelopper votre méthode dans un autre observable.

Concat attend de s'abonner à chaque Observable supplémentaire que vous lui passez. jusqu'à ce que l'Observable précédent soit terminé.

1voto

kd304 Points 8369

Il faudrait pour cela bloquer la consommation du flux (ce qui n'est généralement pas recommandé) :

public void myMethod() {
    Flowable.create(e -> {
       // do sth.
    }, BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.computation(), false)
    .blockingSubscribe(new Subscriber<ContentsWithChannelVO>() {
        @Override
        public void onSubscribe(Subscription subscription) { // do sth. }

        @Override
        public void onNext(ContentsWithChannelVO contentAndChannel) { // do sth. }

        @Override
        public void onError(Throwable throwable) { // do sth. }

        @Override
        public void onComplete() { // do sth. }
    });

    doSomething();
}

Dans ce cas, avoir observeOn est inutile. Notez également que vous devez utiliser subscribeOn() con false sinon le flux se bloque.

Si vous ne voulez pas exécuter doSomething si la source échoue, utilisez blockingForEach :

public void myMethod() {
    Flowable.create(e -> {
       // do sth.
    }, BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.computation(), false)
    .blockingForEach(contentAndChannel -> { /* do sth. */ });

    doSomething();
}

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X