python hbase读取数据发送kafka的方法
本例子实现从hbase获取数据,并发送kafka。
使用
#!/usr/bin/envpython
#coding=utf-8
importsys
importtime
importjson
sys.path.append('/usr/local/lib/python3.5/site-packages')
fromthriftimportThrift
fromthrift.transportimportTSocket
fromthrift.transportimportTTransport
fromthrift.protocolimportTBinaryProtocol
fromhbase1importHbase#调用hbasethrif1
fromhbase1.ttypesimport*
fromkafkaimportKafkaConsumer
fromkafkaimportKafkaProducer
fromkafka.errorsimportKafkaError
importunittest
classHbaseOpreator:
def__init__(self,host,port,table='test'):
self.tableName=table
self.transport=TTransport.TBufferedTransport(TSocket.TSocket(host,port))
self.protocol=TBinaryProtocol.TBinaryProtocol(self.transport)
self.client=Hbase.Client(self.protocol)
self.transport.open()
def__del__(self):
self.transport.close()
defscanTablefilter(self,table,*args):
d=dict()
L=[]
try:
tableName=table
#scan=Hbase.TScan(startRow,stopRow)
scan=TScan()
#主键首字母123
#filter="PrefixFilter('123_')"
#filter="RowFilter(=,'regexstring:.aaa')"
#过滤条件,当前为statis_date字段,值为20170223
#fitler="SingleColumnValueFilter(tableName,'f','statis_date','20170223')"
#filter="SingleColumnValueFilter('f','statis_date',=,'binary:20170223')ANDSingleColumnValueFilter('f','name',=,'binary:LXS')"
filter="SingleColumnValueFilter('info','name',=,'binary:lilei')ORSingleColumnValueFilter('info','name',=,'binary:lily')"
scan.filterString=filter
id=self.client.scannerOpenWithScan(tableName,scan,None)
result=self.client.scannerGet(id)
#result=self.client.scannerGetList(id,100)
whileresult:
forrinresult:
key=r.row
name=r.columns.get('info:name').value
age=r.columns.get('info:age').value
phone=r.columns.get('info:phone').value
d['key']=key
d['name']=name
d['age']=age
d['phone']=phone
#encode_result_json=json.dumps(d).encode(encoding="utf-8")
#print(encode_result_json)
L.append(d)
result=self.client.scannerGet(id)
returnjson.dumps(L).encode(encoding="utf-8")
finally:
#self.client.scannerClose(scan)
print("scanfinish")
defsendKfafkaProduct(data):
#self.host_port='localhost:9092'
producer=KafkaProducer(bootstrap_servers=['localhost:9092'])
fordindata:
producer.send('test',key=b'lxs',value=d)
time.sleep(5)
print(d)
whileTrue:
producer.send('test',key=b'lxs',value=data)
time.sleep(5)
print(data)
if__name__=='__main__':
#unittest.main()
B=HbaseOpreator('10.27.1.138',9090)
value=B.scanTablefilter('ns_lbi:test_hbase_student')
print(value)
#sendKfafkaProduct(value)
以上这篇pythonhbase读取数据发送kafka的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。