RxJava Observable 数据转换

RxJava 提供了多种请求数据处理,转换的方法,功能强大而且优雅,只有想不到,没有做不到。本篇主要记录使用 Observable 对请求数据进行转换处理的方法。

merge

很多情况下,在一个页面同时需要请求两个及以上的接口,然后每次请求也都需要更新UI,此时可以将多个请求 “合并”,而不必对每次请求都使用 subscribe 来添加一个请求成功/失败的订阅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable<User> obLogin = service.login("Michael Cai", "***");
Observable<Message> obMsg = service.message();
Observable.merge(obLogin, obMsg)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onNext(@NonNull Object o) {
Log.d(TAG, "onNext: " + o);
if (o instanceof Message) {
// TODO
} else if (o instanceof User) {
// TODO
}
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});

上面的代码创建了两个请求,一个是登录,一个是获取消息,使用 merge 方法合并的 Observable 请求被发起的顺序是按照参数传递的顺序,上面的 Login 请求在 Message 请求的前面被调用。

Observer

  • onSubscribe 仅调用一次,请求被合并后,后面也只调用了一次 subscribe
  • onNext 调用多次,每个请求在成功之后,都会被调用
  • onError
  • onComplete 仅调用一次,当所有请求都完成之后被调用

flatMap

大多数情况下,服务器返回的数据通常是这样的

1
2
3
4
5
{
"message": "Success",
"code": 0,
"data": {}
}

如果直接按照之前的方式,则在请求结果返回回来之后,在 onNext 方法中,还得进行一次判断,如果 code 为0则表示成功,如果 code 不为0,则表示参数错误,得分别进行处理。而 flatMap 则可以帮且解决这个问题,可以将服务器返回的数据先进行转换。

创建服务器返回数据类型

服务器返回的数据 data 是多种多样的,所以使用泛型定义 data

1
2
3
4
5
public class Result<T> {
public int code = 0;
public String message = "";
public T data;
}

api 接口定义

1
2
@GET(ServiceName + "login")
Observable<Result<User>> login(@Query("username") String name, @Query("pwd") String pwd);

发起请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
service.login("Michael Cai", "***")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Function<Result<User>, ObservableSource<User>>() {
@Override
public ObservableSource<User> apply(@NonNull Result<User> result) throws Exception {
if (result.code == 0) { // 如果 code = 0,则请求成功
return Observable.just(result.data);
}
// 请求失败
return Observable.error(new ApiException(result.code, result.message));
}
})
.subscribe(new Observer<User>() {
@Override
public void onNext(@NonNull User user) {
Log.d(TAG, "onNext: " + user);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});

当接口请求成功之后,如果服务器返回的 code 是 0,则表示成功,使用 Observable.just 生成 Observable 对象并返回,如果是其它数值,则使用 Observable.error 返回错误。

subscribe 订阅了 Observer 对象之后,以下两种服务器的返回值将分别执行 onNext 和 onError;

  • {“code”:0, “message”:”Success”, “data”:{“name”:”ccf”, “msg”: “what?”, “age”:28 }}
  • {“code”:1, “message”:”Invalidated Pwd”}

Refactor

可以将上面的 flatMap 中的 Funtion 提取出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class FunctionResult<T extends Result> implements Function<Result<T>, ObservableSource<T>> {
@Override
public ObservableSource<T> apply(@NonNull Result<T> tResult) throws Exception {
if (tResult.code == 0) {
return Observable.just(tResult.data);
}
return Observable.error(new ApiException(tResult.code, tResult.message));
}
}
// ...
service.login("Michael Cai", "***")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new FunctionResult())
.subscribe(...);

使用 flatMap 操作数组

继续上面的应用场景,某一个 api 接口服务器返回的 data 是一个用户列表,而我们的某一个需求是获取所有用户的等级求平均值,最简单的方法是获取到所有用户之后遍历获取每一个用户的等级然后得出所有用户的平均等级,然后这样的话就会在 onNext 里面写一个循环,增加了代码的嵌套层次,flatMap 则可以很好的解决这个问题,它可以把数组,列表里面的元素包装到一个 Observable 里面,我们只需要在 onNext 方法里面对每一个获取到的 User 进行处理即可,然后当 onComplete 调用的时候则表示所有的 User 都处理完毕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
service.users()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Function<Result<List<User>>, Observable<User>>() {
@Override
public Observable<User> apply(@NonNull Result<List<User>> listResult) throws Exception {
return Observable.fromIterable(listResult.data);
}
})
.subscribe(new Observer<User>() {
@Override
public void onNext(@NonNull User user) {
// TODO
}
@Override
public void onComplete() {
// TODO
}
});

map

有时候,可能我们需要将一种接口定义中的 Observable\ 转换成另一种数据 Entity2,这时就可以使用 map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
service.login("Michael Cai", "***")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<User, String>() {
@Override
public String apply(@NonNull User user) throws Exception {
return user.name;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.d(TAG, "userName: " + s);
}
});

上面的 map 方法把原本的 接口定义的 Observable\ 转换成了 Observable\

mapflatMap 都是将数据从一种数据类型转换成另外一种数据类,但不同的是:map 是将服务器返回的数据转换成另外一种数据类型,而 flatMap 则是将服务器返回的数据通过定义的规则转换生成一个 Observable 对象,供调用者使用 subscribe 进行订阅,也就是输出类型不同;

zip

合并多个请求,当几个请求都执行完成之后,统一再进行后续操作处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable<User> ou1 = service.user(10001);
Observable<User> ou2 = service.user(10002);
Observable
.zip(ou1, ou2, new BiFunction<User, User, Integer>() {
@Override
public Integer apply(@NonNull User user, @NonNull User user2) throws Exception {
return user.age - user2.age;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});

上面的代码将两个通过 id 获取用户信息的请求打包,当两个请求都完成之后,再计算两个用户的年龄差。

zip 方法的最后一个参数,则是所有请求完成之后,将执行的操作,对于只有两个请求,则使用 BiFunction ,对于3个及3个以上的请求,则是 Function3Function4
两者的定义其实相同:

  • BiFunction
1
2
3
4
public interface BiFunction<T1, T2, R> {
@NonNull
R apply(@NonNull T1 t1, @NonNull T2 t2) throws Exception;
}
  • Function3
1
2
3
4
public interface Function3 <T1, T2, T3, R> {
@NonNull
R apply(@NonNull T1 t1, @NonNull T2 t2, @NonNull T3 t3) throws Exception;
}

combineLatest

合并请求结果,当需要把几个请求的结果合并成一个数据值并返回的时候,可以使用 combineLatest

1
2
3
4
5
6
7
8
9
10
Observable
.combineLatest(ou1, ou2, new BiFunction<User, User, Boolean>() {
@Override
public Boolean apply(@NonNull User user, @NonNull User user2) throws Exception {
Log.d(TAG, "apply: " + user);
Log.d(TAG, "apply: " + user2);
return true;
}
})
.subscribe(...);

retry

有些网络请求因网络异常或者请求超时时,需要再次请求,通常会连续请求3次,如果3次都失败则不再请求,普通的常规方法则是自已在代码中使用一个记数器,来记录请求失败的次数,但在 RxJava 中,可以使用 retry 定义多次请求的次数。

1
2
3
4
5
6
7
8
9
10
11
service.login("a", "b")
.subscribeOn(Schedulers.io())
.retry(3, new Predicate<Throwable>() {
@Override
public boolean test(@NonNull Throwable throwable) throws Exception {
// 返回 true 表示需要 retry
Log.e(TAG, "test: retry");
return true;
}
})
.subscribe(...);

chain of requests

有时候,我们需要用A接口的请求结果来请求B接口,可以使用下面的方式来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
service.user(10001).subscribeOn(Schedulers.io())
.flatMap(new Function<User, ObservableSource<Result<List<User>>>>() {
@Override
public ObservableSource<Result<List<User>>> apply(@NonNull User user) throws Exception {
// 根据 user.tag 获取订阅的用户
return service.rss(user.tag);
}
})
.subscribe(new Consumer<Result<List<User>>>() {
@Override
public void accept(@NonNull Result<List<User>> listResult) throws Exception {
// TODO
}
});

上面的代码先根据 userid 获取用户信息,然后根据用户的 tag 获取相关的用户列表,这种情况就是有两个请求,但后一个请求需要使用前一个接口返回的数据。

concat

concat 用于连续发起两个请求,请求顺序同上,都是在第一个请求成功之后,再发起第二个请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable<User> ou1 = ...;
Observable<Topic> ot2 = ...;
Observable.concat(ou1, ot2)
.subscribe(new Observer<Object>() {
@Override
public void onNext(@NonNull Object o) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});

上面定义了两个 Observable,使用 concat 之后,第二个请求需要在第一个请求发起后并成功获取到服务器的响应之后再被发出。和上一节相比,在 onNext 方法中,需要对 Object 进行类型判断也转换,相对来说复杂了。

注意:如果第一个请求失败,则第二个请求不会被继续发起。

first

concat 是将几个请求一个接一下的发出,而有的情况下,可能我们的服务器地址有多个,需要一个地址一个地址的去尝试,只要有一个地址获取到了,则后续请求不会再被发起,此时就可以用 first 实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable<User> ou1 = ...;
Observable<User> ou2 = ...;
Observable
.concat(ou1, ou2)
.first(ur)
.subscribe(new SingleObserver<User>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull User user) {
Log.d(TAG, "onSuccess: " + user);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError: " + e);
}
});

以上两个 Observable 分别指向两个地址,使用 first 方法传一个 User 对象(为什么要传一个 User,用来干什么,暂不清楚),只要第一个地址请求成功,后面的请求不再发出。

再比如说,有某个请求在发出之前,需要先检查内存中是否已经有缓存的数据,然后再检查磁盘中是否有缓存的数据,最后再从网络中抓取,以上前两步,如果有获取到数据,则不再发起后面的请求,此时就可以使用 first 来快速实现。

1
2
3
4
Observable.concat(memory, disk, network)
.first()
.subscribeOn(Schedulers.newThread())
.subscribe(...);

onErrorReturnItem / onErrorReturn

当网络请求失败时,可能需要加载一些默认的数据,此时就可以使用 onErrorReturnItem 来实现。

1
2
3
4
5
6
User defaultUr = ...;
service.login("Michael Cai", "pwd")
.subscribeOn(Schedulers.io())
.onErrorReturnItem(defaultUr)
.subscribe();

请求失败时 Observer 中会获取到一个默认的 User 对象,onNext 方法会被执行,而不会执行 onError

也可以使用 onErrorReturn ,返回一个默认的数据即可,在 OnErrorReturn 里面可以获取到失败的信息;

1
2
3
4
5
6
7
.onErrorReturn(new Function<Throwable, User>() {
@Override
public User apply(@NonNull Throwable throwable) throws Exception {
return ur;
}
})
.subscribe(...);

timer & interval

  • timer : 隔多长时间后执行
  • interval : 每间隔多长时间执行一次
1
2
3
4
5
6
7
8
Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: " + aLong);
// 如果是 interval 的话,aLong 是执行的次数;
// 如果是 timer 函数,aLong 为0;
}
});

schedulePeriodically

使用 schedulePeriodically 可以创建一个轮循请求,如同 interval 一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final Disposable dis = Observable.create(new ObservableOnSubscribe<User>() {
@Override
public void subscribe(final @NonNull ObservableEmitter<User> e) throws Exception {
Schedulers.newThread().createWorker().schedulePeriodically(new Runnable() {
@Override
public void run() {
if (!e.isDisposed()) {
e.onNext(ur);
}
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<User>() {
@Override
public void accept(@NonNull User user) throws Exception {
Log.e(TAG, "accept: " + user);
}
});

compose

Android 开发中,网络请求和 UI 更新需要放到不同的线程,比较笨的做法是对每一个 Observable 都使用 subscribeOnobserveOn 进行线程切换,如果有100个接口,则要写 100 次,而使用 compose 则可以将过程简化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
service.login("Michael Cai", "***")
.compose(new ObservableTransformer<User, User>() {
@Override
public ObservableSource<User> apply(@NonNull Observable<User> upstream) {
Log.d(TAG, "apply: " + Thread.currentThread().getName());
return upstream
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
})
.subscribe(new Consumer<User>() {
@Override
public void accept(@NonNull User user) throws Exception {
Log.d(TAG, "apply: " + Thread.currentThread().getName());
// TODO
}
});

可以将上面的 ObservableTransfomer 进行进一步封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class UDObservableTransformer implements ObservableTransformer {
public static UDObservableTransformer transformer = new UDObservableTransformer();
@Override
public ObservableSource apply(@NonNull Observable upstream) {
Log.d(TAG, "apply: " + Thread.currentThread().getName());
return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
}
//...
service.login("Michael Cai", "***")
.compose(UDObservableTransformer.transformer)
.subscribe(new Consumer<User>() {
@Override
public void accept(@NonNull User user) throws Exception {
// TODO
}
});

flatMapcompose 的异同:

  • 两者都能实现将一种数据类型转换成另一种数据类型;
  • compose 是更高级的抽象方法,它将影响整个请求流程,比如请求流程中的线程切换都由 compose 方法所指定;compose 方法所指定的 Transformer 将被立即执行,不管有没有被 subscribe
  • flatMap 仅仅对请求结果进行转换处理;

官方解释:

The difference is that compose() is a higher level abstraction: it operates on the entire stream, not individually emitted items. In more specific terms:

compose() is the only way to get the original Observable from the stream. Therefore, operators that affect the whole stream (like subscribeOn() and observeOn()) need to use compose().
In contrast, if you put subscribeOn()/observeOn() in flatMap(), it would only affect the Observable you create in flatMap() but not the rest of the stream.

compose() executes immediately when you create the Observable stream, as if you had written the operators inline. flatMap() executes when its onNext() is called, each time it is called. In other words, flatMap() transforms each item, whereas compose() transforms the whole stream.
flatMap() is necessarily less efficient because it has to create a new Observable every time onNext() is called. compose() operates on the stream as it is.

If you want to replace some operators with reusable code, use compose(). flatMap() has many uses but this is not one of them.

More

  • filter:集合进行过滤
  • each:遍历集合
  • take:取出集合中的前几个
  • skip:跳过前几个元素
  • doOnNext:在onNext的时候调用
  • doOnComplete:在请求完成的时候调用