RxJava—过滤操作符

Posted by dingqiang.l on August 15, 2018

过滤即观察者只接收过滤的数据,跳过即不发送

filter

/**
 * 指定数据条件过滤
 */
private void filter() {
    Observable
            .just(1, 2, 3)
            .filter(integer -> {
                Log.d(TAG, "过滤数据为:2");
                return integer == 2;
            })
            .subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}

ofType

/**
 * 指定数据类型条件过滤
 */
private void ofType() {
    Observable
            .just(1, "a",2,"b", 3)
            .ofType(String.class)
            .subscribe(s -> Log.i(TAG, "接收到过滤的数据为:" + s));
}

skip、SkipLast

/**
 * 跳过某个事件
 */
private void skipAndSkipLast() {
    Observable
            .just(1, 2, 3,4,5)
            .skip(2)//跳过前个2事件
            .skipLast(2)//跳过后2个
            .subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}

distinct

/**
 * 过滤重复事件
 */
private void distinct() {
    Observable
            .just(1, 2, 1,3,3)
            .distinct()
            .subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}

distinctUntilChanged

/**
 * 过滤连续重复事件
 */
private void distinctUntilChanged() {
    Observable
            .just(1, 2, 1,3,3)
            .distinctUntilChanged()
            .subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}

take

/**
 * 观察者最多能接收到的事件数量
 */
private void take() {
    Observable
            .just(1, 2, 1, 3, 3)
            .take(3)//观察者接收前面的
            .subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}

take:观察者接收前面发送的数据; takeLast:观察者接收后面发送的数据;

throttleFirst

/**
 * 发送指定时间内第1次事件
 */
private void throttleFirst() {
    Observable
            .create(emitter -> {
                emitter.onNext(1);
                Thread.sleep(500);
                emitter.onNext(2);
                Thread.sleep(400);
                emitter.onNext(3);
                Thread.sleep(300);
                emitter.onNext(4);
            })
            .throttleFirst (1, TimeUnit.SECONDS)
            .subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}

1、throttleFirst 只接收1秒内发送的第一个事件; 2、throttleLast 只接收1秒内发送的最后一个事件; 3、throttleLast 内部则是对sample的实现; 4、假设在指定时间内,连续点击按钮,则只接收第一或最后一次发送的事件

throttleWithTimeout

/**
 * 发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据
 */
private void throttleWithTimeout() {
    Observable
            .create(emitter -> {
                emitter.onNext(1);
                Thread.sleep(500);
                emitter.onNext(2);
                Thread.sleep(1400);
                emitter.onNext(3);
                Thread.sleep(1300);
                emitter.onNext(4);
                Thread.sleep(300);
                emitter.onNext(5);
                Thread.sleep(1300);
            })
            .throttleWithTimeout(1, TimeUnit.SECONDS)
            .subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}

firstElement

/**
 * 根据指定位置过滤,观察者接收第一个数据
 */
private void firstElement() {
    Observable
            .just(1,2,3,4)
            .firstElement()
            .subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}

1、firstElement:观察者接收第一个发送的数据; 2、lastElement:观察者接收最后一个发送的数据; 3、elementAt:观察者根据事件的索引值接收发送的数据(可越界); 3、elementAtOrError:观察者根据事件的索引值接收发送的数据(越界抛异常);