[Go] Golang(1.23.2) 源码走读 - Mutex | RWMutex

14 天前(已编辑)
8

[Go] Golang(1.23.2) 源码走读 - Mutex | RWMutex

Mutex

下面是Mutex 结构体和位运算常量定义:

  • state: 32 位整数, 高 29 位存储阻塞的队列(最高 2^29-1), 最后一位(32为)存储 mutexLocked (是否上锁),倒数第二位存储 mutexWoken (协调唤醒操作,防止重复唤醒),倒数第三位存储 mutexStarving (是否处于饥饿模式)
  • sema: 通过 runtime 方法 runtime_SemacquireMutex 来阻塞/唤醒 goroutine 的信号量
const (
    mutexLocked = 1 << iota // mutex is locked // 1
    mutexWoken  // 2   
    mutexStarving  // 4 
    mutexWaiterShift = iota // 3  位运算计算队列等待数,要右移、左移的位数
    starvationThresholdNs = 1e6   // 饥饿模式等待阈值为 1ms
)

type Mutex struct {
    state int32
    sema  uint32
}

Lock

下面是 Lock 流程:

  • CAS 抢锁成功返回
  • 抢锁失败,则进入 lockSlow 流程
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}

下面是 lockSlow 流程:

  • 如果 锁已被占用 并且 锁不为饥饿模式 并且满足自旋条件(runtime_canSpin 方法),进入自旋处理环节
  • 在自旋处理环节中,如果当前锁有尚未唤醒的阻塞协程,则通过 CAS 操作将 state 的 mutexWoken 标识置为 1,将局部变量 awoke 置为 true
  • 调用 runtime_doSpin 告知调度器 P 当前处于自旋模式
  • 更新自旋次数 iter 和锁状态值 old
  • continue
func (m *Mutex) lockSlow() {
    var waitStartTime int64 // 标识当前 goroutine 在抢锁过程中的等待时长,单位:ns;
    starving := false // 标识当前是否处于饥饿模式
    awoke := false  // 标记当前 goroutine 是否从阻塞中被唤醒
    iter := 0 //标识当前 goroutine 参与自旋的次数 runtime_canSpin 判断是否已经达到 go 运行时最大的自旋次数
    old := m.state // 当前状态
    for {
        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // Active spinning makes sense.
            // Try to set mutexWoken flag to inform Unlock
            // to not wake other blocked goroutines.
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
  • 新值 new 中置为已加锁,即尝试抢锁
  • 旧值为已被其他 goroutine 加锁或者处于锁已经饥饿模式,可以直接令新值的阻塞协程数加1
  • 当前为饥饿模式且旧值已加锁,则将新值置为饥饿模式
  • 如果是唤醒后的 goroutine,将该 mutexWoken 标识更新置 0
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
    new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
    new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
    new |= mutexStarving
}
if awoke {
    // The goroutine has been woken from sleep,
    // so we need to reset the flag in either case.
    if new&mutexWoken == 0 {
        throw("sync: inconsistent mutex state")
    }
    new &^= mutexWoken
}
  • 旧值是未加锁状态且为不为饥饿模式,说明加锁成功,返回即可
  • 旧值中锁未释放或者处于饥饿模式,则当前 goroutine 需要进入阻塞队列挂起
  • queueLifo 标识当前 goroutine 是从阻塞队列被唤起的老 goroutine 还是新进流程的 goroutine
if atomic.CompareAndSwapInt32(&m.state, old, new) {
    if old&(mutexLocked|mutexStarving) == 0 {
        break // locked the mutex with CAS
    }
    // If we were already waiting before, queue at the front of the queue.
    queueLifo := waitStartTime != 0
    if waitStartTime == 0 {
        waitStartTime = runtime_nanotime()
    }
    runtime_SemacquireMutex(&m.sema, queueLifo, 1

下面是从阻塞态被唤醒的 goroutine 流程

  • 当前 goroutine 进入阻塞队列时间长达 1 ms,则置为饥饿模式
  • 此时锁是饥饿模式,则当前 goroutine 无需竞争可以直接获得锁(Unlock 流程,发现是饥饿模式,会直接唤醒队头的 goroutine)
    starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    old = m.state
    if old&mutexStarving != 0 {
        // If this goroutine was woken and mutex is in starvation mode,
        // ownership was handed off to us but mutex is in somewhat
        // inconsistent state: mutexLocked is not set and we are still
        // accounted as waiter. Fix that.
        if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
            throw("sync: inconsistent mutex state")
        }
        delta := int32(mutexLocked - 1<<mutexWaiterShift)
        if !starving || old>>mutexWaiterShift == 1 {
            // Exit starvation mode.
            // Critical to do it here and consider wait time.
            // Starvation mode is so inefficient, that two goroutines
            // can go lock-step infinitely once they switch mutex
            // to starvation mode.
            delta -= mutexStarving
        }
        atomic.AddInt32(&m.state, delta)
        break
    }
    awoke = true
    iter = 0
} else {
    old = m.state
}

Unlock

下面是 Unlock 流程:

  • 通过原子操作解锁 ;
  • 倘若解锁时发现,目前参与竞争的仅有自身一个 goroutine,则直接返回即可;
  • 倘若发现锁中还有阻塞协程,则走入 unlockSlow 分支.
func (m *Mutex) Unlock() {
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)
    }
}

解锁时,如果是非饥饿模式

  • 倘若阻塞队列内无 goroutine 或者 mutexLocked、mutexStarving、mutexWoken 标识位任一不为零,退出流程
  • 基于 CAS 操作将 Mutex.state 中的阻塞协程数减 1,倘若成功,则唤起阻塞队列头部的 goroutine,并退出
  • 倘若减少阻塞协程数的 CAS 操作失败,则更新此时的 Mutex.state 为新的 old 值,开启下一轮循环
func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        fatal("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            // If there are no waiters or a goroutine has already
            // been woken or grabbed the lock, no need to wake anyone.
            // In starvation mode ownership is directly handed off from unlocking
            // goroutine to the next waiter. We are not part of this chain,
            // since we did not observe mutexStarving when we unlocked the mutex above.
            // So get off the way.
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // Grab the right to wake someone.
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    }

如果是饥饿模式解锁

  • 直接唤醒阻塞队列头部的 goroutine 即可, 其余进来抢锁的 goroutine,发现是饥饿模式,都会被排到队列的队尾。
func (m *Mutex) unlockSlow(new int32) {
    if new&mutexStarving == 0 {
    } else {
        runtime_Semrelease(&m.sema, true, 1)
    }
}

RWMutex

下面是 RWMutex 结构体

  • rwmutexMaxReaders:共享读锁的 goroutine 数量上限,值为 2^29
  • w:RWMutex 内置的互斥锁 sync.Mutex
  • writerSem:写锁阻塞队列的信号量
  • readerSem:读锁阻塞队列的信号量
  • readerCount:无写锁情况下等于占用或等待读锁的 goroutine 数量,存在写锁流程时,该值一般为负数,实际值为读锁流程的 goroutine 数量减 rwmutexMaxReaders(2^29).
  • readerWait:当前 goroutine 获取写锁前,还需要等待多少个 goroutine 释放读锁
type RWMutex struct {
    w           Mutex        // held if there are pending writers
    writerSem   uint32       // semaphore for writers to wait for completing readers
    readerSem   uint32       // semaphore for readers to wait for completing writers
    readerCount atomic.Int32 // number of pending readers
    readerWait  atomic.Int32 // number of departing readers
}

const rwmutexMaxReaders = 1 << 30

RLock

下面是 RLock 流程:

  • 基于原子操作,将 RWMutex 的 readCount 变量加一,表示占用或等待读锁的 goroutine 数加一
  • 若 RWMutex.readCount 的新值仍小于 0,说明有 goroutine 未释放写锁,因此将当前 goroutine 添加到读锁的阻塞队列中并阻塞挂起
func (rw *RWMutex) RLock() {
    if rw.readerCount.Add(1) < 0 {
        // A writer is pending, wait for it.
        runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
    }
}

RUnlock

下面是 RUnLock 流程:

  • 基于原子操作,将 RWMutex 的 readCount 变量加一,表示占用或等待读锁的 goroutine 数减一

  • readCount 的新值小于 0,说明有 goroutine 在等待获取写锁,则走入 RWMutex.rUnlockSlow 的流程中.

func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        rw.rUnlockSlow(r)
    }
}

Lock

下面是 Lock 流程:

  • 对 RWMutex 内置的互斥锁进行加锁操作;
  • 基于原子操作,对 RWMutex.readerCount 进行减少 -rwmutexMaxReaders 的操作, 目的是阻塞即将获取写锁的 goroutine
  • 倘若此时存在未释放读锁的 gouroutine(r != 0),将当前 goroutine 添加到写锁的阻塞队列中挂起
func (rw *RWMutex) Lock() {
    rw.w.Lock()
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}

UnLock

下面是 UnLock 流程: go func (rw *RWMutex) Unlock() { r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { fatal("sync: Unlock of unlocked RWMutex") } for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } rw.w.Unlock() }

  • 基于原子操作,将 RWMutex.readerCount 的值加上 rwmutexMaxReaders
  • 唤醒读锁阻塞队列中的所有 goroutine
  • 解开 RWMutex 内置的互斥锁

使用社交账号登录

  • Loading...
  • Loading...
  • Loading...
  • Loading...
  • Loading...