RxJava—创建操作符

Posted by dingqiang.l on August 15, 2018

create

/**
 * 基本创建
 */
private void create() {
    Observable.create((ObservableOnSubscribe<String>) emitter -> {
        emitter.onNext("这是使用create操作符发送的数据");
        emitter.onComplete();
    }).subscribe(s -> content.setText(s));
}    #### just


/**
 * 快速创建just
 * 发送事件的特点:直接发送传入的事件
 */
private void just() {
    Observable.just("这是使用just操作符发送的数据")
            .subscribe(s -> {
                content.setText(s);
            });
}

fromArray

/**
 * 快速创建fromArray
 * 发送事件的特点:直接发送 传入的数组数据
 */
private void fromArray() {
    sb = new StringBuffer();
    String[] strArr = {"create", "just", "fromArray"};
    Observable.fromArray(strArr)
            .subscribe(str -> {
                sb.append(str).append(" ");
                Log.i("tag", sb.toString().trim());
                content.setText(sb.toString().trim());
            });
} #### fromIterable

/**
 * 快速创建fromIterable
 * 发送事件的特点:直接发送 传入的集合List数据
 */
private void fromIterable() {
    sb = new StringBuffer();
    Observable
            .fromIterable(getListItemStr())
            .subscribe(s -> {
                sb.append(s).append(" ");
                content.setText(sb.toString().trim());
            });
}

timer

/**
 * timer操作符可以延迟执行一段逻辑
 */
private void timer() {
    Observable
            .timer(5, TimeUnit.SECONDS)
            .subscribe(aLong -> runOnUiThread(() -> content.setText("延迟5秒执行")));
} ####interval

/**
 * interval 定时发送事件
 *
 * @param initialDelay 延迟时间
 * @param period       间隔时间
 * @param unit         单位
 */
private void interval(long initialDelay, long period, TimeUnit unit) {
    Observable
            .interval(initialDelay, period, unit)
            .subscribe(new Observer<Long>() {
                Disposable disposable;
                @Override
                public void onSubscribe(Disposable d) {
                    disposable = d;
                }

                @Override
                public void onNext(Long aLong) {
                    if (aLong >= 3 && null != disposable) {
                        disposable.dispose();
                    }
                    runOnUiThread(() -> content.setText("延迟2秒执行,每一秒更新:" + aLong));
                }

                @Override
                public void onError(Throwable e) { }

                @Override
                public void onComplete() {  }
            });
} #### intervalRange

/**
 * intervalRange定时发送事件
 *
 * @param start        发送事件起始序号
 * @param count        发送事件数量
 * @param initialDelay 延迟时间
 * @param period       间隔时间
 * @param unit         单位
 */
private void intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) {
    Observable
            .intervalRange(start, count, initialDelay, period, unit)
            .subscribe(aLong -> {
                runOnUiThread(() -> content.setText("从" + start + "开始发送" + count + "个数,延迟时间为" + initialDelay + ",间隔时间" + period + ",数据接收:" + aLong));
            });
} #### range

/**
 * range定时发送事件(响应的有rangeLong)
 *
 * @param start 发送事件起始序号
 * @param count 发送事件数量
 */
private void range(int start, int count) {
    Observable
            .range(start, count)
            .subscribe(integer -> {
                runOnUiThread(() -> content.setText("从" + start + "开始发送" + count + "个数,接收数据:" + integer));
            });
}