RxJava  提供了多种请求数据处理,转换的方法,功能强大而且优雅,只有想不到,没有做不到。本篇主要记录使用 Observable  对请求数据进行转换处理的方法。
merge 很多情况下,在一个页面同时需要请求两个及以上的接口,然后每次请求也都需要更新UI,此时可以将多个请求 “合并”,而不必对每次请求都使用 subscribe  来添加一个请求成功/失败的订阅。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable<User> obLogin = service.login("Michael Cai" , "***" );
Observable<Message> obMsg = service.message();
Observable.merge(obLogin, obMsg)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new  Observer<Object>() {
    
        @Override 
        public  void  onNext (@NonNull Object o)  
            Log.d(TAG, "onNext: "  + o);
            if  (o instanceof  Message) {
                
            } else  if  (o instanceof  User) {
                
            }
        }
        @Override 
        public  void  onError (@NonNull Throwable e)  
        }
        @Override 
        public  void  onComplete ()  
            Log.d(TAG, "onComplete: " );
        }
    });
上面的代码创建了两个请求,一个是登录,一个是获取消息,使用 merge  方法合并的 Observable  请求被发起的顺序是按照参数传递的顺序,上面的 Login  请求在 Message  请求的前面被调用。
Observer 
onSubscribe  仅调用一次,请求被合并后,后面也只调用了一次 subscribe onNext  调用多次,每个请求在成功之后,都会被调用onError onComplete  仅调用一次,当所有请求都完成之后被调用 
flatMap 大多数情况下,服务器返回的数据通常是这样的
1
2
3
4
5
{
  "message" : "Success" ,
  "code" : 0 ,
  "data" : {}
}
如果直接按照之前的方式,则在请求结果返回回来之后,在 onNext 方法中,还得进行一次判断,如果 code  为0则表示成功,如果 code  不为0,则表示参数错误,得分别进行处理。而 flatMap  则可以帮且解决这个问题,可以将服务器返回的数据先进行转换。
创建服务器返回数据类型 服务器返回的数据 data  是多种多样的,所以使用泛型定义 data 。
1
2
3
4
5
public  class  Result <T > 
    public  int  code = 0 ;
    public  String message = "" ;
    public  T data;
}
api 接口定义 1
2
@GET (ServiceName + "login" )
Observable<Result<User>> login(@Query ("username" ) String name, @Query ("pwd" ) String pwd);
发起请求 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
service.login("Michael Cai" , "***" )
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap(new  Function<Result<User>, ObservableSource<User>>() {
        @Override 
        public  ObservableSource<User> apply (@NonNull Result<User> result)  throws  Exception 
            if  (result.code == 0 ) { 
                return  Observable.just(result.data);
            }
            
            return  Observable.error(new  ApiException(result.code, result.message));
        }
    })
    .subscribe(new  Observer<User>() {
        @Override 
        public  void  onNext (@NonNull User user)  
            Log.d(TAG, "onNext: "  + user);
        }
        @Override 
        public  void  onError (@NonNull Throwable e)  
            Log.e(TAG, "onError: "  + e.getMessage());
        }
        @Override 
        public  void  onComplete ()  
            Log.d(TAG, "onComplete: " );
        }
    });
当接口请求成功之后,如果服务器返回的 code 是 0,则表示成功,使用 Observable.just  生成 Observable  对象并返回,如果是其它数值,则使用 Observable.error  返回错误。
在 subscribe  订阅了 Observer  对象之后,以下两种服务器的返回值将分别执行 onNext 和 onError;
{“code”:0, “message”:”Success”, “data”:{“name”:”ccf”, “msg”: “what?”, “age”:28 }} 
{“code”:1, “message”:”Invalidated Pwd”} 
 
Refactor 可以将上面的 flatMap  中的 Funtion  提取出来。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public  class  FunctionResult <T  extends  Result > implements  Function <Result <T >, ObservableSource <T >> 
    @Override 
    public  ObservableSource<T> apply (@NonNull Result<T> tResult)  throws  Exception 
        if  (tResult.code == 0 ) {
            return  Observable.just(tResult.data);
        }
        return  Observable.error(new  ApiException(tResult.code, tResult.message));
    }
}
service.login("Michael Cai" , "***" )
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap(new  FunctionResult())
    .subscribe(...);
使用 flatMap 操作数组 继续上面的应用场景,某一个 api  接口服务器返回的 data  是一个用户列表,而我们的某一个需求是获取所有用户的等级求平均值,最简单的方法是获取到所有用户之后遍历获取每一个用户的等级然后得出所有用户的平均等级,然后这样的话就会在 onNext  里面写一个循环,增加了代码的嵌套层次,flatMap  则可以很好的解决这个问题,它可以把数组,列表里面的元素包装到一个 Observable  里面,我们只需要在 onNext  方法里面对每一个获取到的 User  进行处理即可,然后当 onComplete  调用的时候则表示所有的 User  都处理完毕。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
service.users()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap(new  Function<Result<List<User>>, Observable<User>>() {
        @Override 
        public  Observable<User> apply (@NonNull Result<List<User>> listResult)  throws  Exception 
            return  Observable.fromIterable(listResult.data);
        }
    })
    .subscribe(new  Observer<User>() {
        @Override 
        public  void  onNext (@NonNull User user)  
            
        }
        @Override 
        public  void  onComplete ()  
            
        }
    });
map 有时候,可能我们需要将一种接口定义中的 Observable\  转换成另一种数据 Entity2 ,这时就可以使用 map 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
service.login("Michael Cai" , "***" )
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .map(new  Function<User, String>() {
        @Override 
        public  String apply (@NonNull User user)  throws  Exception 
            return  user.name;
        }
    })
    .subscribe(new  Consumer<String>() {
        @Override 
        public  void  accept (@NonNull String s)  throws  Exception 
            Log.d(TAG, "userName: "  + s);
        }
    });
上面的 map  方法把原本的 接口定义的 Observable\  转换成了 Observable\  。
map  和 flatMap  都是将数据从一种数据类型转换成另外一种数据类,但不同的是:map  是将服务器返回的数据转换成另外一种数据类型,而 flatMap  则是将服务器返回的数据通过定义的规则转换生成一个 Observable  对象,供调用者使用 subscribe  进行订阅,也就是输出类型不同;
 
zip 合并多个请求,当几个请求都执行完成之后,统一再进行后续操作处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable<User> ou1 = service.user(10001 );
Observable<User> ou2 = service.user(10002 );
Observable
    .zip(ou1, ou2, new  BiFunction<User, User, Integer>() {
        @Override 
        public  Integer apply (@NonNull User user, @NonNull User user2)  throws  Exception 
            return  user.age - user2.age;
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new  Consumer<Integer>() {
        @Override 
        public  void  accept (@NonNull Integer integer)  throws  Exception 
            Log.d(TAG, "accept: "  + integer);
        }
    });
上面的代码将两个通过 id  获取用户信息的请求打包,当两个请求都完成之后,再计算两个用户的年龄差。
zip  方法的最后一个参数,则是所有请求完成之后,将执行的操作,对于只有两个请求,则使用 BiFunction  ,对于3个及3个以上的请求,则是 Function3 、Function4  …
1
2
3
4
public  interface  BiFunction <T1 , T2 , R > 
    @NonNull 
    R apply (@NonNull T1 t1, @NonNull T2 t2)  throws  Exception ;
}
1
2
3
4
public  interface  Function3  <T1 , T2 , T3 , R > 
    @NonNull 
    R apply (@NonNull T1 t1, @NonNull T2 t2, @NonNull T3 t3)  throws  Exception ;
}
combineLatest 合并请求结果,当需要把几个请求的结果合并成一个数据值并返回的时候,可以使用 combineLatest 。
1
2
3
4
5
6
7
8
9
10
Observable
    .combineLatest(ou1, ou2, new  BiFunction<User, User, Boolean>() {
        @Override 
        public  Boolean apply (@NonNull User user, @NonNull User user2)  throws  Exception 
            Log.d(TAG, "apply: "  + user);
            Log.d(TAG, "apply: "  + user2);
            return  true ;
        }
    })
    .subscribe(...);
retry 有些网络请求因网络异常或者请求超时时,需要再次请求,通常会连续请求3次,如果3次都失败则不再请求,普通的常规方法则是自已在代码中使用一个记数器,来记录请求失败的次数,但在 RxJava  中,可以使用 retry  定义多次请求的次数。
1
2
3
4
5
6
7
8
9
10
11
service.login("a" , "b" )
    .subscribeOn(Schedulers.io())
    .retry(3 , new  Predicate<Throwable>() {
        @Override 
        public  boolean  test (@NonNull Throwable throwable)  throws  Exception 
            
            Log.e(TAG, "test: retry" );
            return  true ;
        }
    })
    .subscribe(...);
chain of requests 有时候,我们需要用A接口的请求结果来请求B接口,可以使用下面的方式来实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
service.user(10001 ).subscribeOn(Schedulers.io())
    .flatMap(new  Function<User, ObservableSource<Result<List<User>>>>() {
        @Override 
        public  ObservableSource<Result<List<User>>> apply(@NonNull  User user) throws  Exception {
        	
            return  service.rss(user.tag);
        }
    })
    .subscribe(new  Consumer<Result<List<User>>>() {
        @Override 
        public  void  accept (@NonNull Result<List<User>> listResult)  throws  Exception 
            
        }
    });
上面的代码先根据 userid  获取用户信息,然后根据用户的 tag  获取相关的用户列表,这种情况就是有两个请求,但后一个请求需要使用前一个接口返回的数据。
concat concat  用于连续发起两个请求,请求顺序同上,都是在第一个请求成功之后,再发起第二个请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable<User> ou1 = ...;
Observable<Topic> ot2 = ...;
Observable.concat(ou1, ot2)
    .subscribe(new  Observer<Object>() {
        @Override 
        public  void  onNext (@NonNull Object o)  
        }
        @Override 
        public  void  onError (@NonNull Throwable e)  
        }
        @Override 
        public  void  onComplete ()  
        }
    });
上面定义了两个 Observable ,使用 concat  之后,第二个请求需要在第一个请求发起后并成功获取到服务器的响应之后再被发出。和上一节相比,在 onNext  方法中,需要对 Object  进行类型判断也转换,相对来说复杂了。
注意:如果第一个请求失败,则第二个请求不会被继续发起。
 
first concat  是将几个请求一个接一下的发出,而有的情况下,可能我们的服务器地址有多个,需要一个地址一个地址的去尝试,只要有一个地址获取到了,则后续请求不会再被发起,此时就可以用 first  实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable<User> ou1 = ...;
Observable<User> ou2 = ...;
Observable
    .concat(ou1, ou2)
    .first(ur)
    .subscribe(new  SingleObserver<User>() {
        @Override 
        public  void  onSubscribe (@NonNull Disposable d)  
        }
        @Override 
        public  void  onSuccess (@NonNull User user)  
            Log.d(TAG, "onSuccess: "  + user);
        }
        @Override 
        public  void  onError (@NonNull Throwable e)  
            Log.e(TAG, "onError: "  + e);
        }
    });
以上两个 Observable  分别指向两个地址,使用 first 方法传一个 User  对象(为什么要传一个 User ,用来干什么,暂不清楚),只要第一个地址请求成功,后面的请求不再发出。
再比如说,有某个请求在发出之前,需要先检查内存中是否已经有缓存的数据,然后再检查磁盘中是否有缓存的数据,最后再从网络中抓取,以上前两步,如果有获取到数据,则不再发起后面的请求,此时就可以使用 first  来快速实现。
 
1
2
3
4
Observable.concat(memory, disk, network)
.first()
.subscribeOn(Schedulers.newThread())
.subscribe(...);
onErrorReturnItem / onErrorReturn 当网络请求失败时,可能需要加载一些默认的数据,此时就可以使用 onErrorReturnItem  来实现。
1
2
3
4
5
6
User defaultUr = ...;
service.login("Michael Cai" , "pwd" )
    .subscribeOn(Schedulers.io())
    .onErrorReturnItem(defaultUr)
    .subscribe();
请求失败时 Observer  中会获取到一个默认的 User  对象,onNext  方法会被执行,而不会执行 onError 。
也可以使用 onErrorReturn  ,返回一个默认的数据即可,在 OnErrorReturn  里面可以获取到失败的信息;
1
2
3
4
5
6
7
.onErrorReturn(new  Function<Throwable, User>() {
    @Override 
    public  User apply (@NonNull Throwable throwable)  throws  Exception 
        return  ur;
    }
})
.subscribe(...);
timer & interval 
timer : 隔多长时间后执行 
interval : 每间隔多长时间执行一次 
 
1
2
3
4
5
6
7
8
Observable.timer(2 , TimeUnit.SECONDS).subscribe(new  Consumer<Long>() {
    @Override 
    public  void  accept (@NonNull Long aLong)  throws  Exception 
        Log.e(TAG, "accept: "  + aLong);
        
        
    }
});
schedulePeriodically 使用 schedulePeriodically  可以创建一个轮循请求,如同 interval  一样。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final  Disposable dis = Observable.create(new  ObservableOnSubscribe<User>() {
    @Override 
    public  void  subscribe (final  @NonNull ObservableEmitter<User> e)  throws  Exception 
        Schedulers.newThread().createWorker().schedulePeriodically(new  Runnable() {
            @Override 
            public  void  run ()  
                if  (!e.isDisposed()) {
                    e.onNext(ur);
                }
            }
        }, 1000 , 1000 , TimeUnit.MILLISECONDS);
    }
}).subscribe(new  Consumer<User>() {
    @Override 
    public  void  accept (@NonNull User user)  throws  Exception 
        Log.e(TAG, "accept: "  + user);
    }
});
compose 在 Android  开发中,网络请求和 UI 更新需要放到不同的线程,比较笨的做法是对每一个 Observable  都使用 subscribeOn  和 observeOn  进行线程切换,如果有100个接口,则要写 100 次,而使用 compose  则可以将过程简化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
service.login("Michael Cai" , "***" )
    .compose(new  ObservableTransformer<User, User>() {
        @Override 
        public  ObservableSource<User> apply (@NonNull Observable<User> upstream)  
            Log.d(TAG, "apply: "  + Thread.currentThread().getName());
            return  upstream
            	.subscribeOn(Schedulers.io())
            	.observeOn(AndroidSchedulers.mainThread());
        }
    })
    .subscribe(new  Consumer<User>() {
        @Override 
        public  void  accept (@NonNull User user)  throws  Exception 
            Log.d(TAG, "apply: "  + Thread.currentThread().getName());
            
        }
    });
可以将上面的 ObservableTransfomer  进行进一步封装:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class  UDObservableTransformer  implements  ObservableTransformer  
    public  static  UDObservableTransformer transformer = new  UDObservableTransformer();
    @Override 
    public  ObservableSource apply (@NonNull Observable upstream)  
        Log.d(TAG, "apply: "  + Thread.currentThread().getName());
        return  upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }
}
service.login("Michael Cai" , "***" )
    .compose(UDObservableTransformer.transformer)
    .subscribe(new  Consumer<User>() {
        @Override 
        public  void  accept (@NonNull User user)  throws  Exception 
            
        }
    });
flatMap  与 compose  的异同:
两者都能实现将一种数据类型转换成另一种数据类型; 
compose 是更高级的抽象方法,它将影响整个请求流程,比如请求流程中的线程切换都由 compose  方法所指定;compose  方法所指定的 Transformer  将被立即执行,不管有没有被 subscribe ; 
flatMap 仅仅对请求结果进行转换处理; 
 
官方解释:
The difference is that compose() is a higher level abstraction: it operates on the entire stream, not individually emitted items. In more specific terms:
compose() is the only way to get the original Observable from the stream. Therefore, operators that affect the whole stream (like subscribeOn() and observeOn()) need to use compose().
compose() executes immediately when you create the Observable stream, as if you had written the operators inline. flatMap() executes when its onNext() is called, each time it is called. In other words, flatMap() transforms each item, whereas compose() transforms the whole stream.
If you want to replace some operators with reusable code, use compose(). flatMap() has many uses but this is not one of them.
 
More 
filter :集合进行过滤each :遍历集合take :取出集合中的前几个skip :跳过前几个元素doOnNext :在onNext的时候调用doOnComplete :在请求完成的时候调用