PHP基于rabbitmq操作类的生产者和消费者功能示例
本文实例讲述了PHP基于rabbitmq操作类的生产者和消费者功能。分享给大家供大家参考,具体如下:
注意事项:
1、accept.php消费者代码需要在命令行执行
2、'username'=>'asdf','password'=>'123456'改成自己的帐号和密码
RabbitMQCommand.php操作类代码
$host,'port'=>5672,'username'=>$username,'password'=>$password,'vhost'=>'/') */ publicfunction__construct($configs=array(),$exchange_name='',$queue_name='',$route_key=''){ $this->setConfigs($configs); $this->exchange_name=$exchange_name; $this->queue_name=$queue_name; $this->route_key=$route_key; } privatefunctionsetConfigs($configs){ if(!is_array($configs)){ thrownewException('configsisnotarray'); } if(!($configs['host']&&$configs['port']&&$configs['username']&&$configs['password'])){ thrownewException('configsisempty'); } if(empty($configs['vhost'])){ $configs['vhost']='/'; } $configs['login']=$configs['username']; unset($configs['username']); $this->configs=$configs; } /* *设置是否持久化,默认为True */ publicfunctionsetDurable($durable){ $this->durable=$durable; } /* *设置是否自动删除 */ publicfunctionsetAutoDelete($autodelete){ $this->autodelete=$autodelete; } /* *设置是否镜像 */ publicfunctionsetMirror($mirror){ $this->mirror=$mirror; } /* *打开amqp连接 */ privatefunctionopen(){ if(!$this->_conn){ try{ $this->_conn=newAMQPConnection($this->configs); $this->_conn->connect(); $this->initConnection(); }catch(AMQPConnectionException$ex){ thrownewException('cannotconnectionrabbitmq',500); } } } /* *rabbitmq连接不变 *重置交换机,队列,路由等配置 */ publicfunctionreset($exchange_name,$queue_name,$route_key){ $this->exchange_name=$exchange_name; $this->queue_name=$queue_name; $this->route_key=$route_key; $this->initConnection(); } /* *初始化rabbit连接的相关配置 */ privatefunctioninitConnection(){ if(empty($this->exchange_name)||empty($this->queue_name)||empty($this->route_key)){ thrownewException('rabbitmqexchange_nameorqueue_nameorroute_keyisempty',500); } $this->_channel=newAMQPChannel($this->_conn); $this->_exchange=newAMQPExchange($this->_channel); $this->_exchange->setName($this->exchange_name); $this->_exchange->setType(AMQP_EX_TYPE_DIRECT); if($this->durable) $this->_exchange->setFlags(AMQP_DURABLE); if($this->autodelete) $this->_exchange->setFlags(AMQP_AUTODELETE); $this->_exchange->declare(); $this->_queue=newAMQPQueue($this->_channel); $this->_queue->setName($this->queue_name); if($this->durable) $this->_queue->setFlags(AMQP_DURABLE); if($this->autodelete) $this->_queue->setFlags(AMQP_AUTODELETE); if($this->mirror) $this->_queue->setArgument('x-ha-policy','all'); $this->_queue->declare(); $this->_queue->bind($this->exchange_name,$this->route_key); } publicfunctionclose(){ if($this->_conn){ $this->_conn->disconnect(); } } publicfunction__sleep(){ $this->close(); returnarray_keys(get_object_vars($this)); } publicfunction__destruct(){ $this->close(); } /* *生产者发送消息 */ publicfunctionsend($msg){ $this->open(); if(is_array($msg)){ $msg=json_encode($msg); }else{ $msg=trim(strval($msg)); } return$this->_exchange->publish($msg,$this->route_key); } /* *消费者 *$fun_name=array($classobj,$function)orfunctionnamestring *$autoack是否自动应答 * *functionprocessMessage($envelope,$queue){ $msg=$envelope->getBody(); echo$msg."\n";//处理消息 $queue->ack($envelope->getDeliveryTag());//手动应答 } */ publicfunctionrun($fun_name,$autoack=True){ $this->open(); if(!$fun_name||!$this->_queue)returnFalse; while(True){ if($autoack)$this->_queue->consume($fun_name,AMQP_AUTOACK); else$this->_queue->consume($fun_name); } } }
send.php生产者代码
'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/'); $exchange_name='class-e-1'; $queue_name='class-q-1'; $route_key='class-r-1'; $ra=newRabbitMQCommand($configs,$exchange_name,$queue_name,$route_key); for($i=0;$i<=100;$i++){ $ra->send(date('Y-m-dH:i:s',time())); } exit();
accept.php消费者代码
'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/'); $exchange_name='class-e-1'; $queue_name='class-q-1'; $route_key='class-r-1'; $ra=newRabbitMQCommand($configs,$exchange_name,$queue_name,$route_key); classA{ functionprocessMessage($envelope,$queue){ $msg=$envelope->getBody(); $envelopeID=$envelope->getDeliveryTag(); $pid=posix_getpid(); file_put_contents("log{$pid}.log",$msg.'|'.$envelopeID.''."\r\n",FILE_APPEND); $queue->ack($envelopeID); } } $a=newA(); $s=$ra->run(array($a,'processMessage'),false);
更多关于PHP相关内容感兴趣的读者可查看本站专题:《PHP数据结构与算法教程》、《php程序设计算法总结》、《php字符串(string)用法总结》、《PHP数组(Array)操作技巧大全》、《PHP常用遍历算法与技巧总结》及《PHP数学运算技巧总结》
希望本文所述对大家PHP程序设计有所帮助。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。