做了这么久的Android,还没有使用到RxJava。感觉是一件很low 的事情。RxJava 怎么学习,也困扰了自己一段时间。这次蹭着比较闲的一段时间,决定通过源码来学习下RxJava(是的,你没有看错,是通过看源码!!!)。
当然这里需要比较熟悉Java,懂得一些设计模式。给 Android 开发者的 RxJava 详解 对RxJava 的原理以及RxJava 是干什么的都讲的比较清晰。
流程
直接撸起袖子看源码了。
Observable#subscribe 流程
|
|
第1串代码是一个最简单的例子,Observable#create创建一个Observable(被观察者),然后Observable#subscribe(即被观察者订阅了观察者)。
Observable#create 创建被观察者
|
|
直接执行到了RxJavaPlugins#onAssembly
|
|
默认onObservableAssembly为null,所以最终直接返回了一个ObservableCreate对象。所以默认的Observable#create相当于是创建了一个具体的Obsevable(ObservableCreate)。
Observer是直接new 的,所以跳过Observer创建逻辑。
ObservableCreate#subscribe 流程
注意方法的参数是Observer
|
|
第3行,生成一个新的Observer,这里默认返回的就是传入的Observer。
第5行,确定两者的订阅关系。上面已经可以知道,当前的Observable 是ObservableCreate。
|
|
至此,执行的顺序有一些明确了。同时这里出现了一个新对象CreateEmitter。注意这里的Observer对象实际是对例子中Observer对象的包装,所以简单可以理解成直接调用的例子中创建的Observer。
Observer#onSubscribe,从这里可以看出onSubscribe只会执行一次。ObservableOnSubscribe#subscribe。ObservableOnSubscribe对象实际就是例子中new 的。
Observable#subscribe 回调流程
|
|
相当于开发者自身的业务逻辑代码。这里的emitter 前面已经介绍,是一个CreateEmitter。
CreateEmitter#onNext 流程
|
|
第7行,直接调用Observer#onNext。
然后第二次的onNext 以及最后的onComplete 都会按照类似的逻辑进行处理。
至此,Observable 通知Observer 的流程就比较清晰的展示在我们面前了。当然,从这个流程确实看不出RxJava 到底有什么优势。
map 流程
|
|
第9行,map逻辑似乎是将输入的Integer转化成String输出。
Observable#map 流程
|
|
上面最终默认就是返回了一个ObservableMap。所以现在的情况看来,map的操作实际就是生成一个新的Observable(ObservableMap)。
查看构造函数。
|
|
保留了原始的Observable(source,这里其实是ObservableCreate),同时增加了一个成员变量function。
ObservableMap#subscribe 流程
直接省略,看最终subscribeActual的逻辑。
|
|
这里的source 其实就是一个ObservableCreate。最终将传入的Observer和function 包装成ObservableMap$MapObserver。
|
|
MapObserver的中也会保存两个值,一个原始的Observer(actual,可以理解是原始代码中new 的Observer)以及转换的函数mapper。
之后的逻辑就是第一个流程中的,最后会回调到ObservableMap$MapObserver中。
MapObserver#onNext 流程
|
|
第11行,调用Function#apply获取到转换以后的值(当前的例子是:Integer->String)。
第16行,Observer#onNext调用最新的值。
至此,逻辑也非常清晰,map相当于在onNext 之前对数据重新处理。
flatMap 流程
与map 流程非常类似,所以简单说明。
|
|
flatMap 中范型类型与map 中的不一样。返回的是一个ObservableSource。是通过Observable#fromIterable生成的。这里似乎就可以得出一个结论,flatMap 的实现就是将Observable的一个事件转换成一组事件,同时将这一组事件封装成一个Observable。然后最终通知Observer的其实是一组事件。
生成的是一个ObservableFlatMap对象。
subscribeActual重新封装Observer成ObservableFlatMap$MergeObserver。根据上面的分析,最终其实调用的是MergeObserver#onNext。
MergeObserver#onNext 流程
|
|
第8行,生成一个新的ObservableSource,具体的apply逻辑,可以由最上面代码获得。
|
|
即最终调用的Observable是Observable#fromIterable,其实就是一个ObservableFromIterable对象。
回到上面的第24行subscribeInner(p);,最终的调用逻辑其实就是ObservableFromIterable#subscribeActual。
线程切换逻辑
RxJava 的精髓就是在线程之间的切换。从一个线程的Observable 发送消息,到另外一个线程的Observer 接收消息。
主要的逻辑就是subscribeOn和observeOn,用来指定相应的线程的
|
|
上面的例子是假设的是Subscribe 在io 线程,而Observer 在newThread 线程。
这里直接是默认的情况,io 对应的是
Schedulers.IO,而newThread 对应的是Schedulers.NEW_THREAD。
Observable#subscribeOn(Scheduler) 流程
|
|
实际是就是返回一个新的Observable(ObservableSubscribeOn),即将ObservableCreate包装成ObservableSubscribeOn。
Observable#observeOn(Scheduler) 流程
|
|
相当于是直接返回了一个新的Observable(ObservableObserveOn)。但是这个ObservableObserveOn实际上是封装了上面的ObservableSubscribeOn。
所以最后得到的是一个封装了多层的Observable(ObservableObserveOn-> ObservableSubscribeOn->ObservableCreate->new 的ObservableOnSubscribe)。
Observable#subscribe 流程
这里简化了相关逻辑,其实上面Observable#subscribe有具体的逻辑。
ObservableObserveOn#subscribeActual 流程
|
|
相当于是直接调用ObservableSubscribeOn#subscribeActual。
ObservableSubscribeOn#subscribeActual 流程
|
|
此时s 代表的是ObservableObserveOn$ObserveOnObserver。
第3行,onSubscribe回调的线程是当前线程。
重点是第4行。SubscribeTask中封装的是SubscribeTask->SubscribeOnObserver->ObservableObserveOn$ObserveOnObserver->…->最终就是传入的Observer。直接看Scheduler#scheduleDirect逻辑。
Scheduler#scheduleDirect 流程
|
|
上面已经分析出io 对应的是IoScheduler。
|
|
直接创建的是一个IoScheduler$EventLoopWorker。所以最终的逻辑就是EventLoopWorker#schedule
EventLoopWorker#schedule 流程
|
|
只需看最终NewThreadWorker#scheduleActual
|
|
至此就知道了Observable#subscribe回调逻辑线程是在Observable#subscribeOn(Scheduler)中指定的。
继续看原始代码。
|
|
最终是会回调到ObservableObserveOn$ObserveOnObserver#onNext,即被观察者通知观察者。
|
|
默认会执行到第6行,将通知事件加入到queue中。
最后第8行,直接调度事件,同上面的分析。最终可以确定Observer#onNext、Observer#onComplete回调逻辑线程是Observable#observeOn(Scheduler)指定的。
至此,对subscribeOn和observeOn对Observable运行的线程和对Observer运行的线程有了比较直观的了解。回调的逻辑还是按照上面最简单的逻辑,只是在运行的时候封装成Runnable,给不同的线程池调用。
总结
看了一些最简单的基础教程以后,直接写demo 看源码是我自己觉得不错的开源框架的学习方法。后续就要开始在工程中做一些使用的调研了。
RxJava 教程系列
给 Android 开发者的 RxJava 详解
Java开发必会的反编译知识(附支持对Lambda进行反编译的工具)
What’s different in 2.0
RxJava api