AMQP:Advanced Message Queue Protocol(高级消息队列协议),是一个提供统一消息服务的应用层标准高级消息队列协议。
Exchange:交换机,用来接收生成者消息,并路由给与交换机绑定的队列。
Queue:消息队列,用来存储和消息,并对消息进行顺序排列的队列,是消息的容器,也是消息的终点,消费者将消息存储到消息队列后即可返回。消费者,从这里取用消息。
Broker:消息队列服务器实体(这里可以理解为实体机)
Virtual Host:虚拟主机,一个虚拟机里可以存在多个交换机,和消息队列,他可以共享服务器的加密信息和鉴权认证等,也就是说鉴权和加密是以虚拟机为单位的,不能单独对队列进行这些操作。
Connection:链接:一个网络连接
Channel:通道或信道多路复用连接中的一条独立的双向数据流通道,是建立在真实的TCP连接内地虚拟连接,可以安排每隔进程分配一个信道,也可以安排每隔线程分配一个信道。
Binding:绑定,即exchange与queue之间的绑定关系
Consumer:消费者,这里是指对队列内容进行消费(取出消息,并进行相应的处理)的应用程序,可以是一个进程,也可以是一个线程。
Publisher:生成者,这里是指产生消息的应用程序,可以是进程,也可以是线程
Message:消息,这里是指生产者产生并存储于队列的数据。
六、RabbitMQ在PHP中的应用
AMQP扩展安装
扩展下载地址:
注意事项:
windows系统请点击 DLL
linux下载后需要编译安装
Windows下下载dll之前请检查php的相应版本,并下载对应的版本,否则会出现无法使用的问题
Window下需要在php.ini下加入:
需要先将php_amqp.dll复制到php/ext目录下
[amqp]
extension=php_amqp.dll
然后在httpd.conf下加入:
需要先将rabbitmq.4.dll复制到配置目录下
# rabbitmq
LoadFile "d:/xampp/php/rabbitmq.4.dll"
这两个dll文件存在于下载的扩展中
一个关于RabbitMQ的php封装,此封装经过本人测试是可以使用的,但是有关信道和交换机这块因为本人仅仅是为了设置,所以在分配逻辑上做太多考虑,如果需要使用该方法可以在这方面做出相应的修改。
<?php
/**
* RabbitMQ消息队列封装
*/
namespace common\lib;
/**
* RabbitMQ消息队列生产者
*/
class RabbitMQ {
const HOST = '127.0.0.1';
const PORT = '5672';
const USER = 'zhangxugang810';
const PASSWORD = '132133';
const VHOST = 'huaweiQueue';
private $isTransaction = true;
private $exchangeName = 'ex_huawei';
private $queueName = 'queue_huawei';
private $routeKey = 'route_huawei';
private $cn;
private $ch;
private $ex;
private $queue;
public function __construct() {
$this->connect();
}
/**
* 组装参数
*/
private function connectParams() {
return ['host' => static::HOST, 'port' => static::PORT, 'login' => static::USER, 'password' => static::PASSWORD, 'vhost' => static::VHOST];
}
/**
* 连接队列服务器并创建channel
*/
private function connect() {
$params = $this->connectParams();
$this->cn = new \AMQPConnection($params);
if (!$this->cn->connect()) {
die("不能连接这个BROCKER\n");
}
$this->channel();
}
private function channel(){
$this->ch = new \AMQPChannel($this->cn);
}
/**
* 创建交换机
*/
private function exchange() {
$this->ex = new \AMQPExchange($this->ch);
$this->ex->setName($this->exchangeName);
date_default_timezone_set("Asia/Shanghai");
}
/**
* 创建队列并绑定交换机
*/
private function queue() {
//使用哪个信道
$this->queue = new \AMQPQueue($this->ch);
$this->queue->setName($this->queueName);
$this->queue->setFlags(AMQP_DURABLE);
// echo "Message Total:".$this->queue->declare()."\n";
// $this->queue->bind($this->exchangeName, $this->routeKey);
}
/**
* 生产者方法 - 单条
* @param type $msg
* @return type
*/
public function sendOne($msg){
$this->exchange();
return $this->ex->publish($msg, $this->routeKey);
}
/**
* 生产者方法 - 多条
* @param type $msgs
* @return type
*/
public function sendMultiple($msgs){
$this->exchange();
if ($this->isTransaction) {//如果启用事务
$this->ch->startTransaction(); //开始事务
}
$result = [];
foreach($msgs as $k => $msg){
$result[$k] = $this->ex->publish($msg, $this->routeKey);
}
if ($this->isTransaction) {//如果启用事务
$this->ch->commitTransaction(); //开始事务
}
return $result;
}
/**
* 消费者方法
* 这里需要根据相应的业务逻辑进行修改
*/
public function run() {
$this->queue();
echo "Message:\n";
while (True) {
$this->queue->consume(function($envelope, $queue) {
//此处调用业务逻辑处理方法
static::processMessage($envelope, $queue);
});
//$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
}
$this->disconnect();
}
/**
* 消费者业务逻辑
* 此处只是示例方法
* @param type $envelope
* @param type $queue
*/
public static function processMessage($envelope, $queue) {
//此处是业务逻辑处理
$msg = $envelope->getBody();
echo $msg . "\n"; //处理消息
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}
}
七、RabbitMQ单点问题与高可用