深入分析RxJava的Disposable及其工作原理

梦里梦外; 2023-01-04 13:09 585阅读 0赞

在这里插入图片描述

良好的编码习惯告诉我们,任何基于订阅者模式代码,都要注意为注册与注销的配对出现,以避免泄露都问题

Disposable


RxJava通过Disposable(RxJava1中是Subscription)在适当的时机取消订阅、停止数据流的发射。这在Android等具有Lifecycle概念的场景中非常重要,避免造成一些不必要bug以及对象泄露。

  1. private CompositeDisposable compositeDisposable =
  2. new CompositeDisposable();
  3. @Override public void onCreate() {
  4. compositeDisposable.add(backendApi.loadUser()
  5. .subscribe(this::displayUser, this::handleError));
  6. }
  7. @Override public void onDestroy() {
  8. compositeDisposable.clear();
  9. }

上例是Android中的常见写法:使用CompositeDisposable收集所有的Disposable,而后在onDestroy中统一释放。

Disposable作为一个接口,clear最终调用的是各个Disposable的dispose方法:

  1. public interface Disposable {
  2. void dispose();
  3. boolean isDisposed();
  4. }

当然,也出现了一些可以帮助我们自动dispose的框架以较少模板代码。例如RxLifecycle、uber的AutoDispose等。本文旨在介绍Disposable的基本工作原理,对这些三方库有兴趣的同学请去github自行学习。

dispose方法实现


  1. Disposable disposable = Observable.create(
  2. (ObservableOnSubscribe<Integer>) observableEmitter -> {
  3. for (int i = 1; i <= 3; i++) {
  4. observableEmitter.onNext(i);
  5. }
  6. })
  7. .takeUntil(integer -> integer < 3)
  8. .subscribe();

对于以上代码,当调用disposable.dispose();时,代码是如何运行的呢?

Disposable是一个Observer

当调用Observable.subscribe() / subscribe(...)后返回的Disposable对象,是本质是一个LambdaObserver对象

  1. public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
  2. @NonNull Action onComplete) {
  3. LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
  4. subscribe(ls);
  5. return ls; //作为Disposable返回
  6. }

LambdaObserver及众多接口于一身

  1. public final class LambdaObserver<T> extends AtomicReference<Disposable>
  2. implements Observer<T>, Disposable
  • 首先,作为一个Observer,被subscribe()后,通过onNext等向下游发射数据;
  • 其次,作为Disposable对外提供dispose方法;
  • 最后,作为AtomicReference接口,持有一个Disposable的value

    @Override
    public void dispose() {

    1. DisposableHelper.dispose(this);

    }

    public static boolean dispose(AtomicReference field) {

    1. Disposable current = field.get();
    2. Disposable d = DISPOSED;
    3. if (current != d) {
    4. current = field.getAndSet(d);
    5. if (current != d) {
    6. if (current != null) {
    7. current.dispose();
    8. }
    9. return true;
    10. }
    11. }
    12. return false;

    }

如上,调用dispose后,做两件事:

  • 获取AtomicReference当前value,调用其dispose
  • 将当前value设为DISPOSED

onSubscribe中传递Disposable

LambdaObserver中AtomicReference的value是在Observer.onSubscribe中设置的:

  1. @Override
  2. public void onSubscribe(Disposable d) {
  3. if (DisposableHelper.setOnce(this, d)) { //设置value
  4. try {
  5. onSubscribe.accept(this);
  6. } catch (Throwable ex) {
  7. ...
  8. }
  9. }
  10. }

那么Observer.onSubscribe是什么时候被调用的呢?

RxJava的各种操作符都是Observable的实现。操作符链式调用的本质就是创建Observable和Observer,并通过subscribe关联。
subscribe内部最终都会调用到subscribeActual,这是每个操作符都必须实现的方法。

create在subscribeActual中,调用Observer.onSubscrie(),将当前的Disposable(前文说过其实就是当前Observer)作为parent传递给下游

  1. protected void subscribeActual(Observer<? super T> observer) {
  2. CreateEmitter<T> parent = new CreateEmitter<>(observer); // CreateEmitter是一个Diposable
  3. observer.onSubscribe(parent); // Observer.onSubscrie()
  4. try {
  5. source.subscribe(parent);
  6. } catch (Throwable ex) {
  7. ...
  8. }
  9. }

Observer中关联上下游

除create、subscribe这样的终端操作符以外,大部分的操作符的Observer结构如下:

  1. @Override
  2. /** The downstream subscriber. */
  3. protected final Observer<? super R> downstream;
  4. /** The upstream subscription. */
  5. protected Disposable upstream;
  6. public final void onSubscribe(Disposable d) {
  7. ...
  8. this.upstream = d;
  9. downstream.onSubscribe(this);
  10. ...
  11. }
  12. public void dispose() {
  13. upstream.dispose();
  14. }
  • Observer会持有上下游对象:upstream和downstream
  • onSubscribe向下递归调用
  • dipose向上递归调用

在链式订阅中,向下游订阅Observer的同时,也关联了上游的Disposable(Observer)

我们在最下端调用subscribe时,各个Observer会建立上下游关联,当我们在下游调用dispose时,最终会递归调用到顶端(create)的dispose

再看takeUntil的例子

根据以上分析,我们会回到最初takeUntil的例子。前面说过所有的操作符都是Observable的实现:

  • takeUntil 对应 ObservableTakeUntilPredicate;
  • create 对应 ObservableCreate

    ObservableCreate.subscribeActual() // Send 1, 2, 3 down

    ObservableCreate.subscribe(TakeUntilPredicateObserver)

    ObservableTakeUntilPredicate.subscribeActual()

    ObservableTakeUntilPredicate.subscribe()

通过subscribeActual -> onSubscribe的调用,Disposable也有了如下引用链:

  1. CreateEmitter
  2. TakeUntilPredicateObserver
  3. LambdaObserver

当我们针对终端的LambdaObserver调用dispose方法时,通过引用链递会最终调用到CreateEmitter的dispose。

CreateEmitter跟LambdaObserver一样,也会将AtomicReference的value设为DISPOSED

  1. public void dispose() {
  2. DisposableHelper.dispose(this);
  3. }

之后,在onNext中判断状态,当为DISPOSED时,不再发射数据流

  1. @Override
  2. public void onNext(T t) {
  3. if (!isDisposed()) { //是否为DISPOSED
  4. observer.onNext(t);
  5. }
  6. }

关于onComplete


通过如下test,可以发现当onComplete调用后会,会自动调用dispose。

  1. @Test
  2. public void testDisposed(){
  3. boolean isDisposed = false;
  4. TestObserver<Integer> to = Observable.<Integer>create(subscriber -> {
  5. subscriber.setDisposable(new Disposable() {
  6. @Override
  7. public boolean isDisposed() {
  8. return isDisposed;
  9. }
  10. @Override
  11. public void dispose() {
  12. isDisposed = true;
  13. }
  14. });
  15. subscriber.onComplete();
  16. }).test();
  17. to.assertComplete();
  18. assertTrue(isDisposed);
  19. }

ObservableEmitter的onComplete中果然也调用了dispose:

  1. public void onComplete() {
  2. if (!isDisposed()) {
  3. try {
  4. observer.onComplete();
  5. } finally {
  6. dispose();
  7. }
  8. }
  9. }

关于内存泄漏


调用dipose确实可以结束流的发射,但是不等于没有内存泄露。

查看ObservableCreate的源码可以知道,dispose只是只是简单的设置了DISPOSED状态,Observe中关联的上下游对象并没有释放。所以当订阅了一个静态的Observable时,无法避免内存泄漏。

但是当订阅一个Subject时,dispose确实可以有效释放对象,避免内存泄漏:

  1. public void dispose() {
  2. if (super.tryDispose()) {
  3. parent.remove(this); //对象删除
  4. }
  5. }

关于实时性


前面分析知道,对于终端操作符create、subscribe等,其Observer在dispose时会标记当前状态为DISPOSED。但对于其他操作符,其Observer的dispose只是递归向上调用dispose,对正在处理中的数据不会拦截。

调用dispose后,RxJava数据流不一定会立即停止,大部分操作符在调用dispose后,数据依然会发射给下游

关于dispose的实时性测试,下文可供参考
https://medium.com/stepstone-tech/the-curious-case-of-rxjava-disposables-e64ff8a06879

发表评论

表情:
评论列表 (有 0 条评论,585人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RxJava前奏之原理分析

    RxJava 之前奏:原理分析 > 前言:RxJava在国内越来越火,本文只是原理分析的总结。代码不多,但有些绕,一步一步的推进,对于理解RxJava来说还是十分有好处的