互斥锁是一种混合锁,其实现方式包含了自旋锁,同时参考了操作系统锁的实现。sync.Mutex
结构比较简单,其包含了表示当前锁状态的state
及信号量sema
。
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}
state
通过位图的形式存储了当前锁的状态,如下图所示,其中包含锁是否为锁定状态、正在等待被锁唤醒的协程数量、两个和饥饿模式有关的标志。
为了解决某一个协程可能长时间无法获取锁的问题,Go 1.9之后使用了饥饿模式。在饥饿模式下,unlock
会唤醒最先申请加速的协程,从而保证公平。sema
是互质锁中实现的信号量,后面会详细讲解其用途。
互斥锁的第1个阶段是使用原子操作快速抢占锁,如果抢占成功则立即返回,如果抢占失败则调用lockSlow
方法。
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
lockSlow
方法在正常情况下会自旋尝试抢占锁一段时间,而不会立即进入休眠状态,这使得互斥锁在频繁加锁与释放时也能良好工作。锁只有在正常模式下才能够进入自旋状态,runtime_canSpin
函数会判断当前是否能进入自旋状态。
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
在下面4种情况下,自旋状态立即终止:
进入自旋状态后,runtime_doSpin
函数调用的procyield
函数是一段汇编代码,会执行30次PAUSE
指令占用CPU
时间。
//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
当长时间未获取到锁时,就进入互斥锁的第2个阶段,使用信号量进行同步。如果加锁操作进入信号量同步阶段,则信号量计数值减1。如果解锁操作进入信号量同步阶段,则信号量计数值加1。当信号量计数值大于0时,意味着有其他协程执行了解锁操作,这时加锁协程可以直接退出。当信号量计数值等于0时,意味着当前加锁协程需要陷入休眠状态。在互斥锁第3个阶段,所有锁的信息都会根据锁的地址存储在全局semtable
哈希表中。
// Prime to not correlate with any user patterns.
const semTabSize = 251
var semtable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}
哈希函数为根据信号量地址简单取模。
func semroot(addr *uint32) *semaRoot {
return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}
下图为互斥锁加入等待队列中的示意图,先根据哈希函数查找当前锁存储在哪一个哈希桶bucket
中。哈希结果相同的多个锁可能存储在同一个哈希桶中,哈希桶中通过一根双向链表解决哈希冲突问题。
哈希桶中的链表还被构造成了特殊的treap
树,如下图所示。treap
树是一种引入了随机数的二叉搜索树,其实现简单,引入的随机数及必要时的旋转保证了比较好的平衡性。
将哈希桶中锁的数据结构设计为二叉搜索树的主要目的是快速查找到当前哈希桶中是否存在已经存在过的锁,这时能够以log2N的时间复杂度进行查找。如果已经查找到存在该锁,则将当前的协程添加到等待队列的尾部。
如果不存在该锁,则需要向当前treap树中添加一个新的元素。值得注意的是,由于在访问哈希表时,仍然可能面临并发的数据争用,因此这里也需要加锁,但是此处的锁和互斥锁有所不同,其实现方式为先自旋一定次数,如果还没有获取到锁,则调用操作系统级别的锁,在Linux
中为pthread mutex
互斥锁。所以Go语言中的互斥锁算一种混合锁,它结合了原子操作、自旋、信号量、全局哈希表、等待队列、操作系统级别锁等多种技术,在正常情况下是基本不会进入操作系统级别的锁。
锁被放置到全局的等待队列中并等待被唤醒,唤醒的顺序为从前到后,遵循先入先出的准则,这样保证了公平性。当长时间无法获取锁时,当前的互斥锁会进入饥饿模式。在饥饿模式下,为了保证公平性,新申请锁的协程不会进入自旋状态,而是直接放入等待队列中。放入等待队列中的协程会切换自己的执行状态,让渡执行权利并进入新的调度循环,这不会暂停线程的运行。
互斥锁的释放和互斥锁的锁定相对应,其步骤如下: