新聞中心
本文轉(zhuǎn)載自微信公眾號「網(wǎng)管叨bi叨」,作者KevinYan11 。轉(zhuǎn)載本文請聯(lián)系網(wǎng)管叨bi叨公眾號。

前言
上一篇文章 我用休眠做并發(fā)控制,搞垮了下游服務(wù) 發(fā)出去后得到不少網(wǎng)友的回應(yīng),有人問自己平時用的方案行不行,有人建議借鑒TCP的擁塞控制策略,動態(tài)地調(diào)整發(fā)起的并發(fā)數(shù),還有人問為啥我要管下游抗不抗得住。
今天我就來總結(jié)幾種調(diào)用下游服務(wù)時做并發(fā)控制的方案。
因為我們這篇文章是科普向的文章,主要目的是總結(jié)一下應(yīng)該怎么在享受并發(fā)帶來效率提升的同時做好并發(fā)控制讓整個系統(tǒng)的上下游都能更穩(wěn)定一些,不對限流、控制到底該哪個服務(wù)加,出了事故誰負(fù)責(zé)做討論。
并發(fā)控制方案
前面我們提到用休眠做并發(fā)控制的最大弊端是,沒有考慮下游服務(wù)的感受,每次開固定數(shù)量的goroutine 去執(zhí)行任務(wù)后,調(diào)用者休眠 1s 再來,而不是等待下游服務(wù)的反饋再開啟下一批任務(wù)執(zhí)行。
- func badConcurrency() {
- batchSize := 500
- for {
- data, _ := queryDataWithSizeN(batchSize)
- if len(data) == 0 {
- break
- }
- for _, item := range data {
- go func(i int) {
- doSomething(i)
- }(item)
- }
- time.Sleep(time.Second * 1)
- }
- }
此外上游還有請求分配不均的問題,休眠的時候完全沒有請求,休眠結(jié)束后不管下游有沒有執(zhí)行完成馬上又發(fā)起一批新的請求。
所以我們應(yīng)該從等待下游反饋和請求分配盡量均勻兩個角度去做并發(fā)控制,當(dāng)然實際項目中應(yīng)該是兩方面結(jié)合才行。
本文的可執(zhí)行示例代碼請訪問下面的鏈接查看:
https://github.com/kevinyan815/gocookbook/blob/master/codes/prevent_over_concurrency/main.go
使用限流器
我們在向下游發(fā)起并發(fā)請求時可以通過限流器做一下限流,如果達(dá)到限制就阻塞直到能再次發(fā)起請求。一聽到阻塞直到blabla 有的同學(xué)是不是馬上內(nèi)心小激動想用 channel 去實現(xiàn)一個限流器啦,「此處應(yīng)用咳嗽聲」其實完全沒必要Golang 官方限流器 time/rate包的 Wait 方法就能給我們提供了這個功能。
- func useRateLimit() {
- limiter := rate.NewLimiter(rate.Every(1*time.Second), 500)
- batchSize := 500
- for {
- data, _ :=queryDataWithSizeN(batchSize)
- if len(data) == 0 {
- fmt.Println("End of all data")
- break
- }
- for _, item := range data {
- // 阻塞直到令牌桶有充足的Token
- err := limiter.Wait(context.Background())
- if err != nil {
- fmt.Println("Error: ", err)
- return
- }
- go func(i int) {
- doSomething(i)
- }(item)
- }
- }
- }
- // 模擬調(diào)用下游服務(wù)
- func doSomething(i int) {
- time.Sleep(2 * time.Second)
- fmt.Println("End:", i)
- }
- // 模擬查詢N條數(shù)據(jù)
- func queryDataWithSizeN(size int) (dataList []int, err error) {
- rand.Seed(time.Now().Unix())
- dataList = rand.Perm(size)
- return
- }
time/rate包提供的限流器采用的是令牌桶算法,使用Wait方法是當(dāng)桶中沒有足夠的令牌時調(diào)用者會阻塞直到能取到令牌,當(dāng)然也可以通過Wait方法接受的Context參數(shù)設(shè)置等待超時時間。限流器往桶中放令牌的速率是恒定的這樣比單純使用time.Sleep請求更均勻些。
關(guān)于time/rate 限流器的使用方法的詳解,請查看我之前的文章:Golang官方限流器的用法詳解
用了限流器了之后,只是讓我們的并發(fā)請求分布地更均勻了,最好我們能在受到下游反饋完成后再開始下次并發(fā)。
使用WaitGroup
我們可以等上批并發(fā)請求都執(zhí)行完后再開始下一批任務(wù),估計大部分同學(xué)聽到這馬上就會想到應(yīng)該加WaitGroup
WaitGroup適合用于并發(fā)-等待的場景:一個goroutine在檢查點(Check Point)等待一組執(zhí)行任務(wù)的 worker goroutine 全部完成,如果在執(zhí)行任務(wù)的這些worker goroutine 還沒全部完成,等待的 goroutine 就會阻塞在檢查點,直到所有woker goroutine 都完成后才能繼續(xù)執(zhí)行。
- func useWaitGroup() {
- batchSize := 500
- for {
- data, _ := queryDataWithSizeN(batchSize)
- if len(data) == 0 {
- fmt.Println("End of all data")
- break
- }
- var wg sync.WaitGroup
- for _, item := range data {
- wg.Add(1)
- go func(i int) {
- doSomething(i)
- wg.Done()
- }(item)
- }
- wg.Wait()
- fmt.Println("Next bunch of data")
- }
- }
這里調(diào)用程序會等待這一批任務(wù)都執(zhí)行完后,再開始查下一批數(shù)據(jù)進(jìn)行下一批請求,等待時間取決于這一批請求中最晚返回的那個響應(yīng)用了多少時間。
使用Semaphore
如果你不想等一批全部完成后再開始下一批,也可以采用一個完成后下一個補上的策略,這種比使用WaitGroup做并發(fā)控制,如果下游資源夠,整個任務(wù)的處理時間會更快一些。這種策略需要使用信號量(Semaphore)做并發(fā)控制,Go 語言里通過擴(kuò)展庫golang.org/x/sync/semaphore 提供了信號量并發(fā)原語。
關(guān)于信號量的使用方法和實現(xiàn)原理,可以讀讀我以前的文章:并發(fā)編程-信號量的使用方法和其實現(xiàn)原理
上面的程序改為使用信號量semaphore.Weighted做并發(fā)控制的示例如下:
- func useSemaphore() {
- var concurrentNum int64 = 10
- var weight int64 = 1
- var batchSize int = 50
- s := semaphore.NewWeighted(concurrentNum)
- for {
- data, _ := queryDataWithSizeN(batchSize)
- if len(data) == 0 {
- fmt.Println("End of all data")
- break
- }
- for _, item := range data {
- s.Acquire(context.Background(), weight)
- go func(i int) {
- doSomething(i)
- s.Release(weight)
- }(item)
- }
- }
- }
使用生產(chǎn)者消費者模式
也有不少讀者回復(fù)說得加線程池才行,因為每個人公司里可能都有在用的線程池實現(xiàn),直接用就行,我在這里就不再獻(xiàn)丑給大家實現(xiàn)線程池了。在我看來我們其實是需要實現(xiàn)一個生產(chǎn)者和消費者模式,讓線程池幫助我們限制只有固定數(shù)量的消費者線程去做下游服務(wù)的調(diào)用,而生產(chǎn)者則是將數(shù)據(jù)存儲里取出來。
channel 正好能夠作為兩者之間的媒介。
- func useChannel() {
- batchSize := 50
- dataChan := make(chan int)
- var wg sync.WaitGroup
- wg.Add(batchSize + 1)
- // 生產(chǎn)者
- go func() {
- for {
- data, _ := queryDataWithSizeN(batchSize)
- if len(data) == 0 {
- break
- }
- for _, item := range data {
- dataChan <- item
- }
- }
- close(dataChan)
- wg.Done()
- }()
- // 消費者
- go func() {
- for i := 0; i < 50; i++ {
- go func() {
- for {
- select {
- case v, ok := <- dataChan:
- if !ok {
- wg.Done()
- return
- }
- doSomething(v)
- }
- }
- }()
- }
- }()
- wg.Wait()
- }
這個代碼實現(xiàn)里,如果用ErrorGroup代替WaitGroup的話還能更簡化一些,這個就留給讀者自己探索吧。
關(guān)于ErrorGroup的用法總結(jié),推薦閱讀文章:覺得WaitGroup不好用?試試ErrorGroup吧!
總結(jié)
通過文章里總結(jié)的一些方法,我們也能看出來并發(fā)編程的場景下,除了關(guān)注發(fā)起的并發(fā)線程數(shù)外,更重要的是還需要關(guān)注被異步調(diào)用的下層服務(wù)的反饋,不是一味的加并發(fā)數(shù)就能解決問題的。理解我們?yōu)槭裁丛诓l(fā)編程中要關(guān)注下層服務(wù)的反饋是很重要的,否則我們列舉的那些方案其實都可以在goroutine里再開goroutine,不關(guān)心是否執(zhí)行完成直接返回,無限套娃下去。
新聞名稱:幾個預(yù)防并發(fā)搞垮下游服務(wù)的方法
URL網(wǎng)址:http://m.5511xx.com/article/codepdj.html


咨詢
建站咨詢
