理论+算法+实战,教你如何实现亿级流量下的分布式限流

作者: 小麦

更新时间:2022-03-26 14:35:24

4685 阅读

摘要:在互联网应用中,高并发系统会面临一个重大的挑战,那就是大量流高并发访问,比如:天猫的双十一、京东618、秒杀、抢购促销等,这些都是典型的大流量高并发场景。

本文分享自华为云社区《【高并发】如何实现亿级流量下的分布式限流?这些理论你必须掌握!!》,作者:冰 河。

在互联网应用中,高并发系统会面临一个重大的挑战,那就是大量流高并发访问,比如:天猫的双十一、京东618、秒杀、抢购促销等,这些都是典型的大流量高并发场景。

高并发系统限流

短时间内巨大的访问流量,我们如何让系统在处理高并发的同时还能保证自身系统的稳定性?有人会说,增加机器就可以了,因为我的系统是分布式的,所以可以只需要增加机器就可以解决问题了。但是,如果你通过增加机器还是不能解决这个问题怎么办呢?而且这种情况下又不能无限制的增加机器,服务器的硬件资源始终都是有限的,在有限的资源下,我们要应对这种大流量高并发的访问,就不得不采取一些其他的措施来保护我们的后端服务系统了,比如:缓存、异步、降级、限流、静态化等。

这里,我们先说说如何实现限流。

什么是限流?

在高并发系统中,限流通常指的是:对高并发访问或者请求进行限速或者对一个时间内的请求进行限速来保护我们的系统,一旦达到系统的限速规则(比如系统限制的请求速度),则可以采用下面的方式来处理这些请求。

  • 拒绝服务(友好提示或者跳转到错误页面)。
  • 排队或等待(比如秒杀系统)。
  • 服务降级(返回默认的兜底数据)。

其实,就是对请求进行限速,比如10r/s,即每秒只允许10个请求,这样就限制了请求的速度。从某种意义上说,限流,其实就是在一定频率上进行量的限制。

限流一般用来控制系统服务请求的速率,比如:天猫双十一的限流,京东618的限流,12306的抢票等。

限流有哪些使用场景?

这里,我们来举一个例子,假设你做了一个商城系统,某个节假日的时候,突然发现提交订单的接口请求比平时请求量突然上涨了将近50倍,没多久提交订单的接口就超时并且抛出了异常,几乎不可用了。而且,因为订单接口超时不可用,还导致了系统其它服务出现故障。

我们该如何应对这种大流量场景呢?一种典型的处理方案就是限流。当然了,除了限流之外,还有其他的处理方案,我们这篇文章就主要讲限流。

  • 对稀缺资源的秒杀、抢购;
  • 对数据库的高并发读写操作,比如提交订单,瞬间往数据库插入大量的数据;

限流可以说是处理高并发问题的利器,有了限流就可以不用担心瞬间高峰流量压垮系统服务或者服务雪崩,最终做到有损服务而不是不服务。

使用限流同样需要注意的是:限流要评估好,测试好,否则会导致正常的访问被限流。

计数器

计数器法

限流算法中最简单粗暴的一种算法,例如,某一个接口1分钟内的请求不超过60次,我们可以在开始时设置一个计数器,每次请求时,这个计数器的值加1,如果这个这个计数器的值大于60并且与第一次请求的时间间隔在1分钟之内,那么说明请求过多;如果该请求与第一次请求的时间间隔大于1分钟,并且该计数器的值还在限流范围内,那么重置该计数器。

使用计数器还可以用来限制一定时间内的总并发数,比如数据库连接池、线程池、秒杀的并发数;计数器限流只要一定时间内的总请求数超过设定的阀值则进行限流,是一种简单粗暴的总数量限流,而不是平均速率限流。

这个方法有一个致命问题:临界问题——当遇到恶意请求,在0:59时,瞬间请求100次,并且在1:00请求100次,那么这个用户在1秒内请求了200次,用户可以在重置节点突发请求,而瞬间超过我们设置的速率限制,用户可能通过算法漏洞击垮我们的应用。

这个问题我们可以使用滑动窗口解决。

滑动窗口

在上图中,整个红色矩形框是一个时间窗口,在我们的例子中,一个时间窗口就是1分钟,然后我们将时间窗口进行划分,如上图我们把滑动窗口划分为6格,所以每一格代表10秒,每超过10秒,我们的时间窗口就会向右滑动一格,每一格都有自己独立的计数器,例如:一个请求在0:35到达, 那么0:30到0:39的计数器会+1,那么滑动窗口是怎么解决临界点的问题呢?如上图,0:59到达的100个请求会在灰色区域格子中,而1:00到达的请求会在红色格子中,窗口会向右滑动一格,那么此时间窗口内的总请求数共200个,超过了限定的100,所以此时能够检测出来触发了限流。回头看看计数器算法,会发现,其实计数器算法就是窗口滑动算法,只不过计数器算法没有对时间窗口进行划分,所以是一格。

由此可见,当滑动窗口的格子划分越多,限流的统计就会越精确。

漏桶算法

算法的思路就是水(请求)先进入到漏桶里面,漏桶以恒定的速度流出,当水流的速度过大就会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。如下图所示。

漏桶算法不支持突发流量。

令牌桶算法

从上图中可以看出,令牌算法有点复杂,桶里存放着令牌token。桶一开始是空的,token以固定的速率r往桶里面填充,直到达到桶的容量,多余的token会被丢弃。每当一个请求过来时,就会尝试着移除一个token,如果没有token,请求无法通过。

令牌桶算法支持突发流量。

令牌桶算法实现

Guava框架提供了令牌桶算法的实现,可直接使用这个框架的RateLimiter类创建一个令牌桶限流器,比如:每秒放置的令牌桶的数量为5,那么RateLimiter对象可以保证1秒内不会放入超过5个令牌,并且以固定速率进行放置令牌,达到平滑输出的效果。

平滑流量示例

这里,我写了一个使用Guava框架实现令牌桶算法的示例,如下所示。

package io.binghe.limit.guava;

import com.google.common.util.concurrent.RateLimiter;

/**
 * @author binghe
 * @version 1.0.0
 * @description 令牌桶算法
 */
public class TokenBucketLimiter {
    public static void main(String[] args){
        //每秒钟生成5个令牌
        RateLimiter limiter = RateLimiter.create(5);

        //返回值表示从令牌桶中获取一个令牌所花费的时间,单位是秒
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
    }
}

代码的实现非常简单,就是使用Guava框架的RateLimiter类生成了一个每秒向桶中放入5个令牌的对象,然后不断从桶中获取令牌。我们先来运行下这段代码,输出的结果信息如下所示。

0.0
0.197294
0.191278
0.19997
0.199305
0.200472
0.200184
0.199417
0.200111
0.199759

从输出结果可以看出:第一次从桶中获取令牌时,返回的时间为0.0,也就是没耗费时间。之后每次从桶中获取令牌时,都会耗费一定的时间,这是为什么呢?按理说,向桶中放入了5个令牌后,再从桶中获取令牌也应该和第一次一样并不会花费时间啊!

因为在Guava的实现是这样的:我们使用RateLimiter.create(5)创建令牌桶对象时,表示每秒新增5个令牌,1秒等于1000毫秒,也就是每隔200毫秒向桶中放入一个令牌。

当我们运行程序时,程序运行到RateLimiter limiter = RateLimiter.create(5);时,就会向桶中放入一个令牌,当程序运行到第一个System.out.println(limiter.acquire(1));时,由于桶中已经存在一个令牌,直接获取这个令牌,并没有花费时间。然而程序继续向下执行时,由于程序会每隔200毫秒向桶中放入一个令牌,所以,获取令牌时,花费的时间几乎都是200毫秒左右。

突发流量示例

我们再来看一个突发流量的示例,代码示例如下所示。

package io.binghe.limit.guava;

import com.google.common.util.concurrent.RateLimiter;

/**
 * @author binghe
 * @version 1.0.0
 * @description 令牌桶算法
 */
public class TokenBucketLimiter {
    public static void main(String[] args){
        //每秒钟生成5个令牌
        RateLimiter limiter = RateLimiter.create(5);

        //返回值表示从令牌桶中获取一个令牌所花费的时间,单位是秒
        System.out.println(limiter.acquire(50));
        System.out.println(limiter.acquire(5));
        System.out.println(limiter.acquire(5));
        System.out.println(limiter.acquire(5));
        System.out.println(limiter.acquire(5));
    }
}

上述代码表示的含义为:每秒向桶中放入5个令牌,第一次从桶中获取50个令牌,也就是我们说的突发流量,后续每次从桶中获取5个令牌。接下来,我们运行上述代码看下效果。

0.0
9.998409
0.99109
1.000148
0.999752

运行代码时,会发现当命令行打印出0.0后,会等很久才会打印出后面的输出结果。

程序每秒钟向桶中放入5个令牌,当程序运行到 RateLimiter limiter = RateLimiter.create(5); 时,就会向桶中放入令牌。当运行到 System.out.println(limiter.acquire(50)); 时,发现很快就会获取到令牌,花费了0.0秒。接下来,运行到第一个System.out.println(limiter.acquire(5));时,花费了9.998409秒。小伙们可以思考下,为什么这里会花费10秒中的时间呢?

这是因为我们使用RateLimiter limiter = RateLimiter.create(5);代码向桶中放入令牌时,一秒钟放入5个,而System.out.println(limiter.acquire(50));需要获取50个令牌,也就是获取50个令牌需要花费10秒钟时间,这是因为程序向桶中放入50个令牌需要10秒钟。程序第一次从桶中获取令牌时,很快就获取到了。而第二次获取令牌时,花费了将近10秒的时间。

Guava框架支持突发流量,但是在突发流量之后再次请求时,会被限速,也就是说:在突发流量之后,再次请求时,会弥补处理突发请求所花费的时间。所以,我们的突发示例程序中,在一次从桶中获取50个令牌后,再次从桶中获取令牌,则会花费10秒左右的时间。

Guava令牌桶算法的特点

  • RateLimiter使用令牌桶算法,会进行令牌的累积,如果获取令牌的频率比较低,则不会导致等待,直接获取令牌。
  • RateLimiter由于会累积令牌,所以可以应对突发流量。也就是说如果同时请求5个令牌,由于此时令牌桶中有累积的令牌,能够快速响应请求。
  • RateLimiter在没有足够的令牌发放时,采用的是滞后的方式进行处理,也就是前一个请求获取令牌所需要等待的时间由下一次请求来承受和弥补,也就是代替前一个请求进行等待。(这里,小伙伴们要好好理解下)

HTTP接口限流实战

这里,我们实现Web接口限流,具体方式为:使用自定义注解封装基于令牌桶限流算法实现接口限流。

不使用注解实现接口限流

搭建项目

这里,我们使用SpringBoot项目来搭建Http接口限流项目,SpringBoot项目本质上还是一个Maven项目。所以,小伙伴们可以直接创建一个Maven项目,我这里的项目名称为mykit-ratelimiter-test。接下来,在pom.xml文件中添加如下依赖使项目构建为一个SpringBoot项目。

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
    </parent>

    <modelVersion>4.0.0</modelVersion>
    <groupId>io.mykit.limiter</groupId>
    <artifactId>mykit-ratelimiter-test</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>mykit-ratelimiter-test</name>

    <properties>
        <guava.version>28.2-jre</guava.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-undertow</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>${guava.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version><!--$NO-MVN-MAN-VER$-->
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

可以看到,我在项目中除了引用了SpringBoot相关的Jar包外,还引用了guava框架,版本为28.2-jre。

创建核心类

这里,我主要是模拟一个支付接口的限流场景。首先,我们定义一个PayService接口和MessageService接口。PayService接口主要用于模拟后续的支付业务,MessageService接口模拟发送消息。接口的定义分别如下所示。

  • PayService
package io.mykit.limiter.service;
import java.math.BigDecimal;
/**
 * @author binghe
 * @version 1.0.0
 * @description 模拟支付
 */
public interface PayService {
    int pay(BigDecimal price);
}
  • MessageService
package io.mykit.limiter.service;
/**
 * @author binghe
 * @version 1.0.0
 * @description 模拟发送消息服务
 */
public interface MessageService {
    boolean sendMessage(String message);
}

接下来,创建二者的实现类,分别如下。

  • MessageServiceImpl
package io.mykit.limiter.service.impl;
import io.mykit.limiter.service.MessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/**
 * @author binghe
 * @version 1.0.0
 * @description 模拟实现发送消息
 */
@Service
public class MessageServiceImpl implements MessageService {
    private final Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
    @Override
    public boolean sendMessage(String message) {
        logger.info("发送消息成功===>>" + message);
        return true;
    }
}
  • PayServiceImpl
package io.mykit.limiter.service.impl;
import io.mykit.limiter.service.PayService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
/**
 * @author binghe
 * @version 1.0.0
 * @description 模拟支付
 */
@Service
public class PayServiceImpl implements PayService {
    private final Logger logger = LoggerFactory.getLogger(PayServiceImpl.class);
    @Override
    public int pay(BigDecimal price) {
        logger.info("支付成功===>>" + price);
        return 1;
    }
}

由于是模拟支付和发送消息,所以,我在具体实现的方法中打印出了相关的日志,并没有实现具体的业务逻辑。

接下来,就是创建我们的Controller类PayController,在PayController类的接口pay()方法中使用了限流,每秒钟向桶中放入2个令牌,并且客户端从桶中获取令牌,如果在500毫秒内没有获取到令牌的话,我们可以则直接走服务降级处理。

PayController的代码如下所示。

package io.mykit.limiter.controller;
import com.google.common.util.concurrent.RateLimiter;
import io.mykit.limiter.service.MessageService;
import io.mykit.limiter.service.PayService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;

/**
 * @author binghe
 * @version 1.0.0
 * @description 测试接口限流
 */
@RestController
public class PayController {
    private final Logger logger = LoggerFactory.getLogger(PayController.class);
    /**
     * RateLimiter的create()方法中传入一个参数,表示以固定的速率2r/s,即以每秒2个令牌的速率向桶中放入令牌
     */
    private RateLimiter rateLimiter = RateLimiter.create(2);

    @Autowired
    private MessageService messageService;
    @Autowired
    private PayService payService;
    @RequestMapping("/boot/pay")
    public String pay(){
        //记录返回接口
        String result = "";
        //限流处理,客户端请求从桶中获取令牌,如果在500毫秒没有获取到令牌,则直接走服务降级处理
        boolean tryAcquire = rateLimiter.tryAcquire(500, TimeUnit.MILLISECONDS);
        if (!tryAcquire){
            result = "请求过多,降级处理";
            logger.info(result);
            return result;
        }
        int ret = payService.pay(BigDecimal.valueOf(100.0));
        if(ret > 0){
            result = "支付成功";
            return result;
        }
        result = "支付失败,再试一次吧...";
        return result;
    }
}

最后,我们来创建mykit-ratelimiter-test项目的核心启动类,如下所示。

package io.mykit.limiter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author binghe
 * @version 1.0.0
 * @description 项目启动类
 */
@SpringBootApplication
public class MykitLimiterApplication {

    public static void main(String[] args){
        SpringApplication.run(MykitLimiterApplication.class, args);
    }
}

至此,我们不使用注解方式实现限流的Web应用就基本完成了。

运行项目

项目创建完成后,我们来运行项目,运行SpringBoot项目比较简单,直接运行MykitLimiterApplication类的main()方法即可。

项目运行成功后,我们在浏览器地址栏输入链接:http://localhost:8080/boot/pay。页面会输出“支付成功”的字样,说明项目搭建成功了。如下所示。

此时,我只访问了一次,并没有触发限流。接下来,我们不停的刷浏览器,此时,浏览器会输出“支付失败,再试一次吧…”的字样,如下所示。

在PayController类中还有一个sendMessage()方法,模拟的是发送消息的接口,同样使用了限流操作,具体代码如下所示。

@RequestMapping("/boot/send/message")
public String sendMessage(){
    //记录返回接口
    String result = "";
    //限流处理,客户端请求从桶中获取令牌,如果在500毫秒没有获取到令牌,则直接走服务降级处理
    boolean tryAcquire = rateLimiter.tryAcquire(500, TimeUnit.MILLISECONDS);
    if (!tryAcquire){
        result = "请求过多,降级处理";
        logger.info(result);
        return result;
    }
    boolean flag = messageService.sendMessage("恭喜您成长值+1");
    if (flag){
        result = "消息发送成功";
        return result;
    }
    result = "消息发送失败,再试一次吧...";
    return result;
}

sendMessage()方法的代码逻辑和运行效果与pay()方法相同,我就不再浏览器访问 http://localhost:8080/boot/send/message 地址的访问效果了,小伙伴们可以自行验证。

不使用注解实现限流缺点

通过对项目的编写,我们可以发现,当在项目中对接口进行限流时,不使用注解进行开发,会导致代码出现大量冗余,每个方法中几乎都要写一段相同的限流逻辑,代码十分冗余。

如何解决代码冗余的问题呢?我们可以使用自定义注解进行实现。

使用注解实现接口限流

使用自定义注解,我们可以将一些通用的业务逻辑封装到注解的切面中,在需要添加注解业务逻辑的方法上加上相应的注解即可。针对我们这个限流的实例来说,可以基于自定义注解实现。

实现自定义注解

实现,我们来创建一个自定义注解,如下所示。

package io.mykit.limiter.annotation;
import java.lang.annotation.*;
/**
 * @author binghe
 * @version 1.0.0
 * @description 实现限流的自定义注解
 */
@Target(value = ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyRateLimiter {
    //向令牌桶放入令牌的速率
    double rate();
    //从令牌桶获取令牌的超时时间
    long timeout() default 0;
}

自定义注解切面实现

接下来,我们还要实现一个切面类MyRateLimiterAspect,如下所示。

package io.mykit.limiter.aspect;

import com.google.common.util.concurrent.RateLimiter;
import io.mykit.limiter.annotation.MyRateLimiter;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.TimeUnit;

/**
 * @author binghe
 * @version 1.0.0
 * @description 一般限流切面类
 */
@Aspect
@Component
public class MyRateLimiterAspect {

    private RateLimiter rateLimiter = RateLimiter.create(2);

    @Pointcut("execution(public * io.mykit.limiter.controller.*.*(..))")
    public void pointcut(){

    }

    /**
     * 核心切面方法
     */
    @Around("pointcut()")
    public Object process(ProceedingJoinPoint proceedingJoinPoint) throws Throwable{
        MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();

        //使用反射获取方法上是否存在@MyRateLimiter注解
        MyRateLimiter myRateLimiter = signature.getMethod().getDeclaredAnnotation(MyRateLimiter.class);
        if(myRateLimiter == null){
            //程序正常执行,执行目标方法
            return proceedingJoinPoint.proceed();
        }
        //获取注解上的参数
        //获取配置的速率
        double rate = myRateLimiter.rate();
        //获取客户端等待令牌的时间
        long timeout = myRateLimiter.timeout();

        //设置限流速率
        rateLimiter.setRate(rate);

        //判断客户端获取令牌是否超时
        boolean tryAcquire = rateLimiter.tryAcquire(timeout, TimeUnit.MILLISECONDS);
        if(!tryAcquire){
            //服务降级
            fullback();
            return null;
        }
        //获取到令牌,直接执行
        return proceedingJoinPoint.proceed();

    }

    /**
     * 降级处理
     */
    private void fullback() {
        response.setHeader("Content-type", "text/html;charset=UTF-8");
        PrintWriter writer = null;
        try {
            writer =  response.getWriter();
            writer.println("出错了,重试一次试试?");
            writer.flush();;
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(writer != null){
                writer.close();
            }
        }
    }
}

自定义切面的功能比较简单,我就不细说了,大家有啥问题可以关注【冰河技术】微信公众号来进行提问。

接下来,我们改造下PayController类中的sendMessage()方法,修改后的方法片段代码如下所示。

@MyRateLimiter(rate = 1.0, timeout = 500)
@RequestMapping("/boot/send/message")
public String sendMessage(){
    //记录返回接口
    String result = "";
    boolean flag = messageService.sendMessage("恭喜您成长值+1");
    if (flag){
        result = "消息发送成功";
        return result;
    }
    result = "消息发送失败,再试一次吧...";
    return result;
}

运行部署项目

部署项目比较简单,只需要运行MykitLimiterApplication类下的main()方法即可。这里,为了简单,我们还是从浏览器中直接输入链接地址来进行访问

效果如下所示。

接下来,我们不断的刷新浏览器。会出现“消息发送失败,再试一次吧…”的字样,说明已经触发限流操作。

  • 项目源码已提交到github:

 

点击关注,第一时间了解华为云新鲜技术~

版权声明:本文著作权归作者【小麦 】所有,不代表本网站立场。

侵权请联系:root_email@163.com

相关推荐