最近在工作中遇到了一个问题,开启多个进程处理队列会重复读取 Redis 中队列吗?是否因此导致重复执行任务?下面就来通过示例代码详细介绍下。
使用 Supervisor 监听 Laravel 队列任务,其中 Supervisor 的配置如下:
1
2
3
4
5
6
7
8
|
[program:laravel-worker] process_name=%(program_name)s_%(process_num)02d command=php / var /www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon autostart=true autorestart=true numprocs=8 redirect_stderr=true stdout_logfile=/ var /www/xxx.cn/worker.log |
注意: numprocs = 8
,代表开启 8 个进程来执行 command 中的命令。
如下:
1
2
3
4
5
6
7
8
9
10
11
|
PS C:\Users\tanteng\website\laradock> docker-compose exec php-worker sh /etc/supervisor/conf.d # ps -ef | grep php 7 root 0:00 php / var /www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 8 root 0:00 php / var /www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 9 root 0:00 php / var /www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 10 root 0:00 php / var /www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 11 root 0:00 php / var /www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 12 root 0:00 php / var /www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 13 root 0:00 php / var /www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 14 root 0:00 php / var /www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 44 root 0:00 grep php |
Laravel 多进程读取队列内容是否会重复
在 Laravel 的某个控制器方法,一次放入多个任务队列:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public function index(Request $request ) { $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); $this ->dispatch(( new SendFile3())->onQueue( 'sendfile' )); } |
在队列处理的方法打印日志,打印处理的队列的 ID:
app/Jobs/SendFile3.php
1
2
3
4
5
6
7
8
|
public function handle() { info( 'invoke SendFile3' ); dump( 'invoke handle' ); $rawbody = $this ->job->getRawBody(); $info = json_decode( $rawbody , true); info( 'queue id:' . $info [ 'id' ]); } |
Laravel 使用 Redis 的 list 作为队列的数据结构,并会为每个队列分配一个 ID,数据结构如下:
1
2
3
4
5
6
7
8
9
|
{ "job" : "Illuminate\\Queue\\CallQueuedHandler@call" , "data" : { "commandName" : "App\\Jobs\\SendFile3" , "command" : "O:18:\"App\\Jobs\\SendFile3\":4:{s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";N;s:5:\"queue\";s:8:\"sendfile\";s:5:\"delay\";N;}" }, "id" : "hadBcy3IpNsnOofQQdHohsa451OkQs88" , "attempts" : 1 } |
请求这个控制器路由(或者命令行方式),就可以看到 Redis 中多了很多队列任务了,如图:
这个时候开启 Supervisor 处理队列任务,并查看日志:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
[2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:JaClJzhDEvntzLCRIz6uRQkCVLbE8Y9C [2017-12-23 19:01:01] local.INFO: queue id:ukHv0Li4P2VgPa55qU6yEOJM27Mo5YwJ [2017-12-23 19:01:01] local.INFO: queue id:ObMpwDTmnaveBUkU7aan5abt3Agyt90l [2017-12-23 19:01:01] local.INFO: queue id:fo2qZn2ftSdQtdnKOciMK7iJb4qlhRGE [2017-12-23 19:01:01] local.INFO: queue id:uLjFMoOU7Wk7bOAd4zpHb3ccRMJHBtR6 [2017-12-23 19:01:01] local.INFO: queue id:87ULqPBObFmGr16nl5wxFVOi71zGCeRM [2017-12-23 19:01:01] local.INFO: queue id:9UVl0muQLzBqlRI99rChGW2ElXwVEMIE [2017-12-23 19:01:01] local.INFO: queue id:a0vgyZuz9HtmH7DGHEpXqesFTcQU3QAF [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:2cXuXxopPkgYiV4WO8gv9CJ6CwXeKtYL [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:9acTAYa8cxpJX6Q3Gb1sULokotP8reqZ [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:BPHQvBboChlv4gr2I0vyLVyw9bijtTYJ [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:Fm6tNajdxYKtdQbDMYDmwWJFLnNikRyg [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:nyAbcvSkBVPbaH3e2ItQkoLJlP1ficib [2017-12-23 19:01:01] local.INFO: queue id:WBHsSVZtP43569UoPXxfLLJcvYmPW7cP [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:bliPnKcRSDApwVmKLNxEhaKelhm0RDEY [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:eOAoQucEIwRz9uZ64xm6IDKgiqj9Xc3W [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:lzise9EiqQqINrhALbmAI4qNg7qylpb2 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:WXYKvcfOhS1pPnwOwUTsenoMv5l5EUXe [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:XtH5JiwLgnrwWzI02Oyi70pihAOkuJUD [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:9ehmE5HImlpNubpY0xWN8UVrOzxeMqws [2017-12-23 19:01:01] local.INFO: queue id:C1sK87cpZl47edLA0zhfo7PJ9MIEcoyx [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:2kwl51oH4lyyRrljCReGUCkNiJRDl7oe [2017-12-23 19:01:01] local.INFO: queue id:ObRpoqrYTPYiyv2delMlOXu3sAPpWJlN [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:6qgu6W3TapLjSrt688yv9HRXvDDLxntz [2017-12-23 19:01:01] local.INFO: queue id:wiTlERhwn7s9cQkfUF9lLlNADpXjKncI [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:ZSLW0VLFBDpL4wjTJzu3Yb3V45pNe807 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:qhZlXLGfGWRluIeNm7VbllmTJZYb2h5n [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:LUx1IByD3L2psNl9BZwHhk2knXyRPzW6 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:M2RESPjyo5hpAFxxL0EQbWwsUq4jpmWn [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:hUsGaiIAOO6ZfGQc5kGHGpsv5RpoRPYO [2017-12-23 19:01:01] local.INFO: queue id:cEHJsOy6bLeZ4NbncPziaHqlarMeyyEF [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:w4bkFiJKMU5saqG2xKN3ZRL5BYXGATMk [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:0zBuwbxlrEhhxKfYBkVyTY4z35f154sI [2017-12-23 19:01:01] local.INFO: queue id:mvoZvyDPvq4tcPjEy9G7PMtH3MwPkPik [2017-12-23 19:01:01] local.INFO: invoke SendFile3 [2017-12-23 19:01:01] local.INFO: queue id:TLvF74eeidECWKtjZqWvW03UJTRPTL9r [2017-12-23 19:01:01] local.INFO: queue id:me8wyPfgcz0nf9xvcXz0hf2xVxqa1FFS |
这 8 个进程并发处理队列,但从打印的日志看,没有出现同样的 ID. 我们再看一下 Laravel 如何使用 Redis 处理队列的。
分析一下 Laravel 队列的处理
Laravel 中入队列方法
1
2
3
4
5
6
|
public function pushRaw( $payload , $queue = null, array $options = []) { $this ->getConnection()->rpush( $this ->getQueue( $queue ), $payload ); return Arr::get(json_decode( $payload , true), 'id' ); } |
用的是 Redis 的 rpush 命令。
Laravel 中取队列方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public function pop( $queue = null) { $original = $queue ?: $this -> default ; $queue = $this ->getQueue( $queue ); $this ->migrateExpiredJobs( $queue . ':delayed' , $queue ); if (! is_null ( $this ->expire)) { $this ->migrateExpiredJobs( $queue . ':reserved' , $queue ); } list( $job , $reserved ) = $this ->getConnection()-> eval ( LuaScripts::pop(), 2, $queue , $queue . ':reserved' , $this ->getTime() + $this ->expire ); if ( $reserved ) { return new RedisJob( $this ->container, $this , $job , $reserved , $original ); } } |
这里用的是 lua 脚本取队列,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public static function pop() { return <<< 'LUA' local job = redis.call( 'lpop' , KEYS[1]) local reserved = false if (job ~= false) then reserved = cjson.decode(job) reserved[ 'attempts' ] = reserved[ 'attempts' ] + 1 reserved = cjson.encode(reserved) redis.call( 'zadd' , KEYS[2], ARGV[1], reserved) end return {job, reserved} LUA; } |
那么结论是:从 Laravel 的处理方式和打印的日志结果看,即使多个进程读取同一个队列,也不会读取到一样的数据。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END
暂无评论内容