您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Redis限流的实现方法有哪些
## 目录
1. [限流技术概述](#限流技术概述)
2. [Redis实现限流的优势](#Redis实现限流的优势)
3. [基于计数器的限流](#基于计数器的限流)
4. [滑动窗口限流](#滑动窗口限流)
5. [令牌桶算法](#令牌桶算法)
6. [漏桶算法](#漏桶算法)
7. [分布式限流方案](#分布式限流方案)
8. [Redis模块扩展](#Redis模块扩展)
9. [实际应用场景](#实际应用场景)
10. [性能优化建议](#性能优化建议)
11. [常见问题与解决方案](#常见问题与解决方案)
12. [总结](#总结)
---
## 限流技术概述
限流(Rate Limiting)是保障系统稳定性的核心手段,通过控制单位时间内的请求量,防止系统因突发流量导致资源耗尽。常见的限流维度包括:
- QPS(每秒查询率)
- 并发连接数
- 用户/API级别配额
传统单机限流存在一致性难题,而Redis凭借其单线程特性、原子操作和丰富的数据结构,成为分布式限流的首选方案。
---
## Redis实现限流的优势
1. **原子性保证**:INCR、SETNX等命令的原子执行
2. **高性能**:内存操作达到10万+ QPS
3. **持久化支持**:RDB/AOF保障限流数据不丢失
4. **分布式协调**:多节点共享限流状态
5. **丰富数据结构**:String/Hash/ZSet等灵活组合
---
## 基于计数器的限流
最简单的限流实现,适用于对精度要求不高的场景。
### 基础实现
```python
def is_allowed(key, limit, period):
current = redis.incr(key)
if current == 1:
redis.expire(key, period)
return current <= limit
-- KEYS[1]: 限流key
-- ARGV[1]: 时间窗口(秒)
-- ARGV[2]: 最大阈值
local counter = redis.call("INCR", KEYS[1])
if counter == 1 then
redis.call("EXPIRE", KEYS[1], ARGV[1])
end
return counter <= tonumber(ARGV[2]) and 1 or 0
通过多时间片实现更平滑的流量控制,典型实现方式:
def sliding_window(key, window_size, max_count):
now = time.time()
pipeline = redis.pipeline()
pipeline.zadd(key, {now: now})
pipeline.zremrangebyscore(key, 0, now - window_size)
pipeline.zcard(key)
_, _, count = pipeline.execute()
return count <= max_count
将窗口划分为多个子区间(如60个1秒区间),使用Hash存储:
local current_time = tonumber(ARGV[1])
local window_size = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local slots = {}
for i=1, window_size do
slots[i] = current_time - window_size + i
end
local sum = 0
for _, slot in ipairs(slots) do
sum = sum + tonumber(redis.call("HGET", KEYS[1], slot) or 0)
end
if sum >= limit then
return 0
end
redis.call("HSET", KEYS[1], current_time, (redis.call("HGET", KEYS[1], current_time) or 0) + 1)
redis.call("EXPIRE", KEYS[1], window_size * 2)
return 1
允许突发流量的经典算法,Redis实现要点:
key = {
"last_time": 最后更新时间戳,
"tokens": 当前令牌数,
"rate": 每秒生成速率,
"capacity": 桶容量
}
local key = KEYS[1]
local now = tonumber(ARGV[1])
local tokens_requested = tonumber(ARGV[2])
local bucket = redis.call("HMGET", key, "last_time", "tokens", "rate", "capacity")
local last_time = tonumber(bucket[1])
local tokens = tonumber(bucket[2])
local rate = tonumber(bucket[3])
local capacity = tonumber(bucket[4])
-- 初始化桶
if not last_time then
last_time = now
tokens = capacity
redis.call("HMSET", key, "last_time", last_time, "tokens", tokens, "rate", rate, "capacity", capacity)
end
-- 计算新增令牌
local elapsed = now - last_time
local new_tokens = elapsed * rate
if new_tokens > 0 then
tokens = math.min(tokens + new_tokens, capacity)
last_time = now
end
-- 检查令牌是否充足
if tokens >= tokens_requested then
tokens = tokens - tokens_requested
redis.call("HMSET", key, "last_time", last_time, "tokens", tokens)
return 1
end
return 0
恒定速率输出的流量整形算法,与令牌桶的区别: 1. 处理速率恒定 2. 不支持突发流量
local key = KEYS[1]
local now = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local rate = tonumber(ARGV[3])
local bucket = redis.call("HMGET", key, "water", "last_time")
local water = tonumber(bucket[1] or 0)
local last_time = tonumber(bucket[2] or now)
-- 计算流出量
local elapsed = now - last_time
water = math.max(0, water - elapsed * rate)
-- 尝试加水
if water + 1 <= capacity then
redis.call("HMSET", key, "water", water + 1, "last_time", now)
return 1
end
return 0
方案 | 优点 | 缺点 |
---|---|---|
中心化计数器 | 实现简单 | 单点压力大 |
分片计数 | 负载均衡 | 精度下降 |
Redis+Celery | 准实时聚合 | 延迟较高 |
# 使用Redis集群的hash tag保证相同用户路由到同一节点
def cluster_limiter(user_id, limit):
key = f"limiter:{{{user_id}}}"
return redis.incr(key) <= limit
提供原子化的令牌桶实现:
CL.THROTTLE user123 100 400 60 1
参数说明:
- key=user123
- 最大容量=100
- 每秒补充400个令牌
- 时间窗口=60秒
- 本次申请令牌数=1
方法 | QPS | 内存消耗 |
---|---|---|
原生Lua | 35,000 | 低 |
redis-cell | 75,000 | 中 |
多级缓存 | 120,000 | 高 |
location /api/ {
lua_shared_dict my_limit_req_store 100m;
access_by_lua_file /path/to/redis_limiter.lua;
}
// Redisson实现
RRateLimiter rateLimiter = redisson.getRateLimiter("seckill");
rateLimiter.trySetRate(RateType.OVERALL, 1000, 1, RateIntervalUnit.SECONDS);
if(rateLimiter.tryAcquire()) {
// 处理订单
}
# Scrapy中间件示例
def process_request(self, request, spider):
domain = urlparse(request.url).netloc
if not check_redis_limit(f"crawl:{domain}", 5, 60):
raise IgnoreRequest("Domain rate limited")
redis-cli info stats | grep instantaneous_ops
redis-cli slowlog get
现象:时间窗口边缘出现流量突增
解决方案:采用滑动窗口+子区间划分
现象:限流检查导致请求延迟
解决方案:
- 设置合理的连接超时
- 降级策略:本地限流+Redis限流组合
现象:内存快速增长
解决方案:
- 设置自动过期时间
- 定期扫描清理(使用SCAN替代KEYS)
Redis限流方案选型指南:
算法 | 适用场景 | 实现复杂度 | 精准度 |
---|---|---|---|
固定窗口 | 简单快速拦截 | ★☆☆☆☆ | 低 |
滑动窗口 | API精准控制 | ★★★☆☆ | 高 |
令牌桶 | 允许突发流量 | ★★★★☆ | 中 |
漏桶 | 恒定速率输出 | ★★★☆☆ | 高 |
技术演进趋势: 1. 硬件加速:Redis 7.0的Function加速 2. 混合架构:本地限流+Redis协调 3. 自适应限流:基于实时指标的动态调整 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。