[Go] Golang(1.23.2) 源码走读 - Mutex | RWMutex
Mutex
下面是Mutex 结构体和位运算常量定义:
- state: 32 位整数, 高 29 位存储阻塞的队列(最高 2^29-1), 最后一位(32为)存储 mutexLocked (是否上锁),倒数第二位存储 mutexWoken (协调唤醒操作,防止重复唤醒),倒数第三位存储 mutexStarving (是否处于饥饿模式)
- sema: 通过 runtime 方法 runtime_SemacquireMutex 来阻塞/唤醒 goroutine 的信号量
const (
mutexLocked = 1 << iota // mutex is locked // 1
mutexWoken // 2
mutexStarving // 4
mutexWaiterShift = iota // 3 位运算计算队列等待数,要右移、左移的位数
starvationThresholdNs = 1e6 // 饥饿模式等待阈值为 1ms
)
type Mutex struct {
state int32
sema uint32
}
Lock
下面是 Lock 流程:
- CAS 抢锁成功返回
- 抢锁失败,则进入 lockSlow 流程
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
下面是 lockSlow 流程:
- 如果 锁已被占用 并且 锁不为饥饿模式 并且满足自旋条件(runtime_canSpin 方法),进入自旋处理环节
- 在自旋处理环节中,如果当前锁有尚未唤醒的阻塞协程,则通过 CAS 操作将 state 的 mutexWoken 标识置为 1,将局部变量 awoke 置为 true
- 调用 runtime_doSpin 告知调度器 P 当前处于自旋模式
- 更新自旋次数 iter 和锁状态值 old
- continue
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 标识当前 goroutine 在抢锁过程中的等待时长,单位:ns;
starving := false // 标识当前是否处于饥饿模式
awoke := false // 标记当前 goroutine 是否从阻塞中被唤醒
iter := 0 //标识当前 goroutine 参与自旋的次数 runtime_canSpin 判断是否已经达到 go 运行时最大的自旋次数
old := m.state // 当前状态
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
- 新值 new 中置为已加锁,即尝试抢锁
- 旧值为已被其他 goroutine 加锁或者处于锁已经饥饿模式,可以直接令新值的阻塞协程数加1
- 当前为饥饿模式且旧值已加锁,则将新值置为饥饿模式
- 如果是唤醒后的 goroutine,将该 mutexWoken 标识更新置 0
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
- 旧值是未加锁状态且为不为饥饿模式,说明加锁成功,返回即可
- 旧值中锁未释放或者处于饥饿模式,则当前 goroutine 需要进入阻塞队列挂起
- queueLifo 标识当前 goroutine 是从阻塞队列被唤起的老 goroutine 还是新进流程的 goroutine
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1
下面是从阻塞态被唤醒的 goroutine 流程
- 当前 goroutine 进入阻塞队列时间长达 1 ms,则置为饥饿模式
- 此时锁是饥饿模式,则当前 goroutine 无需竞争可以直接获得锁(Unlock 流程,发现是饥饿模式,会直接唤醒队头的 goroutine)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
Unlock
下面是 Unlock
流程:
- 通过原子操作解锁 ;
- 倘若解锁时发现,目前参与竞争的仅有自身一个 goroutine,则直接返回即可;
- 倘若发现锁中还有阻塞协程,则走入 unlockSlow 分支.
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
解锁时,如果是非饥饿模式
- 倘若阻塞队列内无 goroutine 或者 mutexLocked、mutexStarving、mutexWoken 标识位任一不为零,退出流程
- 基于 CAS 操作将 Mutex.state 中的阻塞协程数减 1,倘若成功,则唤起阻塞队列头部的 goroutine,并退出
- 倘若减少阻塞协程数的 CAS 操作失败,则更新此时的 Mutex.state 为新的 old 值,开启下一轮循环
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
}
如果是饥饿模式解锁
- 直接唤醒阻塞队列头部的 goroutine 即可, 其余进来抢锁的 goroutine,发现是饥饿模式,都会被排到队列的队尾。
func (m *Mutex) unlockSlow(new int32) {
if new&mutexStarving == 0 {
} else {
runtime_Semrelease(&m.sema, true, 1)
}
}
RWMutex
下面是 RWMutex
结构体
- rwmutexMaxReaders:共享读锁的 goroutine 数量上限,值为 2^29
- w:RWMutex 内置的互斥锁 sync.Mutex
- writerSem:写锁阻塞队列的信号量
- readerSem:读锁阻塞队列的信号量
- readerCount:无写锁情况下等于占用或等待读锁的 goroutine 数量,存在写锁流程时,该值一般为负数,实际值为读锁流程的 goroutine 数量减 rwmutexMaxReaders(2^29).
- readerWait:当前 goroutine 获取写锁前,还需要等待多少个 goroutine 释放读锁
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
}
const rwmutexMaxReaders = 1 << 30
RLock
下面是 RLock
流程:
- 基于原子操作,将 RWMutex 的 readCount 变量加一,表示占用或等待读锁的 goroutine 数加一
- 若 RWMutex.readCount 的新值仍小于 0,说明有 goroutine 未释放写锁,因此将当前 goroutine 添加到读锁的阻塞队列中并阻塞挂起
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}
RUnlock
下面是 RUnLock
流程:
基于原子操作,将 RWMutex 的 readCount 变量加一,表示占用或等待读锁的 goroutine 数减一
readCount 的新值小于 0,说明有 goroutine 在等待获取写锁,则走入 RWMutex.rUnlockSlow 的流程中.
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
rw.rUnlockSlow(r)
}
}
Lock
下面是 Lock
流程:
- 对 RWMutex 内置的互斥锁进行加锁操作;
- 基于原子操作,对 RWMutex.readerCount 进行减少 -rwmutexMaxReaders 的操作, 目的是阻塞即将获取写锁的 goroutine
- 倘若此时存在未释放读锁的 gouroutine(r != 0),将当前 goroutine 添加到写锁的阻塞队列中挂起
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
UnLock
下面是 UnLock
流程:
go
func (rw *RWMutex) Unlock() {
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
fatal("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}
- 基于原子操作,将 RWMutex.readerCount 的值加上 rwmutexMaxReaders
- 唤醒读锁阻塞队列中的所有 goroutine
- 解开 RWMutex 内置的互斥锁