python hbase读取数据发送kafka的方法
本例子实现从hbase获取数据,并发送kafka。
使用
#!/usr/bin/envpython #coding=utf-8 importsys importtime importjson sys.path.append('/usr/local/lib/python3.5/site-packages') fromthriftimportThrift fromthrift.transportimportTSocket fromthrift.transportimportTTransport fromthrift.protocolimportTBinaryProtocol fromhbase1importHbase#调用hbasethrif1 fromhbase1.ttypesimport* fromkafkaimportKafkaConsumer fromkafkaimportKafkaProducer fromkafka.errorsimportKafkaError importunittest classHbaseOpreator: def__init__(self,host,port,table='test'): self.tableName=table self.transport=TTransport.TBufferedTransport(TSocket.TSocket(host,port)) self.protocol=TBinaryProtocol.TBinaryProtocol(self.transport) self.client=Hbase.Client(self.protocol) self.transport.open() def__del__(self): self.transport.close() defscanTablefilter(self,table,*args): d=dict() L=[] try: tableName=table #scan=Hbase.TScan(startRow,stopRow) scan=TScan() #主键首字母123 #filter="PrefixFilter('123_')" #filter="RowFilter(=,'regexstring:.aaa')" #过滤条件,当前为statis_date字段,值为20170223 #fitler="SingleColumnValueFilter(tableName,'f','statis_date','20170223')" #filter="SingleColumnValueFilter('f','statis_date',=,'binary:20170223')ANDSingleColumnValueFilter('f','name',=,'binary:LXS')" filter="SingleColumnValueFilter('info','name',=,'binary:lilei')ORSingleColumnValueFilter('info','name',=,'binary:lily')" scan.filterString=filter id=self.client.scannerOpenWithScan(tableName,scan,None) result=self.client.scannerGet(id) #result=self.client.scannerGetList(id,100) whileresult: forrinresult: key=r.row name=r.columns.get('info:name').value age=r.columns.get('info:age').value phone=r.columns.get('info:phone').value d['key']=key d['name']=name d['age']=age d['phone']=phone #encode_result_json=json.dumps(d).encode(encoding="utf-8") #print(encode_result_json) L.append(d) result=self.client.scannerGet(id) returnjson.dumps(L).encode(encoding="utf-8") finally: #self.client.scannerClose(scan) print("scanfinish") defsendKfafkaProduct(data): #self.host_port='localhost:9092' producer=KafkaProducer(bootstrap_servers=['localhost:9092']) fordindata: producer.send('test',key=b'lxs',value=d) time.sleep(5) print(d) whileTrue: producer.send('test',key=b'lxs',value=data) time.sleep(5) print(data) if__name__=='__main__': #unittest.main() B=HbaseOpreator('10.27.1.138',9090) value=B.scanTablefilter('ns_lbi:test_hbase_student') print(value) #sendKfafkaProduct(value)
以上这篇pythonhbase读取数据发送kafka的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。