尝试一下用lua脚本执行redis命令

前言

对于应用内部的服务限流,熔断器Hystrix,作出很好的实践。最近开源的Sentinel也是一个不错的选择。

不过对于集群接口限流,好像没有什么框架。常用的限流算法就是:漏桶算法和令牌桶算法。
本文将利用redis和lua脚本实现令牌桶算法,同时通过spring来驱动脚本。
Guava提供了一个RateLimiter,我们也看一下。

当然还可以用计数器方法,如设定一个计数key,一秒过期,一秒内达到n次就拒绝服务。

两个算法

Google了十几个中文博客,每一篇都是一样的,都说两种算法能限制速率,但是漏桶算法不能应对突发流量而令牌桶可以,又没说为什么。在我看来,两种算法的算法不过是“一正一反”,本质是一样的。无奈英语渣看了一下维基百科"LeakyBucket"词条,Overview里提到,有两种版本的漏桶算法:

漏桶算法(LeakyBucket)

  1. as a meter(作为计量器)
  2. as a queue(作为调度队列)

第一种版本是令牌桶算法的镜像实现,也就是描述起来不一样,原理一样。只要元素能放进桶,就是允许通过。

第二种版本是把桶作为队列,只有元素漏出桶,这个元素才算通过。因为漏桶的速率是恒定的,所以能起到流量整形的作用。

upload successful

漏桶算法指一定速率漏水,流量进来的时候是把水加入桶里,当水> 容量的时候拒绝服务(或者其他策略balabala)。

两个重要参数:

  1. 漏水速率
  2. 桶容量

下面是计量器版本伪代码:

double rate; // leak rate in calls/s
double burst; // bucket size in calls
long refreshTime; // time for last water refresh
double water; // water count at refreshTime
refreshWater() {
    long now = getTimestamp();
    //计算两次请求之间流失的水并相减
    water = max(0, water- (now - refreshTime)*rate);
    refreshTime = now;
}
bool check() {
    refreshWater();
    if (water < burst) { // 水桶还没满,继续加1
        water ++;
        return true;
    } else {
        return false;
    }
}

令牌桶算法(TokenBucket)

upload successful

令牌桶算法是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据字节数目,并允许突发数据的发送,用令牌数量代表字节数量。

两个重要参数:

  1. 发牌速率
  2. 令牌桶容量

对于令牌不足的情况,对流量可以进行三种方式的处理:

  1. 丢弃数据包
  2. 放入等待队列直至令牌足够
  3. 进行标记,过载情况下可以进行丢弃

令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务(或者其他策略balabala)。从原理上看,令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”。

丢弃数据包的实现看楼下lua脚本

Guava的RateLimiter就是使用了令牌桶算法。

redis实现

redis在2.6之后内置了对lua脚本的支持。通过lua脚本我们可以执行一些复杂的逻辑操作,同时保证整个操作过程的原子性。

lua脚本写在客户端,下面是一个lua脚本,应该很容易理解

--token_bucket.lua
--keys和argv都是数组
--tonumber()方法是转整数
--local表示本地变量,速度比全局变量快
--..表示拼接字符串

local key = KEYS[1];
--local limit = tonumber(ARGV[1]);
local step = tonumber(ARGV[2]);
local interval = tonumber(ARGV[3]);
local nowTime = tonumber(ARGV[4]);

local lastClearTimeKey = 'lastTimeOf' .. key
local lastClearTime = redis.call('GET', lastClearTimeKey);
local existKey = redis.call('EXISTS', key);

if existKey == 1 then
    local diff = tonumber(nowTime) - tonumber(lastClearTime);
    local value = tonumber(redis.call('GET', key));
    if diff > interval then
        local maxValue = value + diff / interval * step;
        if maxValue > step then
            value = step;
        else
            value = maxValue;
        end
        redis.call('SET', lastClearTimeKey, nowTime);
        redis.call('SET', key, math.floor(value));
    end

    if value <= 0 then
        return 0;
    else
        redis.call('DECR', key);
    end
else
    redis.call('SET', key, step - 1);
    redis.call('SET', lastClearTimeKey, nowTime);
end
return 1;

使用spring驱动

由于使用的是spring-boot,redis-server在本地,所以只要引入spring-data-redisjedis,其他配置默认

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-redis</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

设置序列化器

    @Bean
    @SuppressWarnings("unchecked")
    public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 设置键的序列化器
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        // 默认Serializer,RedisTemplate在序列化key、反序列化返回值的时候找不到设置的Serializer会使用默认Serializer,而默认的默认Serializer是JdkSerializationRedisSerializer,这个会转成很难看的码可能导致lua执行出错
        redisTemplate.setDefaultSerializer( new GenericJackson2JsonRedisSerializer());
        return redisTemplate;
    }

脚本的抽象类是DefaultRedisScript, 使用RedisTemplate调用。 keysList类型,argvs是数组类型,不可以搞混。

public class RedisScriptService {
    private final static Logger log = LoggerFactory.getLogger(RedisScriptService.class);

    @Resource
    private RedisTemplate redisTemplate;

    public void counterConsume(String key, int limit, int step, int interval) {
        DefaultRedisScript<Long> consumeRedisScript = new DefaultRedisScript<>();
        consumeRedisScript.setResultType(Long.class);
        consumeRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/token_bucket.lua")));//加载lua脚本文件

        List<Object> keyList = new LinkedList<>();
        keyList.add(key);//通过KEYS[1]取值
        for (int i = 0; i < 15; i++) {
            log.info("result:" + redisTemplate.execute(consumeRedisScript, keyList, new Object[]{limit, step, interval, System.currentTimeMillis()}).toString());
        }
    }
}
2018-09-30 16:23:17.066  INFO 18660 --- [           main] c.h.learning.service.RedisScriptService  : result:1
2018-09-30 16:23:17.067  INFO 18660 --- [           main] c.h.learning.service.RedisScriptService  : result:1
2018-09-30 16:23:17.067  INFO 18660 --- [           main] c.h.learning.service.RedisScriptService  : result:1
2018-09-30 16:23:17.068  INFO 18660 --- [           main] c.h.learning.service.RedisScriptService  : result:1
2018-09-30 16:23:17.069  INFO 18660 --- [           main] c.h.learning.service.RedisScriptService  : result:1
2018-09-30 16:23:17.070  INFO 18660 --- [           main] c.h.learning.service.RedisScriptService  : result:1
2018-09-30 16:23:17.071  INFO 18660 --- [           main] c.h.learning.service.RedisScriptService  : result:1
2018-09-30 16:23:17.072  INFO 18660 --- [           main] c.h.learning.service.RedisScriptService  : result:1
2018-09-30 16:23:17.072  INFO 18660 --- [           main] c.h.learning.service.RedisScriptService  : result:1
2018-09-30 16:23:17.073  INFO 18660 --- [           main] c.h.learning.service.RedisScriptService  : result:1
2018-09-30 16:23:17.074  INFO 18660 --- [           main] c.h.learning.service.RedisScriptService  : result:0
2018-09-30 16:23:17.075  INFO 18660 --- [           main] c.h.learning.service.RedisScriptService  : result:0
2018-09-30 16:23:17.076  INFO 18660 --- [           main] c.h.learning.service.RedisScriptService  : result:0
2018-09-30 16:23:17.077  INFO 18660 --- [           main] c.h.learning.service.RedisScriptService  : result:0
2018-09-30 16:23:17.078  INFO 18660 --- [           main] c.h.learning.service.RedisScriptService  : result:0

result的1是通过,0是不通过。

键没有设置过期,可以优化。

Guava RateLimiter

Guava RateLimiter实现了令牌桶算法。这里有一个中文版的官方文档:Guava官方文档,主要就是声明发牌速率,然后需要判断能否获取令牌,则调用tryAcquire()tryAcquire(int)方法;需要阻塞直至令牌足够,则调用acquire()acquire(int)

qps限制器

假设我们要限制每秒处理task或者qps的值,代码:

public class RateLimiterTest {
    public static void main(String[] args) {
        RateLimiter rateLimiter = RateLimiter.create(5);
        IntStream.range(0, 15).forEach((a) -> {
            if (a == 5) {
                SleepUtil.sleep(2000);
            }
            System.out.println(rateLimiter.acquire());
            if ((a + 1) % 5 == 0) {
                System.out.println();
            }
        });
    }
}
0.0// 0令牌,还不用阻塞
0.143213
0.196686
0.198948
0.199056

0.0// 停了2秒,发了5个令牌
0.0
0.0
0.0
0.0

0.0// 0令牌,还不用阻塞
0.199505
0.198738
0.197634
0.196273

示例里先实例一个限流器,速率为1秒5次,通过acquire()阻塞获得令牌,总调用15次,并返回等待时间。第5次acquire()完的时候,挂起2秒。

  • 从输出来看,第一次取得令牌是不用等待的。
  • 挂起2秒后,接下来的5次不用阻塞,再接下来的5次除了以第一次发生了阻塞(第一次),也就是2秒内只发了5个令牌,不会累积。

为什么看起来0令牌的情况下,第一次调用阻塞时间都是0呢?那是因为RateLimiter可以预消费(acquire()实际上是调用acquire(1), 这和调用acquire(1000) 将得到相同的限制效果,如果存在这样的调用的话),但会影响下一次请求的,也就是说,如果一个高开销的任务抵达一个空闲的RateLimiter,它会被马上许可,但是下一个请求会经历额外的限制,从而来偿付高开销任务。

此外需注意:RateLimiter 并不提供公平性的保证,没有先来先得的概念。

这个类其实实现了令牌不足下多种应对策略

  • require()属于流量整形的实现
  • tryRequire()属于服务限流的实现

后记

除此之外,nginx也有限流模块,一种是限制连接数,一种是使用令牌桶算法,具体效果没实战。

参考

  1. Leaky bucket