kratos源码分析:限流器_kratos 限流

CSDN博客 · · 83 次点击 · · 开始浏览    

1. 为什么需要限流

无论是在单体服务中还是在微服务中,提供的API接口都是有访问上限的
当非预期的请求对系统压力过大,我们就必须考虑使用限流来丢弃部分请求,以保障服务整体可用,以防止压力超出系统承载上限而拖垮系统,比如遇到以下情况:

  1. 业务上:热点业务的突发请求
  2. 代码上:调用方bug导致的突发请求
  3. 安全上:被恶意攻击,例如:DDOS攻击

因此对公开暴露的接口最好都加上限流措施

2. 传统限流的问题

使用限流器的前提是必须知道自身的能够处理的最大并发数,一般在上线前通过压测得到qps,但这种方式有以下几个问题:

  • 每个接口的限流参数可能都不一样,需要限流的接口都要压测,流程繁琐
  • 压测一般是单接口压,但真实线上环境是多个接口一起提供服务,单接口压测的数据比实际能承受的QPS更大,使得qps数据不准确
  • 随着业务迭代,每个接口的处理能力往往也会随之变化,如果需要精确更新限流参数,每次上线前都需要进行压测,变得非常繁琐

那么有没有一种更加简洁的限流机制能实现最大限度的自我保护呢?
过载保护能智能地根据服务自身的负载情况判断是否需要开启保护,其目标为:

  1. 系统不会被请求压垮
  2. 在不被压垮的前提下,保持系统的吞吐量

那如何判断服务的负载是否达到过载的阈值呢?一般有以下几个指标:

  1. cpu负载(cpu load)(常用)
  2. 内存负载(mem load)
  3. 正在处理的请求数(inflight)

一个简单的思路是,只根据cpu负载指标作为是否放行请求的依据:

  • cpu负载小于阈值:就放行请求
  • cpu负载大于等于阈值:拒绝请求

但这样会有负载抖动的问题:因为load是一个结果,即load的统计有延迟性

  1. 当发现负载没达到阈值而放行时,可能放了很多请求过去,而这些请求在真正执行时才会把load推高,导致负载陡增,甚至超过系统负载
    1. 可以类比成非并发安全的check and do,check load没有超过阈值再放行do,但check和do没有原子性
  2. 当发现负载超过阈值时,此时会拒绝请求,但是等到负载小于阈值时,由于过去一段时间都没有放行请求,会造成一段空窗期,这段空窗期导致负载抖降,浪费了系统的处理能力

延迟性就是造成负载抖动的关键

3. kratos限流算法

因此在判断系统是否过载时,cpu load只作为开始保护的一个条件,真正的判断依据是正在处理的请求数inflight
由于正在处理的请求数的统计在请求开始前+1,请求结束后-1,没有延迟,不会造成上面提到的问题

那怎么判断inflight有没有超过阈值呢?就需要根据历史的请求数和平均响应时间来计算出这个阈值

先举个例子:
在这里插入图片描述

已知某大学每年招收2000名学生, 每名学生均需要4年才能毕业离校,请估算目前在你们大学就读的学生总人数
这道题目的答案是8000 (2000 * 4), 原因是一直保持着4年招收学生的人数,而每年学校招收2000人

抽象化来说:

  • L :系统中的存在的物体数量
  • W :物体在系统中的平均等待时间(逗留时间)
  • λ:物体进入系统的速率
    结合上面的案例, λ 就是大学每年招生的人数,W 就是每个学生为了毕业在学校里等待的时间(在学校这个系统里逗留的时间),L 就是目前学校里学生的总人数。
  • 物体进入系统的速率是 λ 个每秒(对比大学招生2000人每年)
  • 整个系统一共保留了W秒的物体(对比大学里一共保留了4个年级的学生)
  • 因此整个系统里有 λW 个物体(对比大学总共里一共有 2000 * 4 = 8000 个学生)

回到后端服务来说,L是吞吐量,W是平均请求耗时,λ是qps
如果cpu load达到80%后,过去一段时间的每秒的请求数qps * 每个请求平均耗时rt,就是最大吞吐量。如果inflight超过了这个值,就需要拒绝请求

kratos限流算法就采用了这个思路,其依靠下面 4 个指标共同确定:

  • cpu:根据EWMA(指数加权平均)计算得到的cpu使用率
    • EWMA会使得数据更平滑,减少噪音的干扰。同时时效性高,最新的数据有最高的权重
  • inflight:系统中正在处理的请求数量
  • pass: 过去一段时间的请求数
  • rt:过去一段时间的请求耗时

限流算法为:

  1. 如果cpu利用率超过80%,进入限流判定
  2. 如果cpu利用率小于80%,但最近1s内触发过限流,也进入限流判定
  3. 限流判定:判断系统中的请求数InFlight是否大于 吞吐量阈值 ( M a x P a s s   ∗   M i n R t   ∗   b u c k e t P e r S e c o n d   /   1000 ) (MaxPass \ * \ MinRt \ * \ bucketPerSecond \ / \ 1000) (MaxPass  MinRt  bucketPerSecond / 1000),如果是进行限流
    1. MaxPass:滑动窗口中,请求数最大的那个窗口的值
    2. MinRt:滑动窗口中,平均响应时间最小的那个窗口的值,单位毫秒
    3. bucketPerSecond:一秒有多少个滑动窗口
    4. 这样MaxPass * bucketPerSecond就是qps,MinRt/1000就是以秒为单位的请求耗时
    5. 整个算式的计算结果就是过去一段时间的最大吞吐量。如果当前系统中正在处理的请求数inflight大于该值,就要限流

4. kratos限流实现

本文涉及的源码:https://github.com/go-kratos/aegis,版本:v0.2.0

4.1 cpu使用率计算

init函数 中,启动了一个goroutine来计算 CPU使用率,这里使用了指数加权平均算法来修正偏差,其计算公式为:
c p u = c p u t − 1   ∗   d e c a y + c p u t   ∗   ( 1 − d e c a y ) cpu = cpu^{t-1} \ * \ decay + cpu^t \ * \ (1 - decay) cpu=cput1  decay+cput  (1decay)
其中dacay=0.95

func init() {  
    go cpuproc()  
}  
  
// cpu = cpuᵗ⁻¹ * decay + cpuᵗ * (1 - decay)  
func cpuproc() {  
    ticker := time.NewTicker(time.Millisecond * 500) // same to cpu sample rate  
    defer func() {  
       ticker.Stop()  
       if err := recover(); err != nil {  
          go cpuproc()  
       }  
    }()  
  
    for range ticker.C {  
       stat := &cpu.Stat{}
       // 获取真实的cpu使用率  
       cpu.ReadStat(stat)  
       stat.Usage = min(stat.Usage, 1000) 
       // 拿到上一次计算的cpu使用率 
       prevCPU := atomic.LoadInt64(&gCPU)
       // 写入本次计算的cpu使用率  
       curCPU := int64(float64(prevCPU)*decay + float64(stat.Usage)*(1.0-decay))  
       atomic.StoreInt64(&gCPU, curCPU)  
    }  
}

4.2 核心结构

type BBR struct {  
    cpu             cpuGetter  
    // 统计请求数的滑动窗口
    passStat        window.RollingCounter
    // 统计响应时间的滑动窗口  
    rtStat          window.RollingCounter 
    // 系统正在处理的请求数 
    inFlight        int64
    // 每秒多少个桶,默认为10  
    bucketPerSecond int64  
    bucketDuration  time.Duration  
  
    // 上次drop的时间  
    prevDropTime atomic.Value  
    maxPASSCache atomic.Value  
    minRtCache   atomic.Value  
  
    opts options  
}

4.3 计算maxPass

计算滑动窗口中,所有桶中pass的最大值

func (l *BBR) maxPASS() int64 {  
    passCache := l.maxPASSCache.Load()  
    if passCache != nil {  
       ps := passCache.(*counterCache)  
       // 距离上次计算不足一个时间单位,用上次计算的  
       if l.timespan(ps.time) < 1 {  
          return ps.val  
       }  
    }  
      
    // 计算所有桶中最大的pass值  
    rawMaxPass := int64(l.passStat.Reduce(func(iterator window.Iterator) float64 {  
       var result = 1.0  
       for i := 1; iterator.Next() && i < l.opts.Bucket; i++ {  
          bucket := iterator.Bucket()  
          count := 0.0  
          for _, p := range bucket.Points {  
             count += p  
          }  
          result = math.Max(result, count)  
       }  
       return result  
    }))  
  
    l.maxPASSCache.Store(&counterCache{  
       val:  rawMaxPass,  
       time: time.Now(),  
    })  
    return rawMaxPass  
}

4.4 计算minRT

计算滑动窗口中,所有桶中rt平均值的最小值

func (l *BBR) minRT() int64 {  
    rtCache := l.minRtCache.Load()  
    if rtCache != nil {  
       rc := rtCache.(*counterCache)  
       // 距离上次计算不足一个时间单位,用上次计算的  
       if l.timespan(rc.time) < 1 {  
          return rc.val  
       }  
    }  
      
    // 计算所有桶中最小的响应时间  
    rawMinRT := int64(math.Ceil(l.rtStat.Reduce(func(iterator window.Iterator) float64 {  
       var result = math.MaxFloat64  
       for i := 1; iterator.Next() && i < l.opts.Bucket; i++ {  
          bucket := iterator.Bucket()  
          if len(bucket.Points) == 0 {  
             continue  
          }  
          total := 0.0  
          for _, p := range bucket.Points {  
             total += p  
          }  
          avg := total / float64(bucket.Count)  
          result = math.Min(result, avg)  
       }  
       return result  
    })))  
    if rawMinRT <= 0 {  
       rawMinRT = 1  
    }  
      
    l.minRtCache.Store(&counterCache{  
       val:  rawMinRT,  
       time: time.Now(),  
    })  
    return rawMinRT  
}

4.5 maxFlight

计算最大吞吐量,等于每秒处理的请求(maxPASS() * bucketPerSecond),乘以请求在系统待了多少秒(minRT() / 1000)
其中maxPASS() 和 minRT() 根据滑动窗口动态计算

func (l *BBR) maxInFlight() int64 {  
    return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.bucketPerSecond)/1000.0) + 0.5)  
}

4.6 Allow

接下来看限流判定Allow方法:
在这里插入图片描述

  1. shouldDrop判断是否应该放行
  2. 如果可以放行,系统中的请求数inFlight+1
  3. 计算当前时间作为开始执行时间
  4. 返回一个func,用于请求完成后执行
    1. 计算请求的耗时,上报到滑动窗口
    2. 系统中的请求数inFlight-1
    3. 上报请求数+1到滑动窗口
func (l *BBR) Allow() (ratelimit.DoneFunc, error) {  
    if l.shouldDrop() {  
       return nil, ratelimit.ErrLimitExceed  
    }  
  
    // inFlight++  
    atomic.AddInt64(&l.inFlight, 1)  
    // 开始执行时间  
    start := time.Now().UnixNano()  
    ms := float64(time.Millisecond) 

    // 返回请求执行完毕后的回调方法
    return func(ratelimit.DoneInfo) {  
       //结束执行时间  
       if rt := int64(math.Ceil(float64(time.Now().UnixNano()-start)) / ms); rt > 0 {  
          // 上报本次执行耗时  
          l.rtStat.Add(rt)  
       }  
       // inFlight--  
       atomic.AddInt64(&l.inFlight, -1)  
       // 请求数+1  
       l.passStat.Add(1)  
    }, nil  
}

其中shouldDrop为判断是否需要限流:

  1. 如果cpu小于触发阈值,则判断距离上一次触发限流是否在1s内
    1. 在:继续判断inFlight是否超过maxFlight(),进而执行限流
    2. 不在:放行请求
  2. 判断inFlight是否超过maxFlight(),如果是拒绝请求,否则放行
func (l *BBR) shouldDrop() bool {  
    now := time.Duration(time.Now().UnixNano())  
  
    // cpu小于触发阈值  
    if l.cpu() < l.opts.CPUThreshold {  
       // 获取上一次drop的时间  
       prevDropTime, _ := l.prevDropTime.Load().(time.Duration)  
       if prevDropTime == 0 {         
          return false  
       }  
  
       // 当前时间距离上一次drop的实现小于1s,继续执行限流逻辑  
       if time.Duration(now-prevDropTime) <= time.Second {  
          // 判断inFlight都没有超过maxInFlight  
          inFlight := atomic.LoadInt64(&l.inFlight)  
          return inFlight > 1 && inFlight > l.maxInFlight()  
       }  
       l.prevDropTime.Store(time.Duration(0))  
       return false  
    }  
  
    // cpu使用率超过阈值,执行判定限流逻辑  
    inFlight := atomic.LoadInt64(&l.inFlight)  
    drop := inFlight > 1 && inFlight > l.maxInFlight()  
    if drop {  
       prevDrop, _ := l.prevDropTime.Load().(time.Duration)  
       if prevDrop != 0 {  
          return drop  
       }  
       // 存储上一次drop时间  
       l.prevDropTime.Store(now)  
    }  
    return drop  
}

本文来自:CSDN博客

感谢作者:CSDN博客

查看原文:kratos源码分析:限流器_kratos 限流

83 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传