如何基于 Redis 实现滑动窗口限流?

如何基于 Redis 实现滑动窗口限流?如何基于 Redis 实现滑动窗口限流?

滑动窗口限流是一种更精确、更平滑的限流算法,它比固定窗口限流更能应对突发的流量。基于Redis实现滑动窗口限流有多种方案,我将从简单到复杂为您详细解析。


一、 滑动窗口限流原理

核心思想:统计最近N秒内的请求数量,而不是固定时间段的请求数量。

固定窗口 vs 滑动窗口:

固定窗口(10秒内100次):
[ 0-10s ]: 90次
[10-20s ]: 110次  ← 限流!但实际上20秒内只有200次

滑动窗口(10秒内100次):
当前时间15s,统计[5s-15s]的请求数:105次 ← 限流更精确

二、 方案一:Redis Sorted Set(最经典方案)

2.1 实现原理

使用有序集合存储请求的时间戳,通过 ZREMRANGEBYSCORE 移除旧数据,通过 ZCARD 统计窗口内请求数。

-- Lua脚本:保证原子性
local key = KEYS[1]  -- 限流键,如"rate_limit:user:123"
local now = tonumber(ARGV[1])  -- 当前时间戳(秒)
local window = tonumber(ARGV[2])  -- 窗口大小(秒)
local limit = tonumber(ARGV[3])  -- 窗口内最大请求数
local unique_id = ARGV[4]  -- 请求唯一标识(如UUID)

-- 1. 移除窗口外的旧数据(分数小于 now - window 的数据)
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)

-- 2. 获取当前窗口内的请求数
local current = redis.call('ZCARD', key)

-- 3. 如果未超过限制,则添加当前请求
if current < limit then
    -- 使用时间戳作为分数,保证顺序;使用唯一ID作为成员
    redis.call('ZADD', key, now, unique_id)
    -- 设置过期时间,避免内存泄漏(窗口大小+1秒)
    redis.call('EXPIRE', key, window + 1)
    return 1  -- 允许通过
else
    return 0  -- 拒绝请求
end

2.2 Java实现

@Component
public class SlidingWindowRateLimiter {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    private final DefaultRedisScript<Long> rateLimitScript;
    
    public SlidingWindowRateLimiter() {
        // 加载Lua脚本
        rateLimitScript = new DefaultRedisScript<>();
        rateLimitScript.setScriptSource(new ResourceScriptSource(
            new ClassPathResource("scripts/rate_limiter.lua")
        ));
        rateLimitScript.setResultType(Long.class);
    }
    
    /**
     * 检查是否允许请求
     * @param key 限流键
     * @param window 窗口大小(秒)
     * @param limit 窗口内最大请求数
     * @return true=允许,false=拒绝
     */
    public boolean allowRequest(String key, int window, int limit) {
        long now = System.currentTimeMillis() / 1000; // 秒级时间戳
        String uniqueId = UUID.randomUUID().toString();
        
        List<String> keys = Collections.singletonList(key);
        Object[] args = {now, window, limit, uniqueId};
        
        Long result = redisTemplate.execute(
            rateLimitScript, 
            keys, 
            args
        );
        
        return result != null && result == 1;
    }
    
    /**
     * 获取剩余请求数
     */
    public long getRemainingRequests(String key, int window, int limit) {
        long now = System.currentTimeMillis() / 1000;
        String windowKey = key;
        
        // 先清理旧数据
        redisTemplate.opsForZSet().removeRangeByScore(
            windowKey, 0, now - window
        );
        
        // 获取当前数量
        Long current = redisTemplate.opsForZSet().zCard(windowKey);
        return limit - (current != null ? current : 0);
    }
}

2.3 使用示例

@RestController
public class ApiController {
    
    @Autowired
    private SlidingWindowRateLimiter rateLimiter;
    
    @GetMapping("/api/resource")
    public ResponseEntity<?> getResource(@RequestParam String userId) {
        String key = "rate_limit:api:resource:user:" + userId;
        int window = 60; // 60秒窗口
        int limit = 100; // 最多100次请求
        
        if (!rateLimiter.allowRequest(key, window, limit)) {
            long remaining = rateLimiter.getRemainingRequests(key, window, limit);
            return ResponseEntity.status(429)
                .header("X-RateLimit-Remaining", String.valueOf(remaining))
                .body("请求过于频繁,请稍后再试");
        }
        
        // 处理业务逻辑
        return ResponseEntity.ok("成功获取资源");
    }
}

2.4 优缺点分析

优点

  • 精确控制:真正的滑动窗口,统计准确
  • 代码清晰:实现逻辑直观易懂

缺点

  • 内存消耗:每个请求都存储一个ZSet成员
  • 性能开销:每次请求都需要执行Lua脚本和ZSet操作
  • 大数据量问题:窗口内请求多时,ZSet会很大

三、 方案二:Redis Cell模块(Redis 4.0+,推荐方案)

3.1 什么是Redis Cell?

Redis Cell是一个官方限流模块,实现了滑动窗口算法的漏桶变体,效率极高。

3.2 安装与使用

# 1. 下载模块
wget https://github.com/brandur/redis-cell/releases/download/v0.3.0/redis-cell-v0.3.0-x86_64-unknown-linux-gnu.tar.gz

# 2. 解压并加载
tar -xzf redis-cell*.tar.gz
# 在redis.conf中添加:
# loadmodule /path/to/libredis_cell.so

# 3. 重启Redis

3.3 核心命令:CL.THROTTLE

# 语法:CL.THROTTLE <key> <max_burst> <count> <period> [<quantity>]
# 参数:
#   key: 限流键
#   max_burst: 最大突发量(令牌桶容量)
#   count: 时间窗口内允许的请求数
#   period: 时间窗口(秒)
#   quantity: 本次消耗的令牌数(默认1)

# 示例:60秒内最多100次请求,突发容量为10
CL.THROTTLE user_api_limit 10 100 60

返回值

1) (integer) 0    # 0=允许,1=拒绝
2) (integer) 11   # 令牌桶容量(max_burst + 1)
3) (integer) 10   # 剩余令牌数
4) (integer) -1   # 如果被拒绝,多少秒后重试(-1表示无需等待)
5) (integer) 60   # 窗口大小(秒)

3.4 Java实现

@Component
public class RedisCellRateLimiter {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    /**
     * 使用Redis Cell进行限流
     * @param key 限流键
     * @param maxBurst 最大突发量
     * @param count 窗口内请求数
     * @param period 窗口大小(秒)
     * @return 限流结果
     */
    public RateLimitResult throttle(String key, int maxBurst, int count, int period) {
        // 构造CL.THROTTLE命令
        String[] args = {
            key, 
            String.valueOf(maxBurst),
            String.valueOf(count),
            String.valueOf(period),
            "1"  // 本次消耗1个令牌
        };
        
        // 执行命令
        List<Long> result = redisTemplate.execute(
            (RedisCallback<List<Long>>) connection -> {
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                byte[][] cmdArgs = new byte[args.length][];
                for (int i = 0; i < args.length; i++) {
                    cmdArgs[i] = serializer.serialize(args[i]);
                }
                return connection.execute(
                    "CL.THROTTLE", 
                    cmdArgs, 
                    RedisCellResponseParser.INSTANCE
                );
            }
        );
        
        if (result == null || result.size() < 5) {
            throw new RuntimeException("Redis Cell执行失败");
        }
        
        return RateLimitResult.builder()
            .allowed(result.get(0) == 0)
            .limit(result.get(1).intValue())
            .remaining(result.get(2).intValue())
            .retryAfter(result.get(3).intValue())
            .resetAfter(result.get(4).intValue())
            .build();
    }
    
    @Data
    @Builder
    public static class RateLimitResult {
        private boolean allowed;      // 是否允许
        private int limit;           // 总限制数
        private int remaining;       // 剩余令牌数
        private int retryAfter;      // 重试等待时间(秒)
        private int resetAfter;      // 重置时间(秒)
    }
}

3.5 优点

  • 性能极高:C语言实现,效率远超Lua脚本
  • 功能丰富:支持突发流量、返回重试时间
  • 官方维护:Redis官方模块,稳定可靠

四、 方案三:时间分片方案(内存优化版)

4.1 原理

将滑动窗口划分为多个时间片,每个时间片记录该时间段内的请求数。

滑动窗口:60秒
时间片:10秒(分为6个时间片)

存储结构:
key: "rate_limit:user:123"
value: {
  "timestamp": 1640995200,  // 当前窗口起始时间
  "counts": [5, 8, 12, 3, 9, 7]  // 6个时间片的计数
}

4.2 Lua脚本实现

-- 参数:
-- KEYS[1]: 限流键
-- ARGV[1]: 当前时间戳(秒)
-- ARGV[2]: 窗口大小(秒)
-- ARGV[3]: 时间片大小(秒)
-- ARGV[4]: 最大请求数
-- ARGV[5]: 本次请求消耗的令牌数(默认1)

local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local slice_size = tonumber(ARGV[3])
local max_requests = tonumber(ARGV[4])
local cost = tonumber(ARGV[5] or 1)

-- 计算时间片数量
local slice_count = math.floor(window / slice_size)

-- 获取当前数据
local data = redis.call('HGETALL', key)
local current_slice_index
local counts = {}

if #data == 0 then
    -- 第一次初始化
    current_slice_index = math.floor(now / slice_size) % slice_count
    for i = 0, slice_count-1 do
        counts[i] = 0
    end
else
    -- 解析现有数据
    local base_time = tonumber(data[2])
    current_slice_index = math.floor(now / slice_size) % slice_count
    local base_slice_index = math.floor(base_time / slice_size) % slice_count
    
    -- 初始化计数数组
    for i = 0, slice_count-1 do
        counts[i] = 0
    end
    
    -- 填充有效数据
    for i = 3, #data, 2 do
        local idx = tonumber(data[i])
        local count = tonumber(data[i+1])
        counts[idx] = count
    end
    
    -- 清理过期的时间片
    local elapsed_slices = (current_slice_index - base_slice_index + slice_count) % slice_count
    if elapsed_slices > 0 then
        for i = 1, elapsed_slices do
            local idx = (base_slice_index + i - 1) % slice_count
            counts[idx] = 0
        end
    end
end

-- 计算窗口内总请求数
local total = 0
for i = 0, slice_count-1 do
    total = total + counts[i]
end

-- 检查是否超过限制
if total + cost > max_requests then
    return {0, total, max_requests - total}  -- 拒绝
else
    -- 更新当前时间片的计数
    counts[current_slice_index] = counts[current_slice_index] + cost
    
    -- 保存回Redis
    local save_data = {}
    table.insert(save_data, "base_time")
    table.insert(save_data, tostring(now))
    
    for i = 0, slice_count-1 do
        table.insert(save_data, tostring(i))
        table.insert(save_data, tostring(counts[i]))
    end
    
    redis.call('HMSET', key, unpack(save_data))
    redis.call('EXPIRE', key, window + slice_size)
    
    return {1, total + cost, max_requests - (total + cost)}  -- 允许
end

4.3 优缺点

优点

  • 内存高效:无论多少请求,都只存储固定数量的时间片
  • 性能较好:比ZSet方案更轻量

缺点

  • 实现复杂:时间片管理和清理逻辑复杂
  • 精度有限:依赖时间片大小,不是完全精确的滑动窗口

五、 方案四:Redis Stream方案(分布式场景)

5.1 原理

利用Redis Stream作为时间窗口的请求日志,自动处理过期数据。

-- Lua脚本:使用Stream实现滑动窗口
local key = KEYS[1]  -- Stream键名
local now = tonumber(ARGV[1])  -- 当前时间戳(毫秒)
local window = tonumber(ARGV[2])  -- 窗口大小(毫秒)
local limit = tonumber(ARGV[3])  -- 限制数
local request_id = ARGV[4]  -- 请求ID

-- 1. 添加当前请求到Stream
redis.call('XADD', key, '*', 'request', request_id, 'time', now)

-- 2. 获取Stream长度(窗口内的请求数)
local stream_length = redis.call('XLEN', key)

if stream_length > limit then
    -- 3. 如果超过限制,尝试清理过期数据
    local min_id = tostring(now - window)  -- 最小有效ID
    
    -- 使用XRANGE获取最旧的消息
    local old_messages = redis.call('XRANGE', key, '-', '+', 'COUNT', 1)
    if #old_messages > 0 then
        local oldest_id = old_messages[1][1]
        local oldest_time = tonumber(string.sub(oldest_id, 1, string.find(oldest_id, '-') - 1))
        
        if oldest_time < now - window then
            -- 删除过期的消息
            redis.call('XTRIM', key, 'MINID', '~', min_id)
            
            -- 重新获取长度
            stream_length = redis.call('XLEN', key)
        end
    end
end

-- 4. 检查是否超过限制
if stream_length > limit then
    -- 删除刚刚添加的消息(因为超限了)
    local latest = redis.call('XRANGE', key, '+', '-', 'COUNT', 1)
    if #latest > 0 then
        redis.call('XDEL', key, latest[1][1])
    end
    return 0  -- 拒绝
else
    -- 设置Stream的过期时间
    redis.call('EXPIRE', key, math.ceil(window / 1000) + 1)
    return 1  -- 允许
end

六、 生产环境选型指南

方案 精确度 性能 内存效率 实现复杂度 适用场景
Sorted Set 中小流量,需要精确控制
Redis Cell 极高 生产环境首选
时间分片 大流量,内存敏感
Redis Stream 需要请求日志审计

6.1 推荐方案组合

# 不同场景的推荐方案
生产环境通用限流:
  推荐: Redis Cell (CL.THROTTLE)
  理由: 官方模块,性能最好,功能完善

自定义复杂规则:
  推荐: Sorted Set + Lua
  理由: 灵活,可完全自定义逻辑

海量用户/高并发:
  推荐: 时间分片方案
  理由: 内存效率高,可水平扩展

需要审计追踪:
  推荐: Redis Stream
  理由: 天然的消息日志,便于分析

6.2 Spring Boot集成示例

@Configuration
public class RateLimitConfig {
    
    @Bean
    public RedisCellRateLimiter redisCellRateLimiter() {
        return new RedisCellRateLimiter();
    }
    
    @Bean
    public FilterRegistrationBean<RateLimitFilter> rateLimitFilter() {
        FilterRegistrationBean<RateLimitFilter> registration = new FilterRegistrationBean<>();
        registration.setFilter(new RateLimitFilter());
        registration.addUrlPatterns("/api/*");
        registration.setOrder(1);
        return registration;
    }
}

@Slf4j
public class RateLimitFilter extends OncePerRequestFilter {
    
    @Autowired
    private RedisCellRateLimiter rateLimiter;
    
    @Override
    protected void doFilterInternal(HttpServletRequest request, 
                                   HttpServletResponse response, 
                                   FilterChain filterChain) throws ServletException, IOException {
        
        String clientIp = getClientIp(request);
        String apiPath = request.getRequestURI();
        String key = String.format("rate_limit:ip:%s:path:%s", clientIp, apiPath);
        
        // 配置:60秒内最多100次,突发容量20
        RedisCellRateLimiter.RateLimitResult result = rateLimiter.throttle(
            key, 20, 100, 60
        );
        
        if (!result.isAllowed()) {
            log.warn("限流触发: IP={}, Path={}, Remaining={}", 
                    clientIp, apiPath, result.getRemaining());
            
            response.setStatus(429);
            response.setContentType("application/json");
            response.setHeader("X-RateLimit-Limit", String.valueOf(result.getLimit()));
            response.setHeader("X-RateLimit-Remaining", String.valueOf(result.getRemaining()));
            response.setHeader("Retry-After", String.valueOf(result.getRetryAfter()));
            
            response.getWriter().write(String.format(
                "{\"code\":429,\"message\":\"请求过于频繁,请%d秒后重试\",\"retry_after\":%d}",
                result.getRetryAfter(), result.getRetryAfter()
            ));
            return;
        }
        
        // 添加限流头部信息
        response.setHeader("X-RateLimit-Limit", String.valueOf(result.getLimit()));
        response.setHeader("X-RateLimit-Remaining", String.valueOf(result.getRemaining()));
        response.setHeader("X-RateLimit-Reset", String.valueOf(result.getResetAfter()));
        
        filterChain.doFilter(request, response);
    }
    
    private String getClientIp(HttpServletRequest request) {
        String ip = request.getHeader("X-Forwarded-For");
        if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {
            ip = request.getRemoteAddr();
        }
        return ip.split(",")[0].trim();
    }
}

七、 高级特性与最佳实践

7.1 多维度限流

// 同时限制IP、用户、全局
public boolean multiDimensionLimit(HttpServletRequest request, String userId) {
    // 1. IP维度限流
    String ipKey = "rate_limit:ip:" + getClientIp(request) + ":global";
    // 2. 用户维度限流  
    String userKey = "rate_limit:user:" + userId + ":global";
    // 3. API全局限流
    String apiKey = "rate_limit:api:" + request.getRequestURI();
    
    // 任何一个维度限流都触发整体限流
    return checkLimit(ipKey, 1000, 60)  // IP: 60秒1000次
        && checkLimit(userKey, 500, 60)   // 用户: 60秒500次
        && checkLimit(apiKey, 10000, 60); // API: 60秒10000次
}

7.2 动态配置

@Component
public class DynamicRateLimiter {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    private final Map<String, RateLimitConfig> configCache = new ConcurrentHashMap<>();
    
    @Scheduled(fixedDelay = 30000) // 每30秒刷新配置
    public void refreshConfig() {
        // 从数据库或配置中心加载限流配置
        List<RateLimitConfig> configs = loadConfigFromDB();
        configCache.clear();
        configs.forEach(config -> 
            configCache.put(config.getKeyPattern(), config)
        );
    }
    
    public boolean checkLimit(String key, String path) {
        // 匹配最符合的配置
        RateLimitConfig config = findBestMatchConfig(path);
        if (config == null) return true;
        
        String limitKey = config.buildKey(key, path);
        return executeLimit(limitKey, config.getLimit(), config.getWindow());
    }
}

7.3 监控与告警

@Slf4j
@Component
public class RateLimitMonitor {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Scheduled(fixedRate = 60000) // 每分钟监控一次
    public void monitorRateLimit() {
        // 扫描所有限流key
        Set<String> keys = redisTemplate.keys("rate_limit:*");
        
        for (String key : keys) {
            // 获取拒绝率
            Long total = getTotalRequests(key);
            Long rejected = getRejectedRequests(key);
            
            if (total > 0) {
                double rejectRate = (double) rejected / total;
                if (rejectRate > 0.1) { // 拒绝率超过10%告警
                    log.warn("限流告警: key={}, 拒绝率={}%", key, rejectRate * 100);
                    sendAlert(key, rejectRate);
                }
            }
        }
    }
}

总结

基于Redis实现滑动窗口限流,Redis Cell模块(CL.THROTTLE)是生产环境的最优选择,它性能极高、功能完善。如果无法使用Redis模块,则Sorted Set + Lua脚本是次优选择。

关键要点

  1. 精确性优先:滑动窗口比固定窗口更精确,能更好地应对流量突发
  2. 原子性保证:必须使用Lua脚本保证Redis操作的原子性
  3. 内存管理:注意设置合理的过期时间,避免内存泄漏
  4. 监控告警:实时监控限流状态,及时发现异常
  5. 分层限流:实现IP、用户、API等多维度限流策略

滑动窗口限流的本质是在时间维度上滑动统计,既要保证精确性,又要兼顾性能。选择合适的方案,合理配置参数,才能在高并发系统中既保护服务稳定,又不误伤正常用户。