• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    PHP+RabbitMQ实现消息队列的完整代码

    前言

    为什么使用RabbitMq而不是ActiveMq或者RocketMq?

    首先,从业务上来讲,我并不要求消息的100%接受率,并且,我需要结合php开发,RabbitMq相较RocketMq,延迟较低(微妙级)。至于ActiveMq,貌似问题较多。RabbitMq对各种语言的支持较好,所以选择RabbitMq。

    先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异.

    php扩展地址: http://pecl.php.net/package/amqp

    具体以官网为准  http://www.rabbitmq.com/getstarted.html

    介绍

    config.php

     ?php
     return [
      //配置
      'host' => [
       'host' => '127.0.0.1',
       'port' => '5672',
       'login' => 'guest',
       'password' => 'guest',
       'vhost'=>'/',
      ],
      //交换机
      'exchange'=>'word',
      //路由
      'routes' => [],
     ];

    BaseMQ.php

     ?php
     /**
      * Created by PhpStorm.
      * User: pc
      * Date: 2018/12/13
      * Time: 14:11
      */
     
     namespace MyObjSummary\rabbitMQ;
     
     /** Member
      *  AMQPChannel
      *  AMQPConnection
      *  AMQPEnvelope
      *  AMQPExchange
      *  AMQPQueue
      * Class BaseMQ
      * @package MyObjSummary\rabbitMQ
      */
     class BaseMQ
     {
      /** MQ Channel
       * @var \AMQPChannel
       */
      public $AMQPChannel ;
     
      /** MQ Link
       * @var \AMQPConnection
       */
      public $AMQPConnection ;
     
      /** MQ Envelope
       * @var \AMQPEnvelope
       */
      public $AMQPEnvelope ;
     
      /** MQ Exchange
       * @var \AMQPExchange
       */
      public $AMQPExchange ;
     
      /** MQ Queue
       * @var \AMQPQueue
       */
      public $AMQPQueue ;
     
      /** conf
       * @var
       */
      public $conf ;
     
      /** exchange
       * @var
       */
      public $exchange ;
     
      /** link
       * BaseMQ constructor.
       * @throws \AMQPConnectionException
       */
      public function __construct()
      {
       $conf = require 'config.php' ;
       if(!$conf)
        throw new \AMQPConnectionException('config error!');
       $this->conf  = $conf['host'] ;
       $this->exchange = $conf['exchange'] ;
       $this->AMQPConnection = new \AMQPConnection($this->conf);
       if (!$this->AMQPConnection->connect())
        throw new \AMQPConnectionException("Cannot connect to the broker!\n");
      }
     
      /**
       * close link
       */
      public function close()
      {
       $this->AMQPConnection->disconnect();
      }
     
      /** Channel
       * @return \AMQPChannel
       * @throws \AMQPConnectionException
       */
      public function channel()
      {
       if(!$this->AMQPChannel) {
        $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);
       }
       return $this->AMQPChannel;
      }
     
      /** Exchange
       * @return \AMQPExchange
       * @throws \AMQPConnectionException
       * @throws \AMQPExchangeException
       */
      public function exchange()
      {
       if(!$this->AMQPExchange) {
        $this->AMQPExchange = new \AMQPExchange($this->channel());
        $this->AMQPExchange->setName($this->exchange);
       }
       return $this->AMQPExchange ;
      }
     
      /** queue
       * @return \AMQPQueue
       * @throws \AMQPConnectionException
       * @throws \AMQPQueueException
       */
      public function queue()
      {
       if(!$this->AMQPQueue) {
        $this->AMQPQueue = new \AMQPQueue($this->channel());
       }
       return $this->AMQPQueue ;
      }
     
      /** Envelope
       * @return \AMQPEnvelope
       */
      public function envelope()
      {
       if(!$this->AMQPEnvelope) {
        $this->AMQPEnvelope = new \AMQPEnvelope();
       }
       return $this->AMQPEnvelope;
      }
     }

    ProductMQ.php

     ?php
     //生产者 P
     namespace MyObjSummary\rabbitMQ;
     require 'BaseMQ.php';
     class ProductMQ extends BaseMQ
     {
      private $routes = ['hello','word']; //路由key
     
      /**
       * ProductMQ constructor.
       * @throws \AMQPConnectionException
       */
      public function __construct()
      {
       parent::__construct();
      }
     
      /** 只控制发送成功 不接受消费者是否收到
       * @throws \AMQPChannelException
       * @throws \AMQPConnectionException
       * @throws \AMQPExchangeException
       */
      public function run()
      {
       //频道
       $channel = $this->channel();
       //创建交换机对象
       $ex = $this->exchange();
       //消息内容
       $message = 'product message '.rand(1,99999);
       //开始事务
       $channel->startTransaction();
       $sendEd = true ;
       foreach ($this->routes as $route) {
        $sendEd = $ex->publish($message, $route) ;
        echo "Send Message:".$sendEd."\n";
       }
       if(!$sendEd) {
        $channel->rollbackTransaction();
       }
       $channel->commitTransaction(); //提交事务
       $this->close();
       die ;
      }
     }
     try{
      (new ProductMQ())->run();
     }catch (\Exception $exception){
      var_dump($exception->getMessage()) ;
     }

    ConsumerMQ.php

     ?php
     //消费者 C
     namespace MyObjSummary\rabbitMQ;
     require 'BaseMQ.php';
     class ConsumerMQ extends BaseMQ
     {
      private $q_name = 'hello'; //队列名
      private $route = 'hello'; //路由key
     
      /**
       * ConsumerMQ constructor.
       * @throws \AMQPConnectionException
       */
      public function __construct()
      {
       parent::__construct();
      }
     
      /** 接受消息 如果终止 重连时会有消息
       * @throws \AMQPChannelException
       * @throws \AMQPConnectionException
       * @throws \AMQPExchangeException
       * @throws \AMQPQueueException
       */
      public function run()
      {
     
       //创建交换机
       $ex = $this->exchange();
       $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
       $ex->setFlags(AMQP_DURABLE); //持久化
       //echo "Exchange Status:".$ex->declare()."\n";
     
       //创建队列
       $q = $this->queue();
       //var_dump($q->declare());exit();
       $q->setName($this->q_name);
       $q->setFlags(AMQP_DURABLE); //持久化
       //echo "Message Total:".$q->declareQueue()."\n";
     
       //绑定交换机与队列,并指定路由键
       echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n";
     
       //阻塞模式接收消息
       echo "Message:\n";
       while(True){
        $q->consume(function ($envelope,$queue){
         $msg = $envelope->getBody();
         echo $msg."\n"; //处理消息
         $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
        });
        //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
       }
       $this->close();
      }
     }
     try{
      (new ConsumerMQ)->run();
     }catch (\Exception $exception){
      var_dump($exception->getMessage()) ;
     }

    总结

    以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对脚本之家的支持。

    您可能感兴趣的文章:
    • PHP如何通过带尾指针的链表实现''队列''
    • PHP7生产环境队列Beanstalkd用法详解
    • PHP Beanstalkd消息队列的安装与使用方法实例详解
    • 详解PHP队列的实现
    • php基于Redis消息队列实现的消息推送的方法
    • PHP队列场景以及实现代码实例详解
    上一篇:PHP实现的数据对象映射模式详解
    下一篇:Discuz不使用插件实现简单的打赏功能
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    PHP+RabbitMQ实现消息队列的完整代码 PHP+RabbitMQ,实现,消息,队列,