那些有趣的数据结构与算法(05)--限流

有时候限流也可以称为防刷, 这两者的界定并不是很明显, 常用的限流算法包括固定窗口, 滑动窗口, 漏桶以及令牌桶算法, 它们都有各自的优势与最适合的使用场景, 算法不分好坏, 之分场景。

1. 固定窗口

固定窗口属于最简单但是也最容易出现问题的限流策略, 假设某接口限制请求频率为10000/minute, 则统计一分钟内接口请求的总次数, 若次数大于10000, 则请求失败, 开始限流, 直到下一个一分钟开始。

使用Redis可以非常轻松的实现该功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 该功能可用Lua优化,详见Lua模块
from redis import Redis
redis = Redis("127.0.0.1", 6379)

def hit_user_access(api):
try:
redis_key = "restrict_access:"+api
access_number = redis.incr(redis_key)
if access_number == 1:
redis.expire(redis_key, 60)
if access_number > MAX_ACCESS_NUMBER:
return True
return False
except Exception as e:
# 日记记录
print(e)

借助于Redis的expire-key功能来实现”当前一分钟”和”下一个一分钟”, 若键过期, 则重新进行计数, 表示下一个一分钟开始了。

固定窗口用来做小流量的防刷比较适合, 但是并不适合作为整体系统的限流。 其原因就在于在这一分钟内接收流量并不一定是平均的。 攻击方可以在每一个一分钟开始的前1秒或几秒中疯狂攻击接口, 使得接口的请求数量在一开始就达到上限, 这样一来后续的正常用户将无法访问。

但是在一些简单的场景下, 例如单个用户的验证码发送条数限制, 当天密码输入失败的最大次数等等, 使用Redis实现的固定窗口不失为一个最佳选择。

2. 滑动窗口

固定窗口很像一步接着一步的走路, 两步之间没有间隙, 但是每一步之间不会重叠。 而滑动窗口则是”拖着脚”在走路, 下一步会部分的覆盖前一步所走过的路径。

如上图所示, 窗口大小为60s, 每过一秒向前移动一小步, 60s过后将会移动一整个窗口, 此时就像固定窗口移动一样。

滑动窗口在限流上其实使用的不是很多, 原因就在于滑动窗口也无法处理1s内请求过载的问题, 但是在监控告警上却是不二之选。

滑动窗口的最大优势就在于”重叠”, 因为窗口在滑动过程中, 势必会跨越前一分钟和后一分钟, 使得控制更加精细。

在具体的实现上, 通常会使用计算的方式来模拟窗口的向右滑动, 也可以说是”薛定谔的滑动”。 这里不考虑限流的滑动窗口实现, 而是转而实现监控告警的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
local key = KEYS[1]

local now_timestamp = tonumber(ARGV[1])
local window_size = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])

local should_clear = now_timestamp - window_size
redis.call("ZREMRANGEBYSCORE", key, 0, should_clear)

local amount = redis.call("ZCARD", key)
if amount < limit then
redis.call("ZADD", key, now_timestamp, now_timestamp)
end
redis.call("EXPIRE", key, window_size)

return amount < limit

在熔断器里面会有这样的技术细节: 5分钟内失败率达到某个阈值时进行熔断, 像这样的需求完全可以使用滑动窗口很好的实现, 不管是使用简单的单机实现, 还是使用Redis的分布式实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class MovingWindow(object):

def __init__(self, window, rate):
"""
:param window: 窗口大小
:param rate: 移动速率
"""
assert window > 0 and rate > 0, "window and rate should more than zero value"
assert isinstance(window, int) and isinstance(rate, int), "window and rate should be a Integer"
self.window = window
self.rate = rate

self.__window = [0] * self.window
self.__last_moving = int(time.time())

def __shift(self, step):
"""
:param step: 窗口向右滑动的距离
"""
if step > self.window:
self.__clear()
self.__window = self.__window[step:] + [0] * step

def __clock_shift(self):
"""
计算窗口应当滑动的距离
"""
now = int(time.time())
expire = now - self.__last_moving
step = int(expire / self.rate)
if step > 0:
self.__shift(step)
self.__last_moving = now

def incr(self, value):
# 首先将窗口滑动至正确位置
self.__clock_shift()
# 将值添加至窗口最后一个元素上即可
self.__window[-1] += value

def count(self):
"""
:return: 返回当前窗口计数总数
"""
self.__clock_shift()
return sum(self.__window)

def __clear(self):
self.__window = [0] * self.window

def __repr__(self):
return "window: {}".format(self.__window)

3. 令牌桶

令牌桶算法可能是生产用使用的较为广泛的限流算法, 一方面可以限制瞬时流量, 一方面也可以限制一段时间内的流量, 算是比较两全的算法。

令牌桶引入缓冲区, 按照一定的速率生成令牌, 并将其置于令牌桶中。 每一个请求首先尝试从令牌桶中获取令牌, 若无令牌可用, 则直接返回失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class TokenBucket(object):

def __init__(self, capacity, rate):
"""
:param capacity: 桶的容量
:param rate: 生成令牌的速率
"""
assert capacity > 0 and rate > 0, "capacity and rate should more than zero"
assert isinstance(capacity, int) and isinstance(rate, int), "capacity and rate should be integer"
self.capacity = capacity
self.rate = rate
self.__last_clock = int(time.time())
# 初始令牌数为0
self.__bucket = 0

def hit(self):
# 非并发安全的实现
now = int(time.time())
self.__bucket = min(self.capacity, (now - self.__last_clock) * self.rate)
if self.__bucket < 1:
return True
else:
self.__bucket -= 1
self.__last_clock = now
return False

4. 小结

无论是最简单的固定窗口, 还是稍微复杂一些的滑动窗口与令牌桶, 都有其适用的场景。 比如固定窗口适合限制具体的接口某个ip的访问次数, 滑动窗口用于记录一段时间内错误次数, 令牌桶用于秒杀场景下的限流。 在一个系统中综合运用这三种算法完全有可能, 只不过可能会根据业务场景的不同而进行稍加变动而已。