• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    PHP基于rabbitmq操作类的生产者和消费者功能示例

    本文实例讲述了PHP基于rabbitmq操作类的生产者和消费者功能。分享给大家供大家参考,具体如下:

    注意事项:

    1、accept.php消费者代码需要在命令行执行

    2、'username'=>'asdf','password'=>'123456' 改成自己的帐号和密码

    RabbitMQCommand.php操作类代码

    ?php
    /*
     * amqp协议操作类,可以访问rabbitMQ
     * 需先安装php_amqp扩展
     */
    class RabbitMQCommand{
      public $configs = array();
      //交换机名称
      public $exchange_name = '';
      //队列名称
      public $queue_name = '';
      //路由名称
      public $route_key = '';
      /*
       * 持久化,默认True
       */
      public $durable = True;
      /*
       * 自动删除
       * exchange is deleted when all queues have finished using it
       * queue is deleted when last consumer unsubscribes
       *
       */
      public $autodelete = False;
      /*
       * 镜像
       * 镜像队列,打开后消息会在节点之间复制,有master和slave的概念
       */
      public $mirror = False;
      private $_conn = Null;
      private $_exchange = Null;
      private $_channel = Null;
      private $_queue = Null;
      /*
       * @configs array('host'=>$host,'port'=>5672,'username'=>$username,'password'=>$password,'vhost'=>'/')
       */
      public function __construct($configs = array(), $exchange_name = '', $queue_name = '', $route_key = '') {
        $this->setConfigs($configs);
        $this->exchange_name = $exchange_name;
        $this->queue_name = $queue_name;
        $this->route_key = $route_key;
      }
      private function setConfigs($configs) {
        if (!is_array($configs)) {
          throw new Exception('configs is not array');
        }
        if (!($configs['host']  $configs['port']  $configs['username']  $configs['password'])) {
          throw new Exception('configs is empty');
        }
        if (empty($configs['vhost'])) {
          $configs['vhost'] = '/';
        }
        $configs['login'] = $configs['username'];
        unset($configs['username']);
        $this->configs = $configs;
      }
      /*
       * 设置是否持久化,默认为True
       */
      public function setDurable($durable) {
        $this->durable = $durable;
      }
      /*
       * 设置是否自动删除
       */
      public function setAutoDelete($autodelete) {
        $this->autodelete = $autodelete;
      }
      /*
       * 设置是否镜像
       */
      public function setMirror($mirror) {
        $this->mirror = $mirror;
      }
      /*
       * 打开amqp连接
       */
      private function open() {
        if (!$this->_conn) {
          try {
            $this->_conn = new AMQPConnection($this->configs);
            $this->_conn->connect();
            $this->initConnection();
          } catch (AMQPConnectionException $ex) {
            throw new Exception('cannot connection rabbitmq',500);
          }
        }
      }
      /*
       * rabbitmq连接不变
       * 重置交换机,队列,路由等配置
       */
      public function reset($exchange_name, $queue_name, $route_key) {
        $this->exchange_name = $exchange_name;
        $this->queue_name = $queue_name;
        $this->route_key = $route_key;
        $this->initConnection();
      }
      /*
       * 初始化rabbit连接的相关配置
       */
      private function initConnection() {
        if (empty($this->exchange_name) || empty($this->queue_name) || empty($this->route_key)) {
          throw new Exception('rabbitmq exchange_name or queue_name or route_key is empty',500);
        }
        $this->_channel = new AMQPChannel($this->_conn);
        $this->_exchange = new AMQPExchange($this->_channel);
        $this->_exchange->setName($this->exchange_name);
        $this->_exchange->setType(AMQP_EX_TYPE_DIRECT);
        if ($this->durable)
          $this->_exchange->setFlags(AMQP_DURABLE);
        if ($this->autodelete)
          $this->_exchange->setFlags(AMQP_AUTODELETE);
        $this->_exchange->declare();
        $this->_queue = new AMQPQueue($this->_channel);
        $this->_queue->setName($this->queue_name);
        if ($this->durable)
          $this->_queue->setFlags(AMQP_DURABLE);
        if ($this->autodelete)
          $this->_queue->setFlags(AMQP_AUTODELETE);
        if ($this->mirror)
          $this->_queue->setArgument('x-ha-policy', 'all');
        $this->_queue->declare();
        $this->_queue->bind($this->exchange_name, $this->route_key);
      }
      public function close() {
        if ($this->_conn) {
          $this->_conn->disconnect();
        }
      }
      public function __sleep() {
        $this->close();
        return array_keys(get_object_vars($this));
      }
      public function __destruct() {
        $this->close();
      }
      /*
       * 生产者发送消息
       */
      public function send($msg) {
        $this->open();
        if(is_array($msg)){
          $msg = json_encode($msg);
        }else{
          $msg = trim(strval($msg));
        }
        return $this->_exchange->publish($msg, $this->route_key);
      }
      /*
       * 消费者
       * $fun_name = array($classobj,$function) or function name string
       * $autoack 是否自动应答
       *
       * function processMessage($envelope, $queue) {
          $msg = $envelope->getBody();
          echo $msg."\n"; //处理消息
          $queue->ack($envelope->getDeliveryTag());//手动应答
        }
       */
      public function run($fun_name, $autoack = True){
        $this->open();
        if (!$fun_name || !$this->_queue) return False;
        while(True){
          if ($autoack) $this->_queue->consume($fun_name, AMQP_AUTOACK);
          else $this->_queue->consume($fun_name);
        }
      }
    }
    
    

    send.php生产者代码

    ?php
    set_time_limit(0);
    include_once('RabbitMQCommand.php');
    $configs = array('host'=>'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/');
    $exchange_name = 'class-e-1';
    $queue_name = 'class-q-1';
    $route_key = 'class-r-1';
    $ra = new RabbitMQCommand($configs,$exchange_name,$queue_name,$route_key);
    for($i=0;$i=100;$i++){
      $ra->send(date('Y-m-d H:i:s',time()));
    }
    exit();
    
    

    accept.php消费者代码

    ?php
    error_reporting(0);
    include_once('RabbitMQCommand.php');
    $configs = array('host'=>'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/');
    $exchange_name = 'class-e-1';
    $queue_name = 'class-q-1';
    $route_key = 'class-r-1';
    $ra = new RabbitMQCommand($configs,$exchange_name,$queue_name,$route_key);
    class A{
      function processMessage($envelope, $queue) {
        $msg = $envelope->getBody();
        $envelopeID = $envelope->getDeliveryTag();
        $pid = posix_getpid();
        file_put_contents("log{$pid}.log", $msg.'|'.$envelopeID.''."\r\n",FILE_APPEND);
        $queue->ack($envelopeID);
      }
    }
    $a = new A();
    $s = $ra->run(array($a,'processMessage'),false);
    
    

    更多关于PHP相关内容感兴趣的读者可查看本站专题:《PHP数据结构与算法教程》、《php程序设计算法总结》、《php字符串(string)用法总结》、《PHP数组(Array)操作技巧大全》、《PHP常用遍历算法与技巧总结》及《PHP数学运算技巧总结》

    希望本文所述对大家PHP程序设计有所帮助。

    您可能感兴趣的文章:
    • 安卓开发之mqtt协议实例代码
    • springboot集成mqtt的实践开发
    • vue使用stompjs实现mqtt消息推送通知
    • springboot实现rabbitmq的队列初始化和绑定
    • docker部署rabbitmq集群的实现方法
    • springboot+RabbitMQ+InfluxDB+Grafara监控实践
    • RabbitMQ的配置与安装教程全纪录
    • Spring Boot RabbitMQ 延迟消息实现完整版示例
    • Docker MQTT安装使用教程
    上一篇:PHP7.1实现的AES与RSA加密操作示例
    下一篇:PHP实现redis限制单ip、单用户的访问次数功能示例
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯

    时间:9:00-21:00 (节假日不休)

    地址:江苏信息产业基地11号楼四层

    《增值电信业务经营许可证》 苏B2-20120278

    PHP基于rabbitmq操作类的生产者和消费者功能示例 PHP,基于,rabbitmq,操作,类,