RxJava—组合操作符

Posted by dingqiang.l on August 15, 2018

concat

/**
 * 组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
 */
private void concat() {
    Observable.concat(
            Observable.just("a", "b"),
            Observable.just("c", "d"),
            Observable.just("e"),
            Observable.just("g", "h"))
            .subscribe(str -> Log.i(TAG, "输出:" + str));
} 注:concat与concatArray特性一致,区别在于concat发送数量最多只能发四个,而concatArray可多于4个。 #### merge
/**
 * 组合多个被观察者一起发送数据,合并后 按时间线并行执行
 */
private void merge() {
    Observable.merge(
            Observable.intervalRange(0,5,1,1, TimeUnit.SECONDS),
            Observable.intervalRange(2,5,1,1, TimeUnit.SECONDS))
            .subscribe(aLong -> Log.i(TAG, "输出:" + aLong));
} 注:merge与mergeArray特性一致,区别在于merge发送数量最多只能发四个,而mergeArray可多于4个。

concatDelayError 、mergeDelayError、concatArrayDelayError、mergeArrayDelayError

/**
 * 使用concat或merge,如果其中的某个事件抛异常,使用concatArrayDelayError可继续发送未发送完成事件,最后才触发Error
 */
private void concatArrayDelayError() {
    Observable.concatArrayDelayError(
            Observable.just("1"),
            Observable.error(new NullPointerException()),
            Observable.just("2"),
            Observable.just("3"))
            .subscribe(str -> Log.i(TAG, "输出:" + str));

}

zip

/**
 * 合并多个事件,生成一个新事件,然后发送
 */
private void zip() {
    Observable.zip(
            Observable.just(1, 2, 3),
            Observable.just("a", "b", "c"),
            (integer, s) -> integer+s)
            .subscribe(s -> Log.i(TAG, "zip: " + s));
} 注:如果observable1发送数据数量与observable2不等,在没有调用onComplete()方法前,则会合并多余的数据;如果使用just等方法,多余数据则不会合并发送,原因是调用了onComplete(),详见源码。

combineLatest

/**
 * 将observable1最新(即最后)发送的数据与observable2发送的数据合并
 */
private void combineLatest() {
    Observable.combineLatest(
            Observable.just(1, 2, 3),
            Observable.just("a", "b", "c"),
            (integer, s) -> {
                Log.e(TAG, "合并的数据是: " + integer + " " + s);
                return integer + s;
            })
            .subscribe(s -> Log.i(TAG, "zip: " + s));
}

reduce

/**
 * 将要发送的事件聚合成一个新的事件,然后在发送
 */
private void reduce() {
    Observable
            .just("a", "b", "c")
            .reduce((s, s2) -> {
                Log.i(TAG, "本次聚合的数据是: " + s + "   " + s2);
                return s + s2;
            }).subscribe(s -> Log.i(TAG, "最终结果是: " + s));
}

collect

/**
 * 将发送的数据发到一个数据结构中(如ArrayList)
 */
private void collect() {
    Observable
            .just("a", "b", "c")
            .collect(ArrayList::new, (BiConsumer<ArrayList<String>, String>) ArrayList::add)
            .subscribe(strings -> Log.i(TAG, "输出结果是: " + strings));
}

startWith

/**
 * 在事件发送之前追加一些数据,或者追加新的事件,追加顺序:后调用则先追加
 */
private void startWith() {
    Observable
            .just(6)//初始要发送的事件,发送了数据6
            .startWith(5)//追加数据5
            .startWithArray(3, 4)//追加多个数据3,4
            .startWith(Observable.just(1, 2))//追加事件,改追加的事件发送了数据1,2
            .subscribe(s -> Log.i(TAG, "发送的事件: " + s));
}

count

/**
 * 统计发送事件数量
 */
private void count() {
    Observable
            .just("a", "b", "c")
            .count()
            .subscribe(aLong -> Log.i(TAG, "发送数据事件数量: " + aLong));
}