/**
* 组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
*/
private void concat() {
Observable.concat(
Observable.just("a", "b"),
Observable.just("c", "d"),
Observable.just("e"),
Observable.just("g", "h"))
.subscribe(str -> Log.i(TAG, "输出:" + str));
} 注:concat与concatArray特性一致,区别在于concat发送数量最多只能发四个,而concatArray可多于4个。 #### merge
/**
* 组合多个被观察者一起发送数据,合并后 按时间线并行执行
*/
private void merge() {
Observable.merge(
Observable.intervalRange(0,5,1,1, TimeUnit.SECONDS),
Observable.intervalRange(2,5,1,1, TimeUnit.SECONDS))
.subscribe(aLong -> Log.i(TAG, "输出:" + aLong));
} 注:merge与mergeArray特性一致,区别在于merge发送数量最多只能发四个,而mergeArray可多于4个。
/**
* 使用concat或merge,如果其中的某个事件抛异常,使用concatArrayDelayError可继续发送未发送完成事件,最后才触发Error
*/
private void concatArrayDelayError() {
Observable.concatArrayDelayError(
Observable.just("1"),
Observable.error(new NullPointerException()),
Observable.just("2"),
Observable.just("3"))
.subscribe(str -> Log.i(TAG, "输出:" + str));
}
/**
* 合并多个事件,生成一个新事件,然后发送
*/
private void zip() {
Observable.zip(
Observable.just(1, 2, 3),
Observable.just("a", "b", "c"),
(integer, s) -> integer+s)
.subscribe(s -> Log.i(TAG, "zip: " + s));
} 注:如果observable1发送数据数量与observable2不等,在没有调用onComplete()方法前,则会合并多余的数据;如果使用just等方法,多余数据则不会合并发送,原因是调用了onComplete(),详见源码。
/**
* 将observable1最新(即最后)发送的数据与observable2发送的数据合并
*/
private void combineLatest() {
Observable.combineLatest(
Observable.just(1, 2, 3),
Observable.just("a", "b", "c"),
(integer, s) -> {
Log.e(TAG, "合并的数据是: " + integer + " " + s);
return integer + s;
})
.subscribe(s -> Log.i(TAG, "zip: " + s));
}
/**
* 将要发送的事件聚合成一个新的事件,然后在发送
*/
private void reduce() {
Observable
.just("a", "b", "c")
.reduce((s, s2) -> {
Log.i(TAG, "本次聚合的数据是: " + s + " " + s2);
return s + s2;
}).subscribe(s -> Log.i(TAG, "最终结果是: " + s));
}
/**
* 将发送的数据发到一个数据结构中(如ArrayList)
*/
private void collect() {
Observable
.just("a", "b", "c")
.collect(ArrayList::new, (BiConsumer<ArrayList<String>, String>) ArrayList::add)
.subscribe(strings -> Log.i(TAG, "输出结果是: " + strings));
}
/**
* 在事件发送之前追加一些数据,或者追加新的事件,追加顺序:后调用则先追加
*/
private void startWith() {
Observable
.just(6)//初始要发送的事件,发送了数据6
.startWith(5)//追加数据5
.startWithArray(3, 4)//追加多个数据3,4
.startWith(Observable.just(1, 2))//追加事件,改追加的事件发送了数据1,2
.subscribe(s -> Log.i(TAG, "发送的事件: " + s));
}
/**
* 统计发送事件数量
*/
private void count() {
Observable
.just("a", "b", "c")
.count()
.subscribe(aLong -> Log.i(TAG, "发送数据事件数量: " + aLong));
}