map 转换
map转换,将泛型指定的对象转换成其他类型的对象,可进行多次转换
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("100");
emitter.onComplete();
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.valueOf(s);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("这里就已经将 String 转换成 Integer 了" + integer);
}
});
这里就已经将 String 转换成 Integer了100
filter 过滤
filter过滤,将指定的对象过滤掉,可进行多次过滤
Observable.just(1, 2, 3, 4, 5, 7, 8)
.filter(new Predicate<Integer>() {
public boolean test(Integer integer) throws Exception {
//大于5的过滤掉
return integer < 5;
}
})
.subscribe(new Consumer<Integer>() {
public void accept(Integer integer) throws Exception {
System.out.println("这里就能获取到过滤后的整数了" + integer);
}
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
这里就能获取到过滤后的整数了1
这里就能获取到过滤后的整数了2
这里就能获取到过滤后的整数了3
这里就能获取到过滤后的整数了4
flatMap 平铺
flatMap平铺,一对多的转化(类似嵌套),这里需要注意的是, flatMap并不保证事件的顺序, 如果需要保证顺序则需要使用concatMap
Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
public void accept(String s) throws Exception {
System.out.println("TAG:" + s);
}
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 2
D/TAG: I am value 2
D/TAG: I am value 2
distinct 去重复
Observable.just(1, 1, 2, 2, 3, 3)
.distinct()
.subscribe(new Consumer<Integer>() {
public void accept(Integer integer) throws Exception {
System.out.println("这里就能获取去重复之后的整数了:" + integer);
}
});
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
这里就能获取去重复之后的整数了:1
这里就能获取去重复之后的整数了:2
这里就能获取去重复之后的整数了:3
take 取出固定个数
Observable.just(1, 2, 3, 4, 5, 6)
.take(2)
.subscribe(new Consumer<Integer>() {
public void accept(Integer integer) throws Exception {
System.out.println("这里就能取出固定个数之后的整数了:" + integer);
}
});
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
这里就能取出固定个数之后的整数了:1
这里就能取出固定个数之后的整数了:2
这里就能取出固定个数之后的整数了:3
toList 打包成集合
Observable.just(1, 2, 3, 4, 5, 6)
.toList()
.subscribe(new Consumer<List<Integer>>() {
public void accept(List<Integer> integers) throws Exception {
System.out.println("这里可以直接打包成一个集合,集合的长度为" + integers.size());
}
});
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
这里可以直接打包成一个集合,集合的长度为6
delay 延迟
使得被观察者延迟一段时间再发送事件
// 1. 指定延迟时间
// 参数1 = 时间;参数2 = 时间单位
delay(long delay,TimeUnit unit)
// 2. 指定延迟时间 & 调度器
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器
delay(long delay,TimeUnit unit,mScheduler scheduler)
// 3. 指定延迟时间 & 错误延迟
// 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常
// 参数1 = 时间;参数2 = 时间单位;参数3 = 错误延迟参数
delay(long delay,TimeUnit unit,boolean delayError)
// 4. 指定延迟时间 & 调度器 & 错误延迟
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器;参数4 = 错误延迟参数
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟
Observable.just(1, 2, 3)
.delay(3, TimeUnit.SECONDS) // 延迟3s再发送,由于使用类似,所以此处不作全部展示
.subscribe(new Consumer<Integer>() {
public void accept(Integer integer) throws Exception {
System.out.println("延迟测试" + integer);
}
});
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
zip 压合
将两个被观察者对象组合成一个,如果两个被观察者的大小不同,合并的次数将以最小那个为主
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
});
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Log.d(TAG, "emit B");
emitter.onNext("B");
Log.d(TAG, "emit C");
emitter.onNext("C");
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
});
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
D/TAG: onSubscribe
D/TAG: emit 1
D/TAG: emit 2
D/TAG: emit 3
D/TAG: emit 4
D/TAG: emit complete1
D/TAG: emit A
D/TAG: onNext: 1A
D/TAG: emit B
D/TAG: onNext: 2B
D/TAG: emit C
D/TAG: onNext: 3C
D/TAG: emit complete2
D/TAG: onComplete
执行的顺序是第一个被观察者全部执行完毕之后才执行第二个被观察者对象,导致这种现象是因为他们都在同一个线程中,现在进行一下线程调度
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Log.d(TAG, "emit B");
emitter.onNext("B");
Log.d(TAG, "emit C");
emitter.onNext("C");
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
D/TAG: onSubscribe
D/TAG: emit A
D/TAG: emit 1
D/TAG: onNext: 1A
D/TAG: emit B
D/TAG: emit 2
D/TAG: onNext: 2B
D/TAG: emit C
D/TAG: emit 3
D/TAG: onNext: 3C
D/TAG: emit complete2
D/TAG: onComplete