TrumanWong

Golang互斥锁实现原理

TrumanWong
3/1/2022

互斥锁是一种混合锁,其实现方式包含了自旋锁,同时参考了操作系统锁的实现。sync.Mutex结构比较简单,其包含了表示当前锁状态的state及信号量sema

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
    state int32
    sema  uint32
}

state通过位图的形式存储了当前锁的状态,如下图所示,其中包含锁是否为锁定状态、正在等待被锁唤醒的协程数量、两个和饥饿模式有关的标志。

TrumanWong

为了解决某一个协程可能长时间无法获取锁的问题,Go 1.9之后使用了饥饿模式。在饥饿模式下,unlock会唤醒最先申请加速的协程,从而保证公平。sema是互质锁中实现的信号量,后面会详细讲解其用途。

互斥锁的第1个阶段是使用原子操作快速抢占锁,如果抢占成功则立即返回,如果抢占失败则调用lockSlow方法。

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}

lockSlow方法在正常情况下会自旋尝试抢占锁一段时间,而不会立即进入休眠状态,这使得互斥锁在频繁加锁与释放时也能良好工作。锁只有在正常模式下才能够进入自旋状态,runtime_canSpin函数会判断当前是否能进入自旋状态。

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // Active spinning makes sense.
            // Try to set mutexWoken flag to inform Unlock
            // to not wake other blocked goroutines.
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        new := old
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        // The current goroutine switches mutex to starvation mode.
        // But if the mutex is currently unlocked, don't do the switch.
        // Unlock expects that starving mutex has waiters, which will not
        // be true in this case.
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

在下面4种情况下,自旋状态立即终止:

  1. 程序在单核CPU上运行。
  2. 逻辑处理器P小于或等于1。
  3. 当前协程所在的逻辑处理器P的本地队列上有其他协程待运行。
  4. 自旋次数超过了设定的阈值。

进入自旋状态后,runtime_doSpin函数调用的procyield函数是一段汇编代码,会执行30次PAUSE指令占用CPU时间。

//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
    procyield(active_spin_cnt)
}

TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL	cycles+0(FP), AX
again:
    PAUSE
    SUBL	$1, AX
    JNZ	again
    RET

当长时间未获取到锁时,就进入互斥锁的第2个阶段,使用信号量进行同步。如果加锁操作进入信号量同步阶段,则信号量计数值减1。如果解锁操作进入信号量同步阶段,则信号量计数值加1。当信号量计数值大于0时,意味着有其他协程执行了解锁操作,这时加锁协程可以直接退出。当信号量计数值等于0时,意味着当前加锁协程需要陷入休眠状态。在互斥锁第3个阶段,所有锁的信息都会根据锁的地址存储在全局semtable哈希表中。

// Prime to not correlate with any user patterns.
const semTabSize = 251

var semtable [semTabSize]struct {
    root semaRoot
    pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

哈希函数为根据信号量地址简单取模。

func semroot(addr *uint32) *semaRoot {
    return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}

下图为互斥锁加入等待队列中的示意图,先根据哈希函数查找当前锁存储在哪一个哈希桶bucket中。哈希结果相同的多个锁可能存储在同一个哈希桶中,哈希桶中通过一根双向链表解决哈希冲突问题。

TrumanWong

哈希桶中的链表还被构造成了特殊的treap树,如下图所示。treap树是一种引入了随机数的二叉搜索树,其实现简单,引入的随机数及必要时的旋转保证了比较好的平衡性。

将哈希桶中锁的数据结构设计为二叉搜索树的主要目的是快速查找到当前哈希桶中是否存在已经存在过的锁,这时能够以log2N的时间复杂度进行查找。如果已经查找到存在该锁,则将当前的协程添加到等待队列的尾部。

TrumanWong

如果不存在该锁,则需要向当前treap树中添加一个新的元素。值得注意的是,由于在访问哈希表时,仍然可能面临并发的数据争用,因此这里也需要加锁,但是此处的锁和互斥锁有所不同,其实现方式为先自旋一定次数,如果还没有获取到锁,则调用操作系统级别的锁,在Linux中为pthread mutex互斥锁。所以Go语言中的互斥锁算一种混合锁,它结合了原子操作、自旋、信号量、全局哈希表、等待队列、操作系统级别锁等多种技术,在正常情况下是基本不会进入操作系统级别的锁。

锁被放置到全局的等待队列中并等待被唤醒,唤醒的顺序为从前到后,遵循先入先出的准则,这样保证了公平性。当长时间无法获取锁时,当前的互斥锁会进入饥饿模式。在饥饿模式下,为了保证公平性,新申请锁的协程不会进入自旋状态,而是直接放入等待队列中。放入等待队列中的协程会切换自己的执行状态,让渡执行权利并进入新的调度循环,这不会暂停线程的运行。

互斥锁的释放

互斥锁的释放和互斥锁的锁定相对应,其步骤如下:

  1. 如果当前锁处于普通的锁定状态,即没有进入饥饿状态和唤醒状态,也没有多个协程因为抢占锁陷入堵塞,则Unlock方法在修改mutexLocked状态后立即退出(快速路径)。否则,进入慢路径调用unlockSlow方法。

    // Unlock unlocks m.
    // It is a run-time error if m is not locked on entry to Unlock.
    //
    // A locked Mutex is not associated with a particular goroutine.
    // It is allowed for one goroutine to lock a Mutex and then
    // arrange for another goroutine to unlock it.
    func (m *Mutex) Unlock() {
        if race.Enabled {
            _ = m.state
            race.Release(unsafe.Pointer(m))
        }
    
        // Fast path: drop lock bit.
        new := atomic.AddInt32(&m.state, -mutexLocked)
        if new != 0 {
            // Outlined slow path to allow inlining the fast path.
            // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
            m.unlockSlow(new)
        }
    }
    
  2. 判断锁是否重复释放。锁不能重复释放,否则会在运行时报错。

    if (new+mutexLocked)&mutexLocked == 0 {
            throw("sync: unlock of unlocked mutex")
        }
    
  3. 如果锁当前处于饥饿状态,则进入信号量同步阶段,到全局哈希表中寻找当前锁的等待队列,以先入先出的顺序唤醒指定协程。

  4. 如果锁当前未处于饥饿状态且当前mutexWoken已设置,则表明有其他申请锁的协程准备从正常状态退出,这时锁释放后不用去当前锁的等待队列中唤醒其他协程,而是直接退出。如果唤醒了等待队列中的协程,则将唤醒的协程放入当前协程所在逻辑处理器P的runnext字段中,存储到runnext字段中的协程会被优先调度。如果在饥饿模式下,则当前协程会让渡自己的执行权利,让被唤醒的协程直接运行,这是通过将runtime_Semrelease函数第2个参数设置为true实现的。

    func (m *Mutex) unlockSlow(new int32) {
        if (new+mutexLocked)&mutexLocked == 0 {
            throw("sync: unlock of unlocked mutex")
        }
        if new&mutexStarving == 0 {
            old := new
            for {
                // If there are no waiters or a goroutine has already
                // been woken or grabbed the lock, no need to wake anyone.
                // In starvation mode ownership is directly handed off from unlocking
                // goroutine to the next waiter. We are not part of this chain,
                // since we did not observe mutexStarving when we unlocked the mutex above.
                // So get off the way.
                if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                    return
                }
                // Grab the right to wake someone.
                new = (old - 1<<mutexWaiterShift) | mutexWoken
                if atomic.CompareAndSwapInt32(&m.state, old, new) {
                    runtime_Semrelease(&m.sema, false, 1)
                    return
                }
                old = m.state
            }
        } else {
            // Starving mode: handoff mutex ownership to the next waiter, and yield
            // our time slice so that the next waiter can start to run immediately.
            // Note: mutexLocked is not set, the waiter will set it after wakeup.
            // But mutex is still considered locked if mutexStarving is set,
            // so new coming goroutines won't acquire it.
            runtime_Semrelease(&m.sema, true, 1)
        }
    }