/**
* 基本创建
*/
private void create() {
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("这是使用create操作符发送的数据");
emitter.onComplete();
}).subscribe(s -> content.setText(s));
} #### just
/**
* 快速创建just
* 发送事件的特点:直接发送传入的事件
*/
private void just() {
Observable.just("这是使用just操作符发送的数据")
.subscribe(s -> {
content.setText(s);
});
}
/**
* 快速创建fromArray
* 发送事件的特点:直接发送 传入的数组数据
*/
private void fromArray() {
sb = new StringBuffer();
String[] strArr = {"create", "just", "fromArray"};
Observable.fromArray(strArr)
.subscribe(str -> {
sb.append(str).append(" ");
Log.i("tag", sb.toString().trim());
content.setText(sb.toString().trim());
});
} #### fromIterable
/**
* 快速创建fromIterable
* 发送事件的特点:直接发送 传入的集合List数据
*/
private void fromIterable() {
sb = new StringBuffer();
Observable
.fromIterable(getListItemStr())
.subscribe(s -> {
sb.append(s).append(" ");
content.setText(sb.toString().trim());
});
}
/**
* timer操作符可以延迟执行一段逻辑
*/
private void timer() {
Observable
.timer(5, TimeUnit.SECONDS)
.subscribe(aLong -> runOnUiThread(() -> content.setText("延迟5秒执行")));
} ####interval
/**
* interval 定时发送事件
*
* @param initialDelay 延迟时间
* @param period 间隔时间
* @param unit 单位
*/
private void interval(long initialDelay, long period, TimeUnit unit) {
Observable
.interval(initialDelay, period, unit)
.subscribe(new Observer<Long>() {
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(Long aLong) {
if (aLong >= 3 && null != disposable) {
disposable.dispose();
}
runOnUiThread(() -> content.setText("延迟2秒执行,每一秒更新:" + aLong));
}
@Override
public void onError(Throwable e) { }
@Override
public void onComplete() { }
});
} #### intervalRange
/**
* intervalRange定时发送事件
*
* @param start 发送事件起始序号
* @param count 发送事件数量
* @param initialDelay 延迟时间
* @param period 间隔时间
* @param unit 单位
*/
private void intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) {
Observable
.intervalRange(start, count, initialDelay, period, unit)
.subscribe(aLong -> {
runOnUiThread(() -> content.setText("从" + start + "开始发送" + count + "个数,延迟时间为" + initialDelay + ",间隔时间" + period + ",数据接收:" + aLong));
});
} #### range
/**
* range定时发送事件(响应的有rangeLong)
*
* @param start 发送事件起始序号
* @param count 发送事件数量
*/
private void range(int start, int count) {
Observable
.range(start, count)
.subscribe(integer -> {
runOnUiThread(() -> content.setText("从" + start + "开始发送" + count + "个数,接收数据:" + integer));
});
}