本文简单介绍消息队列的使用,以及在使用过程中的注意事项。
项目环境:thinkphp6.0.3 + think-queue3.0
composer
安装 thinkphp6
与 think-queue
# 安装 thinkphp6
composer create-project topthink/think 6.0.*
# 安装 think-queue
composer require topthink/think-queue
queue
的配置think-queue
内置了 Redis,Database,Sync等驱动,
这里介绍使用Redis做消息队列存储,
结合supervisor进程管理使队列常驻进程。
配置文件位于 config/queue.php
return [
// 驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动//或其他自定义的完整的类名
'default' => 'redis', // Redis驱动
'connections' => [
...
'redis' => [
'type' => 'redis', // Redis 驱动
'queue' => 'default', // 默认的队列名称
'host' => '127.0.0.1', // redis 主机ip
'port' => 6379, // redis 端口
'password' => '123456', // redis 密码
'select' => 0, // 使用哪一个 db,默认为 db0
'timeout' => 0, // redis连接的超时时间
'persistent' => false, // 是否是长连接
],
...
]
];
单模块项目推荐使用 app\job 作为任务类的命名空间 多模块项目可用使用 app\module\job 作为任务类的命名空间 也可以放在任意可以自动加载到的地方
任务类不需继承任何类,如果这个类只有一个任务,那么就只需要提供一个fire方法就可以了,如果有多个小任务,就写多个方法,下面发布任务的时候会有区别
每个方法会传入两个参数 think\queue\Job $job(当前的任务对象) 和 $data(发布任务时自定义的数据)
<?php
namespace app\job;
use think\queue\Job;
class MyJob
{
public function fire(Job $job, $data){
print("<info>开始执行队列"."</info> \n");
//....这里执行具体的任务
if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了
}
//如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
$job->delete();
// 也可以重新发布这个任务
$job->release($delay); //$delay为延迟时间
}
public function failed($data){
// ...任务达到最大重试次数后,失败了
print("<info>队列执行失败"."</info> \n");
}
}
think\facade\Queue::push($job, $data = '', $queue = null)
think\facade\Queue::later($delay, $job, $data = '', $queue = null)
# 两个方法,前者是立即执行,后者是在$delay秒后执行
app\job
的,比如上面的例子一,写Job1类名即可app\module\job
的,写model/Job1
app\lib\job\Job2
@+方法名
,app\lib\job\Job2@task1
、app\lib\job\Job2@task2
<?php
namespace app\controller;
use think\facade\Queue;
class MyQueue extends Base
{
public function index($job_name = '', $job_class = '', $job_data = [], $type = 'later', $time = 5) //jio_data 需要发送的数据
{
// 当前任务将由哪个类来负责处理。
$job_class = "app\job\MyJob@fire"; // 任务类 - 执行时调用该类的deal方法
$job_queue_name = 'Check'; // 队列名称
empty($job_data) && $job_data = [ 'ts' => time(), 'bizId' => uniqid() , 'data' => $_GET ];
switch ($type) {
case 'later':
$is_push = Queue::later($time, $job_class, $job_data, $job_queue_name); // 延迟发送任务 5秒
break;
case 'push':
$is_push = Queue::push($job_class, $job_data, $job_queue_name); // 立即发送任务
break;
}
// database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false
if ($is_push !== false) {
echo date('Y-m-d H:i:s') . " 新增任务" . "<br>";
} else {
echo '新增任务错误';
}
}
}
打开浏览器访问 http://域名/MyQueue/index
,就可以看到任务新增成功
打开终端切换到当前项目根目录下,执行下面的命令:
命令 | 描述 |
---|---|
php think queue:work | 监听队列 |
php think queue:listen | 监听队列 |
php think queue:restart | 重启队列 |
php think queue:subscribe | 暂无,可能是保留的 官方有什么其他想法但是还没实现 |
work和listen的区别详细点击(think-queue 解析上)查看,
两种,具体的可选参数可以输入命令加 --help 查看。
(注意:使用不同版本的thinkphp时,可选用的参数不同)
(如:thinkphp6中就取消掉了 --daemon
参数,队列默认为循环模式)
php think queue:work --queue Check
supervisor是用Python开发的一个client/server服务,是Linux/Unix系统下的一个进程管理工具。
可以很方便的监听、启动、停止、重启一个或多个进程。
1.配置好yum源后,可以直接安装
yum install supervisor
2.Debian/Ubuntu可通过apt安装
apt-get install supervisor
3.Python使用pip安装
pip install supervisor
本项目演示使用yum安装:
打开 supervisor
的主配置文件:/etc/supervisord.conf
,找到最后一行
[include]
;files = supervisord.d/*.ini
files = /etc/supervisord.d/*.conf
最后一行,个人习惯命名conf。
进入子进程配置文件路径:/etc/supervisord.d/
,新建 Check.conf
,相关配置说明:
# 项目名
[program:Check]
# 脚本目录
directory=/www/.../Check
# 脚本执行命令
command=php think queue:work --queue Check
# supervisor启动的时候是否随着同时启动,默认True
autostart=true
# 当程序exit的时候,这个program不会自动重启,默认unexpected,设置子进程挂掉后自动重启的情况,有三个选项,false,unexpected和true。如果为false的时候,无论什么情况下,都不会被重新启动,如果为unexpected,只有当进程的退出码不在下面的exitcodes里面定义的
autorestart=false
# 这个选项是子进程启动多少秒之后,此时状态如果是running,则我们认为启动成功了。默认值为1
startsecs=1
# 脚本运行的用户身份
user=www
# 日志输出
stderr_logfile=/tmp/Check_stderr.log
stdout_logfile=/tmp/Check_stdout.log
# 把stderr重定向到stdout,默认 false
redirect_stderr = true
# stdout日志文件大小,默认 50MB
stdout_logfile_maxbytes = 20M
# stdout日志文件备份数
stdout_logfile_backups = 20
示例:
#说明同上
[program:Check]
directory=/www/.../Check
command=php think queue:listen --queue Check --tries 3
autostart=true
autorestart=false
stderr_logfile=/tmp/Check_stderr.log
stdout_logfile=/tmp/Check_stdout.log
#user = www
对于单模块而言,不同的业务逻辑为了区分可能会存在多个队列名,这种情况将多个队列名用逗号拼接起来,内容如下:
[program:queue]
user=www
command=php /www/wwwroot/tpqueue/think queue:listen --queue Check,Check1,Check2 --tries 3
启动 supervisor
systemctl start supervisord
然后可以执行不同的命令,查看队列状态,及对队列进行操作:
supervisorctl restart <application name> // 重启指定应用
supervisorctl stop <application name> // 停止指定应用
supervisorctl start <application name> // 启动指定应用
supervisorctl restart all // 重启所有应用
supervisorctl stop all // 停止所有应用
supervisorctl start all // 启动所有应用
supervisorctl status // 查看所有进程的状态
supervisorctl update // 配置文件修改后使用该命令加载新的配置
supervisorctl reload // 重新启动配置中的所有程序
输入以下命令可进入 supervisorctl 的 shell 界面
supervisorctl -c /etc/supervisord.conf
若是centos7,加入开机启动:
systemctl start supervisord //启动supervisor并加载默认配置文件
systemctl enable supervisord //将supervisor加入开机启动项
执行命令来验证是否为开机启动
systemctl is-enabled supervisord
1、在消息对列表执行过程中,当消费者执行队列出现问题时(达到失败后最大尝试次数),将会自动化清除当前队列。
Thinkphp5(think-queue)消息队列结合supervisor进程管理实现队列常驻进程
think-queue 解析上
Supervisor使用详解
本文为冯奎原创文章,转载无需和我联系,但请注明来自冯奎博客fengkui.net
最新评论