python制作mysql数据迁移脚本
用python写了个数据迁移脚本,主要是利用从库将大的静态表导出表空间,载导入到目标实例中。
#!/usr/bin/envpython3 #-*-coding:utf8-*- #author:zhanbin.liu #!!!!!DB必须同版本 #python3环境pip3installpymysqlparamiko importos #frompathlibimportPath importsys importpymysql importparamiko #每次只能迁移一个DB下的表,到指定DB #GRANTSELECT,CREATE,RELOAD,ALTER,LOCKTABLESON*.*TO'data_migration'@'192.168.%'IDENTIFIEDBY'data_migration@123'; tables='sqlauto_cluster,sqlauto_user'#以,分割的字符串,如a,b,c tableList=tables.split(',') sourceIp='192.168.1.101' sourceDataBase='/data/mysql/3306/data' sourceDbName='inception_web' sourceDataDir=os.path.join(sourceDataBase,sourceDbName) desIp='192.168.1.102' desDataBase='/data/mysql/3306/data' desDbName='inception_web' desDataDir=os.path.join(desDataBase,desDbName) #fortableintableList: #desFile=Path("%s/%s.ibd"%(desDataDir,table)) #print(desFile) #ifdesFile.is_file(): #print("ok") #else: #print("no") comUser='data_migration' comPwd='data_migration@123' comPort=3306 client=paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) deftable_judge(): print("table_judge") sourceTableExist=pymysql.connect(sourceIp,comUser,comPwd,sourceDbName,comPort,charset='utf8') desTableExist=pymysql.connect(desIp,comUser,comPwd,desDbName,comPort,charset='utf8') sourceTables=[] desTables=[] cursor_source=sourceTableExist.cursor() cursor_des=desTableExist.cursor() fortableintableList: #print(table) cursor_source.execute("selectTABLE_NAMEfrominformation_schema.TABLESwhereTABLE_SCHEMA='%s'andTABLE_NAME='%s';"%(sourceDbName,table)) sourceTable_tmp=cursor_source.fetchall() cursor_des.execute("selectTABLE_NAMEfrominformation_schema.TABLESwhereTABLE_SCHEMA='%s'andTABLE_NAME='%s';"%(desDbName,table)) desTable_tmp=cursor_des.fetchall() #print(desTable_tmp) ifsourceTable_tmpis(): sourceTables.append(table) ifdesTable_tmpisnot(): desTables.append(desTable_tmp[0][0]) sourceTableExist.close() desTableExist.close() s=d=0 ifsourceTables!=[]: print('迁移源不存在将要迁移的表:',sourceIp,sourceDbName,sourceTables,'请检查') s=1 ifdesTables!=[]: print('目标库存在将要迁移的表:',desIp,desDbName,desTables,'请移除') d=1 ifs==1ord==1: sys.exit() defdata_sync(): print('data_sync') db_source=pymysql.connect(sourceIp,comUser,comPwd,sourceDbName,comPort,charset='utf8') db_des=pymysql.connect(desIp,comUser,comPwd,desDbName,comPort,charset='utf8') cursor_db_source=db_source.cursor() cursor_db_des=db_des.cursor() fortableintableList: print("正在同步表:",table) cursor_db_source.execute("showcreatetable%s;"%(table)) createTableSQL=cursor_db_source.fetchall()[0][1] print(createTableSQL) try: cursor_db_des.execute(createTableSQL) exceptExceptionaserror: print(error) cursor_db_source.execute("flushtable%swithreadlock;"%(table)) cursor_db_des.execute("altertable%sdiscardtablespace;"%(table)) client.connect(sourceIp,22,'root') stdin1,stdout1,stderr1=client.exec_command("scp%s%s:%s"%(sourceDataDir+"/"+table+".ibd",desIp,desDataDir)) stdin2,stdout2,stderr2=client.exec_command("scp%s%s:%s"%(sourceDataDir+"/"+table+".cfg",desIp,desDataDir)) a_e_1=stderr1.readlines() a_e_2=stderr2.readlines() ifa_e_1!=[]ora_e_2!=[]: print(a_e_1,a_e_2) sys.exit() client.close() client.connect(desIp,22,'root') stdin3,stdout3,stderr3=client.exec_command("chown-Rmysql.mysql%s*"%(desDataDir+"/"+table)) a_e_3=stderr3.readlines() ifa_e_3!=[]: print(a_e_1,a_e_2) sys.exit() client.close() #cursor_db_source.execute("selectsleep(10);") cursor_db_source.execute("unlocktables;") cursor_db_des.execute("altertable%simporttablespace;"%(table)) print("同步完成") cursor_db_source.close() cursor_db_des.close() defdata_checksum(): print('data_checksum') db_source=pymysql.connect(sourceIp,comUser,comPwd,sourceDbName,comPort,charset='utf8') db_des=pymysql.connect(desIp,comUser,comPwd,desDbName,comPort,charset='utf8') cursor_db_source=db_source.cursor() cursor_db_des=db_des.cursor() fortableintableList: print("正在校验表:",table) cursor_db_source.execute("checksumtable%s;"%(table)) ck_s=cursor_db_source.fetchall()[0][1] cursor_db_des.execute("checksumtable%s;"%(table)) ck_d=cursor_db_des.fetchall()[0][1] ifck_s!=ck_d: print("表不一致:",table) else: print("表一致:",table) cursor_db_source.close() cursor_db_des.close() if__name__=="__main__": table_judge() data_sync() data_checksum() print('haha')