RxJava框架介绍

RxJava 到底是什么?使用 RxJava 到底有什么好处呢?其实 RxJavaReactiveX 使用 Java 语言实现的版本。
什么是 ReactiveX 呢?简单的说,ReactiveX 就是 观察者模式+迭代器模式+函数式编程三种模式的结合体。

  • 扩展观察者模式,通过使用可观察的对象序列流来表述一系列事件,订阅者进行占点观察并对序列流做出反应;
  • 借鉴迭代器模式,对多个对象序列进行迭代输出,订阅者可以依次处理不同的对象序列;
  • 使用函数式编程思想(functional programming),简化问题解决的步骤;

基本概念

RxJava 中的两大核心就是 Observable (被观察者,也就是事件源)和 Subscriber (观察者),由 Observable 发出一系列的事件,Subscriber 进行订阅接收并进行处理,看起来就好像是设计模式中的观察者模式,但是跟观察者模式不同的地方就在于,如果没有观察者 SubscriberObservable 是不会发出任何事件的。

由于 Observable 发出的事件并不仅限于一个,有可能是多个的,如何确保每一个事件都能发送到 Subscriber 上进行处理呢?ReactiveX 借鉴了设计模式的迭代器模式,对事件进行迭代轮询next()、hasNext(),在迭代过程中如果出现异常则直接抛出(throws Exceptions),下表是 Observable 和迭代器 Iterable 的对比:

事件Event 迭代器 Iterable Observable
接收数据 T next() onNext(T)
遇到错误 throw Exception onError(Exception)
迭代完成 !hasNext() onComplete()

Iterable 不同的地方在于,迭代器模式在事件处理上采用的是“同步/拉式”的方式,而 Observable 采用的是“异步/推式”的方式。

  • Observable:事件源,英文释义“可观察的”,在观察者模式中称为“被观察者”或“可观察对象”;
  • Observer:观察者,可接收 ObservableSubject 发出的数据;
  • Consumer:观察者,只能处理 Observable 发出的接收数据处理方法,不能处理 onErroronComplete
  • SubjectSubject 是一个比较特殊的对象,既可充当事件源,也可充当观察者;
  • ObservableEmitter:事件发起者,调用对象的 onNextonErroronComplete 分别表示发出一个迭代事件,异常事件,完成的事件;对象的 isDisposed() 方法用于判断 subscribe() 方法返回的对象,用户是否已经取消订阅,如果用户取消订阅了,为了节省系统开销,此时即可结束流程,而不必要再调用 onNext 或者 onComplete 等方法;
  • Disposablesubscribe()方法返回的对象,该对象有 isDisposed() 和dispose() 方法分别用于 判断事件源是否已经迭代完成 和 取消订阅;

HelloWorld

1
2
3
4
5
6
7
Observable.just(100)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "accept: " + integer);
}
});
  • Observable.just 快速创建一个 Observable 对象;
  • Observalbe 对象实例的 subscribe 方法将 Observable 对象和 Observer 对象关联起来;

使用 ObservableOnSubscribe 生成 Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(final @NonNull ObservableEmitter<Integer> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(1);
}
Observable.timer(2000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
if (!e.isDisposed())
e.onNext(2);
e.onComplete();
}
});
}
})

上面的代码在第一次 onNext 调用之后,通过 timer 在 2s 后再次调用 onNext 发出事件,此时先判断用户是否已经取消掉了订阅,如果没有取消订阅,则才会再发出事件;