Rxjava2_Flowable_Sqlite_Android数据库访问实例
一、使用Rxjava访问数据库的优点:
1.随意的线程控制,数据库操作在一个线程,返回数据处理在ui线程
2.随时订阅和取消订阅,而不必再使用回调函数
3.对读取的数据用rxjava进行过滤,流式处理
4.使用sqlbrite可以原生返回rxjava的格式,同时是响应式数据库框架
(有数据添加和更新时自动调用之前订阅了的读取函数,达到有数据添加自动更新ui的效果,
同时这个特性没有禁止的方法,只能通过取消订阅停止这个功能,对于有的框架这反而是一种累赘)
二、接下来之关注实现过程:
本次实现用rxjava2的Flowable,有被压支持(在不需要被压支持的情况建议使用Observable)
实现一个稳健的的可灵活切换其他数据库的结构,当然是先定义数据库访问接口。然后跟具不同的数据库实现接口的方法
定义接口:(对于update,delete,insert,可以选择void类型,来简化调用代码,但缺少了执行结果判断)
publicinterfaceDbSource{ //Stringsql="insertintotable_task(tid,startts)values(tid,startts)"; FlowableinsertNewTask(inttid,intstartts); //Stringsql="select*fromtable_task"; Flowable >getAllTask(); //Stringsql="select*fromtable_taskwhereendts=0"; Flowable
>getRunningTask(); //Stringsql="updatetable_tasksetisuploadend=isuploadendwheretid=tid"; Flowable markUploadEnd(inttid,booleanisuploadend); //Stringsql="deletefromtable_taskwheretid=tidandendts>0"; Flowable deleteTask(inttid); }
三、用Android原生的Sqlite实现数据库操作
publicclassSimpleDbimplementsDbSource{ privatestaticSimpleDbsqlite; privateSqliteHelpersqliteHelper; privateSimpleDb(Contextcontext){ this.sqliteHelper=newSqliteHelper(context); } publicstaticsynchronizedSimpleDbgetInstance(Contextcontext){ if(sqlite==null) sqlite=newSimpleDb(context); returnsqlite; } FlowableinsertNewTask(inttid,intstartts){ returnFlowable.create(newFlowableOnSubscribe (){ @Override publicvoidsubscribe(FlowableEmitter e)throwsException{ //这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法 ContentValuesvalues=newContentValues(); values.put(“tid”,1); values.put(“startts”,13233); if(sqliteHelper.getWriteableDatabase().insert(TABLE_NAME,null,values)!=-1) e.onNext(true); else e.onNext(false); e.onComplete(); } },BackpressureStrategy.BUFFER); } Flowable >getAllTask(){ returnFlowable.create(newFlowableOnSubscribe
>(){ @Override publicvoidsubscribe(FlowableEmitter
>e)throwsException{ List
taskList=newArrayList<>(); StringBuildersql=newStringBuilder(100); sql.append("select*from"); sql.append(SqliteHelper.TABLE_NAME_TASK); SQLiteDatabasesqLiteDatabase=sqliteHelper.getReadableDatabase(); Cursorcursor=sqLiteDatabase.rawQuery(sql.toString(),null); if(cursor.moveToFirst()){ intcount=cursor.getCount(); for(inta=0;a >getRunningTask(){ returnFlowable.create(newFlowableOnSubscribe >(){ @Override publicvoidsubscribe(FlowableEmitter >e)throwsException{ TaskItemitem=null; StringBuildersql=newStringBuilder(100); sql.append("select*from"); sql.append(SqliteHelper.TABLE_NAME_TASK); sql.append("whereendts=0limit1"); SQLiteDatabasesqLiteDatabase=sqliteHelper.getReadableDatabase(); Cursorcursor=sqLiteDatabase.rawQuery(sql.toString(),null); if(cursor.moveToFirst()){ intcount=cursor.getCount(); if(count==1){ item=newTaskItem(); item.setId(cursor.getInt(0)); item.setTid(cursor.getInt(1)); item.setStartts(cursor.getInt(2)); item.setEndts(cursor.getInt(3)); } } cursor.close(); sqLiteDatabase.close(); e.onNext(Optional.fromNullable(item));//importcom.google.common.base.Optional;//安全检查,待会看调用的代码,配合rxjava很好 e.onComplete(); } },BackpressureStrategy.BUFFER); } Flowable markUploadEnd(inttid,booleanisuploadend){ returnFlowable.create(newFlowableOnSubscribe (){ @Override publicvoidsubscribe(FlowableEmitter e)throwsException{ //这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法 //数据库操作代码 e.onNext(false);//返回结果 e.onComplete();//返回结束 } },BackpressureStrategy.BUFFER); } Flowable deleteTask(inttid){ returnFlowable.create(newFlowableOnSubscribe (){ @Override publicvoidsubscribe(FlowableEmitter e)throwsException{ //这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法 //数据库操作代码 e.onNext(false);//返回结果 e.onComplete();//返回结束 } },BackpressureStrategy.BUFFER); } }
四、同一个接口使用sqlbrite的实现方式
publicclassBriteDbimplementsDbSource{ @NonNull protectedfinalBriteDatabasemDatabaseHelper; @NonNull privateFunctionmTaskMapperFunction; @NonNull privateFunction mPoiMapperFunction; @NonNull privateFunction mInterestPoiMapperFunction; //Preventdirectinstantiation. privateBriteDb(@NonNullContextcontext){ DbHelperdbHelper=newDbHelper(context); SqlBritesqlBrite=newSqlBrite.Builder().build(); mDatabaseHelper=sqlBrite.wrapDatabaseHelper(dbHelper,Schedulers.io(); mTaskMapperFunction=this::getTask; mPoiMapperFunction=this::getPoi; mInterestPoiMapperFunction=this::getInterestPoi; } @Nullable privatestaticBriteDbINSTANCE; publicstaticBriteDbgetInstance(@NonNullContextcontext){ if(INSTANCE==null){ INSTANCE=newBriteDb(context); } returnINSTANCE; } @NonNull privateTaskItemgetTask(@NonNullCursorc){ TaskItemitem=newTaskItem(); item.setId(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ID))); item.setTid(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_TID))); item.setStartts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_STARTTS))); item.setEndts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS))); returnitem; } @Override publicvoidinsertNewTask(inttid,intstartts){ ContentValuesvalues=newContentValues(); values.put(PersistenceContract.TaskEntry.COLUMN_TASK_TID,tid); values.put(PersistenceContract.TaskEntry.COLUMN_TASK_STARTTS,startts); mDatabaseHelper.insert(PersistenceContract.TaskEntry.TABLE_NAME_TASK,values,SQLiteDatabase.CONFLICT_REPLACE); } @Override publicFlowable >getAllTask(){ Stringsql=String.format("SELECT*FROM%s",PersistenceContract.TaskEntry.TABLE_NAME_TASK);//TABLE_NAME_TASK表的名字字符串 returnmDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK,sql) .mapToList(mTaskMapperFunction) .toFlowable(BackpressureStrategy.BUFFER); } @Override publicFlowable
>getRunningTask(){ Stringsql=String.format("SELECT*FROM%sWHERE%s=?limit1", PersistenceContract.TaskEntry.TABLE_NAME_TASK,PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS); returnmDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK,sql,"0") .mapToOne(cursor->Optional.fromNullable(mTaskMapperFunction.apply(cursor))) .toFlowable(BackpressureStrategy.BUFFER); } @Override publicFlowable markUploadEnd(inttid,booleanisuploadend){ returnFlowable.create(newFlowableOnSubscribe (){ @Override publicvoidsubscribe(FlowableEmitter e)throwsException{ ContentValuesvalues=newContentValues(); if(isuploadend){ values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND,1); }else{ values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND,0); } Stringselection=PersistenceContract.TaskEntry.COLUMN_TASK_TID+"=?"; //String[]selectionArgs={String.valueOf(tid)}; StringselectionArgs=String.valueOf(tid); intres=mDatabaseHelper.update(PersistenceContract.TaskEntry.TABLE_NAME_TASK,values,selection,selectionArgs); if(res>0){ e.onNext(true);//返回结果 }else{ e.onNext(false);//返回结果 } e.onComplete();//返回结束 } },BackpressureStrategy.BUFFER); } @Override publicFlowable deleteTask(inttid){ returnFlowable.create(newFlowableOnSubscribe (){ @Override publicvoidsubscribe(FlowableEmitter e)throwsException{ Stringselection=PersistenceContract.TaskEntry.COLUMN_TASK_TID+"=?AND"+ PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS+">0"; String[]selectionArgs=newString[1]; selectionArgs[0]=String.valueOf(tid); intres=mDatabaseHelper.delete(PersistenceContract.TaskEntry.TABLE_NAME_TASK,selection,selectionArgs); if(res>0){ e.onNext(true);//返回结果 }else{ e.onNext(false);//返回结果 } e.onComplete();//返回结束 } },BackpressureStrategy.BUFFER); } }
五、数据库调用使用方法
使用了lambda简化了表达式进一步简化代码:
简化方法:在/app/build.gradle里面加入如下内容:(defaultConfig的外面)
compileOptions{ sourceCompatibilityJavaVersion.VERSION_1_8 targetCompatibilityJavaVersion.VERSION_1_8 }
接口调用(获得数据库实例):
//全局定义的实例获取类,以后想要换数据库,只需在这个类里切换即可 publicclassInjection{ publicstaticDbSourcegetDbSource(Contextcontext){ //chooseoneofthem //returnBriteDb.getInstance(context); returnSimpleDb.getInstance(context); } } DbSourcedb=Injection.getInstance(mContext); disposable1=db.getAllTask() .flatMap(Flowable::fromIterable) .filter(task->{//自定义过滤 if(!task.getIsuploadend()){ returntrue; }else{ returnfalse; } }) .subscribe(taskItems->//这里是使用了lambda简化了表达式 doTaskProcess(taskItems) ,throwable->{ throwable.printStackTrace(); },//onCompleted ()->{ if(disposable1!=null&&!disposable1.isDisposed()){ disposable1.dispose(); } }); disposable1=db.getRunningTask() .filter(Optional::isPresent)//判断是否为空,为空的就跳过 .map(Optional::get)//获取到真的参数 .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(taskItem->{//onNext() //hasrunningtask mTid=taskItem.getTid(); },throwable->throwable.printStackTrace()//onError() ,()->disposable1.dispose());//onComplete() disposable1=db.markUploadEnd(tid,isuploadend) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(status->{//onNext() if(status){ //dosomething } },throwable->throwable.printStackTrace()//onError() ,()->disposable1.dispose());//onComplete() disposable1=db.deleteTask(tid) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(status->{//onNext() if(status){ //dosomething } },throwable->throwable.printStackTrace()//onError() ,()->disposable1.dispose());//onComplete()
以上这篇Rxjava2_Flowable_Sqlite_Android数据库访问实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。