谈谈RxJava2中的异常及处理方法
前言
众所周知,RxJava2中当链式调用中抛出异常时,如果没有对应的Consumer去处理异常,则这个异常会被抛出到虚拟机中去,Android上的直接表现就是crash,程序崩溃。
订阅方式
说异常处理前咱们先来看一下RxJava2中Observable订阅方法subscribe()我们常用的几种订阅方式:
//1 subscribe() //2 Disposablesubscribe(ConsumeronNext) //3 Disposablesubscribe(ConsumeronNext,ConsumeronError) //4 Disposablesubscribe(ConsumeronNext,ConsumeronError,ActiononComplete) //5 Disposablesubscribe(ConsumeronNext,ConsumeronError,ActiononComplete,ConsumeronSubscribe) //6 voidsubscribe(Observerobserver)
无参和以Consumer为参数的几种方法内部都是以默认参数补齐的方式最终调用第5个方法,而方法5内部通过LambdaObserver将参数包装成Observer再调用第6个方法
publicfinalDisposablesubscribe(ConsumeronNext,ConsumeronError, ActiononComplete,ConsumeronSubscribe){ ObjectHelper.requireNonNull(onNext,"onNextisnull"); ObjectHelper.requireNonNull(onError,"onErrorisnull"); ObjectHelper.requireNonNull(onComplete,"onCompleteisnull"); ObjectHelper.requireNonNull(onSubscribe,"onSubscribeisnull"); LambdaObserverls=newLambdaObserver (onNext,onError,onComplete,onSubscribe); subscribe(ls); returnls; }
所以使用Consumer参数方式和 Observer参数方式进行订阅除了观察回调来源不一样其他没有任何差别。但就是因为这种差别,在异常情况发生时的处理结果上也会产生差别
异常处理
我们分别进行一下几种方式模拟异常:
1、ObserveronNext中抛出异常(切换线程)
apiService.newJsonKeyData() .doOnSubscribe{t->compositeDisposable.add(t)} .compose(RxScheduler.sync())//封装的线程切换 .subscribe(object:Observer>{ overridefunonComplete(){ } overridefunonSubscribe(d:Disposable){ } overridefunonNext(t:List
){ throwRuntimeException("runtimeexception") } overridefunonError(e:Throwable){ Log.d("error",e.message) } })
结果:不会触发onError,App崩溃
2、ObserveronNext中抛出异常(未切换线程)
Observable.create{ it.onNext("ssss") } .subscribe(object:Observer { overridefunonComplete(){ } overridefunonSubscribe(d:Disposable){ } overridefunonNext(t:String){ Log.d("result::",t) throwRuntimeException("runllllll") } overridefunonError(e:Throwable){ Log.e("sss","sss",e) } })
结果:会触发onError,App未崩溃
3、Observermap操作符中抛出异常
apiService.newJsonKeyData() .doOnSubscribe{t->compositeDisposable.add(t)} .map{ throwRuntimeException("runtimeexception") } .compose(RxScheduler.sync()) .subscribe(object:Observer>{ overridefunonComplete(){ } overridefunonSubscribe(d:Disposable){ } overridefunonNext(t:List
){ } overridefunonError(e:Throwable){ Log.d("error",e.message) } })
结果:会触发Observer的onError,App未崩溃
4、ConsumeronNext中抛出异常
apiService.newJsonKeyData() .doOnSubscribe{t->compositeDisposable.add(t)} .compose(RxScheduler.sync()) .subscribe({ throwRuntimeException("messsasassssssssssssssssssssssssssssssssssssss") },{ Log.d("Error",it.message) })
结果A:有errorConsumer触发errorConsumer,App未崩溃
apiService.newJsonKeyData() .doOnSubscribe{t->compositeDisposable.add(t)} .compose(RxScheduler.sync()) .subscribe{ throwRuntimeException("messsasassssssssssssssssssssssssssssssssssssss") }
结果B:无errorConsumer,App崩溃
那么为什么会出现这些不同情况呢?我们从源码中去一探究竟。
Consumer订阅方式的崩溃与不崩溃
subscribe()传入consumer类型参数最终在Observable中会将传入的参数转换为LambdaObserver再调用subscribe(lambdaObserver)进行订阅。展开 LambdaObserver:(主要看onNext和onError方法中的处理)
. . . @Override publicvoidonNext(Tt){ if(!isDisposed()){ try{ onNext.accept(t); }catch(Throwablee){ Exceptions.throwIfFatal(e); get().dispose(); onError(e); } } } @Override publicvoidonError(Throwablet){ if(!isDisposed()){ lazySet(DisposableHelper.DISPOSED); try{ onError.accept(t); }catch(Throwablee){ Exceptions.throwIfFatal(e); RxJavaPlugins.onError(newCompositeException(t,e)); } }else{ RxJavaPlugins.onError(t); } } . . .
onNext中调用了对应consumer的apply()方法,并且进行了trycatch。因此我们在consumer中进行的工作抛出异常会被捕获触发LambdaObserver的onError。再看onError中,如果订阅未取消且errorConsumer的apply()执行无异常则能正常走完事件流,否则会调用RxJavaPlugins.onError(t)。看到这里应该就能明白了,当订阅时未传入errorConsumer时Observable会指定OnErrorMissingConsumer为默认的errorConsumer,发生异常时抛出OnErrorNotImplementedException。
RxJavaPlugins.onError(t)
上面分析,发现异常最终会流向RxJavaPlugins.onError(t)。这个方法为RxJava2提供的一个全局的静态方法。
publicstaticvoidonError(@NonNullThrowableerror){ Consumerf=errorHandler; if(error==null){ error=newNullPointerException("onErrorcalledwithnull.Nullvaluesaregenerallynotallowedin2.xoperatorsandsources."); }else{ if(!isBug(error)){ error=newUndeliverableException(error); } } if(f!=null){ try{ f.accept(error); return; }catch(Throwablee){ //Exceptions.throwIfFatal(e);TODOdecide e.printStackTrace();//NOPMD uncaught(e); } } error.printStackTrace();//NOPMD uncaught(error); }
查看其源码发现,当errorHandler不为空时异常将由其消耗掉,为空或者消耗过程产生新的异常则RxJava会将异常抛给虚拟机(可能导致程序崩溃)。errorHandler本身是一个Consumer对象,我们可以通过如下方式配置他:
RxJavaPlugins.setErrorHandler(object:Consumer1{ overridefunaccept(t:Throwable?){ TODO("notimplemented")//TochangebodyofcreatedfunctionsuseFile|Settings|FileTemplates. } })
数据操作符中抛出异常
以map操作符为例,map操作符实际上RxJava是将事件流hook了另一个新的ObservableObservableMap
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) publicfinalObservable map(Functionmapper){ ObjectHelper.requireNonNull(mapper,"mapperisnull"); returnRxJavaPlugins.onAssembly(newObservableMap (this,mapper)); }
进入ObservableMap类,发现内部订阅了一个内部静态类MapObserver,重点看MapObserver 的onNext方法
publicvoidonNext(Tt){ if(done){ return; } if(sourceMode!=NONE){ downstream.onNext(null); return; } Uv; try{ v=ObjectHelper.requireNonNull(mapper.apply(t),"Themapperfunctionreturnedanullvalue."); }catch(Throwableex){ fail(ex); return; } downstream.onNext(v); }
onNext中trycatch了mapper.apply(),这个apply执行的就是我们在操作符中实现的function方法。因此在map之类数据变换操作符中产生异常能够自身捕获并发送给最终的Observer。如果此时的订阅对象中能消耗掉异常则事件流正常走onError()结束,如果订阅方式为上以节中的consumer,则崩溃情况为上一节中的分析结果。
Observer的onNext中抛出异常
上述的方式1为一次网络请求,里面涉及到线程的切换。方式2为直接create一个Observable对象,不涉及线程切换,其结果为线程切换后,观察者Observer的onNext()方法中抛出异常无法触发onError(),程序崩溃。
未切换线程的Observable.create
查看create()方法源码,发现内部创建了一个ObservableCreate对象,在调用订阅时会触发subscribeActual() 方法。在 subscribeActual()中再调用我们create时传入的ObservableOnSubscribe对象的subscribe()方法来触发事件流。
@Override protectedvoidsubscribeActual(Observerobserver){ //对我们的观察者使用CreateEmitter进行包装,内部的触发方法是相对应的 CreateEmitterparent=newCreateEmitter (observer); observer.onSubscribe(parent); try{ //source为create时创建的ObservableOnSubscribe匿名内部接口实现类 source.subscribe(parent); }catch(Throwableex){ Exceptions.throwIfFatal(ex); parent.onError(ex); } }
上述代码中的订阅过程是使用trycatch今夕包裹的。订阅及订阅触发后发送的事件流都在一个线程,所以能够捕获整个事件流中的异常。(PS:大家可以尝试下使用 observeOn()切换事件发送线程。会发现异常不能再捕获,程序崩溃)
涉及线程变换时的异常处理
Retrofit进行网络请求返回的Observable对象实质上是RxJava2CallAdapter中生成的BodyObservable,期内部的onNext是没有进行异常捕获的。其实这里是否捕获并不是程序崩溃的根本原因,因为进行网络请求,必然是涉及到线程切换的。就算此处trycatch处理了,也并不能捕获到事件流下游的异常。
@OverridepublicvoidonNext(Responseresponse){ if(response.isSuccessful()){ observer.onNext(response.body()); }else{ terminated=true; Throwablet=newHttpException(response); try{ observer.onError(t); }catch(Throwableinner){ Exceptions.throwIfFatal(inner); RxJavaPlugins.onError(newCompositeException(t,inner)); } } }
以我们在最终的Observer的onNext抛出异常为例,要捕获这次异常那么必须在最终的调用线程中去进行捕获。即.observeOn(AndroidSchedulers.mainThread())切换过来的Android主线程。与其他操作符一样,线程切换时产生了一组新的订阅关系,RxJava内部会创建一个新的观察对象ObservableObserveOn。
@Override publicvoidonNext(Tt){ if(done){ return; } if(sourceMode!=QueueDisposable.ASYNC){ queue.offer(t); } schedule(); } . . . voidschedule(){ if(getAndIncrement()==0){ worker.schedule(this);//执行ObservableObserveOn的run方法 } } . . . @Override publicvoidrun(){ if(outputFused){ drainFused(); }else{ drainNormal(); } }
而执行任务的worker即为对应线程Scheduler的对应实现子类所创建的Worker,以AndroidSchedulers.mainThread()为例,Scheduler实现类为HandlerScheduler,其对应Worker为HandlerWorker,最终任务交给ScheduledRunnable来执行。
privatestaticfinalclassScheduledRunnableimplementsRunnable,Disposable{ privatefinalHandlerhandler; privatefinalRunnabledelegate; privatevolatilebooleandisposed;//TrackedsolelyforisDisposed(). ScheduledRunnable(Handlerhandler,Runnabledelegate){ this.handler=handler; this.delegate=delegate; } @Override publicvoidrun(){ try{ delegate.run(); }catch(Throwablet){ RxJavaPlugins.onError(t); } } @Override publicvoiddispose(){ handler.removeCallbacks(this); disposed=true; } @Override publicbooleanisDisposed(){ returndisposed; } }
会发现,run中进行了trycatch。但catch内消化异常使用的是全局异常处理RxJavaPlugins.onError(t);,而不是某一个观察者的onError。所以在经过切换线程操作符后,观察者onNext中抛出的异常,onError无法捕获。
处理方案
既然知道了问题所在,那么处理问题的方案也就十分清晰了。
1、注册全局的异常处理
RxJavaPlugins.setErrorHandler(object:Consumer{ overridefunaccept(t:Throwable?){ //dosomething } })
2、Consumer作为观察者时,不完全确定没有异常一定要添加异常处理Consumer
apiService.stringData() .doOnSubscribe{t->compositeDisposable.add(t)} .compose(RxScheduler.sync()) .subscribe(Consumer{},Consumer {})
3、Observer可以创建一个BaseObaerver将onNext内部进行trycatch人为的流转到onError中,项目中的观察这都使用这个BaseObserver的子类。
@Override publicvoidonNext(Tt){ try{ onSuccess(t); }catch(Exceptione){ onError(e); } data=t; success=true; }
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对毛票票的支持。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。