rxjava:我可以使用重试()但延迟?
我在我的Android应用程序中使用rxjava来asynchronous处理networking请求。 现在我只想在经过一段时间后再试一次失败的networking请求。
有什么办法可以在Observable上使用retry(),但是只有在某个延迟之后才能重试。
有没有办法让Observable知道目前正在重试(而不是第一次尝试)?
我看了debounce()/ throttleWithTimeout(),但他们似乎在做一些不同的事情。
编辑:
我想我find了一种方法来做到这一点,但是我有兴趣确认这是正确的做法或者其他更好的方法。
我在做的是这样的:在Observable.OnSubscribe的call()方法中,在调用Subscribers onError()方法之前,我只需让Threadhibernate所需的时间。 所以,每1000毫秒重试,我做这样的事情:
@Override public void call(Subscriber<? super List<ProductNode>> subscriber) { try { Log.d(TAG, "trying to load all products with pid: " + pid); subscriber.onNext(productClient.getProductNodesForParentId(pid)); subscriber.onCompleted(); } catch (Exception e) { try { Thread.sleep(1000); } catch (InterruptedException e1) { e.printStackTrace(); } subscriber.onError(e); } }
由于这个方法在IO线程上运行,所以不会阻塞UI。 我能看到的唯一问题是即使是第一个错误也会延迟报告,所以即使没有重试(),延迟也是存在的。 如果在发生错误之后没有应用延迟,而是在重试之前 (但不是在第一次尝试之前,显然),我希望它更好。
您可以使用retryWhen()
运算符向任何Observable添加重试逻辑。
以下类包含重试逻辑:
RxJava 2.x
public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> { private final int maxRetries; private final int retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { this.maxRetries = maxRetries; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Observable<?> apply(final Observable<? extends Throwable> attempts) { return attempts .flatMap(new Function<Throwable, Observable<?>>() { @Override public Observable<?> apply(final Throwable throwable) { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (ie re-subscribed). return Observable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Observable.error(throwable); } }); } }
RxJava 1.x
public class RetryWithDelay implements Func1<Observable<? extends Throwable>, Observable<?>> { private final int maxRetries; private final int retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { this.maxRetries = maxRetries; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Observable<?> call(Observable<? extends Throwable> attempts) { return attempts .flatMap(new Func1<Throwable, Observable<?>>() { @Override public Observable<?> call(Throwable throwable) { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (ie re-subscribed). return Observable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Observable.error(throwable); } }); } }
用法:
// Add retry logic to existing observable. // Retry max of 3 times with a delay of 2 seconds. observable .retryWhen(new RetryWithDelay(3, 2000));
这是一个基于Ben Christensen的代码片段的解决scheme, RetryWhen Example和RetryWhenTestsConditional (我不得不将n.getThrowable()
更改为n
)。 我使用evant / gradle-retrolambda在Android上使lambda表示法工作,但是您不必使用lambdaexpression式(虽然强烈推荐)。 对于延迟,我实现了指数退避,但是可以插入你想要的退避逻辑。 为了完整性,我添加了subscribeOn
和observeOn
操作符。 我为AndroidSchedulers.mainThread()
使用ReactiveX / RxAndroid 。
int ATTEMPT_COUNT = 10; public class Tuple<X, Y> { public final X x; public final Y y; public Tuple(X x, Y y) { this.x = x; this.y = y; } } observable .subscribeOn(Schedulers.io()) .retryWhen( attempts -> { return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i)) .flatMap( ni -> { if (ni.y > ATTEMPT_COUNT) return Observable.error(ni.x); return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS); }); }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber);
而不是使用MyRequestObservable.retry我使用包装函数retryObservable(MyRequestObservable,retrycount,秒),它返回一个新的Observable处理间接的延迟,所以我可以做
retryObservable(restApi.getObservableStuff(), 3, 30) .subscribe(new Action1<BonusIndividualList>(){ @Override public void call(BonusIndividualList arg0) { //success! } }, new Action1<Throwable>(){ @Override public void call(Throwable arg0) { // failed after the 3 retries ! }}); // wrapper code private static <T> Observable<T> retryObservable( final Observable<T> requestObservable, final int nbRetry, final long seconds) { return Observable.create(new Observable.OnSubscribe<T>() { @Override public void call(final Subscriber<? super T> subscriber) { requestObservable.subscribe(new Action1<T>() { @Override public void call(T arg0) { subscriber.onNext(arg0); subscriber.onCompleted(); } }, new Action1<Throwable>() { @Override public void call(Throwable error) { if (nbRetry > 0) { Observable.just(requestObservable) .delay(seconds, TimeUnit.SECONDS) .observeOn(mainThread()) .subscribe(new Action1<Observable<T>>(){ @Override public void call(Observable<T> observable){ retryObservable(observable, nbRetry - 1, seconds) .subscribe(subscriber); } }); } else { // still fail after retries subscriber.onError(error); } } }); } }); }
现在使用RxJava版本1.0+,你可以使用zipWith来延迟重试。
添加修改kjones的答案。
改性
public class RetryWithDelay implements Func1<Observable<? extends Throwable>, Observable<?>> { private final int MAX_RETRIES; private final int DELAY_DURATION; private final int START_RETRY; /** * Provide number of retries and seconds to be delayed between retry. * * @param maxRetries Number of retries. * @param delayDurationInSeconds Seconds to be delays in each retry. */ public RetryWithDelay(int maxRetries, int delayDurationInSeconds) { MAX_RETRIES = maxRetries; DELAY_DURATION = delayDurationInSeconds; START_RETRY = 1; } @Override public Observable<?> call(Observable<? extends Throwable> observable) { return observable .delay(DELAY_DURATION, TimeUnit.SECONDS) .zipWith(Observable.range(START_RETRY, MAX_RETRIES), new Func2<Throwable, Integer, Integer>() { @Override public Integer call(Throwable throwable, Integer attempt) { return attempt; } }); } }
受保罗的答复启发,如果你不担心重retryWhen
Abhijit Sarkar retryWhen
问题,推迟与rxJava2无条件地重新订阅的最简单的方法是:
source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))
您可能希望看到更多的样本和重试时的解释时和重复时 。
您可以在RetryWhen操作符中返回的Observable中添加延迟
/** * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated */ @Test public void observableOnErrorResumeNext() { Subscription subscription = Observable.just(null) .map(Object::toString) .doOnError(failure -> System.out.println("Error:" + failure.getCause())) .retryWhen(errors -> errors.doOnNext(o -> count++) .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)), Schedulers.newThread()) .onErrorResumeNext(t -> { System.out.println("Error after all retries:" + t.getCause()); return Observable.just("I save the world for extinction!"); }) .subscribe(s -> System.out.println(s)); new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS); }
你可以在这里看到更多的例子。 https://github.com/politrons/reactive
retryWhen
一个复杂的,甚至可能是越野车的操作员。 官方文档和至less一个答案在这里使用range
运算符,如果没有重试将会失败。 请参阅我与ReactiveX成员David Karnok的讨论 。
我改进了kjones的答案,将flatMap
更改为concatMap
并添加了RetryDelayStrategy
类。 flatMap
在concatMap
中不保留发射顺序,这对于延迟退避很重要。 如名称所示, RetryDelayStrategy
让用户从各种生成重试延迟的模式中进行select,包括退避。 代码在我的GitHub上可用,完成以下testing用例:
- 一次尝试成功(不重试)
- 1次重试失败
- 尝试重试3次,但在2日成功,因此不会重试第三次
- 第三次重试成功
请参阅setRandomJokes
方法。
与kjones相同的答案,但更新到最新版本对于RxJava 2.x版本:( 'io.reactivex.rxjava2:rxjava:2.1.3')
public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> { private final int maxRetries; private final long retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { this.maxRetries = maxRetries; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception { return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() { @Override public Publisher<?> apply(Throwable throwable) throws Exception { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (ie re-subscribed). return Flowable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Flowable.error(throwable); } }); }
}
用法:
//将重试逻辑添加到现有的observable。 //延迟2秒,重试最多3次。
observable .retryWhen(new RetryWithDelay(3, 2000));
只需要这样做:
Observable.just("") .delay(2, TimeUnit.SECONDS) //delay .flatMap(new Func1<String, Observable<File>>() { @Override public Observable<File> call(String s) { L.from(TAG).d("postAvatar="); File file = PhotoPickUtil.getTempFile(); if (file.length() <= 0) { throw new NullPointerException(); } return Observable.just(file); } }) .retry(6) .subscribe(new Action1<File>() { @Override public void call(File file) { postAvatar(file); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { } });