thinkphp6使用think-queue怎么实现普通队列和延迟队列

发布时间:2022-04-20 13:37:22 作者:zzz
来源:亿速云 阅读:683

本文小编为大家详细介绍“thinkphp6使用think-queue怎么实现普通队列和延迟队列”,内容详细,步骤清晰,细节处理妥当,希望这篇“thinkphp6使用think-queue怎么实现普通队列和延迟队列”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。

###TP6 队列

TP6 中使用 think-queue 可以实现普通队列和延迟队列。

think-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:

消息队列实现过程

1、通过生产者推送消息到消息队列服务中

2、消息队列服务将收到的消息存入redis队列中(zset)

3、消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条

4、处理获取下来的消息调用业务类进行处理相关业务

5、业务处理后,需要从队列中删除消息

composer 安装 think-queue

composer require topthink/think-queue

配置文件

安装完 think-queue 后会在 config 目录中生成 queue.php,这个文件是队列的配置文件。

tp6中提供了多种消息队列的实现方式,默认使用sync,我这里选择使用Redis。

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => env('redis.host', '127.0.0.1'),
            'port'       => env('redis.port', '6379'),
            'password'   => env('redis.password','123456'),
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

创建目录及队列消费类文件

在 app 目录下创建 queue 目录,然后在该目录下新建一个抽象类 Queue.php 文件,作为基础类

<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/**
 * Class Queue 队列消费基础类
 * @package app\queue
 */abstract class Queue{
    /**
     * @describe:fire是消息队列默认调用的方法
     * @param \think\queue\Job $job
     * @param $message
     */
    public function fire(Job $job, $data)
    {
        if (empty($data)) {
            Log::error(sprintf('[%s][%s] 队列无消息', __CLASS__, __FUNCTION__));
            return ;
        }

        $jobId = $job->getJobId(); // 队列的数据库id或者redis key
        // $jobClassName = $job->getName(); // 队列对象类
        // $queueName = $job->getQueue(); // 队列名称

        // 如果已经执行中或者执行完成就不再执行了
        if (!$this->checkJob($jobId, $data)) {
            $job->delete();
            Cache::store('redis')->delete($jobId);
            return ;
        }

        // 执行业务处理
        if ($this->execute($data)) {
            Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__));
            $job->delete(); // 任务执行成功后删除
            Cache::store('redis')->delete($jobId); // 删除redis中的缓存
        } else {
            // 检查任务重试次数
            if ($job->attempts() > 3) {
                Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__));
                 // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行
                //$job->release(10); 
                // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数
                //$job->failed();   
                // 第3种处理方式:删除任务
                $job->delete(); // 任务执行后删除
                Cache::store('redis')->delete($jobId); // 删除redis中的缓存
            }
        }
    }

    /**
     * 消息在到达消费者时可能已经不需要执行了
     * @param  string  $jobId
     * @param $message
     * @return bool 任务执行的结果
     * @throws \Psr\SimpleCache\InvalidArgumentException
     */
    protected function checkJob(string $jobId, $message): bool
    {
        // 查询redis
        $data = Cache::store('redis')->get($jobId);
        if (!empty($data)) {
            return false;
        }
        Cache::store('redis')->set($jobId, $message);
        return true;
    }

    /**
     * @describe: 根据消息中的数据进行实际的业务处理
     * @param $data 数据
     * @return bool 返回结果
     */
    abstract protected function execute($data): bool;}

所有真正的消费类继承基础抽象类

<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{
    protected function execute($data): bool
    {
       // 具体消费业务逻辑
    }}

生产者逻辑

use think\facade\Queue;

// 普通队列生成调用方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);

// 延时队列生成调用方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延时队列 10 秒后执行:
Queue::later(10 , Test::class, $data, $queueName);

开启进程监听任务并执行

php think queue:listen
php think queue:work

命令模式介绍

命令模式
命令行参数

读到这里,这篇“thinkphp6使用think-queue怎么实现普通队列和延迟队列”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注亿速云行业资讯频道。

推荐阅读:
  1. php实现延迟队列
  2. SpringBoot:RabbitMQ 延迟队列

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

thinkphp think-queue

上一篇:Redis热点数据问题怎么解决

下一篇:Python数学相关模块怎么用

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》