添加依赖
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.1.0'
compile 'io.reactivex.rxjava2:rxjava:2.2.3'
创建数据源(被观察者/可观察者)
just 方法
返回一个可观察对象,该对象发出给定(常量引用)项的信号,然后完成
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("just:" + integer);
}
});
just:1
just:2
just:4
just:5
just:6
just:7
just:8
just:9
just:10
fromArray 方法
fromArray 和 Just 几乎是一样的效果,只不过 Just 限制 10 个以内,而 fromArray 并没有限制,查看得知 单个参数 Just 是自行创建 ObservableJust,而多个参数 Just 最终还是回调了 fromArray,这里不再过多演示
create 方法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
//执行多次
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
//标记事件结束
emitter.onComplete();
//标记事件发送错误
//emitter.onError(new NullPointerException("不能为空"));
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("create:" + integer);
}
});
create:1
create:2
create:3
range 方法
使用范围数据,指定输出数据的范围(1-40的数值)
Observable.range(3, 5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("range:" + integer);
}
});
range:3
range:4
range:5
range:6
range:7
interval 方法
指定某一时刻进行数据发送
Observable.interval(10, 1, TimeUnit.SECONDS) // 先等待 10 秒,之后再每一秒发送一次,10 秒这个参数也可以不填,默认用间隔时间参数替代(这里示例 1 秒)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long l) throws Exception {
System.out.println("interval:" + l);
}
});
2019-03-20 15:30:33.331 interval:0
2019-03-20 15:30:34.331 interval:1
2019-03-20 15:30:35.331 interval:2
2019-03-20 15:30:36.331 interval:3
2019-03-20 15:30:37.331 interval:4
2019-03-20 15:30:38.331 interval:5
..................................
创建事件的接收者(观察者|订阅者),onNext方法中的数据类型必须被观察者指定的泛型
// 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
public final Disposable subscribe() {}
// 表示观察者只对被观察者发送的Next事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext) {}
// 表示观察者只对被观察者发送的Next事件 & Error事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// 表示观察者对被观察者所有的事件做出响应
public final void subscribe(Observer<? super T> observer) {}
Observer 类用法
onSubscribe:订阅的时候被调用,方法参数有 Disposable,可用于取消订阅
onNext(T item):Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于业务逻辑
onCompleted():正常终止,在没有遇到错误的情况下,Observable在最后一次调用onNext之后调用此方法
onError(Throwable e):当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出异常
//观察者
Observable.just(1, 2, 3)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("onError:" + e.toString());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
onSubscribe
onNext:1
onNext:2
onNext:3
onComplete
Consumer 类用法
Observable.just(1, 2, 3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("accept:" + integer);
}
});
accept:1
accept:2
accept:3
订阅
// 订阅事件,被观察者必须指定了事件的接收者(观察者),整个事件流程才可以被启动
Disposable disposable = observable.subscribe(observer);
// 是否被订阅
disposable.isDisposed();
// 取消订阅
disposable.dispose();