过滤即观察者只接收过滤的数据,跳过即不发送
filter
/**
* 指定数据条件过滤
*/
private void filter() {
Observable
.just(1, 2, 3)
.filter(integer -> {
Log.d(TAG, "过滤数据为:2");
return integer == 2;
})
.subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}
ofType
/**
* 指定数据类型条件过滤
*/
private void ofType() {
Observable
.just(1, "a",2,"b", 3)
.ofType(String.class)
.subscribe(s -> Log.i(TAG, "接收到过滤的数据为:" + s));
}
skip、SkipLast
/**
* 跳过某个事件
*/
private void skipAndSkipLast() {
Observable
.just(1, 2, 3,4,5)
.skip(2)//跳过前个2事件
.skipLast(2)//跳过后2个
.subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}
distinct
/**
* 过滤重复事件
*/
private void distinct() {
Observable
.just(1, 2, 1,3,3)
.distinct()
.subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}
distinctUntilChanged
/**
* 过滤连续重复事件
*/
private void distinctUntilChanged() {
Observable
.just(1, 2, 1,3,3)
.distinctUntilChanged()
.subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}
take
/**
* 观察者最多能接收到的事件数量
*/
private void take() {
Observable
.just(1, 2, 1, 3, 3)
.take(3)//观察者接收前面的
.subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}
take:观察者接收前面发送的数据; takeLast:观察者接收后面发送的数据;
throttleFirst
/**
* 发送指定时间内第1次事件
*/
private void throttleFirst() {
Observable
.create(emitter -> {
emitter.onNext(1);
Thread.sleep(500);
emitter.onNext(2);
Thread.sleep(400);
emitter.onNext(3);
Thread.sleep(300);
emitter.onNext(4);
})
.throttleFirst (1, TimeUnit.SECONDS)
.subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}
1、throttleFirst 只接收1秒内发送的第一个事件; 2、throttleLast 只接收1秒内发送的最后一个事件; 3、throttleLast 内部则是对sample的实现; 4、假设在指定时间内,连续点击按钮,则只接收第一或最后一次发送的事件
throttleWithTimeout
/**
* 发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据
*/
private void throttleWithTimeout() {
Observable
.create(emitter -> {
emitter.onNext(1);
Thread.sleep(500);
emitter.onNext(2);
Thread.sleep(1400);
emitter.onNext(3);
Thread.sleep(1300);
emitter.onNext(4);
Thread.sleep(300);
emitter.onNext(5);
Thread.sleep(1300);
})
.throttleWithTimeout(1, TimeUnit.SECONDS)
.subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}
firstElement
/**
* 根据指定位置过滤,观察者接收第一个数据
*/
private void firstElement() {
Observable
.just(1,2,3,4)
.firstElement()
.subscribe(integer -> Log.i(TAG, "接收到过滤的数据为:" + integer));
}
1、firstElement:观察者接收第一个发送的数据; 2、lastElement:观察者接收最后一个发送的数据; 3、elementAt:观察者根据事件的索引值接收发送的数据(可越界); 3、elementAtOrError:观察者根据事件的索引值接收发送的数据(越界抛异常);