用python简单实现mysql数据同步到ElasticSearch的教程
之前博客有用logstash-input-jdbc同步mysql数据到ElasticSearch,但是由于同步时间最少是一分钟一次,无法满足线上业务,所以只能自己实现一个,但是时间比较紧,所以简单实现一个
思路:
网上有很多思路用什么mysql的binlog功能什么的,但是我对mysql了解实在有限,所以用一个很呆板的办法查询mysql得到数据,再插入es,因为数据量不大,而且10秒间隔同步一次,效率还可以,为了避免服务器之间的时间差和mysql更新和查询产生的时间差,所以在查询更新时间条件时是和上一次同步开始时间比较,这样不管数据多少,更新耗时多少都不会少数据,因为原则是同步不漏掉任何数据,也可以程序多开将时间差和间隔时间差异化,因为用mysql中一个id当作es中的id,也避免了重复数据
使用:
只需要按照escongif.py写配置文件,然后写sql文件,最后直接执行mstes.py就可以了,我这个也是参考logstash-input-jdbc的配置形式
MsToEs
|----esconfig.py(配置文件)
|----mstes.py(同步程序)
|----sql_manage.py(数据库管理)
|----aa.sql(需要用到sql文件)
|----bb.sql(需要用到sql文件)
sql_manage.py:
#-*-coding:utf-8-*- __author__="ZJL" fromsqlalchemy.poolimportQueuePool fromsqlalchemyimportcreate_engine fromsqlalchemy.ormimportsessionmaker,scoped_session importtraceback importesconfig #用于不需要回滚和提交的操作 deffind(func): defwrapper(self,*args,**kwargs): try: returnfunc(self,*args,**kwargs) exceptExceptionase: print(traceback.format_exc()) print(str(e)) returntraceback.format_exc() finally: self.session.close() returnwrapper classMysqlManager(object): def__init__(self): mysql_connection_string=esconfig.mysql.get("mysql_connection_string") self.engine=create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8',poolclass=QueuePool, pool_recycle=3600) #self.DB_Session=sessionmaker(bind=self.engine) #self.session=self.DB_Session() self.DB_Session=sessionmaker(bind=self.engine,autocommit=False,autoflush=True,expire_on_commit=False) self.db=scoped_session(self.DB_Session) self.session=self.db() @find defselect_all_dict(self,sql,keys): a=self.session.execute(sql) a=a.fetchall() lists=[] foriina: iflen(keys)==len(i): data_dict={} fork,vinzip(keys,i): data_dict[k]=v lists.append(data_dict) else: returnFalse returnlists #关闭 defclose(self): self.session.close()
aa.sql:
select CONVERT(c.`id`,CHAR)asid, c.`code`ascode, c.`project_name`asproject_name, c.`name`asname, date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')asupdate_time, from`cc`c wheredate_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
bb.sql:
select CONVERT(c.`id`,CHAR)asid, CONVERT(c.`age`,CHAR)asage, c.`code`ascode, c.`name`asname, c.`project_name`asproject_name, date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')asupdate_time, from`bb`c wheredate_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
esconfig.py:
#-*-coding:utf-8-*- #__author__="ZJL" #sql文件名与es中的type名一致 mysql={ #mysql连接信息 "mysql_connection_string":"root:123456@127.0.0.1:3306/xxx", #sql文件信息 "statement_filespath":[ #sql对应的es索引和es类型 { "index":"a1", "sqlfile":"aa.sql", "type":"aa" }, { "index":"a1", "sqlfile":"bb.sql", "type":"bb" }, ], } #es的ip和端口 elasticsearch={ "hosts":"127.0.0.1:9200", } #字段顺序与sql文件字段顺序一致,这是存进es中的字段名,这里用es的type名作为标识 db_field={ "aa": ("id", "code", "name", "project_name", "update_time", ), "bb": ("id", "code", "age", "project_name", "name", "update_time", ), } es_config={ #间隔多少秒同步一次 "sleep_time":10, #为了解决服务器之间时间差问题 "time_difference":3, #show_json用来展示导入的json格式数据, "show_json":False, }
mstes.py:
#-*-coding:utf-8-*- #__author__="ZJL" fromsql_manageimportMysqlManager fromesconfigimportmysql,elasticsearch,db_field,es_config fromelasticsearchimportElasticsearch fromelasticsearchimporthelpers importtraceback importtime classTongBu(object): def__init__(self): try: #是否展示json数据在控制台 self.show_json=es_config.get("show_json") #间隔多少秒同步一次 self.sleep_time=es_config.get("sleep_time") #为了解决同步时数据更新产生的误差 self.time_difference=es_config.get("time_difference") #当前时间,留有后用 self.datetime_now="" #es的ip和端口 es_host=elasticsearch.get("hosts") #连接es self.es=Elasticsearch(es_host) #连接mysql self.mm=MysqlManager() except: print(traceback.format_exc()) deftongbu_es_mm(self): try: #同步开始时间 start_time=time.time() print("start..............",time.strftime("%Y-%m-%d%H:%M:%S",time.localtime(start_time))) #这个list用于批量插入es actions=[] #获得所有sql文件list statement_filespath=mysql.get("statement_filespath",[]) ifself.datetime_now: #当前时间加上时间差(间隔时间加上执行同步用掉的时间,等于上一次同步开始时间)再字符串格式化 #sql中格式化时间时年月日和时分秒之间不能空格,不然导入es时报解析错误,所以这里的时间格式化也统一中间加一个T self.datetime_now=time.strftime("%Y-%m-%dT%H:%M:%S",time.localtime(time.time()-(self.sleep_time+self.time_difference))) else: self.datetime_now="1999-01-01T00:00:00" ifstatement_filespath: forfilepathinstatement_filespath: #sql文件 sqlfile=filepath.get("sqlfile") #es的索引 es_index=filepath.get("index") #es的type es_type=filepath.get("type") #读取sql文件内容 withopen(sqlfile,"r")asopf: sqldatas=opf.read() #::datetime_now是一个自定义的特殊字符串用于增量更新 if"::datetime_now"insqldatas: sqldatas=sqldatas.replace("::datetime_now",self.datetime_now) else: sqldatas=sqldatas #es和sql字段的映射 dict_set=db_field.get(es_type) #访问mysql,得到一个list,元素都是字典,键是字段名,值是数据 db_data_list=self.mm.select_all_dict(sqldatas,dict_set) ifdb_data_list: #将数据拼装成es的格式 fordb_dataindb_data_list: action={ "_index":es_index, "_type":es_type, "@timestamp":time.strftime("%Y-%m-%dT%H:%M:%S",time.localtime(time.time())), "_source":db_data } #如果没有id字段就自动生成 es_id=db_data.get("id","") ifes_id: action["_id"]=es_id #是否显示json再终端 ifself.show_json: print(action) #将拼装好的数据放进list中 actions.append(action) #list不为空就批量插入数据到es中 iflen(actions)>0: helpers.bulk(self.es,actions) exceptExceptionase: print(traceback.format_exc()) else: end_time=time.time() print("end...................",time.strftime("%Y-%m-%d%H:%M:%S",time.localtime(start_time))) self.time_difference=end_time-start_time finally: #报错就关闭数据库 self.mm.close() defmain(): tb=TongBu() #间隔多少秒同步一次 sleep_time=tb.sleep_time #死循环执行导入数据,加上时间间隔 whileTrue: tb.tongbu_es_mm() time.sleep(sleep_time) if__name__=='__main__': main()
以上这篇用python简单实现mysql数据同步到ElasticSearch的教程就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。