Python 调用 ES、Solr、Phoenix的示例代码
#!/usr/bin/envpython #-*-coding:utf-8-*- #************************************* #@Time:2019/8/12 #@Author:ZhangFan #@Desc:Library #@File:MyDatabases.py #@Update:2019/8/23 #************************************* importelasticsearch importphoenixdb importpysolr importpymysql classMyELS(object): """ =================================================================== =====================MyELS========================= =================================================================== """ def__init__(self): self.els_conn=None defconnect_to_els(self,host,port): """ 连接到ElasticSearch服务器. """ self.els_conn=elasticsearch.Elasticsearch([{'host':host,'port':port}]) print('Executing:ConnectToElasticSearch|%s'%self.els_conn) defget_els_data(self,query,index): """ 获取ElasticSearch数据 """ print('Executing:Search|%s'%query) try: rst=self.els_conn.search(index=index,q=query) returnrst['hits'] exceptExceptionase: print('ElasticSearchError|%s'%e) raiseException(e) classMyPhoenix(object): """ =================================================================== =====================MyPhoenix====================== =================================================================== """ def__init__(self): self.phoenix_conn=None self.phoenix_cursor=None defconnect_to_phoenix(self,host,port=8765): """ 连接到phoenix服务器 """ address='http://{0}:{1}/'.format(host,port) print('Executing:ConnectToPhoenix|%s'%address) self.phoenix_conn=phoenixdb.connect(address,autocommit=True) self.phoenix_cursor=self.phoenix_conn.cursor() defset_schema(self,sql,schema): """ 设置schema """ pre_sub,sub,fol_sub=sql.upper().partition('FROM') fol_sub=''+schema+'.'+fol_sub.strip() new_sql=''.join([pre_sub,sub,fol_sub]) returnnew_sql defexecute_phoenix_sql(self,sql): """ 执行sql语句 """ #sql=self.set_schema(sql,schema) print('Executing:Execute|%s'%sql) self.phoenix_cursor.execute(sql) defget_from_phoenix(self,sql): """ 获取phoenix数据 """ #sql=self.set_schema(sql,schema) print('Executing:Query|%s'%sql) try: self.phoenix_cursor.execute(sql) exceptExceptionase: print('PhoenixError|%s'%e) raiseException(e) returnself.phoenix_cursor.fetchall() defdisconnect_from_phoenix(self): """ 断开phoenix连接 """ print('Executing:DisconnectFromHBase') self.phoenix_cursor.close() self.phoenix_conn.close() classMySolr(object): """ =================================================================== =====================MySolr========================= =================================================================== """ def__init__(self): self.solr_conn=None self.base_url=None defconnect_to_solr(self,address,selector): """连接到solr服务器. """ self.base_url='http://{0}/solr/{1}/'.format(address,selector) self.solr_conn=pysolr.Solr(self.base_url) print('Executing:ConnectToSolr|%s'%self.base_url) defget_solr_data(self,query): """ 获取solr数据 """ results=list() print('Executing:Search|%s'%query) try: items=self.solr_conn.search(query) foriteminitems: results.append(item) exceptExceptionase: print('SolrError|%s'%e) raiseException(e) returnresults defadd_solr_data(self,data): """ 添加solr数据 """ print('Executing:add|%s'%data) try: self.solr_conn.add([data]) self.solr_conn.commit() exceptExceptionase: print('SolrError|%s'%e) raiseException(e) defdel_solr_byId(self,data): """ 删除solr数据 """ print('Executing:del|%s'%data) try: self.solr_conn.delete(id=data) self.solr_conn.commit() exceptExceptionase: print('SolrError|%s'%e) raiseException(e) if__name__=='__main__': print('Thisistest.') ms=MySolr() me=MyELS() mp=MyPhoenix()
以上就是Python调用ES、Solr、Phoenix的示例代码的详细内容,更多关于Python调用ES、Solr、Phoenix的资料请关注毛票票其它相关文章!