TrumanWong

Golang读写锁原理

TrumanWong
3/1/2022

背景

在同一时间内只能有一个协程获取互斥锁并执行操作,在多读少写的情况下,如果长时间没有写操作,那么读取到的会是完全相同的值,完全不需要通过互斥的方式获取,这是读写锁产生的背景。

读写锁通过两种锁来实现,一种为读锁,另一种为写锁。当进行读取操作时,需要加读锁,而进行写入操作时需要加写锁。多个协程可以同时获得读锁并执行。如果此时有协程申请了写锁,那么该写锁会等待所有的读锁都释放后才能获取写锁继续执行。如果当前的协程申请读锁时已经存在写锁,那么读锁会等待写锁释放后再获取锁继续执行。

总之,读锁必须能观察到上一次写锁写入的值,写锁要等待之前的读锁释放才能写入。可能有多个协程获得读锁,但只有一个协程获得写锁。举一个简单的例子,哈希表并不是并发安全的,它只能够并发读取,并发写入时会出现冲突。一种简单的规避方式如下所示,可以在获取map中的数据时加入RLock读锁,在写入数据时使用Lock写锁。

type Stat struct {
    counters map[string]int
    mutex sync.RWMutex
}

func (s *Stat) getCounter(name string) int {
    s.mutex.RLock()
    defer s.mutex.RUnlock()
    return s.counters[name]
}

func (s *Stat) setCounter(name string) {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    s.counters[name]++
}

读写锁原理

读写锁位于sync标准库中,其结构如下。读写锁复用了互斥锁及信号量这两种机制。

// There is a modified copy of this file in runtime/rwmutex.go.
// If you make any changes here, see if you should make them there.

// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
//
// A RWMutex must not be copied after first use.
//
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
type RWMutex struct {
    w           Mutex  // held if there are pending writers
    writerSem   uint32 // semaphore for writers to wait for completing readers
    readerSem   uint32 // semaphore for readers to wait for completing writers
    readerCount int32  // number of pending readers
    readerWait  int32  // number of departing readers
}

读取操作先通过原子操作将readerCount + 1,如果readerCount ≥ 0就直接返回,所以如果只有获取读取锁的操作,那么其成本只有一个原子操作。当readerCount < 0时,说明当前有写锁,当前协程将借助信号量陷入等待状态,如果获取到信号量则立即退出,没有获取到信号量时的逻辑与互斥锁的逻辑相似。

func (rw *RWMutex) RLock() {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // A writer is pending, wait for it.
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
    }
}

读锁解锁时,如果当前没有写锁,则其成本只有一个原子操作并直接退出。

func (rw *RWMutex) RUnlock() {
    if race.Enabled {
        _ = rw.w.state
        race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
        race.Disable()
    }
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        // Outlined slow-path to allow the fast-path to be inlined
        rw.rUnlockSlow(r)
    }
    if race.Enabled {
        race.Enable()
    }
}

如果当前有写锁正在等待,则调用rUnlockSlow判断当前是否为最后一个被释放的读锁,如果是则需要增加信号量并唤醒写锁。

func (rw *RWMutex) rUnlockSlow(r int32) {
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        race.Enable()
        throw("sync: RUnlock of unlocked RWMutex")
    }
    // A writer is pending.
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        // The last reader unblocks the writer.
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

读写锁申请写锁时要调用Lock方法,必须先获取互斥锁,因为它复用了互斥锁的功能。接着readerCount减去rwmutexMaxReaders阻止后续的读操作。

但获取互斥锁并不一定能直接获取写锁,如果当前已经有其他Goroutine持有互斥锁的读锁,那么当前协程会加入全局等待队列并进入休眠状态,当最后一个读锁被释放时,会唤醒该协程。

func (rw *RWMutex) Lock() {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    // First, resolve competition with other writers.
    rw.w.Lock()
    // Announce to readers there is a pending writer.
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // Wait for active readers.
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
        race.Acquire(unsafe.Pointer(&rw.writerSem))
    }
}

解锁时,调用Unlock方法。将readerCount加上rwmutexMaxReaders,表示不会堵塞后续的读锁,依次唤醒所有等待中的读锁。当所有的读锁唤醒完毕后会释放互斥锁。

func (rw *RWMutex) Unlock() {
    if race.Enabled {
        _ = rw.w.state
        race.Release(unsafe.Pointer(&rw.readerSem))
        race.Disable()
    }

    // Announce to readers there is no active writer.
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        race.Enable()
        throw("sync: Unlock of unlocked RWMutex")
    }
    // Unblock blocked readers, if any.
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // Allow other writers to proceed.
    rw.w.Unlock()
    if race.Enabled {
        race.Enable()
    }
}

可以看出,读写锁在写操作时的性能与互斥锁类似,但是在只有读操作时效率要高很多,因为读锁可以被多个协程获取。