如何基于 Redis 实现滑动窗口限流?
滑动窗口限流是一种更精确、更平滑的限流算法,它比固定窗口限流更能应对突发的流量。基于Redis实现滑动窗口限流有多种方案,我将从简单到复杂为您详细解析。
一、 滑动窗口限流原理
核心思想:统计最近N秒内的请求数量,而不是固定时间段的请求数量。
固定窗口 vs 滑动窗口:
固定窗口(10秒内100次):
[ 0-10s ]: 90次
[10-20s ]: 110次 ← 限流!但实际上20秒内只有200次
滑动窗口(10秒内100次):
当前时间15s,统计[5s-15s]的请求数:105次 ← 限流更精确
二、 方案一:Redis Sorted Set(最经典方案)
2.1 实现原理
使用有序集合存储请求的时间戳,通过 ZREMRANGEBYSCORE 移除旧数据,通过 ZCARD 统计窗口内请求数。
-- Lua脚本:保证原子性
local key = KEYS[1] -- 限流键,如"rate_limit:user:123"
local now = tonumber(ARGV[1]) -- 当前时间戳(秒)
local window = tonumber(ARGV[2]) -- 窗口大小(秒)
local limit = tonumber(ARGV[3]) -- 窗口内最大请求数
local unique_id = ARGV[4] -- 请求唯一标识(如UUID)
-- 1. 移除窗口外的旧数据(分数小于 now - window 的数据)
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
-- 2. 获取当前窗口内的请求数
local current = redis.call('ZCARD', key)
-- 3. 如果未超过限制,则添加当前请求
if current < limit then
-- 使用时间戳作为分数,保证顺序;使用唯一ID作为成员
redis.call('ZADD', key, now, unique_id)
-- 设置过期时间,避免内存泄漏(窗口大小+1秒)
redis.call('EXPIRE', key, window + 1)
return 1 -- 允许通过
else
return 0 -- 拒绝请求
end
2.2 Java实现
@Component
public class SlidingWindowRateLimiter {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private final DefaultRedisScript<Long> rateLimitScript;
public SlidingWindowRateLimiter() {
// 加载Lua脚本
rateLimitScript = new DefaultRedisScript<>();
rateLimitScript.setScriptSource(new ResourceScriptSource(
new ClassPathResource("scripts/rate_limiter.lua")
));
rateLimitScript.setResultType(Long.class);
}
/**
* 检查是否允许请求
* @param key 限流键
* @param window 窗口大小(秒)
* @param limit 窗口内最大请求数
* @return true=允许,false=拒绝
*/
public boolean allowRequest(String key, int window, int limit) {
long now = System.currentTimeMillis() / 1000; // 秒级时间戳
String uniqueId = UUID.randomUUID().toString();
List<String> keys = Collections.singletonList(key);
Object[] args = {now, window, limit, uniqueId};
Long result = redisTemplate.execute(
rateLimitScript,
keys,
args
);
return result != null && result == 1;
}
/**
* 获取剩余请求数
*/
public long getRemainingRequests(String key, int window, int limit) {
long now = System.currentTimeMillis() / 1000;
String windowKey = key;
// 先清理旧数据
redisTemplate.opsForZSet().removeRangeByScore(
windowKey, 0, now - window
);
// 获取当前数量
Long current = redisTemplate.opsForZSet().zCard(windowKey);
return limit - (current != null ? current : 0);
}
}
2.3 使用示例
@RestController
public class ApiController {
@Autowired
private SlidingWindowRateLimiter rateLimiter;
@GetMapping("/api/resource")
public ResponseEntity<?> getResource(@RequestParam String userId) {
String key = "rate_limit:api:resource:user:" + userId;
int window = 60; // 60秒窗口
int limit = 100; // 最多100次请求
if (!rateLimiter.allowRequest(key, window, limit)) {
long remaining = rateLimiter.getRemainingRequests(key, window, limit);
return ResponseEntity.status(429)
.header("X-RateLimit-Remaining", String.valueOf(remaining))
.body("请求过于频繁,请稍后再试");
}
// 处理业务逻辑
return ResponseEntity.ok("成功获取资源");
}
}
2.4 优缺点分析
优点:
- ✅ 精确控制:真正的滑动窗口,统计准确
- ✅ 代码清晰:实现逻辑直观易懂
缺点:
- ❌ 内存消耗:每个请求都存储一个ZSet成员
- ❌ 性能开销:每次请求都需要执行Lua脚本和ZSet操作
- ❌ 大数据量问题:窗口内请求多时,ZSet会很大
三、 方案二:Redis Cell模块(Redis 4.0+,推荐方案)
3.1 什么是Redis Cell?
Redis Cell是一个官方限流模块,实现了滑动窗口算法的漏桶变体,效率极高。
3.2 安装与使用
# 1. 下载模块
wget https://github.com/brandur/redis-cell/releases/download/v0.3.0/redis-cell-v0.3.0-x86_64-unknown-linux-gnu.tar.gz
# 2. 解压并加载
tar -xzf redis-cell*.tar.gz
# 在redis.conf中添加:
# loadmodule /path/to/libredis_cell.so
# 3. 重启Redis
3.3 核心命令:CL.THROTTLE
# 语法:CL.THROTTLE <key> <max_burst> <count> <period> [<quantity>]
# 参数:
# key: 限流键
# max_burst: 最大突发量(令牌桶容量)
# count: 时间窗口内允许的请求数
# period: 时间窗口(秒)
# quantity: 本次消耗的令牌数(默认1)
# 示例:60秒内最多100次请求,突发容量为10
CL.THROTTLE user_api_limit 10 100 60
返回值:
1) (integer) 0 # 0=允许,1=拒绝
2) (integer) 11 # 令牌桶容量(max_burst + 1)
3) (integer) 10 # 剩余令牌数
4) (integer) -1 # 如果被拒绝,多少秒后重试(-1表示无需等待)
5) (integer) 60 # 窗口大小(秒)
3.4 Java实现
@Component
public class RedisCellRateLimiter {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 使用Redis Cell进行限流
* @param key 限流键
* @param maxBurst 最大突发量
* @param count 窗口内请求数
* @param period 窗口大小(秒)
* @return 限流结果
*/
public RateLimitResult throttle(String key, int maxBurst, int count, int period) {
// 构造CL.THROTTLE命令
String[] args = {
key,
String.valueOf(maxBurst),
String.valueOf(count),
String.valueOf(period),
"1" // 本次消耗1个令牌
};
// 执行命令
List<Long> result = redisTemplate.execute(
(RedisCallback<List<Long>>) connection -> {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
byte[][] cmdArgs = new byte[args.length][];
for (int i = 0; i < args.length; i++) {
cmdArgs[i] = serializer.serialize(args[i]);
}
return connection.execute(
"CL.THROTTLE",
cmdArgs,
RedisCellResponseParser.INSTANCE
);
}
);
if (result == null || result.size() < 5) {
throw new RuntimeException("Redis Cell执行失败");
}
return RateLimitResult.builder()
.allowed(result.get(0) == 0)
.limit(result.get(1).intValue())
.remaining(result.get(2).intValue())
.retryAfter(result.get(3).intValue())
.resetAfter(result.get(4).intValue())
.build();
}
@Data
@Builder
public static class RateLimitResult {
private boolean allowed; // 是否允许
private int limit; // 总限制数
private int remaining; // 剩余令牌数
private int retryAfter; // 重试等待时间(秒)
private int resetAfter; // 重置时间(秒)
}
}
3.5 优点
- ✅ 性能极高:C语言实现,效率远超Lua脚本
- ✅ 功能丰富:支持突发流量、返回重试时间
- ✅ 官方维护:Redis官方模块,稳定可靠
四、 方案三:时间分片方案(内存优化版)
4.1 原理
将滑动窗口划分为多个时间片,每个时间片记录该时间段内的请求数。
滑动窗口:60秒
时间片:10秒(分为6个时间片)
存储结构:
key: "rate_limit:user:123"
value: {
"timestamp": 1640995200, // 当前窗口起始时间
"counts": [5, 8, 12, 3, 9, 7] // 6个时间片的计数
}
4.2 Lua脚本实现
-- 参数:
-- KEYS[1]: 限流键
-- ARGV[1]: 当前时间戳(秒)
-- ARGV[2]: 窗口大小(秒)
-- ARGV[3]: 时间片大小(秒)
-- ARGV[4]: 最大请求数
-- ARGV[5]: 本次请求消耗的令牌数(默认1)
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local slice_size = tonumber(ARGV[3])
local max_requests = tonumber(ARGV[4])
local cost = tonumber(ARGV[5] or 1)
-- 计算时间片数量
local slice_count = math.floor(window / slice_size)
-- 获取当前数据
local data = redis.call('HGETALL', key)
local current_slice_index
local counts = {}
if #data == 0 then
-- 第一次初始化
current_slice_index = math.floor(now / slice_size) % slice_count
for i = 0, slice_count-1 do
counts[i] = 0
end
else
-- 解析现有数据
local base_time = tonumber(data[2])
current_slice_index = math.floor(now / slice_size) % slice_count
local base_slice_index = math.floor(base_time / slice_size) % slice_count
-- 初始化计数数组
for i = 0, slice_count-1 do
counts[i] = 0
end
-- 填充有效数据
for i = 3, #data, 2 do
local idx = tonumber(data[i])
local count = tonumber(data[i+1])
counts[idx] = count
end
-- 清理过期的时间片
local elapsed_slices = (current_slice_index - base_slice_index + slice_count) % slice_count
if elapsed_slices > 0 then
for i = 1, elapsed_slices do
local idx = (base_slice_index + i - 1) % slice_count
counts[idx] = 0
end
end
end
-- 计算窗口内总请求数
local total = 0
for i = 0, slice_count-1 do
total = total + counts[i]
end
-- 检查是否超过限制
if total + cost > max_requests then
return {0, total, max_requests - total} -- 拒绝
else
-- 更新当前时间片的计数
counts[current_slice_index] = counts[current_slice_index] + cost
-- 保存回Redis
local save_data = {}
table.insert(save_data, "base_time")
table.insert(save_data, tostring(now))
for i = 0, slice_count-1 do
table.insert(save_data, tostring(i))
table.insert(save_data, tostring(counts[i]))
end
redis.call('HMSET', key, unpack(save_data))
redis.call('EXPIRE', key, window + slice_size)
return {1, total + cost, max_requests - (total + cost)} -- 允许
end
4.3 优缺点
优点:
- ✅ 内存高效:无论多少请求,都只存储固定数量的时间片
- ✅ 性能较好:比ZSet方案更轻量
缺点:
- ❌ 实现复杂:时间片管理和清理逻辑复杂
- ❌ 精度有限:依赖时间片大小,不是完全精确的滑动窗口
五、 方案四:Redis Stream方案(分布式场景)
5.1 原理
利用Redis Stream作为时间窗口的请求日志,自动处理过期数据。
-- Lua脚本:使用Stream实现滑动窗口
local key = KEYS[1] -- Stream键名
local now = tonumber(ARGV[1]) -- 当前时间戳(毫秒)
local window = tonumber(ARGV[2]) -- 窗口大小(毫秒)
local limit = tonumber(ARGV[3]) -- 限制数
local request_id = ARGV[4] -- 请求ID
-- 1. 添加当前请求到Stream
redis.call('XADD', key, '*', 'request', request_id, 'time', now)
-- 2. 获取Stream长度(窗口内的请求数)
local stream_length = redis.call('XLEN', key)
if stream_length > limit then
-- 3. 如果超过限制,尝试清理过期数据
local min_id = tostring(now - window) -- 最小有效ID
-- 使用XRANGE获取最旧的消息
local old_messages = redis.call('XRANGE', key, '-', '+', 'COUNT', 1)
if #old_messages > 0 then
local oldest_id = old_messages[1][1]
local oldest_time = tonumber(string.sub(oldest_id, 1, string.find(oldest_id, '-') - 1))
if oldest_time < now - window then
-- 删除过期的消息
redis.call('XTRIM', key, 'MINID', '~', min_id)
-- 重新获取长度
stream_length = redis.call('XLEN', key)
end
end
end
-- 4. 检查是否超过限制
if stream_length > limit then
-- 删除刚刚添加的消息(因为超限了)
local latest = redis.call('XRANGE', key, '+', '-', 'COUNT', 1)
if #latest > 0 then
redis.call('XDEL', key, latest[1][1])
end
return 0 -- 拒绝
else
-- 设置Stream的过期时间
redis.call('EXPIRE', key, math.ceil(window / 1000) + 1)
return 1 -- 允许
end
六、 生产环境选型指南
| 方案 | 精确度 | 性能 | 内存效率 | 实现复杂度 | 适用场景 |
|---|---|---|---|---|---|
| Sorted Set | 高 | 中 | 低 | 中 | 中小流量,需要精确控制 |
| Redis Cell | 高 | 极高 | 高 | 低 | 生产环境首选 |
| 时间分片 | 中 | 高 | 高 | 高 | 大流量,内存敏感 |
| Redis Stream | 高 | 中 | 中 | 中 | 需要请求日志审计 |
6.1 推荐方案组合
# 不同场景的推荐方案
生产环境通用限流:
推荐: Redis Cell (CL.THROTTLE)
理由: 官方模块,性能最好,功能完善
自定义复杂规则:
推荐: Sorted Set + Lua
理由: 灵活,可完全自定义逻辑
海量用户/高并发:
推荐: 时间分片方案
理由: 内存效率高,可水平扩展
需要审计追踪:
推荐: Redis Stream
理由: 天然的消息日志,便于分析
6.2 Spring Boot集成示例
@Configuration
public class RateLimitConfig {
@Bean
public RedisCellRateLimiter redisCellRateLimiter() {
return new RedisCellRateLimiter();
}
@Bean
public FilterRegistrationBean<RateLimitFilter> rateLimitFilter() {
FilterRegistrationBean<RateLimitFilter> registration = new FilterRegistrationBean<>();
registration.setFilter(new RateLimitFilter());
registration.addUrlPatterns("/api/*");
registration.setOrder(1);
return registration;
}
}
@Slf4j
public class RateLimitFilter extends OncePerRequestFilter {
@Autowired
private RedisCellRateLimiter rateLimiter;
@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain) throws ServletException, IOException {
String clientIp = getClientIp(request);
String apiPath = request.getRequestURI();
String key = String.format("rate_limit:ip:%s:path:%s", clientIp, apiPath);
// 配置:60秒内最多100次,突发容量20
RedisCellRateLimiter.RateLimitResult result = rateLimiter.throttle(
key, 20, 100, 60
);
if (!result.isAllowed()) {
log.warn("限流触发: IP={}, Path={}, Remaining={}",
clientIp, apiPath, result.getRemaining());
response.setStatus(429);
response.setContentType("application/json");
response.setHeader("X-RateLimit-Limit", String.valueOf(result.getLimit()));
response.setHeader("X-RateLimit-Remaining", String.valueOf(result.getRemaining()));
response.setHeader("Retry-After", String.valueOf(result.getRetryAfter()));
response.getWriter().write(String.format(
"{\"code\":429,\"message\":\"请求过于频繁,请%d秒后重试\",\"retry_after\":%d}",
result.getRetryAfter(), result.getRetryAfter()
));
return;
}
// 添加限流头部信息
response.setHeader("X-RateLimit-Limit", String.valueOf(result.getLimit()));
response.setHeader("X-RateLimit-Remaining", String.valueOf(result.getRemaining()));
response.setHeader("X-RateLimit-Reset", String.valueOf(result.getResetAfter()));
filterChain.doFilter(request, response);
}
private String getClientIp(HttpServletRequest request) {
String ip = request.getHeader("X-Forwarded-For");
if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {
ip = request.getRemoteAddr();
}
return ip.split(",")[0].trim();
}
}
七、 高级特性与最佳实践
7.1 多维度限流
// 同时限制IP、用户、全局
public boolean multiDimensionLimit(HttpServletRequest request, String userId) {
// 1. IP维度限流
String ipKey = "rate_limit:ip:" + getClientIp(request) + ":global";
// 2. 用户维度限流
String userKey = "rate_limit:user:" + userId + ":global";
// 3. API全局限流
String apiKey = "rate_limit:api:" + request.getRequestURI();
// 任何一个维度限流都触发整体限流
return checkLimit(ipKey, 1000, 60) // IP: 60秒1000次
&& checkLimit(userKey, 500, 60) // 用户: 60秒500次
&& checkLimit(apiKey, 10000, 60); // API: 60秒10000次
}
7.2 动态配置
@Component
public class DynamicRateLimiter {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private final Map<String, RateLimitConfig> configCache = new ConcurrentHashMap<>();
@Scheduled(fixedDelay = 30000) // 每30秒刷新配置
public void refreshConfig() {
// 从数据库或配置中心加载限流配置
List<RateLimitConfig> configs = loadConfigFromDB();
configCache.clear();
configs.forEach(config ->
configCache.put(config.getKeyPattern(), config)
);
}
public boolean checkLimit(String key, String path) {
// 匹配最符合的配置
RateLimitConfig config = findBestMatchConfig(path);
if (config == null) return true;
String limitKey = config.buildKey(key, path);
return executeLimit(limitKey, config.getLimit(), config.getWindow());
}
}
7.3 监控与告警
@Slf4j
@Component
public class RateLimitMonitor {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Scheduled(fixedRate = 60000) // 每分钟监控一次
public void monitorRateLimit() {
// 扫描所有限流key
Set<String> keys = redisTemplate.keys("rate_limit:*");
for (String key : keys) {
// 获取拒绝率
Long total = getTotalRequests(key);
Long rejected = getRejectedRequests(key);
if (total > 0) {
double rejectRate = (double) rejected / total;
if (rejectRate > 0.1) { // 拒绝率超过10%告警
log.warn("限流告警: key={}, 拒绝率={}%", key, rejectRate * 100);
sendAlert(key, rejectRate);
}
}
}
}
}
总结
基于Redis实现滑动窗口限流,Redis Cell模块(CL.THROTTLE)是生产环境的最优选择,它性能极高、功能完善。如果无法使用Redis模块,则Sorted Set + Lua脚本是次优选择。
关键要点:
- 精确性优先:滑动窗口比固定窗口更精确,能更好地应对流量突发
- 原子性保证:必须使用Lua脚本保证Redis操作的原子性
- 内存管理:注意设置合理的过期时间,避免内存泄漏
- 监控告警:实时监控限流状态,及时发现异常
- 分层限流:实现IP、用户、API等多维度限流策略
滑动窗口限流的本质是在时间维度上滑动统计,既要保证精确性,又要兼顾性能。选择合适的方案,合理配置参数,才能在高并发系统中既保护服务稳定,又不误伤正常用户。