新聞中心
大家好,我是四哥。

在桐柏等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì) 網(wǎng)站設(shè)計(jì)制作按需策劃,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站制作,營銷型網(wǎng)站,成都外貿(mào)網(wǎng)站建設(shè),桐柏網(wǎng)站建設(shè)費(fèi)用合理。
信號量是并發(fā)編程中常見的同步機(jī)制,在標(biāo)準(zhǔn)庫的并發(fā)原語中使用頻繁,比如 Mutex、WaitGroup 等,這些并發(fā)原語的實(shí)現(xiàn)都有信號量的影子,所以我們很有必要學(xué)好弄清楚信號量的實(shí)現(xiàn)原理,做到“知其然,更要知其所以然”,我們才能有更多的“武器”去應(yīng)對實(shí)際面臨的業(yè)務(wù)場景問題。
今天我們就來搞定信號量,通過這篇文章你能掌握:
- 信號量是什么?都有什么操作?
- Go 官方是如何實(shí)現(xiàn)信號量的?
- 實(shí)際場景中如何使用信號量?
- 使用信號量應(yīng)該注意哪些問題?
- 實(shí)現(xiàn)信號量的其他方式?
信號量是什么?都有什么操作?
維基百科上是這樣解釋信號量的:
信號量的概念是計(jì)算機(jī)科學(xué)家 Dijkstra (Dijkstra 算法的發(fā)明者)提出來的,廣泛應(yīng)用在不同的操作系統(tǒng)中。系統(tǒng)中,會給每一個進(jìn)程一個信號量,代表每個進(jìn)程當(dāng)前的狀態(tài),未得到控制權(quán)的進(jìn)程,會在特定的地方被迫停下來,等待可以繼續(xù)進(jìn)行的信號到來。
下文用 G 代表 goroutine。
通俗點(diǎn)解釋就是,信號量通常使用一個整型變量 S 表示一組資源,當(dāng) G 完成對此信號量的等待(wait)時,S 就減 1,當(dāng) G 完成對此信號量的釋放(release)時,S 就加 1。當(dāng)計(jì)數(shù)值為 0 的時候,G 調(diào)用 wait 等待該信號量會阻塞,除非 S 又大于 0,等待的 G 才會解除阻塞,成功返回。
舉個例子,假如圖書館有 10 本《Go 語言編程之旅》,有 1 萬個人都想讀這本書,“僧多粥少”。所以,圖書館管理員先會讓這 1 萬個人進(jìn)行登記,按照登記的順序,借閱此書。如果書全部被借走,那么,其他想看此書的人就需要等待,如果有人還書了,圖書館管理員就會通知下一位同學(xué)來借閱這本書。這里的資源是《Go 語言編程之旅》這十本書,想讀此書的同學(xué)就是 goroutine,圖書管理員就是信號量。
從上面的解釋中我們可以得知什么是信號量,其實(shí)信號量就是一種變量或者抽象數(shù)據(jù)類型,用于控制并發(fā)系統(tǒng)中多個進(jìn)程對公共資源的訪問,訪問具有原子性。信號量主要分為兩類:
- 計(jì)數(shù)信號量,上面說的圖書館借書的例子就是計(jì)數(shù)信號量,它的計(jì)數(shù)可以是任意一個正整數(shù);
- 二值信號量,其實(shí)是一種特殊的計(jì)數(shù)信號量,其值只有 0 或 1,相當(dāng)于互斥量,當(dāng)值為 1 時資源可用,當(dāng)值為 0 時,資源被鎖住,進(jìn)程阻塞無法繼續(xù)執(zhí)行;
PV 操作
信號量定義有兩個操作 P 和 V,P 操作是減少信號量的計(jì)數(shù)值,而 V 操作是增加信號量的計(jì)數(shù)值。
通常初始化時,將信號量 S 指定數(shù)值為 n,就像是一個有 n 個資源的池子。P 操作相當(dāng)于請求資源,如果資源可用,就立即返回;如果沒有資源或者不夠,那么,G 會阻塞等待。V 操作會釋放持有的資源,把資源返還給信號量。
信號量的值除了初始化的操作以外,只能由 P/V 操作改變。
我們一般用信號量保護(hù)一組資源,比如數(shù)據(jù)庫連接池、幾個打印機(jī)資源等等。如果信號量蛻變成二值信號量,那么,它的 P/V 就和互斥鎖的 Lock/Unlock 一樣了。
信號量的實(shí)現(xiàn)-- 官方擴(kuò)展包 Semaphore
在看 Go 源碼時,我們經(jīng)常能夠看到下面這幾個關(guān)于信號量的函數(shù):
func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
這幾個函數(shù)就是信號量的 PV 操作,遺憾的是,它是 Go 運(yùn)行時內(nèi)部使用的,并沒有封裝暴露成一個對外的信號量并發(fā)原語,我們沒有辦法使用。不過沒關(guān)系,Go 在它的擴(kuò)展包中提供了信號量 semaphore,不過這個信號量的類型名并不叫 Semaphore,而是叫 Weighted。
這是一個帶權(quán)重的信號量,接下來我們就重點(diǎn)分析一下這個庫。
Weighted 的實(shí)現(xiàn)思路:使用互斥鎖 + List 實(shí)現(xiàn)的?;コ怄i實(shí)現(xiàn)其它字段的保護(hù),而 List 實(shí)現(xiàn)了一個等待隊(duì)列,等待者的通知是通過 Channel 的通知機(jī)制實(shí)現(xiàn)的。
Weighted 主要包括兩個結(jié)構(gòu)體和幾個常用方法。
結(jié)構(gòu)體
type Weighted struct {
size int64 // 最大資源個數(shù),初始化的時候指定
cur int64 // 計(jì)數(shù)器,當(dāng)前已使用資源數(shù)
mu sync.Mutex // 互斥鎖,對字段保護(hù)
waiters list.List // 等待者列表,當(dāng)前處于阻塞等待的請求者 goroutine
}
每個字段的含義見代碼注釋,其中 waiters 存儲的數(shù)據(jù)是 waiter 對象,waiter 數(shù)據(jù)結(jié)構(gòu)如下:
type waiter struct {
n int64 // 調(diào)用者申請的資源數(shù)
ready chan<- struct{} // 當(dāng)調(diào)用者可以獲取到信號量資源時, close chan,調(diào)用者便會收到通知,成功返回
}
字段含義見注釋。
這里提下初始化資源數(shù)方法 NewWeighted,很簡單:
// 創(chuàng)建資源數(shù)為 n 的信號量
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
方法
1.阻塞獲取資源的方法 -- Acquire(),源碼如下:
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
// 有可用資源,直接返回
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
// 程序執(zhí)行到這里說明無足夠資源使用
if n > s.size {
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
// 資源不足,構(gòu)造 waiter,將其加入到等待隊(duì)列
// ready channel 用于通知阻塞的調(diào)用者有資源可用,由釋放資源的 goroutine 負(fù)責(zé) close,起到消息通知的作用
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w) // 加入到等待隊(duì)列
s.mu.Unlock()
// 調(diào)用者陷入 select 阻塞,除非收到外部 ctx 的取消信號或者被通知有資源可用
select {
case <-ctx.Done(): // 收到外面的控制信號
err := ctx.Err()
s.mu.Lock()
select {
case <-ready: // 再次確認(rèn)是否可能是被喚醒的,如果被喚醒了則忽略控制信號,返回 nil 表示成功
err = nil
default: // 收到控制信息且還沒有獲取到資源,就直接將原來添加的 waiter 刪除掉
isFront := s.waiters.Front() == elem // 當(dāng)前 waiter 是否是鏈表頭元素
s.waiters.Remove(elem) // 刪除 waiter
if isFront && s.size > s.cur { // 如果是鏈表頭元素且有資源可用則嘗試喚醒鏈表第一個等待的 waiter
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready: // 消息通知,請求資源的 goroutine 被釋放資源的 goroutine 喚醒了
return nil
}
}
詳細(xì)說明可以看注釋,Acquire() 相當(dāng)于 P 操作,可以一次獲取多個資源,如果沒有足夠多的資源,調(diào)用者就會被阻塞??梢酝ㄟ^第一個參數(shù) Context 增加超時或者 cancel 的機(jī)制。如果正常獲取了資源,就返回 nil;否則,就返回 ctx.Err(),信號量不改變。
2.非阻塞獲取資源的方法 -- TryAcquire,源碼如下:
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
這個方法比較簡單,非阻塞地獲取指定數(shù)量的資源,如果當(dāng)前沒有空閑資源,就直接返回 false。
3.通知等待者 notifyWaiters,源碼如下:
func (s *Weighted) notifyWaiters() {
for {
next := s.waiters.Front() // 獲取隊(duì)頭元素
if next == nil { // 隊(duì)列里沒有元素
break
}
w := next.Value.(waiter)
if s.size-s.cur < w.n { // 資源不滿足請求者的要求
break
}
s.cur += w.n // 增加已用資源
s.waiters.Remove(next)
close(w.ready) // 關(guān)閉 ready channel,用于通知調(diào)用者 goroutine 已經(jīng)獲取到資源,繼續(xù)運(yùn)行
}
}
通過 for 循環(huán)從鏈表頭部開始依次遍歷鏈表中的所有 waiter,并更新計(jì)數(shù)器 weighted.cur,同時將其從鏈表中移除,直到遇到空閑資源小于 waiter.n 為止。
仔細(xì)分析,我們會發(fā)現(xiàn),notifyWaiters 方法是按照 FIFO 方式喚醒調(diào)用者。這樣做的目的是為了避免調(diào)用者出現(xiàn)“餓死”的情況,當(dāng)釋放 10 個資源的時候,如果第一個等待者需要 11 個資源,那么,隊(duì)列中的所有等待者都會繼續(xù)等待,即使有的等待者只需要 1 個資源,否則的話,資源可能總是被那些請求資源數(shù)小的調(diào)用者獲取,這樣一來,請求資源數(shù)巨大的調(diào)用者,就沒有機(jī)會獲得資源了。
4.釋放占用的資源 -- Release(),源碼如下:
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n // 釋放占用資源數(shù)
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
s.notifyWaiters() // 喚醒等待請求資源的 goroutine
s.mu.Unlock()
}
Release() 相當(dāng)于 V 操作,可以將 n 個資源釋放,返還給信號量。
怎么用?
現(xiàn)在我們知道了信號量的實(shí)現(xiàn)原理,針對實(shí)際業(yè)務(wù)場景中又該如何使用呢?我們舉個 worker pool 的例子,也是官網(wǎng)提供的:考拉茲猜想。
“考拉茲猜想”說的是:對于每一個正整數(shù),如果它是奇數(shù),則對它乘 3 再加 1,如果它是偶數(shù),則對它除以 2,如此循環(huán),最終都能夠得到 1。
我們的例子需要實(shí)現(xiàn)的是,對于給出的正整數(shù),計(jì)算循環(huán)多少次之后能得到 1,代碼如下:
func main() {
var (
maxWorkers = runtime.GOMAXPROCS(0) // worker 數(shù)量
sem = semaphore.NewWeighted(int64(maxWorkers)) // 信號量
out = make([]int, 32) // 任務(wù)數(shù)
)
ctx := context.TODO()
for i := range out {
if err := sem.Acquire(ctx, 1); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
break
}
go func(i int) {
defer sem.Release(1)
out[i] = collatzSteps(i + 1)
}(i)
}
// 等待所有的任務(wù)執(zhí)行完成,也可以通過 WaitGroup 實(shí)現(xiàn)
if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
}
fmt.Println(out)
}
func collatzSteps(n int) (steps int) {
if n <= 0 {
panic("nonpositive input")
}
for ; n > 1; steps++ {
if steps < 0 {
panic("too many steps")
}
if n%2 == 0 {
n /= 2
continue
}
const maxInt = int(^uint(0) >> 1)
if n > (maxInt-1)/3 {
panic("overflow")
}
n = 3*n + 1
}
return steps
}
上面的代碼創(chuàng)建數(shù)量與 CPU 核數(shù)相同的 worker,假設(shè)是 4, 相當(dāng)于池子里只有 4 個資源可用,每個 worker 處理完一個整數(shù),才能處理下一個,相當(dāng)于控制住了并發(fā)數(shù)量。
輸出:
[0 1 7 2 5 8 16 3 19 6 14 9 9 17 17 4 12 20 20 7 7 15 15 10 23 10 111 18 18 18 106 5]
如何正確使用信號量?
閱讀完源碼之后,會發(fā)現(xiàn)使用 semaphore 過程中一不小心就會導(dǎo)致錯誤,比如:如果請求的資源數(shù)比最大的資源數(shù)還大,那么,調(diào)用者可能永遠(yuǎn)被阻塞;調(diào)用 Release() 方法時,可以傳遞任意的整數(shù)。但如果傳遞一個比請求到的數(shù)量大的錯誤的數(shù)值,程序就會 panic;如果傳遞一個負(fù)數(shù),會導(dǎo)致資源永久被持有,等等。
使用時有哪些常犯的錯誤:
- 請求的資源數(shù)大于最大的資源數(shù);
- 請求了資源,但是忘記釋放;
- 長時間持有資源,即使不需要它;
- 釋放了未請求過的資源;
使用一項(xiàng)技術(shù),保證不出錯的前提是正確地使用它,對于信號量來說也是一樣,所以使用信號量是應(yīng)該格外小心,確保正確地傳遞參數(shù),請求多少資源,就釋放多少資源。
總結(jié)
本篇文章詳細(xì)介紹了什么是信號量,什么是 PV 操作,官方擴(kuò)展包 semaphore 實(shí)現(xiàn)原理,剖析了實(shí)際場景中的例子,以及使用信號量時的注意事項(xiàng),相信你已經(jīng)掌握了信號量。
除了官方擴(kuò)展包 semaphore 的實(shí)現(xiàn)方式外,還有別的辦法可以實(shí)現(xiàn)信號量,你還知道哪些方式可以實(shí)現(xiàn)嗎?
可以在評論區(qū)留言,期待與大家一起探討!
網(wǎng)頁標(biāo)題:教妹子學(xué)Go并發(fā)原語:啥是Semaphore?
URL地址:http://m.fisionsoft.com.cn/article/cdsjpcc.html


咨詢
建站咨詢
