最近有朋友有朋友问laravel队列的实现原理和经验,刚好用过所以整理了一下分享给大家。
laravel队列配置参见:
原理分析
创建分发任务方法
class TestController extends Controller { //其他方法 //发送消息 public function SendMessage(Request $request){ ... $this->dispatch((new SendMessage($sendParams))->onQueue('snail:SendMessage')); } }
内部实现:
创建消费任务
命令行运行如下命令:
/home/niuyufu/php/bin/php /home/niuyufu/webroot/assistant_api/artisan queue:work --queue=snail:SendMessage --tries=3 --memory=512 --daemon
内部实现:
队列消息分析:
监控redis对应队列消息,具体产生的消息操作,如下:
tail -f | redis-cli -h 10.94.120.13 -p 6380 monitor | grep "queues:snail"
1492446053.406282 [0 10.95.117.155:57132] "WATCH" "queues:snail:SendMessage:delayed"
1492446053.406452 [0 10.95.117.155:57132] "ZRANGEBYSCORE" "queues:snail:SendMessage:delayed" "-inf" "1492446053"
1492446053.406754 [0 10.95.117.155:57132] "WATCH" "queues:snail:SendMessage:reserved"
1492446053.406842 [0 10.95.117.155:57132] "ZRANGEBYSCORE" "queues:snail:SendMessage:reserved" "-inf" "1492446053"
1492446053.407029 [0 10.95.117.155:57132] "LPOP" "queues:snail:SendMessage"
1492446053.407700 [0 10.95.117.155:57132] "ZADD" "queues:snail:SendMessage:reserved" "1492446113" "{job}"
1492446053.463953 [0 10.95.117.155:57132] "ZREM" "queues:snail:SendMessage:reserved" "{job}"
PS:如果你的redis是codis的话,注意了,因为codis禁用方法列表
KEYS, MOVE, OBJECT, RENAME, RENAMENX, SORT, SCAN, BITOP,MSETNX, BLPOP, BRPOP, BRPOPLPUSH, PSUBSCRIBE,PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, DISCARD, EXEC, MULTI, UNWATCH, WATCH, SCRIPT EXISTS, SCRIPT FLUSH, SCRIPT KILL, SCRIPT LOAD, AUTH, ECHO, SELECT, BGREWRITEAOF, BGSAVE, CLIENT KILL, CLIENT LIST, CONFIG GET, CONFIG SET, CONFIG RESETSTAT, DBSIZE, DEBUG OBJECT, DEBUG SEGFAULT, FLUSHALL, FLUSHDB, INFO, LASTSAVE, MONITOR, SAVE, SHUTDOWN, SLAVEOF, SLOWLOG, SYNC, TIME
所以执行消费任务会有以下错误:
[Predis\Connection\ConnectionException]
Error while reading line from the server. [tcp://100.90.154.39:3000]
解决方法为修改config/queue.php
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => 'default', 'expire' => null, //禁用即可 ],
优化日志处理:
如果你的系统有切割文件日志操作。会发现虽然日志被切分了,但程序却没有往新文件里写入。如,2017-04-18 13:50启动的项目,日志会一直打到 snail.log.2017041813上。
改进方案:
app/Jobs/Job.php文件中添加如下方法:
public function releaseLoggerFile(){ $handles=\Log::getMonolog()->getHandlers(); if(!is_array($handles) || empty($handles)){ return; } foreach($handles as $handle){ if(method_exists($handle, "close")){ $handle->close(); } } return; }
在具体job实现类中的handle方法结尾添加:
public function handle() { ... $this->releaseLoggerFile(); //释放log文件 }
线上部署
创建任务shell
#!/bin/sh day=$(date +'%y%m%d') now=$(date +"%F %T") php_command="/home/niuyufu/php/bin/php" command="/home/niuyufu/webroot/snail_api/artisan" log="/home/niuyufu/webroot/log/wave" queue_name_arr=("snail:SendMessage" "snail:SendMessageToApp") case $1 in "run") for queue_name in ${queue_name_arr[*]} do count=$(ps aux | grep artisan |grep queue=${queue_name} | grep -v grep | wc -l) if [ ${count} -lt 1 ];then echo "${now}:${queue_name} process has exit ${count}\n"; nohup $php_command $command queue:work --queue=${queue_name} --tries=3 --memory=512 --daemon >> $log/queueMonitor.log 2>&1 & else echo "${now}:${queue_name} process is runing"; fi done ;; "stop") kill -9 $(ps -ef | grep "queue:work" | grep -v grep | awk '{print $2}' | tr -s '\n' ' ') echo ${now}."Queue process all stop"; ;; "list") ps -ef | grep "queue:work" | grep -v grep ;; *) echo " Usage: QueueMonitorCommandShell.sh [run|stop|list] " ;; esac
总结:
laravel这边的延迟队列使用了三个队列。
queue:default:delayed // 存储延迟任务
queue:default // 存储"生"任务,就是未处理任务
queue:default:reserved // 存储待处理任务
任务在三个队列中进行轮转,最后一定进入到queue:default:reserved,并且成功后把任务从这个队列中删除。