深入分析RxJava的Disposable及其工作原理
良好的编码习惯告诉我们,任何基于订阅者模式代码,都要注意为注册与注销的配对出现,以避免泄露都问题
Disposable
RxJava通过Disposable
(RxJava1中是Subscription
)在适当的时机取消订阅、停止数据流的发射。这在Android等具有Lifecycle概念的场景中非常重要,避免造成一些不必要bug以及对象泄露。
private CompositeDisposable compositeDisposable =
new CompositeDisposable();
@Override public void onCreate() {
compositeDisposable.add(backendApi.loadUser()
.subscribe(this::displayUser, this::handleError));
}
@Override public void onDestroy() {
compositeDisposable.clear();
}
上例是Android中的常见写法:使用CompositeDisposable
收集所有的Disposable,而后在onDestroy中统一释放。
Disposable作为一个接口,clear最终调用的是各个Disposable的dispose方法:
public interface Disposable {
void dispose();
boolean isDisposed();
}
当然,也出现了一些可以帮助我们自动dispose的框架以较少模板代码。例如RxLifecycle
、uber的AutoDispose
等。本文旨在介绍Disposable的基本工作原理,对这些三方库有兴趣的同学请去github自行学习。
dispose方法实现
Disposable disposable = Observable.create(
(ObservableOnSubscribe<Integer>) observableEmitter -> {
for (int i = 1; i <= 3; i++) {
observableEmitter.onNext(i);
}
})
.takeUntil(integer -> integer < 3)
.subscribe();
对于以上代码,当调用disposable.dispose();
时,代码是如何运行的呢?
Disposable是一个Observer
当调用Observable.subscribe() / subscribe(...)
后返回的Disposable对象,是本质是一个LambdaObserver
对象
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
@NonNull Action onComplete) {
LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
subscribe(ls);
return ls; //作为Disposable返回
}
LambdaObserver及众多接口于一身
public final class LambdaObserver<T> extends AtomicReference<Disposable>
implements Observer<T>, Disposable
- 首先,作为一个Observer,被
subscribe()
后,通过onNext
等向下游发射数据; - 其次,作为Disposable对外提供
dispose
方法; 最后,作为
AtomicReference
接口,持有一个Disposable的value@Override
public void dispose() {DisposableHelper.dispose(this);
}
public static boolean dispose(AtomicReference
field) { Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
如上,调用dispose后,做两件事:
- 获取
AtomicReference
当前value,调用其dispose - 将当前value设为
DISPOSED
onSubscribe中传递Disposable
LambdaObserver中AtomicReference的value是在Observer.onSubscribe
中设置的:
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) { //设置value
try {
onSubscribe.accept(this);
} catch (Throwable ex) {
...
}
}
}
那么Observer.onSubscribe是什么时候被调用的呢?
RxJava的各种操作符都是Observable的实现。操作符链式调用的本质就是创建Observable和Observer,并通过subscribe关联。
subscribe内部最终都会调用到subscribeActual,这是每个操作符都必须实现的方法。
create在subscribeActual中,调用Observer.onSubscrie()
,将当前的Disposable(前文说过其实就是当前Observer)作为parent传递给下游
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer); // CreateEmitter是一个Diposable
observer.onSubscribe(parent); // Observer.onSubscrie()
try {
source.subscribe(parent);
} catch (Throwable ex) {
...
}
}
Observer中关联上下游
除create、subscribe这样的终端操作符以外,大部分的操作符的Observer结构如下:
@Override
/** The downstream subscriber. */
protected final Observer<? super R> downstream;
/** The upstream subscription. */
protected Disposable upstream;
public final void onSubscribe(Disposable d) {
...
this.upstream = d;
downstream.onSubscribe(this);
...
}
public void dispose() {
upstream.dispose();
}
- Observer会持有上下游对象:upstream和downstream
- onSubscribe向下递归调用
- dipose向上递归调用
在链式订阅中,向下游订阅Observer的同时,也关联了上游的Disposable(Observer)
我们在最下端调用subscribe时,各个Observer会建立上下游关联,当我们在下游调用dispose时,最终会递归调用到顶端(create)的dispose
再看takeUntil的例子
根据以上分析,我们会回到最初takeUntil的例子。前面说过所有的操作符都是Observable的实现:
- takeUntil 对应
ObservableTakeUntilPredicate
; create 对应
ObservableCreate
ObservableCreate.subscribeActual() // Send 1, 2, 3 down
↑
ObservableCreate.subscribe(TakeUntilPredicateObserver)
↑
ObservableTakeUntilPredicate.subscribeActual()
↑
ObservableTakeUntilPredicate.subscribe()
通过subscribeActual -> onSubscribe的调用,Disposable也有了如下引用链:
CreateEmitter
↑
TakeUntilPredicateObserver
↑
LambdaObserver
当我们针对终端的LambdaObserver调用dispose方法时,通过引用链递会最终调用到CreateEmitter的dispose。
CreateEmitter跟LambdaObserver一样,也会将AtomicReference的value设为DISPOSED
public void dispose() {
DisposableHelper.dispose(this);
}
之后,在onNext中判断状态,当为DISPOSED时,不再发射数据流
@Override
public void onNext(T t) {
if (!isDisposed()) { //是否为DISPOSED
observer.onNext(t);
}
}
关于onComplete
通过如下test,可以发现当onComplete调用后会,会自动调用dispose。
@Test
public void testDisposed(){
boolean isDisposed = false;
TestObserver<Integer> to = Observable.<Integer>create(subscriber -> {
subscriber.setDisposable(new Disposable() {
@Override
public boolean isDisposed() {
return isDisposed;
}
@Override
public void dispose() {
isDisposed = true;
}
});
subscriber.onComplete();
}).test();
to.assertComplete();
assertTrue(isDisposed);
}
ObservableEmitter的onComplete中果然也调用了dispose:
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
关于内存泄漏
调用dipose确实可以结束流的发射,但是不等于没有内存泄露。
查看ObservableCreate的源码可以知道,dispose只是只是简单的设置了DISPOSED状态,Observe中关联的上下游对象并没有释放。所以当订阅了一个静态的Observable时,无法避免内存泄漏。
但是当订阅一个Subject时,dispose确实可以有效释放对象,避免内存泄漏:
public void dispose() {
if (super.tryDispose()) {
parent.remove(this); //对象删除
}
}
关于实时性
前面分析知道,对于终端操作符create、subscribe等,其Observer在dispose时会标记当前状态为DISPOSED
。但对于其他操作符,其Observer的dispose只是递归向上调用dispose,对正在处理中的数据不会拦截。
调用dispose后,RxJava数据流不一定会立即停止,大部分操作符在调用dispose后,数据依然会发射给下游
关于dispose的实时性测试,下文可供参考
https://medium.com/stepstone-tech/the-curious-case-of-rxjava-disposables-e64ff8a06879
还没有评论,来说两句吧...