為什么需要降載
微服務集群中,調(diào)用鏈路錯綜復雜,作為服務提供者需要有一種保護自己的機制,防止調(diào)用方無腦調(diào)用壓垮自己,保證自身服務的高可用。
最常見的保護機制莫過于限流機制,使用限流器的前提是必須知道自身的能夠處理的最大并發(fā)數(shù),一般在上線前通過壓測來得到最大并發(fā)數(shù),而且日常請求過程中每個接口的限流參數(shù)都不一樣,同時系統(tǒng)一直在不斷的迭代其處理能力往往也會隨之變化,每次上線前都需要進行壓測然后調(diào)整限流參數(shù)變得非常繁瑣。
那么有沒有一種更加簡潔的限流機制能實現(xiàn)最大限度的自我保護呢?
什么是自適應降載
自適應降載能非常智能的保護服務自身,根據(jù)服務自身的系統(tǒng)負載動態(tài)判斷是否需要降載。
設(shè)計目標:
- 保證系統(tǒng)不被拖垮。
- 在系統(tǒng)穩(wěn)定的前提下,保持系統(tǒng)的吞吐量。
那么關(guān)鍵就在于如何衡量服務自身的負載呢?
判斷高負載主要取決于兩個指標:
- cpu 是否過載。
- 最大并發(fā)數(shù)是否過載。
以上兩點同時滿足時則說明服務處于高負載狀態(tài),則進行自適應降載。
同時也應該注意高并發(fā)場景 cpu 負載、并發(fā)數(shù)往往波動比較大,從數(shù)據(jù)上我們稱這種現(xiàn)象為毛刺,毛刺現(xiàn)象可能會導致系統(tǒng)一直在頻繁的進行自動降載操作,所以我們一般獲取一段時間內(nèi)的指標均值來使指標更加平滑。實現(xiàn)上可以采用準確的記錄一段時間內(nèi)的指標然后直接計算平均值,但是需要占用一定的系統(tǒng)資源。
統(tǒng)計學上有一種算法:滑動平均(exponential moving average),可以用來估算變量的局部均值,使得變量的更新與歷史一段時間的歷史取值有關(guān),無需記錄所有的歷史局部變量就可以實現(xiàn)平均值估算,非常節(jié)省寶貴的服務器資源。
滑動平均算法原理 參考這篇文章講的非常清楚。
變量 V 在 t 時刻記為 Vt,θt 為變量 V 在 t 時刻的取值,即在不使用滑動平均模型時 Vt=θt,在使用滑動平均模型后,Vt 的更新公式如下:
Vt=β?Vt?1+(1?β)?θt
- β = 0 時 Vt = θt
- β = 0.9 時,大致相當于過去 10 個 θt 值的平均
- β = 0.99 時,大致相當于過去 100 個 θt 值的平均
代碼實現(xiàn)
接下來我們來看下 go-zero 自適應降載的代碼實現(xiàn)。
core/load/adaptiveshedder.go
自適應降載接口定義:
// 回調(diào)函數(shù)Promise interface { // 請求成功時回調(diào)此函數(shù) Pass() // 請求失敗時回調(diào)此函數(shù) Fail()}// 降載接口定義Shedder interface { // 降載檢查 // 1. 允許調(diào)用,需手動執(zhí)行 Promise.accept()/reject()上報實際執(zhí)行任務結(jié)構(gòu) // 2. 拒絕調(diào)用,將會直接返回err:服務過載錯誤 ErrServiceOverloaded Allow() (Promise, error)}
接口定義非常精簡意味使用起來其實非常簡單,對外暴露一個`Allow()(Promise,error)。
go-zero 使用示例:
業(yè)務中只需調(diào)該方法判斷是否降載,如果被降載則直接結(jié)束流程,否則執(zhí)行業(yè)務最后使用返回值 Promise 根據(jù)執(zhí)行結(jié)果回調(diào)結(jié)果即可。
func UnarySheddingInterceptor(shedder load.Shedder, metrics *stat.Metrics) grpc.UnaryServerInterceptor { ensureSheddingStat() return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (val interface{}, err error) { sheddingStat.IncrementTotal() var promise load.Promise // 檢查是否被降載 promise, err = shedder.Allow() // 降載,記錄相關(guān)日志與指標 if err != nil { metrics.AddDrop() sheddingStat.IncrementDrop() return } // 最后回調(diào)執(zhí)行結(jié)果 defer func() { // 執(zhí)行失敗 if err == context.DeadlineExceeded { promise.Fail() // 執(zhí)行成功 } else { sheddingStat.IncrementPass() promise.Pass() } }() // 執(zhí)行業(yè)務方法 return handler(ctx, req) }}
接口實現(xiàn)類定義 :
主要包含三類屬性
- cpu 負載閾值:超過此值意味著 cpu 處于高負載狀態(tài)。
- 冷卻期:假如服務之前被降載過,那么將進入冷卻期,目的在于防止降載過程中負載還未降下來立馬加壓導致來回抖動。因為降低負載需要一定的時間,處于冷卻期內(nèi)應該繼續(xù)檢查并發(fā)數(shù)是否超過限制,超過限制則繼續(xù)丟棄請求。
- 并發(fā)數(shù):當前正在處理的并發(fā)數(shù),當前正在處理的并發(fā)平均數(shù),以及最近一段內(nèi)的請求數(shù)與響應時間,目的是為了計算當前正在處理的并發(fā)數(shù)是否大于系統(tǒng)可承載的最大并發(fā)數(shù)。
// option參數(shù)模式ShedderOption func(opts *shedderOptions)// 可選配置參數(shù)shedderOptions struct { // 滑動時間窗口大小 window time.Duration // 滑動時間窗口數(shù)量 buckets int // cpu負載臨界值 cpuThreshold int64}// 自適應降載結(jié)構(gòu)體,需實現(xiàn) Shedder 接口adaptiveShedder struct { // cpu負載臨界值 // 高于臨界值代表高負載需要降載保證服務 cpuThreshold int64 // 1s內(nèi)有多少個桶 windows int64 // 并發(fā)數(shù) flying int64 // 滑動平滑并發(fā)數(shù) avgFlying float64 // 自旋鎖,一個服務共用一個降載 // 統(tǒng)計當前正在處理的請求數(shù)時必須加鎖 // 無損并發(fā),提高性能 avgFlyingLock syncx.SpinLock // 最后一次拒絕時間 dropTime *syncx.AtomicDuration // 最近是否被拒絕過 droppedRecently *syncx.AtomicBool // 請求數(shù)統(tǒng)計,通過滑動時間窗口記錄最近一段時間內(nèi)指標 passCounter *collection.RollingWindow // 響應時間統(tǒng)計,通過滑動時間窗口記錄最近一段時間內(nèi)指標 rtCounter *collection.RollingWindow}
自適應降載構(gòu)造器:
func NewAdaptiveShedder(opts ...ShedderOption) Shedder { // 為了保證代碼統(tǒng)一 // 當開發(fā)者關(guān)閉時返回默認的空實現(xiàn),實現(xiàn)代碼統(tǒng)一 // go-zero很多地方都采用了這種設(shè)計,比如Breaker,日志組件 if !enabled.True() { return newNopShedder() } // options模式設(shè)置可選配置參數(shù) options := shedderOptions{ // 默認統(tǒng)計最近5s內(nèi)數(shù)據(jù) window: defaultWindow, // 默認桶數(shù)量50個 buckets: defaultBuckets, // cpu負載 cpuThreshold: defaultCpuThreshold, } for _, opt := range opts { opt(&options) } // 計算每個窗口間隔時間,默認為100ms bucketDuration := options.window / time.Duration(options.buckets) return &adaptiveShedder{ // cpu負載 cpuThreshold: options.cpuThreshold, // 1s的時間內(nèi)包含多少個滑動窗口單元 windows: int64(time.Second / bucketDuration), // 最近一次拒絕時間 dropTime: syncx.NewAtomicDuration(), // 最近是否被拒絕過 droppedRecently: syncx.NewAtomicBool(), // qps統(tǒng)計,滑動時間窗口 // 忽略當前正在寫入窗口(桶),時間周期不完整可能導致數(shù)據(jù)異常 passCounter: collection.NewRollingWindow(options.buckets, bucketDuration, collection.IgnoreCurrentBucket()), // 響應時間統(tǒng)計,滑動時間窗口 // 忽略當前正在寫入窗口(桶),時間周期不完整可能導致數(shù)據(jù)異常 rtCounter: collection.NewRollingWindow(options.buckets, bucketDuration, collection.IgnoreCurrentBucket()), }}
降載檢查 Allow()
:
檢查當前請求是否應該被丟棄,被丟棄業(yè)務側(cè)需要直接中斷請求保護服務,也意味著降載生效同時進入冷卻期。如果放行則返回 promise,等待業(yè)務側(cè)執(zhí)行回調(diào)函數(shù)執(zhí)行指標統(tǒng)計。
// 降載檢查func (as *adaptiveShedder) Allow() (Promise, error) { // 檢查請求是否被丟棄 if as.shouldDrop() { // 設(shè)置drop時間 as.dropTime.Set(timex.Now()) // 最近已被drop as.droppedRecently.Set(true) // 返回過載 return nil, ErrServiceOverloaded } // 正在處理請求數(shù)加1 as.addFlying(1) // 這里每個允許的請求都會返回一個新的promise對象 // promise內(nèi)部持有了降載指針對象 return &promise{ start: timex.Now(), shedder: as, }, nil}
檢查是否應該被丟棄shouldDrop()
:
// 請求是否應該被丟棄func (as *adaptiveShedder) shouldDrop() bool { // 當前cpu負載超過閾值 // 服務處于冷卻期內(nèi)應該繼續(xù)檢查負載并嘗試丟棄請求 if as.systemOverloaded() || as.stillHot() { // 檢查正在處理的并發(fā)是否超出當前可承載的最大并發(fā)數(shù) // 超出則丟棄請求 if as.highThru() { flying := atomic.LoadInt64(&as.flying) as.avgFlyingLock.Lock() avgFlying := as.avgFlying as.avgFlyingLock.Unlock() msg := fmt.Sprintf( "dropreq, cpu: %d, maxPass: %d, minRt: %.2f, hot: %t, flying: %d, avgFlying: %.2f", stat.CpuUsage(), as.maxPass(), as.minRt(), as.stillHot(), flying, avgFlying) logx.Error(msg) stat.Report(msg) return true } } return false}
cpu 閾值檢查 systemOverloaded()
:
cpu 負載值計算算法采用的滑動平均算法,防止毛刺現(xiàn)象。每隔 250ms 采樣一次 β 為 0.95,大概相當于歷史 20 次 cpu 負載的平均值,時間周期約為 5s。
// cpu 是否過載func (as *adaptiveShedder) systemOverloaded() bool { return systemOverloadChecker(as.cpuThreshold)}// cpu 檢查函數(shù)systemOverloadChecker = func(cpuThreshold int64) bool { return stat.CpuUsage() >= cpuThreshold}// cpu滑動平均值curUsage := internal.RefreshCpu()prevUsage := atomic.LoadInt64(&cpuUsage)// cpu = cpu??1 * beta + cpu? * (1 - beta)// 滑動平均算法usage := int64(float64(prevUsage)*beta + float64(curUsage)*(1-beta))atomic.StoreInt64(&cpuUsage, usage)
檢查是否處于冷卻期 stillHot
:
判斷當前系統(tǒng)是否處于冷卻期,如果處于冷卻期內(nèi),應該繼續(xù)嘗試檢查是否丟棄請求。主要是防止系統(tǒng)在過載恢復過程中負載還未降下來,立馬又增加壓力導致來回抖動,此時應該嘗試繼續(xù)丟棄請求。
func (as *adaptiveShedder) stillHot() bool { // 最近沒有丟棄請求 // 說明服務正常 if !as.droppedRecently.True() { return false } // 不在冷卻期 dropTime := as.dropTime.Load() if dropTime == 0 { return false } // 冷卻時間默認為1s hot := timex.Since(dropTime) < coolOffDuration // 不在冷卻期,正常處理請求中 if !hot { // 重置drop記錄 as.droppedRecently.Set(false) } return hot}
檢查當前正在處理的并發(fā)數(shù)highThru()
:
一旦 當前處理的并發(fā)數(shù) > 并發(fā)數(shù)承載上限 則進入降載狀態(tài)。
這里為什么要加鎖呢?因為自適應降載時全局在使用的,為了保證并發(fā)數(shù)平均值正確性。
為什么這里要加自旋鎖呢?因為并發(fā)處理過程中,可以不阻塞其他的 goroutine 執(zhí)行任務,采用無鎖并發(fā)提高性能。
func (as *adaptiveShedder) highThru() bool { // 加鎖 as.avgFlyingLock.Lock() // 獲取滑動平均值 // 每次請求結(jié)束后更新 avgFlying := as.avgFlying // 解鎖 as.avgFlyingLock.Unlock() // 系統(tǒng)此時最大并發(fā)數(shù) maxFlight := as.maxFlight() // 正在處理的并發(fā)數(shù)和平均并發(fā)數(shù)是否大于系統(tǒng)的最大并發(fā)數(shù) return int64(avgFlying) > maxFlight && atomic.LoadInt64(&as.flying) > maxFlight}
如何得到正在處理的并發(fā)數(shù)與平均并發(fā)數(shù)呢?
當前正在的處理并發(fā)數(shù)統(tǒng)計其實非常簡單,每次允許請求時并發(fā)數(shù) +1,請求完成后 通過 promise 對象回調(diào)-1 即可,并利用滑動平均算法求解平均并發(fā)數(shù)即可。
type promise struct { // 請求開始時間 // 統(tǒng)計請求處理耗時 start time.Duration shedder *adaptiveShedder}func (p *promise) Fail() { // 請求結(jié)束,當前正在處理請求數(shù)-1 p.shedder.addFlying(-1)}func (p *promise) Pass() { // 響應時間,單位毫秒 rt := float64(timex.Since(p.start)) / float64(time.Millisecond) // 請求結(jié)束,當前正在處理請求數(shù)-1 p.shedder.addFlying(-1) p.shedder.rtCounter.Add(math.Ceil(rt)) p.shedder.passCounter.Add(1)}func (as *adaptiveShedder) addFlying(delta int64) { flying := atomic.AddInt64(&as.flying, delta) // 請求結(jié)束后,統(tǒng)計當前正在處理的請求并發(fā) if delta < 0 { as.avgFlyingLock.Lock() // 估算當前服務近一段時間內(nèi)的平均請求數(shù) as.avgFlying = as.avgFlying*flyingBeta + float64(flying)*(1-flyingBeta) as.avgFlyingLock.Unlock() }}
得到了當前的系統(tǒng)數(shù)還不夠 ,我們還需要知道當前系統(tǒng)能夠處理并發(fā)數(shù)的上限,即最大并發(fā)數(shù)。
請求通過數(shù)與響應時間都是通過滑動窗口來實現(xiàn)的,關(guān)于滑動窗口的實現(xiàn)可以參考 自適應熔斷器
那篇文章。
當前系統(tǒng)的最大并發(fā)數(shù) = 窗口單位時間內(nèi)的最大通過數(shù)量 * 窗口單位時間內(nèi)的最小響應時間。
// 計算每秒系統(tǒng)的最大并發(fā)數(shù)// 最大并發(fā)數(shù) = 最大請求數(shù)(qps)* 最小響應時間(rt)func (as *adaptiveShedder) maxFlight() int64 { // windows = buckets per second // maxQPS = maxPASS * windows // minRT = min average response time in milliseconds // maxQPS * minRT / milliseconds_per_second // as.maxPass()*as.windows - 每個桶最大的qps * 1s內(nèi)包含桶的數(shù)量 // as.minRt()/1e3 - 窗口所有桶中最小的平均響應時間 / 1000ms這里是為了轉(zhuǎn)換成秒 return int64(math.Max(1, float64(as.maxPass()*as.windows)*(as.minRt()/1e3)))} // 滑動時間窗口內(nèi)有多個桶// 找到請求數(shù)最多的那個// 每個桶占用的時間為 internal ms// qps指的是1s內(nèi)的請求數(shù),qps: maxPass * time.Second/internalfunc (as *adaptiveShedder) maxPass() int64 { var result float64 = 1 // 當前時間窗口內(nèi)請求數(shù)最多的桶 as.passCounter.Reduce(func(b *collection.Bucket) { if b.Sum > result { result = b.Sum } }) return int64(result)}// 滑動時間窗口內(nèi)有多個桶// 計算最小的平均響應時間// 因為需要計算近一段時間內(nèi)系統(tǒng)能夠處理的最大并發(fā)數(shù)func (as *adaptiveShedder) minRt() float64 { // 默認為1000ms result := defaultMinRt as.rtCounter.Reduce(func(b *collection.Bucket) { if b.Count <= 0 { return } // 請求平均響應時間 avg := math.Round(b.Sum / float64(b.Count)) if avg < result { result = avg } }) return result}
參考資料
項目地址
https://github.com/zeromicro/go-zero
歡迎使用 go-zero
并 star 支持我們!
微信交流群
關(guān)注『微服務實踐』公眾號并點擊 交流群 獲取社區(qū)群二維碼。