thinkphp-queuer消息队列与superviso进程守护

本文简单介绍消息队列的使用,以及在使用过程中的注意事项。
项目环境:thinkphp6.0.3 + think-queue3.0

一、think-queue 安装与基本使用

1、使用 composer 安装 thinkphp6think-queue

# 安装 thinkphp6
composer create-project topthink/think 6.0.*
# 安装 think-queue
composer require topthink/think-queue

2、queue 的配置

think-queue 内置了 Redis,Database,Sync等驱动,
这里介绍使用Redis做消息队列存储,
结合supervisor进程管理使队列常驻进程。
配置文件位于 config/queue.php

return [
    // 驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动//或其他自定义的完整的类名
    'default'     => 'redis', // Redis驱动
    'connections' => [
        ...
        'redis'    => [
            'type'       => 'redis',      // Redis 驱动
            'queue'      => 'default',    // 默认的队列名称
            'host'       => '127.0.0.1',  // redis 主机ip
            'port'       => 6379,         // redis 端口
            'password'   => '123456',     // redis 密码
            'select'     => 0,            // 使用哪一个 db,默认为 db0
            'timeout'    => 0,            // redis连接的超时时间
            'persistent' => false,        // 是否是长连接
        ],
        ...
    ]
];

3、创建任务类

单模块项目推荐使用 app\job 作为任务类的命名空间 多模块项目可用使用 app\module\job 作为任务类的命名空间 也可以放在任意可以自动加载到的地方

任务类不需继承任何类,如果这个类只有一个任务,那么就只需要提供一个fire方法就可以了,如果有多个小任务,就写多个方法,下面发布任务的时候会有区别
每个方法会传入两个参数 think\queue\Job $job(当前的任务对象) 和 $data(发布任务时自定义的数据)

<?php
namespace app\job;
use think\queue\Job;

class MyJob
{
    public function fire(Job $job, $data){
        print("<info>开始执行队列"."</info> \n");
        //....这里执行具体的任务
        if ($job->attempts() > 3) {
            //通过这个方法可以检查这个任务已经重试了几次了
        }

        //如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
        $job->delete();
        // 也可以重新发布这个任务
        $job->release($delay); //$delay为延迟时间
    }

    public function failed($data){
        // ...任务达到最大重试次数后,失败了
        print("<info>队列执行失败"."</info> \n");
    }
}

4、发布任务(生产者)

think\facade\Queue::push($job, $data = '', $queue = null)
think\facade\Queue::later($delay, $job, $data = '', $queue = null)
# 两个方法,前者是立即执行,后者是在$delay秒后执行
  1. $job 是任务名
    单模块的,且命名空间是app\job的,比如上面的例子一,写Job1类名即可
    多模块的,且命名空间是app\module\job的,写model/Job1
    其他的需要些完整的类名,比如上面的例子二,需要写完整的类名app\lib\job\Job2
    如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名app\lib\job\Job2@task1app\lib\job\Job2@task2
  2. $data 是你要传到任务里的参数
  3. $queue 队列名,指定这个任务是在哪个队列上执行,同下面监控队列的时候指定的队列名,可不填
<?php
namespace app\controller;
use think\facade\Queue;

class MyQueue extends Base
{
    public function index($job_name = '', $job_class = '', $job_data = [], $type = 'later', $time = 5) //jio_data 需要发送的数据
    {
        // 当前任务将由哪个类来负责处理。
        $job_class = "app\job\MyJob@fire"; // 任务类 - 执行时调用该类的deal方法
        $job_queue_name = 'Check'; // 队列名称
        empty($job_data) && $job_data = [ 'ts' => time(), 'bizId' => uniqid() , 'data' => $_GET ];

        switch ($type) {
            case 'later':
                $is_push = Queue::later($time, $job_class, $job_data, $job_queue_name); // 延迟发送任务 5秒
                break;
            case 'push':
                $is_push = Queue::push($job_class, $job_data, $job_queue_name);   // 立即发送任务
                break;
        }
        // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
        if ($is_push !== false) {
            echo date('Y-m-d H:i:s') . " 新增任务" . "<br>";
        } else {
            echo '新增任务错误';
        }
    }
}

打开浏览器访问 http://域名/MyQueue/index,就可以看到任务新增成功
冯奎博客

5、处理任务(消费者)

打开终端切换到当前项目根目录下,执行下面的命令:

命令 描述
php think queue:work 监听队列
php think queue:listen 监听队列
php think queue:restart 重启队列
php think queue:subscribe 暂无,可能是保留的 官方有什么其他想法但是还没实现

work和listen的区别详细点击(think-queue 解析上)查看,
两种,具体的可选参数可以输入命令加 --help 查看。
(注意:使用不同版本的thinkphp时,可选用的参数不同)
(如:thinkphp6中就取消掉了 --daemon 参数,队列默认为循环模式)

php think queue:work --queue Check

冯奎博客

二、supervisor的安装和配置

supervisor是用Python开发的一个client/server服务,是Linux/Unix系统下的一个进程管理工具。
可以很方便的监听、启动、停止、重启一个或多个进程。

1、安装

1.配置好yum源后,可以直接安装

yum install supervisor

2.Debian/Ubuntu可通过apt安装

apt-get install supervisor

3.Python使用pip安装

pip install supervisor

2、supervisor的相关配置

本项目演示使用yum安装:
打开 supervisor 的主配置文件:/etc/supervisord.conf ,找到最后一行

[include]
;files = supervisord.d/*.ini
files = /etc/supervisord.d/*.conf

最后一行,个人习惯命名conf。
进入子进程配置文件路径:/etc/supervisord.d/,新建 Check.conf,相关配置说明:

# 项目名
[program:Check]
# 脚本目录
directory=/www/.../Check
# 脚本执行命令
command=php think queue:work --queue Check

# supervisor启动的时候是否随着同时启动,默认True
autostart=true
# 当程序exit的时候,这个program不会自动重启,默认unexpected,设置子进程挂掉后自动重启的情况,有三个选项,false,unexpected和true。如果为false的时候,无论什么情况下,都不会被重新启动,如果为unexpected,只有当进程的退出码不在下面的exitcodes里面定义的
autorestart=false
# 这个选项是子进程启动多少秒之后,此时状态如果是running,则我们认为启动成功了。默认值为1
startsecs=1

# 脚本运行的用户身份
user=www

# 日志输出 
stderr_logfile=/tmp/Check_stderr.log 
stdout_logfile=/tmp/Check_stdout.log 
# 把stderr重定向到stdout,默认 false
redirect_stderr = true
# stdout日志文件大小,默认 50MB
stdout_logfile_maxbytes = 20M
# stdout日志文件备份数
stdout_logfile_backups = 20

示例:

#说明同上
[program:Check]
directory=/www/.../Check
command=php think queue:work --queue Check --tries 10
autostart=true
autorestart=false
stderr_logfile=/tmp/Check_stderr.log
stdout_logfile=/tmp/Check_stdout.log
#user = www

对于单模块而言,不同的业务逻辑为了区分可能会存在多个队列名,这种情况将多个队列名用逗号拼接起来,内容如下:

[program:queue]
user=www
command=php /www/wwwroot/tpqueue/think queue:work --queue Check,Check1,Check2

3、supervisor相关命令

启动 supervisor

supervisorctl -c /etc/supervisord.conf

上面这个命令会进入 supervisorctl 的 shell 界面,然后可以执行不同的命令了:

supervisorctl restart <application name>   // 重启指定应用
supervisorctl stop <application name>      // 停止指定应用
supervisorctl start <application name>     // 启动指定应用
supervisorctl restart all   // 重启所有应用
supervisorctl stop all      // 停止所有应用
supervisorctl start all     // 启动所有应用
supervisorctl status        //查看所有进程的状态
supervisorctl update        //配置文件修改后使用该命令加载新的配置
supervisorctl reload        //重新启动配置中的所有程序

若是centos7,加入开机启动:

systemctl start supervisord     //启动supervisor并加载默认配置文件
systemctl enable supervisord    //将supervisor加入开机启动项

执行命令来验证是否为开机启动

systemctl is-enabled supervisord

注意事项:

1、在消息对列表执行过程中,当消费者执行队列出现问题时(达到失败后最大尝试次数),将会自动化清除当前队列。

本文参考文档:

Thinkphp5(think-queue)消息队列结合supervisor进程管理实现队列常驻进程
think-queue 解析上
Supervisor使用详解

冯奎博客
请先登录后发表评论
  • latest comments
  • 总共0条评论