php实现延迟队列

发布时间:2020-06-22 06:51:54 作者:www333335555
来源:网络 阅读:477

延迟队列,顾名思义它是一种带有延迟功能的消息队列。 那么,是在什么场景下我才需要这样的队列呢?

一、背景

先看看一下业务场景:

通常解决以上问题,最简单直接的办法就是定时去扫表。

扫表存在的问题是:

延时队列能对于上述需求能很好的解决

二、调研

调研了市场上一些开源的方案,以下:

基本以上原因打算自己写一个,平常使用php多,项目基本redis的zset结构作为存储,用php语言实现 ,实现原理参考了有赞团队:https://tech.youzan.com/queuing_delay/

三、目标

四、架构设计与说明

总体架构
php实现延迟队列

采用master-work架构模式,主要包括6个模块:

五、部署

环境依赖:PHP 5.4+ 安装sockets,redis,pcntl,pdo_mysql 拓展

step1:安装数据库用于存储一些topic以及告警信息
create database dq;
#存放告警信息
CREATE TABLE `dq_alert` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `host` varchar(255) NOT NULL DEFAULT '',
  `port` int(11) NOT NULL DEFAULT '0',
  `user` varchar(255) NOT NULL DEFAULT '',
  `pwd` varchar(255) NOT NULL DEFAULT '',
  `ext` varchar(2048) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
#存放redis信息
CREATE TABLE `dq_redis` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `t_name` varchar(200) NOT NULL DEFAULT '',
  `t_content` varchar(2048) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8;
#存储注册信息
CREATE TABLE `dq_topic` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `t_name` varchar(1024) NOT NULL DEFAULT '',
  `delay` int(11) NOT NULL DEFAULT '0',
  `callback` varchar(1024) NOT NULL DEFAULT '',
  `timeout` int(11) NOT NULL DEFAULT '3000',
  `email` varchar(1024) NOT NULL DEFAULT '',
  `topic` varchar(255) NOT NULL DEFAULT '',
  `createor` varchar(1024) NOT NULL DEFAULT '',
  `status` tinyint(4) NOT NULL DEFAULT '1',
  `method` varchar(32) NOT NULL DEFAULT 'GET',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
step2:在DqConfg.文件中配置数据库信息: DqConf::$db step3: 启动http服务

在DqConf.php文件中修改php了路径 $logPath

命令:

php DqHttpServer.php --port 8088

访问:http://127.0.0.1:8088,出现配置界面
php实现延迟队列
redis信息格式:host:post:auth 比如 127.0.0.1:6379:12345

stop4:启动服务进程:

php DqInit.php --port 6789
看到如下信息说明启动成功
php实现延迟队列

stop5:配置告信息(比如redis宕机)

php实现延迟队列

stop6:注册topic

php实现延迟队列
php实现延迟队列

step7: 写入数据,在项目根目录下新建test.php文件写入
<?php
include_once 'DqLoader.php';
date_default_timezone_set("PRC");
//可配置多个
$server=array(
    '127.0.0.1:6789',
);
$dqClient = new DqClient();
$dqClient->addServer($server);

$topic ='order_openvip_checker'; //topic在后台注册
$id = uniqid();
$data=array(
    'id'=>$id,
    'body'=>array(
        'a'=>1,
        'b'=>2,
        'c'=>3,
        'ext'=>str_repeat('a',64),
    ),
    //可选,设置后以这个通知时间为准,默认延时时间在注册topic的时候指定
    'fix_time'=>date('Y-m-d 23:50:50'),
);

//添加
$boolRet = $dqClient->add($topic, $data);
echo 'add耗时:'.(msectime() - $time)."ms\n";
//查询
$time = msectime();
$result = $dqClient->get($topic, $id);
echo 'get耗时:'.(msectime() - $time)."ms\n";

//删除
$time = msectime();
$boolRet = $dqClient->del($topic,$id);
echo 'del耗时:'.(msectime() - $time)."ms\n";

执行php test.php

step8:查看日志

默认日志目录在项目目录的logs目录下,在DqConf.php修改$logPath

step9:如果配置文件有改动

ps -ef | grep dq-master| grep -v grep | head -n 1 | awk '{print $2}' | xargs kill -USR2

六、性能测试

需要安装pthreads拓展:

测试原理:使用多线程模拟并发,在1s内能成功返回请求成功的个数

php DqBench  concurrency  requests
concurrency:并发数
requests: 每个并发产生的请求数

测试环境:内存 8G ,8核cpu,2个redis和1个dq-server 部署在一个机器上,数据包64字节
qps:2400

七、值得一提的性能优化点:

八、异常处理

如果调用通知接口在超时时间内,没有收到回复认为通知失败,系统会重新把数据放入队列,重新通知,系统默认最大通知10次(可以在Dqconf.php文件中修改$notify_exp_nums)通知间隔为2n+1,比如第一次1分钟,通知失败,第二次3分钟后,直到收到回复,超出最大通知次数后系统自动丢弃,同时发邮件通知

ps:网络抖动在所难免,通知接口如果涉及到核心的服务,一定要保证幂等!!

九、线上情况

线上部署了两个实例每个机房部一个,4个redis作存储,服务稳定运行数月,各项指标均符合预期

主要接入业务:

项目地址: https://github.com/chenlinzhong/php-delayqueue

推荐阅读:
  1. rabbitmq延迟队列之php实现
  2. 【laravel】 Laravel延迟队列

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

延迟队列 高可用

上一篇:超融合市场的战争远未结束,谁将最终胜出?

下一篇:keepalived+双主

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》