PHP高级编程之消息队列原理与实现方法详解(2)

守护进程需要使用root用户运行,运行后会切换到普通用户,同时创建进程ID文件,以便进程停止的时候使用。

守护进程核心代码https://github.com/netkiller/SOA/blob/master/system/rabbitdaemon.class.php

6.2. 消息队列协议

消息协议是一个数组,将数组序列化或者转为JSON推送到消息队列服务器,这里使用json格式的协议。

$msg = array(
    'Namespace'=>'namespace',
    "Class"=>"Email",
    "Method"=>"smtp",
    "Param" => array(
        $mail, $subject, $message, null
    )
);

序列化后的协议

{"Namespace":"single","Class":"Email","Method":"smtp","Param":["netkiller@msn.com","Hello"," TestHelloWorld",null]}

使用json格式是考虑到通用性,这样推送端可以使用任何语言。如果不考虑兼容,建议使用二进制序列化,例如msgpack效率更好。

6.3. 消息队列处理

消息队列处理核心代码

https://github.com/netkiller/SOA/blob/master/system/rabbitmq.class.php

所以消息的处理在下面一段代码中进行

$this->queue->consume(function($envelope, $queue) {
    $speed = microtime(true);
    $msg = $envelope->getBody();
    $result = $this->loader($msg);
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
    //$this->logging->info(''.$msg.' '.$result)
    $this->logging->debug('Protocol: '.$msg.' ');
    $this->logging->debug('Result: '. $result.' ');
    $this->logging->debug('Time: '. (microtime(true) - $speed) .'');
});

public function loader($msg = null) 负责拆解协议,然后载入对应的类文件,传递参数,运行方法,反馈结果。

Time 可以输出程序运行所花费的时间,对于后期优化十分有用。

提示

loader() 可以进一步优化,使用多线程每次调用loader将任务提交到线程池中,这样便可以多线程处理消息队列。

6.4. 测试

测试代码 https://github.com/netkiller/SOA/blob/master/test/queue/email.php

<?php
$queueName = 'example';
$exchangeName = 'email';
$routeKey = 'email';
$mail = $argv[1];
$subject = $argv[2];
$message = empty($argv[3]) ? 'Hello World!' : ' '.$argv[3];
$connection = new AMQPConnection(array(
    'host' => '192.168.4.1',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
    ));
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$msg = array(
    'Namespace'=>'namespace',
    "Class"=>"Email",
    "Method"=>"smtp",
    "Param" => array(
        $mail, $subject, $message, null
    )
);
$exchange->publish(json_encode($msg), $routeKey);
printf("[x] Sent %s \r\n", json_encode($msg));
$connection->disconnect();


      

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/3758.html