尝试一下用lua脚本执行redis命令
前言
对于应用内部的服务限流,熔断器Hystrix,作出很好的实践。最近开源的Sentinel也是一个不错的选择。
不过对于集群接口限流,好像没有什么框架。常用的限流算法就是:漏桶算法和令牌桶算法。
本文将利用redis和lua脚本实现令牌桶算法,同时通过spring来驱动脚本。
Guava提供了一个RateLimiter,我们也看一下。
当然还可以用计数器方法,如设定一个计数key,一秒过期,一秒内达到n次就拒绝服务。
两个算法
Google了十几个中文博客,每一篇都是一样的,都说两种算法能限制速率,但是漏桶算法不能应对突发流量而令牌桶可以,又没说为什么。在我看来,两种算法的算法不过是“一正一反”,本质是一样的。无奈英语渣看了一下维基百科"LeakyBucket"词条,Overview
里提到,有两种版本的漏桶算法:
漏桶算法(LeakyBucket)
- as a meter(作为计量器)
- as a queue(作为调度队列)
第一种版本是令牌桶算法的镜像实现,也就是描述起来不一样,原理一样。只要元素能放进桶,就是允许通过。
第二种版本是把桶作为队列,只有元素漏出桶,这个元素才算通过。因为漏桶的速率是恒定的,所以能起到流量整形的作用。
漏桶算法指一定速率漏水,流量进来的时候是把水加入桶里,当水> 容量的时候拒绝服务(或者其他策略balabala)。
两个重要参数:
- 漏水速率
- 桶容量
下面是计量器版本伪代码:
double rate; // leak rate in calls/s
double burst; // bucket size in calls
long refreshTime; // time for last water refresh
double water; // water count at refreshTime
refreshWater() {
long now = getTimestamp();
//计算两次请求之间流失的水并相减
water = max(0, water- (now - refreshTime)*rate);
refreshTime = now;
}
bool check() {
refreshWater();
if (water < burst) { // 水桶还没满,继续加1
water ++;
return true;
} else {
return false;
}
}
令牌桶算法(TokenBucket)
令牌桶算法是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据字节数目,并允许突发数据的发送,用令牌数量代表字节数量。
两个重要参数:
- 发牌速率
- 令牌桶容量
对于令牌不足的情况,对流量可以进行三种方式的处理:
- 丢弃数据包
- 放入等待队列直至令牌足够
- 进行标记,过载情况下可以进行丢弃
令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务(或者其他策略balabala)。从原理上看,令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”。
丢弃数据包的实现看楼下lua脚本
Guava的RateLimiter
就是使用了令牌桶算法。
redis实现
redis在2.6之后内置了对lua脚本的支持。通过lua脚本我们可以执行一些复杂的逻辑操作,同时保证整个操作过程的原子性。
lua脚本写在客户端,下面是一个lua脚本,应该很容易理解
--token_bucket.lua
--keys和argv都是数组
--tonumber()方法是转整数
--local表示本地变量,速度比全局变量快
--..表示拼接字符串
local key = KEYS[1];
--local limit = tonumber(ARGV[1]);
local step = tonumber(ARGV[2]);
local interval = tonumber(ARGV[3]);
local nowTime = tonumber(ARGV[4]);
local lastClearTimeKey = 'lastTimeOf' .. key
local lastClearTime = redis.call('GET', lastClearTimeKey);
local existKey = redis.call('EXISTS', key);
if existKey == 1 then
local diff = tonumber(nowTime) - tonumber(lastClearTime);
local value = tonumber(redis.call('GET', key));
if diff > interval then
local maxValue = value + diff / interval * step;
if maxValue > step then
value = step;
else
value = maxValue;
end
redis.call('SET', lastClearTimeKey, nowTime);
redis.call('SET', key, math.floor(value));
end
if value <= 0 then
return 0;
else
redis.call('DECR', key);
end
else
redis.call('SET', key, step - 1);
redis.call('SET', lastClearTimeKey, nowTime);
end
return 1;
使用spring驱动
由于使用的是spring-boot,redis-server在本地,所以只要引入spring-data-redis
和jedis
,其他配置默认
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
设置序列化器
@Bean
@SuppressWarnings("unchecked")
public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 设置键的序列化器
redisTemplate.setKeySerializer(new StringRedisSerializer());
// 默认Serializer,RedisTemplate在序列化key、反序列化返回值的时候找不到设置的Serializer会使用默认Serializer,而默认的默认Serializer是JdkSerializationRedisSerializer,这个会转成很难看的码可能导致lua执行出错
redisTemplate.setDefaultSerializer( new GenericJackson2JsonRedisSerializer());
return redisTemplate;
}
脚本的抽象类是DefaultRedisScript
, 使用RedisTemplate
调用。
keys
是List
类型,argvs
是数组类型,不可以搞混。
public class RedisScriptService {
private final static Logger log = LoggerFactory.getLogger(RedisScriptService.class);
@Resource
private RedisTemplate redisTemplate;
public void counterConsume(String key, int limit, int step, int interval) {
DefaultRedisScript<Long> consumeRedisScript = new DefaultRedisScript<>();
consumeRedisScript.setResultType(Long.class);
consumeRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/token_bucket.lua")));//加载lua脚本文件
List<Object> keyList = new LinkedList<>();
keyList.add(key);//通过KEYS[1]取值
for (int i = 0; i < 15; i++) {
log.info("result:" + redisTemplate.execute(consumeRedisScript, keyList, new Object[]{limit, step, interval, System.currentTimeMillis()}).toString());
}
}
}
2018-09-30 16:23:17.066 INFO 18660 --- [ main] c.h.learning.service.RedisScriptService : result:1
2018-09-30 16:23:17.067 INFO 18660 --- [ main] c.h.learning.service.RedisScriptService : result:1
2018-09-30 16:23:17.067 INFO 18660 --- [ main] c.h.learning.service.RedisScriptService : result:1
2018-09-30 16:23:17.068 INFO 18660 --- [ main] c.h.learning.service.RedisScriptService : result:1
2018-09-30 16:23:17.069 INFO 18660 --- [ main] c.h.learning.service.RedisScriptService : result:1
2018-09-30 16:23:17.070 INFO 18660 --- [ main] c.h.learning.service.RedisScriptService : result:1
2018-09-30 16:23:17.071 INFO 18660 --- [ main] c.h.learning.service.RedisScriptService : result:1
2018-09-30 16:23:17.072 INFO 18660 --- [ main] c.h.learning.service.RedisScriptService : result:1
2018-09-30 16:23:17.072 INFO 18660 --- [ main] c.h.learning.service.RedisScriptService : result:1
2018-09-30 16:23:17.073 INFO 18660 --- [ main] c.h.learning.service.RedisScriptService : result:1
2018-09-30 16:23:17.074 INFO 18660 --- [ main] c.h.learning.service.RedisScriptService : result:0
2018-09-30 16:23:17.075 INFO 18660 --- [ main] c.h.learning.service.RedisScriptService : result:0
2018-09-30 16:23:17.076 INFO 18660 --- [ main] c.h.learning.service.RedisScriptService : result:0
2018-09-30 16:23:17.077 INFO 18660 --- [ main] c.h.learning.service.RedisScriptService : result:0
2018-09-30 16:23:17.078 INFO 18660 --- [ main] c.h.learning.service.RedisScriptService : result:0
result
的1是通过,0是不通过。
键没有设置过期,可以优化。
Guava RateLimiter
Guava RateLimiter
实现了令牌桶算法。这里有一个中文版的官方文档:Guava官方文档,主要就是声明发牌速率,然后需要判断能否获取令牌,则调用tryAcquire()
或tryAcquire(int)
方法;需要阻塞直至令牌足够,则调用acquire()
或acquire(int)
。
qps限制器
假设我们要限制每秒处理task或者qps的值,代码:
public class RateLimiterTest {
public static void main(String[] args) {
RateLimiter rateLimiter = RateLimiter.create(5);
IntStream.range(0, 15).forEach((a) -> {
if (a == 5) {
SleepUtil.sleep(2000);
}
System.out.println(rateLimiter.acquire());
if ((a + 1) % 5 == 0) {
System.out.println();
}
});
}
}
0.0// 0令牌,还不用阻塞
0.143213
0.196686
0.198948
0.199056
0.0// 停了2秒,发了5个令牌
0.0
0.0
0.0
0.0
0.0// 0令牌,还不用阻塞
0.199505
0.198738
0.197634
0.196273
示例里先实例一个限流器,速率为1秒5次,通过acquire()
阻塞获得令牌,总调用15次,并返回等待时间。第5次acquire()
完的时候,挂起2秒。
- 从输出来看,第一次取得令牌是不用等待的。
- 挂起2秒后,接下来的5次不用阻塞,再接下来的5次除了以第一次发生了阻塞(第一次),也就是2秒内只发了5个令牌,不会累积。
为什么看起来0令牌的情况下,第一次调用阻塞时间都是0呢?那是因为RateLimiter
可以预消费(acquire()
实际上是调用acquire(1)
, 这和调用acquire(1000)
将得到相同的限制效果,如果存在这样的调用的话),但会影响下一次请求的,也就是说,如果一个高开销的任务抵达一个空闲的RateLimiter,它会被马上许可,但是下一个请求会经历额外的限制,从而来偿付高开销任务。
此外需注意:RateLimiter 并不提供公平性的保证,没有先来先得的概念。
这个类其实实现了令牌不足下多种应对策略
- require()属于流量整形的实现
- tryRequire()属于服务限流的实现
后记
除此之外,nginx也有限流模块,一种是限制连接数,一种是使用令牌桶算法,具体效果没实战。