做了这么久的Android,还没有使用到RxJava。感觉是一件很low 的事情。RxJava 怎么学习,也困扰了自己一段时间。这次蹭着比较闲的一段时间,决定通过源码来学习下RxJava(是的,你没有看错,是通过看源码!!!)。
当然这里需要比较熟悉Java,懂得一些设计模式。给 Android 开发者的 RxJava 详解 对RxJava 的原理以及RxJava 是干什么的都讲的比较清晰。
流程
直接撸起袖子看源码了。
Observable#subscribe 流程
|
|
第1串代码是一个最简单的例子,Observable#create
创建一个Observable(被观察者),然后Observable#subscribe
(即被观察者订阅了观察者)。
Observable#create 创建被观察者
|
|
直接执行到了RxJavaPlugins#onAssembly
|
|
默认onObservableAssembly
为null,所以最终直接返回了一个ObservableCreate
对象。所以默认的Observable#create
相当于是创建了一个具体的Obsevable(ObservableCreate
)。
Observer
是直接new 的,所以跳过Observer
创建逻辑。
ObservableCreate#subscribe 流程
注意方法的参数是Observer
|
|
第3行,生成一个新的Observer,这里默认返回的就是传入的Observer。
第5行,确定两者的订阅关系。上面已经可以知道,当前的Observable 是ObservableCreate
。
|
|
至此,执行的顺序有一些明确了。同时这里出现了一个新对象CreateEmitter
。注意这里的Observer
对象实际是对例子中Observer
对象的包装,所以简单可以理解成直接调用的例子中创建的Observer
。
Observer#onSubscribe
,从这里可以看出onSubscribe
只会执行一次。ObservableOnSubscribe#subscribe
。ObservableOnSubscribe
对象实际就是例子中new 的。
Observable#subscribe 回调流程
|
|
相当于开发者自身的业务逻辑代码。这里的emitter 前面已经介绍,是一个CreateEmitter
。
CreateEmitter#onNext 流程
|
|
第7行,直接调用Observer#onNext
。
然后第二次的onNext 以及最后的onComplete 都会按照类似的逻辑进行处理。
至此,Observable 通知Observer 的流程就比较清晰的展示在我们面前了。当然,从这个流程确实看不出RxJava 到底有什么优势。
map 流程
|
|
第9行,map
逻辑似乎是将输入的Integer
转化成String
输出。
Observable#map 流程
|
|
上面最终默认就是返回了一个ObservableMap
。所以现在的情况看来,map
的操作实际就是生成一个新的Observable(ObservableMap
)。
查看构造函数。
|
|
保留了原始的Observable
(source,这里其实是ObservableCreate
),同时增加了一个成员变量function。
ObservableMap#subscribe 流程
直接省略,看最终subscribeActual
的逻辑。
|
|
这里的source 其实就是一个ObservableCreate
。最终将传入的Observer
和function 包装成ObservableMap$MapObserver
。
|
|
MapObserver
的中也会保存两个值,一个原始的Observer
(actual,可以理解是原始代码中new 的Observer)以及转换的函数mapper。
之后的逻辑就是第一个流程中的,最后会回调到ObservableMap$MapObserver
中。
MapObserver#onNext 流程
|
|
第11行,调用Function#apply
获取到转换以后的值(当前的例子是:Integer
->String
)。
第16行,Observer#onNext
调用最新的值。
至此,逻辑也非常清晰,map
相当于在onNext 之前对数据重新处理。
flatMap 流程
与map 流程非常类似,所以简单说明。
|
|
flatMap 中范型类型与map 中的不一样。返回的是一个ObservableSource。是通过Observable#fromIterable
生成的。这里似乎就可以得出一个结论,flatMap 的实现就是将Observable
的一个事件转换成一组事件,同时将这一组事件封装成一个Observable
。然后最终通知Observer
的其实是一组事件。
生成的是一个ObservableFlatMap
对象。
subscribeActual
重新封装Observer
成ObservableFlatMap$MergeObserver
。根据上面的分析,最终其实调用的是MergeObserver#onNext
。
MergeObserver#onNext 流程
|
|
第8行,生成一个新的ObservableSource
,具体的apply
逻辑,可以由最上面代码获得。
|
|
即最终调用的Observable
是Observable#fromIterable
,其实就是一个ObservableFromIterable
对象。
回到上面的第24行subscribeInner(p);
,最终的调用逻辑其实就是ObservableFromIterable#subscribeActual
。
线程切换逻辑
RxJava 的精髓就是在线程之间的切换。从一个线程的Observable 发送消息,到另外一个线程的Observer 接收消息。
主要的逻辑就是subscribeOn
和observeOn
,用来指定相应的线程的
|
|
上面的例子是假设的是Subscribe 在io 线程,而Observer 在newThread 线程。
这里直接是默认的情况,io 对应的是
Schedulers.IO
,而newThread 对应的是Schedulers.NEW_THREAD
。
Observable#subscribeOn(Scheduler) 流程
|
|
实际是就是返回一个新的Observable(ObservableSubscribeOn
),即将ObservableCreate
包装成ObservableSubscribeOn
。
Observable#observeOn(Scheduler) 流程
|
|
相当于是直接返回了一个新的Observable(ObservableObserveOn
)。但是这个ObservableObserveOn
实际上是封装了上面的ObservableSubscribeOn
。
所以最后得到的是一个封装了多层的Observable(ObservableObserveOn
-> ObservableSubscribeOn
->ObservableCreate
->new 的ObservableOnSubscribe
)。
Observable#subscribe 流程
这里简化了相关逻辑,其实上面Observable#subscribe
有具体的逻辑。
ObservableObserveOn#subscribeActual 流程
|
|
相当于是直接调用ObservableSubscribeOn#subscribeActual
。
ObservableSubscribeOn#subscribeActual 流程
|
|
此时s 代表的是ObservableObserveOn$ObserveOnObserver
。
第3行,onSubscribe
回调的线程是当前线程。
重点是第4行。SubscribeTask
中封装的是SubscribeTask
->SubscribeOnObserver
->ObservableObserveOn$ObserveOnObserver
->…->最终就是传入的Observer
。直接看Scheduler#scheduleDirect
逻辑。
Scheduler#scheduleDirect 流程
|
|
上面已经分析出io 对应的是IoScheduler
。
|
|
直接创建的是一个IoScheduler$EventLoopWorker
。所以最终的逻辑就是EventLoopWorker#schedule
EventLoopWorker#schedule 流程
|
|
只需看最终NewThreadWorker#scheduleActual
|
|
至此就知道了Observable#subscribe
回调逻辑线程是在Observable#subscribeOn(Scheduler)
中指定的。
继续看原始代码。
|
|
最终是会回调到ObservableObserveOn$ObserveOnObserver#onNext
,即被观察者通知观察者。
|
|
默认会执行到第6行,将通知事件加入到queue
中。
最后第8行,直接调度事件,同上面的分析。最终可以确定Observer#onNext
、Observer#onComplete
回调逻辑线程是Observable#observeOn(Scheduler)
指定的。
至此,对subscribeOn
和observeOn
对Observable
运行的线程和对Observer
运行的线程有了比较直观的了解。回调的逻辑还是按照上面最简单的逻辑,只是在运行的时候封装成Runnable,给不同的线程池调用。
总结
看了一些最简单的基础教程以后,直接写demo 看源码是我自己觉得不错的开源框架的学习方法。后续就要开始在工程中做一些使用的调研了。
RxJava 教程系列
给 Android 开发者的 RxJava 详解
Java开发必会的反编译知识(附支持对Lambda进行反编译的工具)
What’s different in 2.0
RxJava api