python 监听salt job状态,并任务数据推送到redis中的方法
salt分发后,主动将已完成的任务数据推送到redis中,使用redis的生产者模式,进行消息传送
#coding=utf-8 importfnmatch,json,logging importsalt.config importsalt.utils.event fromsalt.utils.redisimportRedisPool importsys,os,datetime,random importmultiprocessing,threading fromjoi.utils.gobsAPIimportPostWeb logger=logging.getLogger(__name__) opts=salt.config.client_config('/data/salt/saltstack/etc/salt/master') r_conn=RedisPool(opts.get('redis_db')).getConn() lock=threading.Lock() classRedisQueueDaemon(object): ''' redis队列监听器 ''' def__init__(self,r_conn): self.r_conn=r_conn#redis连接实例 self.task_queue='task:prod:queue'#任务消息队列 deflisten_task(self): ''' 监听主函数 ''' whileTrue: queue_item=self.r_conn.blpop(self.task_queue,0)[1] print"queueget",queue_item #self.run_task(queue_item) t=threading.Thread(target=self.run_task,args=(queue_item,)) t.start() defrun_task(self,info): ''' 执行操作函数 ''' lock.acquire() info=json.loads(info) ifinfo['type']=='pushTaskData': task_data=self.getTaskData(info['jid']) task_data=json.loads(task_data)iftask_dataelse[] logger.info('获取缓存数据:%s'%task_data) iftask_data: ifself.sendTaskData2bs(task_data): task_data=[] self.setTaskData(info['jid'],task_data) elifinfo['type']=='setTaskState': self.setTaskState(info['jid'],info['state'],info['message']) elifinfo['type']=='setTaskData': self.setTaskData(info['jid'],info['data']) lock.release() defgetTaskData(self,jid): returnself.r_conn.hget('task:'+jid,'data') defsetTaskData(self,jid,data): self.r_conn.hset('task:'+jid,'data',json.dumps(data)) defsendTaskData2bs(self,task_data): logger.info('发送任务数据到后端...') logger.info(task_data) iftask_data: p=PostWeb('/jgapi/verify',task_data,'pushFlowTaskData') result=p.postRes() printresult ifresult['code']: logger.info('发送成功!') returnTrue else: logger.error('发送失败!') returnFalse else: returnTrue defsetTaskState(self,jid,state,message=''): logger.info('到后端设置任务【%s】状态'%str(jid)) p=PostWeb('/jgapi/verify',{'code':jid,'state':'success','message':message},'setTaskState') result=p.postRes() ifresult['code']: logger.info('设置任务【%s】状态成功!'%str(jid)) returnTrue,result else: logger.error('设置任务【%s】状态失败!'%str(jid)) returnresult defsalt_job_listener(): ''' saltjob监听器 ''' sevent=salt.utils.event.get_event( 'master', sock_dir=opts['sock_dir'], transport=opts['transport'], opts=opts) whileTrue: ret=sevent.get_event(full=True) ifretisNone: continue iffnmatch.fnmatch(ret['tag'],'salt/job/*/ret/*'): task_key='task:'+ret['data']['jid'] task_state=r_conn.hget(task_key,'state') task_data=r_conn.hget(task_key,'data') iftask_state: jid_data={ 'code':ret['data']['jid'], 'project_id':settings.SALT_MASTER_OPTS['project_id'], 'serverip':ret['data']['id'], 'returns':ret['data']['return'], 'name':ret['data']['id'], 'state':'success'ifret['data']['success']else'failed', } task_data=json.loads(task_data)iftask_dataelse[] task_data.append(jid_data) logger.info("新增数据:%s"%json.dumps(task_data)) r_conn.lpush('task:prod:queue',json.dumps({'type':'setTaskData','jid':ret['data']['jid'],'data':task_data})) #r_conn.hset(task_key,'data',json.dumps(task_data)) iftask_state=='running': iflen(task_data)>=1: logger.info('新增消息到队列:pushTaskData') r_conn.lpush('task:prod:queue',json.dumps({'jid':ret['data']['jid'],'type':'pushTaskData'})) else: logger.info('任务{0}完成,发送剩下的数据到后端...'.format(task_key)) logger.info('新增消息到队列:pushTaskData') r_conn.lpush('task:prod:queue',json.dumps({'jid':ret['data']['jid'],'type':'pushTaskData'})) printdatetime.datetime.now() defrun(): print'startredisproductqueuelisterner...' logger.info('startredisproductqueuelisterner...') multiprocessing.Process(target=RedisQueueDaemon(r_conn).listen_task,args=()).start() print'startsaltjoblisterner...' logger.info('startsaltjoblisterner...') multiprocessing.Process(target=salt_job_listener,args=()).start() ''' p=multiprocessing.Pool(2) print'startredisproductqueuelisterner...' p.apply_async(redis_queue_listenr,()) print'startsaltjoblisterner...' p.apply_async(salt_job_listener,()) p.close() p.join() '''
以上这篇python监听saltjob状态,并任务数据推送到redis中的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。