谈谈RxJava2中的异常及处理方法
前言
众所周知,RxJava2中当链式调用中抛出异常时,如果没有对应的Consumer去处理异常,则这个异常会被抛出到虚拟机中去,Android上的直接表现就是crash,程序崩溃。
订阅方式
说异常处理前咱们先来看一下RxJava2中Observable订阅方法subscribe()我们常用的几种订阅方式:
//1 subscribe() //2 Disposablesubscribe(ConsumeronNext) //3 Disposablesubscribe(ConsumeronNext,ConsumeronError) //4 Disposablesubscribe(ConsumeronNext,ConsumeronError,ActiononComplete) //5 Disposablesubscribe(ConsumeronNext,ConsumeronError,ActiononComplete,ConsumeronSubscribe) //6 voidsubscribe(Observerobserver)
无参和以Consumer为参数的几种方法内部都是以默认参数补齐的方式最终调用第5个方法,而方法5内部通过LambdaObserver将参数包装成Observer再调用第6个方法
publicfinalDisposablesubscribe(ConsumeronNext,ConsumeronError,
ActiononComplete,ConsumeronSubscribe){
ObjectHelper.requireNonNull(onNext,"onNextisnull");
ObjectHelper.requireNonNull(onError,"onErrorisnull");
ObjectHelper.requireNonNull(onComplete,"onCompleteisnull");
ObjectHelper.requireNonNull(onSubscribe,"onSubscribeisnull");
LambdaObserverls=newLambdaObserver(onNext,onError,onComplete,onSubscribe);
subscribe(ls);
returnls;
}
所以使用Consumer参数方式和 Observer参数方式进行订阅除了观察回调来源不一样其他没有任何差别。但就是因为这种差别,在异常情况发生时的处理结果上也会产生差别
异常处理
我们分别进行一下几种方式模拟异常:
1、ObserveronNext中抛出异常(切换线程)
apiService.newJsonKeyData()
.doOnSubscribe{t->compositeDisposable.add(t)}
.compose(RxScheduler.sync())//封装的线程切换
.subscribe(object:Observer>{
overridefunonComplete(){
}
overridefunonSubscribe(d:Disposable){
}
overridefunonNext(t:List){
throwRuntimeException("runtimeexception")
}
overridefunonError(e:Throwable){
Log.d("error",e.message)
}
})
结果:不会触发onError,App崩溃
2、ObserveronNext中抛出异常(未切换线程)
Observable.create{ it.onNext("ssss") } .subscribe(object:Observer { overridefunonComplete(){ } overridefunonSubscribe(d:Disposable){ } overridefunonNext(t:String){ Log.d("result::",t) throwRuntimeException("runllllll") } overridefunonError(e:Throwable){ Log.e("sss","sss",e) } })
结果:会触发onError,App未崩溃
3、Observermap操作符中抛出异常
apiService.newJsonKeyData()
.doOnSubscribe{t->compositeDisposable.add(t)}
.map{
throwRuntimeException("runtimeexception")
}
.compose(RxScheduler.sync())
.subscribe(object:Observer>{
overridefunonComplete(){
}
overridefunonSubscribe(d:Disposable){
}
overridefunonNext(t:List){
}
overridefunonError(e:Throwable){
Log.d("error",e.message)
}
})
结果:会触发Observer的onError,App未崩溃
4、ConsumeronNext中抛出异常
apiService.newJsonKeyData()
.doOnSubscribe{t->compositeDisposable.add(t)}
.compose(RxScheduler.sync())
.subscribe({
throwRuntimeException("messsasassssssssssssssssssssssssssssssssssssss")
},{
Log.d("Error",it.message)
})
结果A:有errorConsumer触发errorConsumer,App未崩溃
apiService.newJsonKeyData()
.doOnSubscribe{t->compositeDisposable.add(t)}
.compose(RxScheduler.sync())
.subscribe{
throwRuntimeException("messsasassssssssssssssssssssssssssssssssssssss")
}
结果B:无errorConsumer,App崩溃
那么为什么会出现这些不同情况呢?我们从源码中去一探究竟。
Consumer订阅方式的崩溃与不崩溃
subscribe()传入consumer类型参数最终在Observable中会将传入的参数转换为LambdaObserver再调用subscribe(lambdaObserver)进行订阅。展开 LambdaObserver:(主要看onNext和onError方法中的处理)
.
.
.
@Override
publicvoidonNext(Tt){
if(!isDisposed()){
try{
onNext.accept(t);
}catch(Throwablee){
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}
@Override
publicvoidonError(Throwablet){
if(!isDisposed()){
lazySet(DisposableHelper.DISPOSED);
try{
onError.accept(t);
}catch(Throwablee){
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(newCompositeException(t,e));
}
}else{
RxJavaPlugins.onError(t);
}
}
.
.
.
onNext中调用了对应consumer的apply()方法,并且进行了trycatch。因此我们在consumer中进行的工作抛出异常会被捕获触发LambdaObserver的onError。再看onError中,如果订阅未取消且errorConsumer的apply()执行无异常则能正常走完事件流,否则会调用RxJavaPlugins.onError(t)。看到这里应该就能明白了,当订阅时未传入errorConsumer时Observable会指定OnErrorMissingConsumer为默认的errorConsumer,发生异常时抛出OnErrorNotImplementedException。
RxJavaPlugins.onError(t)
上面分析,发现异常最终会流向RxJavaPlugins.onError(t)。这个方法为RxJava2提供的一个全局的静态方法。
publicstaticvoidonError(@NonNullThrowableerror){
Consumerf=errorHandler;
if(error==null){
error=newNullPointerException("onErrorcalledwithnull.Nullvaluesaregenerallynotallowedin2.xoperatorsandsources.");
}else{
if(!isBug(error)){
error=newUndeliverableException(error);
}
}
if(f!=null){
try{
f.accept(error);
return;
}catch(Throwablee){
//Exceptions.throwIfFatal(e);TODOdecide
e.printStackTrace();//NOPMD
uncaught(e);
}
}
error.printStackTrace();//NOPMD
uncaught(error);
}
查看其源码发现,当errorHandler不为空时异常将由其消耗掉,为空或者消耗过程产生新的异常则RxJava会将异常抛给虚拟机(可能导致程序崩溃)。errorHandler本身是一个Consumer对象,我们可以通过如下方式配置他:
RxJavaPlugins.setErrorHandler(object:Consumer1{ overridefunaccept(t:Throwable?){ TODO("notimplemented")//TochangebodyofcreatedfunctionsuseFile|Settings|FileTemplates. } })
数据操作符中抛出异常
以map操作符为例,map操作符实际上RxJava是将事件流hook了另一个新的ObservableObservableMap
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) publicfinalObservable map(Functionmapper){ ObjectHelper.requireNonNull(mapper,"mapperisnull"); returnRxJavaPlugins.onAssembly(newObservableMap (this,mapper)); }
进入ObservableMap类,发现内部订阅了一个内部静态类MapObserver,重点看MapObserver 的onNext方法
publicvoidonNext(Tt){
if(done){
return;
}
if(sourceMode!=NONE){
downstream.onNext(null);
return;
}
Uv;
try{
v=ObjectHelper.requireNonNull(mapper.apply(t),"Themapperfunctionreturnedanullvalue.");
}catch(Throwableex){
fail(ex);
return;
}
downstream.onNext(v);
}
onNext中trycatch了mapper.apply(),这个apply执行的就是我们在操作符中实现的function方法。因此在map之类数据变换操作符中产生异常能够自身捕获并发送给最终的Observer。如果此时的订阅对象中能消耗掉异常则事件流正常走onError()结束,如果订阅方式为上以节中的consumer,则崩溃情况为上一节中的分析结果。
Observer的onNext中抛出异常
上述的方式1为一次网络请求,里面涉及到线程的切换。方式2为直接create一个Observable对象,不涉及线程切换,其结果为线程切换后,观察者Observer的onNext()方法中抛出异常无法触发onError(),程序崩溃。
未切换线程的Observable.create
查看create()方法源码,发现内部创建了一个ObservableCreate对象,在调用订阅时会触发subscribeActual() 方法。在 subscribeActual()中再调用我们create时传入的ObservableOnSubscribe对象的subscribe()方法来触发事件流。
@Override
protectedvoidsubscribeActual(Observerobserver){
//对我们的观察者使用CreateEmitter进行包装,内部的触发方法是相对应的
CreateEmitterparent=newCreateEmitter(observer);
observer.onSubscribe(parent);
try{
//source为create时创建的ObservableOnSubscribe匿名内部接口实现类
source.subscribe(parent);
}catch(Throwableex){
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
上述代码中的订阅过程是使用trycatch今夕包裹的。订阅及订阅触发后发送的事件流都在一个线程,所以能够捕获整个事件流中的异常。(PS:大家可以尝试下使用 observeOn()切换事件发送线程。会发现异常不能再捕获,程序崩溃)
涉及线程变换时的异常处理
Retrofit进行网络请求返回的Observable对象实质上是RxJava2CallAdapter中生成的BodyObservable,期内部的onNext是没有进行异常捕获的。其实这里是否捕获并不是程序崩溃的根本原因,因为进行网络请求,必然是涉及到线程切换的。就算此处trycatch处理了,也并不能捕获到事件流下游的异常。
@OverridepublicvoidonNext(Responseresponse){ if(response.isSuccessful()){ observer.onNext(response.body()); }else{ terminated=true; Throwablet=newHttpException(response); try{ observer.onError(t); }catch(Throwableinner){ Exceptions.throwIfFatal(inner); RxJavaPlugins.onError(newCompositeException(t,inner)); } } }
以我们在最终的Observer的onNext抛出异常为例,要捕获这次异常那么必须在最终的调用线程中去进行捕获。即.observeOn(AndroidSchedulers.mainThread())切换过来的Android主线程。与其他操作符一样,线程切换时产生了一组新的订阅关系,RxJava内部会创建一个新的观察对象ObservableObserveOn。
@Override
publicvoidonNext(Tt){
if(done){
return;
}
if(sourceMode!=QueueDisposable.ASYNC){
queue.offer(t);
}
schedule();
}
.
.
.
voidschedule(){
if(getAndIncrement()==0){
worker.schedule(this);//执行ObservableObserveOn的run方法
}
}
.
.
.
@Override
publicvoidrun(){
if(outputFused){
drainFused();
}else{
drainNormal();
}
}
而执行任务的worker即为对应线程Scheduler的对应实现子类所创建的Worker,以AndroidSchedulers.mainThread()为例,Scheduler实现类为HandlerScheduler,其对应Worker为HandlerWorker,最终任务交给ScheduledRunnable来执行。
privatestaticfinalclassScheduledRunnableimplementsRunnable,Disposable{
privatefinalHandlerhandler;
privatefinalRunnabledelegate;
privatevolatilebooleandisposed;//TrackedsolelyforisDisposed().
ScheduledRunnable(Handlerhandler,Runnabledelegate){
this.handler=handler;
this.delegate=delegate;
}
@Override
publicvoidrun(){
try{
delegate.run();
}catch(Throwablet){
RxJavaPlugins.onError(t);
}
}
@Override
publicvoiddispose(){
handler.removeCallbacks(this);
disposed=true;
}
@Override
publicbooleanisDisposed(){
returndisposed;
}
}
会发现,run中进行了trycatch。但catch内消化异常使用的是全局异常处理RxJavaPlugins.onError(t);,而不是某一个观察者的onError。所以在经过切换线程操作符后,观察者onNext中抛出的异常,onError无法捕获。
处理方案
既然知道了问题所在,那么处理问题的方案也就十分清晰了。
1、注册全局的异常处理
RxJavaPlugins.setErrorHandler(object:Consumer{ overridefunaccept(t:Throwable?){ //dosomething } })
2、Consumer作为观察者时,不完全确定没有异常一定要添加异常处理Consumer
apiService.stringData()
.doOnSubscribe{t->compositeDisposable.add(t)}
.compose(RxScheduler.sync())
.subscribe(Consumer{},Consumer{})
3、Observer可以创建一个BaseObaerver将onNext内部进行trycatch人为的流转到onError中,项目中的观察这都使用这个BaseObserver的子类。
@Override
publicvoidonNext(Tt){
try{
onSuccess(t);
}catch(Exceptione){
onError(e);
}
data=t;
success=true;
}
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对毛票票的支持。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。