Swoole和Redis实现的并发队列处理系统

由于PHP不支持多线程,但是作为一个完善的系统,有很多操作都是需要异步完成的。为了完成这些异步操作,我们做了一个基于Redis队列任务系统。

大家知道,一个消息队列处理系统主要分为两大部分:消费者和生产者。

在我们的系统中,主系统作为生产者,任务系统作为消费者。

 

具体的工作流程如下:

1、主系统将需要需要处理的任务名称+任务参数push到队列中。

2、任务系统实时的对任务队列进行pop,pop出来一个任务就fork一个子进程,由子进程完成具体的任务逻辑。

 

具体代码如下:

1 /** 2 * 启动守护进程 3 */ 4 public function runAction() { 5 Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-'); 6 while (true) { 7 $this->fork_process(); 8 } 9 exit; 10 } 11 12 /** 13 * 创建子进程 14 */ 15 private function fork_process() { 16 $ppid = getmypid(); 17 $pid = pcntl_fork(); 18 if ($pid == 0) {//子进程 19 $pid = posix_getpid(); 20 //echo "* Process {$pid} was created \n\n"; 21 $this->mq_process(); 22 exit; 23 } else {//主进程 24 $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态 25 if (pcntl_wifexited($status)) { 26 //echo "\n\n* Sub process: {$pid} exited with {$status}"; 27 //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid ); 28 } else { 29 Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-'); 30 } 31 } 32 } 33 34 /** 35 * 业务任务队列处理 36 */ 37 private function mq_process() { 38 $data_pop = $this->masterRedis->rPop($this->redis_list_key); 39 $data = json_decode($data_pop, 1); 40 if (!$data) { 41 return FALSE; 42 } 43 $worker = '_task_' . $data['worker']; 44 $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel'; 45 $params = $data['params']; 46 $class = new $class_name(); 47 $class->$worker($params); 48 return TRUE; 49 }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zysgpz.html