最近在工作中遇到了一个问题,开启多个进程处理队列会重复读取 Redis 中队列吗?是否因此导致重复执行任务?下面就来通过示例代码详细介绍下。
使用 Supervisor 监听 Laravel 队列任务,其中 Supervisor 的配置如下:
| 1 2 3 4 5 6 7 8 | [program:laravel-worker]process_name=%(program_name)s_%(process_num)02dcommand=php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemonautostart=trueautorestart=truenumprocs=8redirect_stderr=truestdout_logfile=/var/www/xxx.cn/worker.log | 
注意: numprocs = 8,代表开启 8 个进程来执行 command 中的命令。
如下:
| 1 2 3 4 5 6 7 8 9 10 11 | PS C:\Users\tanteng\website\laradock> docker-compose execphp-worker sh/etc/supervisor/conf.d # ps -ef | grep php 7 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 8 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 9 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 10 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 11 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 12 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 13 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 14 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 44 root  0:00 grep php | 
Laravel 多进程读取队列内容是否会重复
在 Laravel 的某个控制器方法,一次放入多个任务队列:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | publicfunctionindex(Request $request){ $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile')); $this->dispatch((newSendFile3())->onQueue('sendfile'));} | 
在队列处理的方法打印日志,打印处理的队列的 ID:
app/Jobs/SendFile3.php
| 1 2 3 4 5 6 7 8 | publicfunctionhandle(){ info('invoke SendFile3'); dump('invoke handle'); $rawbody= $this->job->getRawBody(); $info= json_decode($rawbody, true); info('queue id:'. $info['id']);} | 
Laravel 使用 Redis 的 list 作为队列的数据结构,并会为每个队列分配一个 ID,数据结构如下:
| 1 2 3 4 5 6 7 8 9 | { "job": "Illuminate\\Queue\\CallQueuedHandler@call", "data": { "commandName": "App\\Jobs\\SendFile3", "command": "O:18:\"App\\Jobs\\SendFile3\":4:{s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";N;s:5:\"queue\";s:8:\"sendfile\";s:5:\"delay\";N;}" }, "id": "hadBcy3IpNsnOofQQdHohsa451OkQs88", "attempts": 1} | 
请求这个控制器路由(或者命令行方式),就可以看到 Redis 中多了很多队列任务了,如图:
![图片[1]-关于 Laravel Redis 多个进程同时取队列问题详解-爱站](https://2zhan.com/wp-content/uploads/2023/02/2017122594148581.png)
这个时候开启 Supervisor 处理队列任务,并查看日志:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | [2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:JaClJzhDEvntzLCRIz6uRQkCVLbE8Y9C[2017-12-23 19:01:01] local.INFO: queue id:ukHv0Li4P2VgPa55qU6yEOJM27Mo5YwJ[2017-12-23 19:01:01] local.INFO: queue id:ObMpwDTmnaveBUkU7aan5abt3Agyt90l[2017-12-23 19:01:01] local.INFO: queue id:fo2qZn2ftSdQtdnKOciMK7iJb4qlhRGE[2017-12-23 19:01:01] local.INFO: queue id:uLjFMoOU7Wk7bOAd4zpHb3ccRMJHBtR6[2017-12-23 19:01:01] local.INFO: queue id:87ULqPBObFmGr16nl5wxFVOi71zGCeRM[2017-12-23 19:01:01] local.INFO: queue id:9UVl0muQLzBqlRI99rChGW2ElXwVEMIE[2017-12-23 19:01:01] local.INFO: queue id:a0vgyZuz9HtmH7DGHEpXqesFTcQU3QAF[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:2cXuXxopPkgYiV4WO8gv9CJ6CwXeKtYL[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:9acTAYa8cxpJX6Q3Gb1sULokotP8reqZ[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:BPHQvBboChlv4gr2I0vyLVyw9bijtTYJ[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:Fm6tNajdxYKtdQbDMYDmwWJFLnNikRyg[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:nyAbcvSkBVPbaH3e2ItQkoLJlP1ficib[2017-12-23 19:01:01] local.INFO: queue id:WBHsSVZtP43569UoPXxfLLJcvYmPW7cP[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:bliPnKcRSDApwVmKLNxEhaKelhm0RDEY[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:eOAoQucEIwRz9uZ64xm6IDKgiqj9Xc3W[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:lzise9EiqQqINrhALbmAI4qNg7qylpb2[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:WXYKvcfOhS1pPnwOwUTsenoMv5l5EUXe[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:XtH5JiwLgnrwWzI02Oyi70pihAOkuJUD[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:9ehmE5HImlpNubpY0xWN8UVrOzxeMqws[2017-12-23 19:01:01] local.INFO: queue id:C1sK87cpZl47edLA0zhfo7PJ9MIEcoyx[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:2kwl51oH4lyyRrljCReGUCkNiJRDl7oe[2017-12-23 19:01:01] local.INFO: queue id:ObRpoqrYTPYiyv2delMlOXu3sAPpWJlN[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:6qgu6W3TapLjSrt688yv9HRXvDDLxntz[2017-12-23 19:01:01] local.INFO: queue id:wiTlERhwn7s9cQkfUF9lLlNADpXjKncI[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:ZSLW0VLFBDpL4wjTJzu3Yb3V45pNe807[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:qhZlXLGfGWRluIeNm7VbllmTJZYb2h5n[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:LUx1IByD3L2psNl9BZwHhk2knXyRPzW6[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:M2RESPjyo5hpAFxxL0EQbWwsUq4jpmWn[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:hUsGaiIAOO6ZfGQc5kGHGpsv5RpoRPYO[2017-12-23 19:01:01] local.INFO: queue id:cEHJsOy6bLeZ4NbncPziaHqlarMeyyEF[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:w4bkFiJKMU5saqG2xKN3ZRL5BYXGATMk[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:0zBuwbxlrEhhxKfYBkVyTY4z35f154sI[2017-12-23 19:01:01] local.INFO: queue id:mvoZvyDPvq4tcPjEy9G7PMtH3MwPkPik[2017-12-23 19:01:01] local.INFO: invoke SendFile3[2017-12-23 19:01:01] local.INFO: queue id:TLvF74eeidECWKtjZqWvW03UJTRPTL9r[2017-12-23 19:01:01] local.INFO: queue id:me8wyPfgcz0nf9xvcXz0hf2xVxqa1FFS | 
这 8 个进程并发处理队列,但从打印的日志看,没有出现同样的 ID. 我们再看一下 Laravel 如何使用 Redis 处理队列的。
分析一下 Laravel 队列的处理
Laravel 中入队列方法
| 1 2 3 4 5 6 | publicfunctionpushRaw($payload, $queue= null, array$options= []){ $this->getConnection()->rpush($this->getQueue($queue), $payload);  returnArr::get(json_decode($payload, true), 'id');} | 
用的是 Redis 的 rpush 命令。
Laravel 中取队列方法
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | publicfunctionpop($queue= null){ $original= $queue?: $this->default;  $queue= $this->getQueue($queue);  $this->migrateExpiredJobs($queue.':delayed', $queue);  if(! is_null($this->expire)) {  $this->migrateExpiredJobs($queue.':reserved', $queue); }  list($job, $reserved) = $this->getConnection()->eval(  LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire );  if($reserved) {  returnnewRedisJob($this->container, $this, $job, $reserved, $original); }} | 
这里用的是 lua 脚本取队列,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | publicstaticfunctionpop(){ return<<<'LUA'local job = redis.call('lpop', KEYS[1])local reserved = falseif(job ~= false) thenreserved = cjson.decode(job)reserved['attempts'] = reserved['attempts'] + 1reserved = cjson.encode(reserved)redis.call('zadd', KEYS[2], ARGV[1], reserved)endreturn{job, reserved}LUA;} | 
那么结论是:从 Laravel 的处理方式和打印的日志结果看,即使多个进程读取同一个队列,也不会读取到一样的数据。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END
    


















 
        
暂无评论内容