在同一时间内只能有一个协程获取互斥锁并执行操作,在多读少写的情况下,如果长时间没有写操作,那么读取到的会是完全相同的值,完全不需要通过互斥的方式获取,这是读写锁产生的背景。
读写锁通过两种锁来实现,一种为读锁,另一种为写锁。当进行读取操作时,需要加读锁,而进行写入操作时需要加写锁。多个协程可以同时获得读锁并执行。如果此时有协程申请了写锁,那么该写锁会等待所有的读锁都释放后才能获取写锁继续执行。如果当前的协程申请读锁时已经存在写锁,那么读锁会等待写锁释放后再获取锁继续执行。
总之,读锁必须能观察到上一次写锁写入的值,写锁要等待之前的读锁释放才能写入。可能有多个协程获得读锁,但只有一个协程获得写锁。举一个简单的例子,哈希表并不是并发安全的,它只能够并发读取,并发写入时会出现冲突。一种简单的规避方式如下所示,可以在获取map
中的数据时加入RLock
读锁,在写入数据时使用Lock
写锁。
type Stat struct {
counters map[string]int
mutex sync.RWMutex
}
func (s *Stat) getCounter(name string) int {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.counters[name]
}
func (s *Stat) setCounter(name string) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.counters[name]++
}
读写锁位于sync
标准库中,其结构如下。读写锁复用了互斥锁及信号量这两种机制。
// There is a modified copy of this file in runtime/rwmutex.go.
// If you make any changes here, see if you should make them there.
// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
//
// A RWMutex must not be copied after first use.
//
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
读取操作先通过原子操作将readerCount + 1
,如果readerCount ≥ 0
就直接返回,所以如果只有获取读取锁的操作,那么其成本只有一个原子操作。当readerCount < 0
时,说明当前有写锁,当前协程将借助信号量陷入等待状态,如果获取到信号量则立即退出,没有获取到信号量时的逻辑与互斥锁的逻辑相似。
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
读锁解锁时,如果当前没有写锁,则其成本只有一个原子操作并直接退出。
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
如果当前有写锁正在等待,则调用rUnlockSlow
判断当前是否为最后一个被释放的读锁,如果是则需要增加信号量并唤醒写锁。
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
读写锁申请写锁时要调用Lock
方法,必须先获取互斥锁,因为它复用了互斥锁的功能。接着readerCount
减去rwmutexMaxReaders
阻止后续的读操作。
但获取互斥锁并不一定能直接获取写锁,如果当前已经有其他Goroutine
持有互斥锁的读锁,那么当前协程会加入全局等待队列并进入休眠状态,当最后一个读锁被释放时,会唤醒该协程。
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
解锁时,调用Unlock
方法。将readerCount
加上rwmutexMaxReaders
,表示不会堵塞后续的读锁,依次唤醒所有等待中的读锁。当所有的读锁唤醒完毕后会释放互斥锁。
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
可以看出,读写锁在写操作时的性能与互斥锁类似,但是在只有读操作时效率要高很多,因为读锁可以被多个协程获取。