日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
幾個預(yù)防并發(fā)搞垮下游服務(wù)的方法

本文轉(zhuǎn)載自微信公眾號「網(wǎng)管叨bi叨」,作者KevinYan11 。轉(zhuǎn)載本文請聯(lián)系網(wǎng)管叨bi叨公眾號。

前言

上一篇文章 我用休眠做并發(fā)控制,搞垮了下游服務(wù) 發(fā)出去后得到不少網(wǎng)友的回應(yīng),有人問自己平時用的方案行不行,有人建議借鑒TCP的擁塞控制策略,動態(tài)地調(diào)整發(fā)起的并發(fā)數(shù),還有人問為啥我要管下游抗不抗得住。

今天我就來總結(jié)幾種調(diào)用下游服務(wù)時做并發(fā)控制的方案。

因為我們這篇文章是科普向的文章,主要目的是總結(jié)一下應(yīng)該怎么在享受并發(fā)帶來效率提升的同時做好并發(fā)控制讓整個系統(tǒng)的上下游都能更穩(wěn)定一些,不對限流、控制到底該哪個服務(wù)加,出了事故誰負(fù)責(zé)做討論。

并發(fā)控制方案

前面我們提到用休眠做并發(fā)控制的最大弊端是,沒有考慮下游服務(wù)的感受,每次開固定數(shù)量的goroutine 去執(zhí)行任務(wù)后,調(diào)用者休眠 1s 再來,而不是等待下游服務(wù)的反饋再開啟下一批任務(wù)執(zhí)行。

 
 
 
  1. func badConcurrency() { 
  2.  batchSize := 500 
  3.  for { 
  4.   data, _ := queryDataWithSizeN(batchSize) 
  5.   if len(data) == 0 { 
  6.    break 
  7.   } 
  8.  
  9.   for _, item := range data { 
  10.    go func(i int) { 
  11.     doSomething(i) 
  12.    }(item) 
  13.   } 
  14.  
  15.   time.Sleep(time.Second * 1) 
  16.  } 

此外上游還有請求分配不均的問題,休眠的時候完全沒有請求,休眠結(jié)束后不管下游有沒有執(zhí)行完成馬上又發(fā)起一批新的請求。

所以我們應(yīng)該從等待下游反饋和請求分配盡量均勻兩個角度去做并發(fā)控制,當(dāng)然實際項目中應(yīng)該是兩方面結(jié)合才行。

本文的可執(zhí)行示例代碼請訪問下面的鏈接查看:

https://github.com/kevinyan815/gocookbook/blob/master/codes/prevent_over_concurrency/main.go

使用限流器

我們在向下游發(fā)起并發(fā)請求時可以通過限流器做一下限流,如果達(dá)到限制就阻塞直到能再次發(fā)起請求。一聽到阻塞直到blabla 有的同學(xué)是不是馬上內(nèi)心小激動想用 channel 去實現(xiàn)一個限流器啦,「此處應(yīng)用咳嗽聲」其實完全沒必要Golang 官方限流器 time/rate包的 Wait 方法就能給我們提供了這個功能。

 
 
 
  1. func useRateLimit() { 
  2.  limiter := rate.NewLimiter(rate.Every(1*time.Second), 500) 
  3.  batchSize := 500 
  4.  for { 
  5.   data, _ :=queryDataWithSizeN(batchSize) 
  6.   if len(data) == 0 { 
  7.    fmt.Println("End of all data") 
  8.    break 
  9.   } 
  10.  
  11.   for _, item := range data { 
  12.    // 阻塞直到令牌桶有充足的Token 
  13.    err := limiter.Wait(context.Background()) 
  14.    if err != nil { 
  15.     fmt.Println("Error: ", err) 
  16.     return 
  17.    } 
  18.    go func(i int) { 
  19.     doSomething(i) 
  20.    }(item) 
  21.   } 
  22.  } 
  23.  
  24. // 模擬調(diào)用下游服務(wù) 
  25. func doSomething(i int) { 
  26.  time.Sleep(2 * time.Second) 
  27.  fmt.Println("End:", i) 
  28.  
  29. // 模擬查詢N條數(shù)據(jù) 
  30. func queryDataWithSizeN(size int) (dataList []int, err error) { 
  31.  rand.Seed(time.Now().Unix()) 
  32.  dataList = rand.Perm(size) 
  33.  return 

time/rate包提供的限流器采用的是令牌桶算法,使用Wait方法是當(dāng)桶中沒有足夠的令牌時調(diào)用者會阻塞直到能取到令牌,當(dāng)然也可以通過Wait方法接受的Context參數(shù)設(shè)置等待超時時間。限流器往桶中放令牌的速率是恒定的這樣比單純使用time.Sleep請求更均勻些。

關(guān)于time/rate 限流器的使用方法的詳解,請查看我之前的文章:Golang官方限流器的用法詳解

用了限流器了之后,只是讓我們的并發(fā)請求分布地更均勻了,最好我們能在受到下游反饋完成后再開始下次并發(fā)。

使用WaitGroup

我們可以等上批并發(fā)請求都執(zhí)行完后再開始下一批任務(wù),估計大部分同學(xué)聽到這馬上就會想到應(yīng)該加WaitGroup

WaitGroup適合用于并發(fā)-等待的場景:一個goroutine在檢查點(Check Point)等待一組執(zhí)行任務(wù)的 worker goroutine 全部完成,如果在執(zhí)行任務(wù)的這些worker goroutine 還沒全部完成,等待的 goroutine 就會阻塞在檢查點,直到所有woker goroutine 都完成后才能繼續(xù)執(zhí)行。

 
 
 
  1. func useWaitGroup() { 
  2.  
  3.  batchSize := 500 
  4.  for { 
  5.   data, _ := queryDataWithSizeN(batchSize) 
  6.   if len(data) == 0 { 
  7.    fmt.Println("End of all data") 
  8.    break 
  9.   } 
  10.   var wg sync.WaitGroup 
  11.   for _, item := range data { 
  12.    wg.Add(1) 
  13.    go func(i int) { 
  14.     doSomething(i) 
  15.     wg.Done() 
  16.    }(item) 
  17.   } 
  18.   wg.Wait() 
  19.  
  20.   fmt.Println("Next bunch of data") 
  21.  } 

這里調(diào)用程序會等待這一批任務(wù)都執(zhí)行完后,再開始查下一批數(shù)據(jù)進(jìn)行下一批請求,等待時間取決于這一批請求中最晚返回的那個響應(yīng)用了多少時間。

使用Semaphore

如果你不想等一批全部完成后再開始下一批,也可以采用一個完成后下一個補上的策略,這種比使用WaitGroup做并發(fā)控制,如果下游資源夠,整個任務(wù)的處理時間會更快一些。這種策略需要使用信號量(Semaphore)做并發(fā)控制,Go 語言里通過擴(kuò)展庫golang.org/x/sync/semaphore 提供了信號量并發(fā)原語。

關(guān)于信號量的使用方法和實現(xiàn)原理,可以讀讀我以前的文章:并發(fā)編程-信號量的使用方法和其實現(xiàn)原理

上面的程序改為使用信號量semaphore.Weighted做并發(fā)控制的示例如下:

 
 
 
  1. func useSemaphore() { 
  2.  var concurrentNum int64 = 10 
  3.  var weight int64 = 1 
  4.  var batchSize int = 50 
  5.  s := semaphore.NewWeighted(concurrentNum) 
  6.  for { 
  7.   data, _ := queryDataWithSizeN(batchSize) 
  8.   if len(data) == 0 { 
  9.    fmt.Println("End of all data") 
  10.    break 
  11.   } 
  12.  
  13.   for _, item := range data { 
  14.       s.Acquire(context.Background(), weight) 
  15.    go func(i int) { 
  16.     doSomething(i) 
  17.     s.Release(weight) 
  18.    }(item) 
  19.   } 
  20.  
  21.  } 

使用生產(chǎn)者消費者模式

也有不少讀者回復(fù)說得加線程池才行,因為每個人公司里可能都有在用的線程池實現(xiàn),直接用就行,我在這里就不再獻(xiàn)丑給大家實現(xiàn)線程池了。在我看來我們其實是需要實現(xiàn)一個生產(chǎn)者和消費者模式,讓線程池幫助我們限制只有固定數(shù)量的消費者線程去做下游服務(wù)的調(diào)用,而生產(chǎn)者則是將數(shù)據(jù)存儲里取出來。

channel 正好能夠作為兩者之間的媒介。

 
 
 
  1. func useChannel() { 
  2.  batchSize := 50 
  3.  dataChan := make(chan int) 
  4.  var wg sync.WaitGroup 
  5.  wg.Add(batchSize + 1) 
  6.  // 生產(chǎn)者 
  7.  go func() { 
  8.   for { 
  9.    data, _ := queryDataWithSizeN(batchSize) 
  10.    if len(data) == 0 { 
  11.     break 
  12.    } 
  13.    for _, item := range data { 
  14.     dataChan <- item 
  15.    } 
  16.   } 
  17.   close(dataChan) 
  18.   wg.Done() 
  19.  }() 
  20.     // 消費者 
  21.  go func() { 
  22.   for i := 0; i < 50; i++ { 
  23.    go func() { 
  24.     for { 
  25.      select { 
  26.      case v, ok := <- dataChan: 
  27.       if !ok { 
  28.        wg.Done() 
  29.        return 
  30.       } 
  31.       doSomething(v) 
  32.      } 
  33.     } 
  34.    }() 
  35.   } 
  36.  }() 
  37.  
  38.  wg.Wait() 

這個代碼實現(xiàn)里,如果用ErrorGroup代替WaitGroup的話還能更簡化一些,這個就留給讀者自己探索吧。

關(guān)于ErrorGroup的用法總結(jié),推薦閱讀文章:覺得WaitGroup不好用?試試ErrorGroup吧!

總結(jié)

通過文章里總結(jié)的一些方法,我們也能看出來并發(fā)編程的場景下,除了關(guān)注發(fā)起的并發(fā)線程數(shù)外,更重要的是還需要關(guān)注被異步調(diào)用的下層服務(wù)的反饋,不是一味的加并發(fā)數(shù)就能解決問題的。理解我們?yōu)槭裁丛诓l(fā)編程中要關(guān)注下層服務(wù)的反饋是很重要的,否則我們列舉的那些方案其實都可以在goroutine里再開goroutine,不關(guān)心是否執(zhí)行完成直接返回,無限套娃下去。


新聞名稱:幾個預(yù)防并發(fā)搞垮下游服務(wù)的方法
URL網(wǎng)址:http://m.5511xx.com/article/codepdj.html