博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
谜之RxJava (三)update 2 —— subscribeOn 和 observeOn 的区别
阅读量:7119 次
发布时间:2019-06-28

本文共 9428 字,大约阅读时间需要 31 分钟。

开头

之前我们分析过subscribeOn这个函数,

现在我们来看下subscribeOnobserveOn这两个函数到底有什么异同。

用过rxjava的旁友都知道,subscribeOnobserveOn都是用来切换线程用的,可是我什么时候用subscribeOn,什么时候用observeOn呢,我们很少知道这两个区别是啥。

友情提示,如果不想看分析过程的,可以直接跳到下面的总结部分。

subscribeOn

先看下OperatorSubscribeOn的核心代码:

public final class OperatorSubscribeOn
implements OnSubscribe
{ final Scheduler scheduler; final Observable
source; public OperatorSubscribeOn(Observable
source, Scheduler scheduler) { this.scheduler = scheduler; this.source = source; } @Override public void call(final Subscriber
subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); inner.schedule(new Action0() { @Override public void call() { Subscriber
s = new Subscriber
(subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void onError(Throwable e) { try { subscriber.onError(e); } finally { inner.unsubscribe(); } } @Override public void onCompleted() { try { subscriber.onCompleted(); } finally { inner.unsubscribe(); } } .... }; source.unsafeSubscribe(s); } }); }}

这里注意两点:

  1. 因为OperatorSubscribeOn是个OnSubscribe对象,所以在call参数中传入的subscriber就是我们在外面使用Observable.subscribe(a)传入的对象a

  2. 这里source对象指向的是调用subscribeOn之前的那个Observable序列。

明确了这两点,我们就很好的知道了subscribeOn是如何工作,产生神奇的效果了。

其实最最主要的就是一行函数

source.unsafeSubscribe(s);

并且要注意它所在的位置,是在worker的call里面,说白了,就是把source.subscribe这一行调用放在指定的线程里,那么总结起来的结论就是:

subscribeOn的调用,改变了调用前序列所运行的线程。

observeOn

同样看下OperatorObserveOn这个类的主要代码:

public final class OperatorObserveOn
implements Operator
{ private final Scheduler scheduler; private final boolean delayError; /** * @param scheduler the scheduler to use * @param delayError delay errors until all normal events are emitted in the other thread? */ public OperatorObserveOn(Scheduler scheduler, boolean delayError) { this.scheduler = scheduler; this.delayError = delayError; } @Override public Subscriber
call(Subscriber
child) { .... ObserveOnSubscriber
parent = new ObserveOnSubscriber
(scheduler, child, delayError); parent.init(); return parent; } /** Observe through individual queue per observer. */ private static final class ObserveOnSubscriber
extends Subscriber
implements Action0 { final Subscriber
child; final Scheduler.Worker recursiveScheduler; final NotificationLite
on; final boolean delayError; final Queue
queue; // the status of the current stream volatile boolean finished; final AtomicLong requested = new AtomicLong(); final AtomicLong counter = new AtomicLong(); /** * The single exception if not null, should be written before setting finished (release) and read after * reading finished (acquire). */ Throwable error; // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should // not prevent anything downstream from consuming, which will happen if the Subscription is chained public ObserveOnSubscriber(Scheduler scheduler, Subscriber
child, boolean delayError) { this.child = child; this.recursiveScheduler = scheduler.createWorker(); this.delayError = delayError; this.on = NotificationLite.instance(); if (UnsafeAccess.isUnsafeAvailable()) { queue = new SpscArrayQueue(RxRingBuffer.SIZE); } else { queue = new SpscAtomicArrayQueue(RxRingBuffer.SIZE); } } void init() { // don't want this code in the constructor because `this` can escape through the // setProducer call Subscriber
localChild = child; localChild.setProducer(new Producer() { @Override public void request(long n) { if (n > 0L) { BackpressureUtils.getAndAddRequest(requested, n); schedule(); } } }); localChild.add(recursiveScheduler); localChild.add(this); } @Override public void onStart() { // signal that this is an async operator capable of receiving this many request(RxRingBuffer.SIZE); } @Override public void onNext(final T t) { if (isUnsubscribed() || finished) { return; } if (!queue.offer(on.next(t))) { onError(new MissingBackpressureException()); return; } schedule(); } @Override public void onCompleted() { if (isUnsubscribed() || finished) { return; } finished = true; schedule(); } @Override public void onError(final Throwable e) { if (isUnsubscribed() || finished) { RxJavaPlugins.getInstance().getErrorHandler().handleError(e); return; } error = e; finished = true; schedule(); } protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } } // only execute this from schedule() @Override public void call() { long emitted = 0L; long missed = 1L; // these are accessed in a tight loop around atomics so // loading them into local variables avoids the mandatory re-reading // of the constant fields final Queue q = this.queue; final Subscriber
localChild = this.child; final NotificationLite
localOn = this.on; // requested and counter are not included to avoid JIT issues with register spilling // and their access is is amortized because they are part of the outer loop which runs // less frequently (usually after each RxRingBuffer.SIZE elements) for (;;) { long requestAmount = requested.get(); long currentEmission = 0L; while (requestAmount != currentEmission) { boolean done = finished; Object v = q.poll(); boolean empty = v == null; if (checkTerminated(done, empty, localChild, q)) { return; } if (empty) { break; } localChild.onNext(localOn.getValue(v)); currentEmission++; emitted++; } if (requestAmount == currentEmission) { if (checkTerminated(finished, q.isEmpty(), localChild, q)) { return; } } if (currentEmission != 0L) { BackpressureUtils.produced(requested, currentEmission); } missed = counter.addAndGet(-missed); if (missed == 0L) { break; } } if (emitted != 0L) { request(emitted); } } boolean checkTerminated(boolean done, boolean isEmpty, Subscriber
a, Queue
q) { if (a.isUnsubscribed()) { q.clear(); return true; } if (done) { if (delayError) { if (isEmpty) { Throwable e = error; try { if (e != null) { a.onError(e); } else { a.onCompleted(); } } finally { recursiveScheduler.unsubscribe(); } } } else { Throwable e = error; if (e != null) { q.clear(); try { a.onError(e); } finally { recursiveScheduler.unsubscribe(); } return true; } else if (isEmpty) { try { a.onCompleted(); } finally { recursiveScheduler.unsubscribe(); } return true; } } } return false; } }}

这里的代码有点长,我们先注意到它是一个Operator,它没有对上层Observable做任何的控制或者包装。

既然是Operator,那么它的职责就是把一个Subscriber转换成另外一个Subscriber, 我们来关注下转换后的Subscriber对转换前的Subscriber做了些什么事。

首先它是一个ObserveOnSubscriber类, 既然是Subscriber那么肯定有onNext, onCompleteonError 看最主要的onNext

@Overridepublic void onNext(final T t) {    if (isUnsubscribed() || finished) {        return;    }    if (!queue.offer(on.next(t))) {        onError(new MissingBackpressureException());        return;    }    schedule();}

好了,这里做了两件事,首先把结果缓存到一个队列里,然后调用schedule启动传入的worker

我们这里需要注意下:

在调用observeOn前的序列,把结果传入到onNext就是它的工作,它并不关心后续的流程,所以工作就到这里就结束了,剩下的交给ObserveOnSubscriber继续。

protected void schedule() {    if (counter.getAndIncrement() == 0) {        recursiveScheduler.schedule(this);    }}

recursiveScheduler 就是之前我们传入的Scheduler,我们一般会在observeOn传入AndroidScheluders.mainThread()对吧、

接下去,我们看下在scheduler中调用的call方法,这里只列出主要带代码

@Overridepublic void call() {    ...    final Subscriber
localChild = this.child; for (;;) { ... boolean done = finished; Object v = q.poll(); boolean empty = v == null; if (checkTerminated(done, empty, localChild, q)) { return; } if (empty) { break; } localChild.onNext(localOn.getValue(v)); ... } if (emitted != 0L) { request(emitted); }}

OK,在Scheduler启动后, 我们在Observable.subscribe(a)传入的a就是这里的child我们看到,在call中终于调用了它的onNext方法,把真正的结果传了出去,但是在这里,我们是工作在observeOn的线程上的。

那么总结起来的结论就是:

  1. observeOn 对调用之前的序列默不关心,也不会要求之前的序列运行在指定的线程上

  2. observeOn 对之前的序列产生的结果先缓存起来,然后再在指定的线程上,推送给最终的subscriber

复杂情况

我们经常多次使用subscribeOn切换线程,那么以后是否可以组合observeOnsubscribeOn达到自由切换的目的呢?

组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道observeOn调用之后,再调用subscribeOn是无效的,原因是什么?

因为subscribeOn改变的是subscribe这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。

经过上面的阐述,我们知道,observeOn的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算subscribeOn调用了,也只是改变observeOn这个消费者所在的线程,和OperatorObserveOn中存储的原始消费者一点关系都没有,它还是由observeOn控制。

总结

如果我们有一段这样的序列

Observable.map                    // 操作1.flatMap                // 操作2.subscribeOn(io).map                    //操作3.flatMap                //操作4.observeOn(main).map                    //操作5.flatMap                //操作6.subscribeOn(io)        //!!特别注意.subscribe(handleData)

假设这里我们是在主线程上调用这段代码,

那么

  1. 操作1操作2是在io线程上,因为之后subscribeOn切换了线程

  2. 操作3操作4也是在io线程上,因为在subscribeOn切换了线程之后,并没有发生改变。

  3. 操作5操作6是在main线程上,因为在他们之前的observeOn切换了线程。

  4. 特别注意那一段,对于操作5操作6是无效的

再简单点总结就是

  1. subscribeOn的调用切换之前的线程。

  2. observeOn的调用切换之后的线程。

  3. observeOn之后,不可再调用subscribeOn 切换线程

=========

续 特别感谢给的额外的总结

  1. 下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件

  2. 只有第一subscribeOn() 起作用(所以多个 subscribeOn() 毛意义)

  3. 这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn()

  4. observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作

  5. 不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,不会自动切换到其他线程

转载地址:http://wusel.baihongyu.com/

你可能感兴趣的文章
如何在XenDesktop中映射USB设备
查看>>
Java并发编程 基础知识学习总结
查看>>
我又发现一个直接就能安装中文小红帽的方法
查看>>
ACM弱校ACMer A HDU1045Fire Net有感
查看>>
cxgrid实现分组统计和添加Footer
查看>>
刘敏华:2013年网络营销行业展望
查看>>
理解MySQL——架构与概念
查看>>
vsftpd虚拟用户
查看>>
ionic 幻灯指令 ion-slide-box
查看>>
发一个经典的Flask-SQLAlchemy使用场景
查看>>
iOS逆向之自动化重签名
查看>>
java 读取pdf、word、Excel文件
查看>>
递归处理vue菜单数据
查看>>
html5 图片热点area,map的用法
查看>>
Java集合框架知多少——干货!!!
查看>>
P2030 - 【BJOI2006】狼抓兔子
查看>>
【随想】关于图灵机
查看>>
echarts 通过ajax实现动态数据加载
查看>>
结构化方法与面向对象方法之比较
查看>>
Pig调试环境
查看>>