消息队列RabbitMQ的安装配置与PHP中的使用 (2)

AMQPAdvanced Message Queue Protocol(高级消息队列协议),是一个提供统一消息服务的应用层标准高级消息队列协议。

Exchange:交换机,用来接收生成者消息,并路由给与交换机绑定的队列。

Queue:消息队列,用来存储和消息,并对消息进行顺序排列的队列,是消息的容器,也是消息的终点,消费者将消息存储到消息队列后即可返回。消费者,从这里取用消息。

Broker:消息队列服务器实体(这里可以理解为实体机)

Virtual Host:虚拟主机,一个虚拟机里可以存在多个交换机,和消息队列,他可以共享服务器的加密信息和鉴权认证等,也就是说鉴权和加密是以虚拟机为单位的,不能单独对队列进行这些操作。

Connection:链接:一个网络连接

Channel:通道或信道多路复用连接中的一条独立的双向数据流通道,是建立在真实的TCP连接内地虚拟连接,可以安排每隔进程分配一个信道,也可以安排每隔线程分配一个信道。

Binding:绑定,即exchangequeue之间的绑定关系

Consumer:消费者,这里是指对队列内容进行消费(取出消息,并进行相应的处理)的应用程序,可以是一个进程,也可以是一个线程。

Publisher:生成者,这里是指产生消息的应用程序,可以是进程,也可以是线程

Message:消息,这里是指生产者产生并存储于队列的数据。

六、RabbitMQPHP中的应用

AMQP扩展安装

扩展下载地址:

注意事项:

windows系统请点击 DLL

linux下载后需要编译安装

Windows下下载dll之前请检查php的相应版本,并下载对应的版本,否则会出现无法使用的问题

Window下需要在php.ini下加入:

需要先将php_amqp.dll复制到php/ext目录下

[amqp]

extension=php_amqp.dll

然后在httpd.conf下加入:

需要先将rabbitmq.4.dll复制到配置目录下

# rabbitmq

LoadFile  "d:/xampp/php/rabbitmq.4.dll"

这两个dll文件存在于下载的扩展中

一个关于RabbitMQphp封装,此封装经过本人测试是可以使用的,但是有关信道和交换机这块因为本人仅仅是为了设置,所以在分配逻辑上做太多考虑,如果需要使用该方法可以在这方面做出相应的修改。

<?php

 

/**

 * RabbitMQ消息队列封装

 */

 

namespace common\lib;

 

/**

 * RabbitMQ消息队列生产者

 */

class RabbitMQ {

 

    const HOST = '127.0.0.1';

    const PORT = '5672';

    const USER = 'zhangxugang810';

    const PASSWORD = '132133';

    const VHOST = 'huaweiQueue';

 

    private $isTransaction = true;

    private $exchangeName = 'ex_huawei';

    private $queueName = 'queue_huawei';

    private $routeKey = 'route_huawei';

    private $cn;

    private $ch;

    private $ex;

    private $queue;

 

    public function __construct() {

        $this->connect();

        

    }

 

    /**

     * 组装参数

     */

    private function connectParams() {

        return ['host' => static::HOST, 'port' => static::PORT, 'login' => static::USER, 'password' => static::PASSWORD, 'vhost' => static::VHOST];

    }

 

    /**

     * 连接队列服务器并创建channel

     */

    private function connect() {

        $params = $this->connectParams();

        $this->cn = new \AMQPConnection($params);

        if (!$this->cn->connect()) {

            die("不能连接这个BROCKER\n");

        }

        $this->channel();

    }

    

    private function channel(){

        $this->ch = new \AMQPChannel($this->cn);

    }

 

    /**

     * 创建交换机

     */

    private function exchange() {

        $this->ex = new \AMQPExchange($this->ch);

        $this->ex->setName($this->exchangeName);

        date_default_timezone_set("Asia/Shanghai");

    }

    

    /**

     * 创建队列并绑定交换机

     */

    private function queue() {

        //使用哪个信道

        $this->queue = new \AMQPQueue($this->ch);

        $this->queue->setName($this->queueName);

        $this->queue->setFlags(AMQP_DURABLE);

//        echo "Message Total:".$this->queue->declare()."\n";   

//        $this->queue->bind($this->exchangeName, $this->routeKey);

    }

    

    /**

     * 生产者方法 - 单条

     * @param type $msg

     * @return type

     */

    public function sendOne($msg){

        $this->exchange();

        return $this->ex->publish($msg, $this->routeKey);

    }

    

    /**

     * 生产者方法 - 多条

     * @param type $msgs

     * @return type

     */

    public function sendMultiple($msgs){

        $this->exchange();

        if ($this->isTransaction) {//如果启用事务

            $this->ch->startTransaction(); //开始事务

        }

        $result = [];

        foreach($msgs as $k => $msg){

            $result[$k] = $this->ex->publish($msg, $this->routeKey);

        }

        if ($this->isTransaction) {//如果启用事务

            $this->ch->commitTransaction(); //开始事务

        }

        return $result;

    }

    

    /**

     * 消费者方法

     * 这里需要根据相应的业务逻辑进行修改

     */

    public function run() {

        $this->queue();

        echo "Message:\n";

        while (True) {

            $this->queue->consume(function($envelope, $queue) {

                //此处调用业务逻辑处理方法

                static::processMessage($envelope, $queue);

            });

            //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答  

        }

        $this->disconnect();

    }

    

    /**

     * 消费者业务逻辑

     * 此处只是示例方法

     * @param type $envelope

     * @param type $queue

     */

    public static function processMessage($envelope, $queue) {

        //此处是业务逻辑处理

        $msg = $envelope->getBody();

        echo $msg . "\n"; //处理消息

        $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答

    }

 

}

 

七、RabbitMQ单点问题与高可用

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpfdpx.html