新聞中心
本文轉(zhuǎn)載自微信公眾號(hào)「網(wǎng)管叨bi叨」,作者KevinYan11 。轉(zhuǎn)載本文請(qǐng)聯(lián)系網(wǎng)管叨bi叨公眾號(hào)。

前言
上一篇文章 我用休眠做并發(fā)控制,搞垮了下游服務(wù) 發(fā)出去后得到不少網(wǎng)友的回應(yīng),有人問(wèn)自己平時(shí)用的方案行不行,有人建議借鑒TCP的擁塞控制策略,動(dòng)態(tài)地調(diào)整發(fā)起的并發(fā)數(shù),還有人問(wèn)為啥我要管下游抗不抗得住。
今天我就來(lái)總結(jié)幾種調(diào)用下游服務(wù)時(shí)做并發(fā)控制的方案。
因?yàn)槲覀冞@篇文章是科普向的文章,主要目的是總結(jié)一下應(yīng)該怎么在享受并發(fā)帶來(lái)效率提升的同時(shí)做好并發(fā)控制讓整個(gè)系統(tǒng)的上下游都能更穩(wěn)定一些,不對(duì)限流、控制到底該哪個(gè)服務(wù)加,出了事故誰(shuí)負(fù)責(zé)做討論。
并發(fā)控制方案
前面我們提到用休眠做并發(fā)控制的最大弊端是,沒(méi)有考慮下游服務(wù)的感受,每次開(kāi)固定數(shù)量的goroutine 去執(zhí)行任務(wù)后,調(diào)用者休眠 1s 再來(lái),而不是等待下游服務(wù)的反饋再開(kāi)啟下一批任務(wù)執(zhí)行。
- func badConcurrency() {
- batchSize := 500
- for {
- data, _ := queryDataWithSizeN(batchSize)
- if len(data) == 0 {
- break
- }
- for _, item := range data {
- go func(i int) {
- doSomething(i)
- }(item)
- }
- time.Sleep(time.Second * 1)
- }
- }
此外上游還有請(qǐng)求分配不均的問(wèn)題,休眠的時(shí)候完全沒(méi)有請(qǐng)求,休眠結(jié)束后不管下游有沒(méi)有執(zhí)行完成馬上又發(fā)起一批新的請(qǐng)求。
所以我們應(yīng)該從等待下游反饋和請(qǐng)求分配盡量均勻兩個(gè)角度去做并發(fā)控制,當(dāng)然實(shí)際項(xiàng)目中應(yīng)該是兩方面結(jié)合才行。
本文的可執(zhí)行示例代碼請(qǐng)?jiān)L問(wèn)下面的鏈接查看:
https://github.com/kevinyan815/gocookbook/blob/master/codes/prevent_over_concurrency/main.go
使用限流器
我們?cè)谙蛳掠伟l(fā)起并發(fā)請(qǐng)求時(shí)可以通過(guò)限流器做一下限流,如果達(dá)到限制就阻塞直到能再次發(fā)起請(qǐng)求。一聽(tīng)到阻塞直到blabla 有的同學(xué)是不是馬上內(nèi)心小激動(dòng)想用 channel 去實(shí)現(xiàn)一個(gè)限流器啦,「此處應(yīng)用咳嗽聲」其實(shí)完全沒(méi)必要Golang 官方限流器 time/rate包的 Wait 方法就能給我們提供了這個(gè)功能。
- func useRateLimit() {
- limiter := rate.NewLimiter(rate.Every(1*time.Second), 500)
- batchSize := 500
- for {
- data, _ :=queryDataWithSizeN(batchSize)
- if len(data) == 0 {
- fmt.Println("End of all data")
- break
- }
- for _, item := range data {
- // 阻塞直到令牌桶有充足的Token
- err := limiter.Wait(context.Background())
- if err != nil {
- fmt.Println("Error: ", err)
- return
- }
- go func(i int) {
- doSomething(i)
- }(item)
- }
- }
- }
- // 模擬調(diào)用下游服務(wù)
- func doSomething(i int) {
- time.Sleep(2 * time.Second)
- fmt.Println("End:", i)
- }
- // 模擬查詢N條數(shù)據(jù)
- func queryDataWithSizeN(size int) (dataList []int, err error) {
- rand.Seed(time.Now().Unix())
- dataList = rand.Perm(size)
- return
- }
time/rate包提供的限流器采用的是令牌桶算法,使用Wait方法是當(dāng)桶中沒(méi)有足夠的令牌時(shí)調(diào)用者會(huì)阻塞直到能取到令牌,當(dāng)然也可以通過(guò)Wait方法接受的Context參數(shù)設(shè)置等待超時(shí)時(shí)間。限流器往桶中放令牌的速率是恒定的這樣比單純使用time.Sleep請(qǐng)求更均勻些。
關(guān)于time/rate 限流器的使用方法的詳解,請(qǐng)查看我之前的文章:Golang官方限流器的用法詳解
用了限流器了之后,只是讓我們的并發(fā)請(qǐng)求分布地更均勻了,最好我們能在受到下游反饋完成后再開(kāi)始下次并發(fā)。
使用WaitGroup
我們可以等上批并發(fā)請(qǐng)求都執(zhí)行完后再開(kāi)始下一批任務(wù),估計(jì)大部分同學(xué)聽(tīng)到這馬上就會(huì)想到應(yīng)該加WaitGroup
WaitGroup適合用于并發(fā)-等待的場(chǎng)景:一個(gè)goroutine在檢查點(diǎn)(Check Point)等待一組執(zhí)行任務(wù)的 worker goroutine 全部完成,如果在執(zhí)行任務(wù)的這些worker goroutine 還沒(méi)全部完成,等待的 goroutine 就會(huì)阻塞在檢查點(diǎn),直到所有woker goroutine 都完成后才能繼續(xù)執(zhí)行。
- func useWaitGroup() {
- batchSize := 500
- for {
- data, _ := queryDataWithSizeN(batchSize)
- if len(data) == 0 {
- fmt.Println("End of all data")
- break
- }
- var wg sync.WaitGroup
- for _, item := range data {
- wg.Add(1)
- go func(i int) {
- doSomething(i)
- wg.Done()
- }(item)
- }
- wg.Wait()
- fmt.Println("Next bunch of data")
- }
- }
這里調(diào)用程序會(huì)等待這一批任務(wù)都執(zhí)行完后,再開(kāi)始查下一批數(shù)據(jù)進(jìn)行下一批請(qǐng)求,等待時(shí)間取決于這一批請(qǐng)求中最晚返回的那個(gè)響應(yīng)用了多少時(shí)間。
使用Semaphore
如果你不想等一批全部完成后再開(kāi)始下一批,也可以采用一個(gè)完成后下一個(gè)補(bǔ)上的策略,這種比使用WaitGroup做并發(fā)控制,如果下游資源夠,整個(gè)任務(wù)的處理時(shí)間會(huì)更快一些。這種策略需要使用信號(hào)量(Semaphore)做并發(fā)控制,Go 語(yǔ)言里通過(guò)擴(kuò)展庫(kù)golang.org/x/sync/semaphore 提供了信號(hào)量并發(fā)原語(yǔ)。
關(guān)于信號(hào)量的使用方法和實(shí)現(xiàn)原理,可以讀讀我以前的文章:并發(fā)編程-信號(hào)量的使用方法和其實(shí)現(xiàn)原理
上面的程序改為使用信號(hào)量semaphore.Weighted做并發(fā)控制的示例如下:
- func useSemaphore() {
- var concurrentNum int64 = 10
- var weight int64 = 1
- var batchSize int = 50
- s := semaphore.NewWeighted(concurrentNum)
- for {
- data, _ := queryDataWithSizeN(batchSize)
- if len(data) == 0 {
- fmt.Println("End of all data")
- break
- }
- for _, item := range data {
- s.Acquire(context.Background(), weight)
- go func(i int) {
- doSomething(i)
- s.Release(weight)
- }(item)
- }
- }
- }
使用生產(chǎn)者消費(fèi)者模式
也有不少讀者回復(fù)說(shuō)得加線程池才行,因?yàn)槊總€(gè)人公司里可能都有在用的線程池實(shí)現(xiàn),直接用就行,我在這里就不再獻(xiàn)丑給大家實(shí)現(xiàn)線程池了。在我看來(lái)我們其實(shí)是需要實(shí)現(xiàn)一個(gè)生產(chǎn)者和消費(fèi)者模式,讓線程池幫助我們限制只有固定數(shù)量的消費(fèi)者線程去做下游服務(wù)的調(diào)用,而生產(chǎn)者則是將數(shù)據(jù)存儲(chǔ)里取出來(lái)。
channel 正好能夠作為兩者之間的媒介。
- func useChannel() {
- batchSize := 50
- dataChan := make(chan int)
- var wg sync.WaitGroup
- wg.Add(batchSize + 1)
- // 生產(chǎn)者
- go func() {
- for {
- data, _ := queryDataWithSizeN(batchSize)
- if len(data) == 0 {
- break
- }
- for _, item := range data {
- dataChan <- item
- }
- }
- close(dataChan)
- wg.Done()
- }()
- // 消費(fèi)者
- go func() {
- for i := 0; i < 50; i++ {
- go func() {
- for {
- select {
- case v, ok := <- dataChan:
- if !ok {
- wg.Done()
- return
- }
- doSomething(v)
- }
- }
- }()
- }
- }()
- wg.Wait()
- }
這個(gè)代碼實(shí)現(xiàn)里,如果用ErrorGroup代替WaitGroup的話還能更簡(jiǎn)化一些,這個(gè)就留給讀者自己探索吧。
關(guān)于ErrorGroup的用法總結(jié),推薦閱讀文章:覺(jué)得WaitGroup不好用?試試ErrorGroup吧!
總結(jié)
通過(guò)文章里總結(jié)的一些方法,我們也能看出來(lái)并發(fā)編程的場(chǎng)景下,除了關(guān)注發(fā)起的并發(fā)線程數(shù)外,更重要的是還需要關(guān)注被異步調(diào)用的下層服務(wù)的反饋,不是一味的加并發(fā)數(shù)就能解決問(wèn)題的。理解我們?yōu)槭裁丛诓l(fā)編程中要關(guān)注下層服務(wù)的反饋是很重要的,否則我們列舉的那些方案其實(shí)都可以在goroutine里再開(kāi)goroutine,不關(guān)心是否執(zhí)行完成直接返回,無(wú)限套娃下去。
網(wǎng)頁(yè)題目:幾個(gè)預(yù)防并發(fā)搞垮下游服務(wù)的方法
網(wǎng)頁(yè)網(wǎng)址:http://m.fisionsoft.com.cn/article/codepdj.html


咨詢
建站咨詢
