delay
/**
* 被观察者延迟一段时间再发送事件(注意耗时操作)
*/
private void delay() {
Observable.just("被观察者延迟一段时间再发送事件")
.delay(2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> content.setText(s));
}
do
private void doXXX() {
Observable
.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("a");
emitter.onNext("b");
emitter.onNext("c");
emitter.onError(new Throwable("error"));
})
.doAfterNext(s -> Log.i(TAG, "doAfterNext 执行Next事件之后调用: " + s))//执行Next事件之后调用
.doAfterTerminate(() -> Log.i(TAG, "doAfterTerminate 发送事件完毕后调用,无论正常发送完毕 / 异常终止 "))//发送事件完毕后调用,无论正常发送完毕 / 异常终止
.doFinally(()-> Log.i(TAG, "doFinally 最后执行"))
.doOnError(throwable -> Log.i(TAG, "doOnError: "+throwable.getMessage()))
.subscribe(s -> Log.i(TAG, "subscribe 接收数据: " + s));
}
do有很多延伸方法,此处不一一举例
onErrorReturn
/**
* 遇到错误时,发送1个特殊事件 & 正常终止
*/
private void onErrorReturn() {
Observable
.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("开始发送数据");
emitter.onError(new Throwable("error"));
})
.onErrorReturn(throwable -> {
Log.i(TAG, "onErrorReturn 发生错误了,错误信息: " + throwable.getMessage());
return "onErrorReturn";
})
.subscribe(s -> Log.i(TAG, "接收数据: " + s));
}
注
1、onErrorResumeNext与onErrorReturn类似,区别在于onErrorResumeNext遇到error时会发送一个新的Observable 2、若拦截Exception用onExceptionResumeNext(),则会将错误传递给观察者的onError方法
retry
/**
* 重试,即当出现错误时,让被观察者(Observable)重新发射数据
*/
private void retry() {
Observable
.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("数据源");
emitter.onError(new Throwable("error"));
})
.retry(3)//重试三次,其他重载方法不在次演示
.subscribe(s -> Log.i(TAG, "接收到的数据: " + s));
}
retryWhen遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable)& 发送事件
repeat
/**
* 无条件地、重复发送 被观察者事件
*/
private void repeat() {
Observable
.just("事件源")
.repeat(3)
.subscribe(s -> Log.i(TAG, "接收数据: " + s));
}
1、repeat无条件地、重复发送 2、repeatWhen有条件地、重复发送