使用步骤
build.gradle引入依赖
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
创建数据源(被观察者/可观察者),泛型必须是Object的子类
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
//事件源,可以指定
@Override
public void call(Subscriber<? super String> subscriber) {
//执行多次
subscriber.onNext("第一次执行");
subscriber.onNext("第二次执行");
//标记事件结束
subscriber.onCompleted();
//标记事件发送错误
//subscriber.onError();
}
});
//from(T[]),返回的对象一般都是数值类型
Integer[] items = {1, 2, 3, 4, 5, 6, 7, 8, 9};
Observable observable = Observable.from(items);
//指定某一时刻进行数据发送
Observable observable = Observable.interval(1, 1, TimeUnit.SECONDS);//每隔一秒发送数据
//just(T...),处理任意类型的数组集合或数值,参数上限10个,参数类型必须一致
Integer[] items1 = {1, 2, 3, 4, 5, 6};
Integer[] items2 = {3, 5, 6, 8, 3, 8};
Observable observable = Observable.just(items1, items2);
//使用范围数据,指定输出数据的范围(1-40的数值)
Observable observable = Observable.range(1, 40);
####创建事件的接收者(观察者|订阅者),onNext方法中的数据类型必须被观察者指定的泛型
onNext(T item):Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于业务逻辑
onCompleted():正常终止,在没有遇到错误的情况下,Observable在最后一次调用onNext之后调用此方法
onError(Throwable e):当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出异常
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
System.out.println("onNext" + s);
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError" + e.getMessage());
}
};
//订阅者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onStart(String s) {
System.out.println("事件开始了");
}
@Override
public void onNext(String s) {
System.out.println("onNext" + s);
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError" + e.getMessage());
}
};
//对订阅者进行简化,更简单
Action1<String> action1 = new Action1<String>() {
@Override
public void call(String s) {
System.out.println("call" + s);
}
};
订阅事件,被观察者必须指定了事件的接收者(观察者|订阅者),整个事件流程才可以被启动
observable.subscribe(observer);
observable.subscribe(subscriber);
//选择性参数方法,可对onNext,onCompleted,onError选择性使用,一般只需要onNext方法就足够
observable.subscribe(action1);
//自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
//自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
//自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
简单解释一下这段代码中出现的 Action1 和 Action0。
Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;
由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。
这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭包』。
Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;
与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。
事实上,虽然 Action0 和 Action1 在 API 中使用最广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法
订阅者是观察者的子类,区别在于订阅者可以取消订阅(在程序销毁后)
if(subscriber != null && !subscriber.isUnsubscribed()) {
subscriber.unsubscribe();
}