Go 1.23
写成。map
是Go
语言中最常用的数据类型之一。在使用map
时需要特别注意,多个协程并发操作map
会抛出panic
异常。然而,在某些场景需要并发操作map
,这时可以采用互斥锁方案解决并发问题,或者可以使用Go
语言提供的并发散列表sync.Map
。
map
的并发问题下面我们用一个示例验证下“多个协程并发操作map
会抛出panic
异常”:
package main
func main() {
m := make(map[int]int)
for i := 0; i <= 10; i++ {
go func() {
for j := 0; j <= 10; j++ {
m[i] += j
}
}()
}
select {}
}
运行后程序抛出concurrent map writes
异常:
fatal error: fatal error: concurrent map writes
concurrent map writes
goroutine 6 [running]:
main.main.func1()
D:/goworkspace/examples/mapconcurrent/main.go:9 +0x35
created by main.main in goroutine 1
D:/goworkspace/examples/mapconcurrent/main.go:6 +0x37
根本原因在于Go
不允许多个携程并发操作map
,结构体map
包含一个标识字段(记录是否有协程正在写map
),当用户协程需要写map
时,第一步会检测该标识字段,如果其他协程也在写map
,则抛出panic
异常;如果没有协程在写map
,则更新该标识字段,并执行写操作。
写map
的实现函数为mapassign_faststr
,这里摘抄出Go
语言检测多个协程并发操作map
的逻辑,如下所示:
// runtime/map_faststr.go
func mapassign_faststr(t *maptype, h *hmap, s string) unsafe.Pointer {
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(unsafe.Pointer(h), callerpc, abi.FuncPCABIInternal(mapassign_faststr))
}
// 如果检测到其他协程正在写map,抛出panic异常
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}
key := stringStructOf(&s)
hash := t.Hasher(noescape(unsafe.Pointer(&s)), uintptr(h.hash0))
// 设置写标识
h.flags ^= hashWriting
...
}
虽然多个协程并发操作map
确实会抛出panic
异常,但是往往有业务场景需要并发操作map
。这时我们可以使用锁来解决并发问题,代码如下:
package main
import (
"log"
"sync"
"time"
)
func main() {
m := make(map[int]int, 0)
var lock sync.Mutex
for i := 0; i <= 10; i++ {
go func() {
for j := 0; j <= 10; j++ {
lock.Lock()
m[i] += j
lock.Unlock()
}
}()
}
time.Sleep(1 * time.Second)
log.Println(m)
}
但是锁通常会降低性能,如果需要高性能的并发操作map
,通过互斥锁的方式这时就不太合适了。因此我们可以使用Go提供的并发散列表sync.Map
。
sync.Map
我们将上面代码改成用sync.Map
来实现,如下所示:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var m sync.Map
for i := 0; i <= 10; i++ {
go func() {
for j := 0; j <= 10; j++ {
if val, ok := m.Load(i); ok {
m.Store(i, val.(int)+1)
} else {
m.Store(i, 1)
}
}
}()
}
time.Sleep(1 * time.Second)
m.Range(func(key, value interface{}) bool {
fmt.Println(key, value)
return true
})
}
sync.Map
的使用非常简单,Store
用于写入数据,Load
用于读取数据。
Load
方法有两个返回值:第一个是键值对的值value
,数据类型为interface
,使用时需要进行类型转换,第二个值类型为bool
,表示该键值对是否存在。sync.Map
的定义接下来我们看sync.Map
的定义,如下所示:
// sync/map.go
type Map struct {
mu Mutex
// read contains the portion of the map's contents that are safe for
// concurrent access (with or without mu held).
//
// The read field itself is always safe to load, but must only be stored with
// mu held.
//
// Entries stored in read may be updated concurrently without mu, but updating
// a previously-expunged entry requires that the entry be copied to the dirty
// map and unexpunged with mu held.
read atomic.Pointer[readOnly]
// dirty contains the portion of the map's contents that require mu to be
// held. To ensure that the dirty map can be promoted to the read map quickly,
// it also includes all of the non-expunged entries in the read map.
//
// Expunged entries are not stored in the dirty map. An expunged entry in the
// clean map must be unexpunged and added to the dirty map before a new value
// can be stored to it.
//
// If the dirty map is nil, the next write to the map will initialize it by
// making a shallow copy of the clean map, omitting stale entries.
dirty map[any]*entry
// misses counts the number of loads since the read map was last updated that
// needed to lock mu to determine whether the key was present.
//
// Once enough misses have occurred to cover the cost of copying the dirty
// map, the dirty map will be promoted to the read map (in the unamended
// state) and the next store to the map will make a new dirty copy.
misses int
}
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
m map[any]*entry
amended bool // true if the dirty map contains some key not in m.
}
sync.Map
各字段含义如下:
Load
方法的底层实现sync.Map
读取数据由Load
方法实现,代码如下所示:
// sync/map.go
func (m *Map) Load(key any) (value any, ok bool) {
read := m.loadReadOnly()
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// Avoid reporting a spurious miss if m.dirty got promoted while we were
// blocked on m.mu. (If further loads of the same key will not miss, it's
// not worth copying the dirty map for this key.)
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
从上面代码可以看到,第一步先从read.m
中查找键值对,如果没有查找到并且read.amended
为true
,则继续从散列表dirty
中查找。继续查找时需要先加锁,加锁之后再次查找散列表read.m
。
sync.Map
,从而修改散列表read.m
。方法missLocked
用于更新计数器misses
,当misses
累加到一定值时,还会将散列表dirty
的全量数据复制到散列表read.m
中:
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(&readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
Store
方法的底层实现sync.Map
写入数据由Store
方法实现,代码如下所示:
// sync/map.go
// Store sets the value for a key.
func (m *Map) Store(key, value any) {
_, _ = m.Swap(key, value)
}
// Swap swaps the value for a key and returns the previous value if any.
// The loaded result reports whether the key was present.
func (m *Map) Swap(key, value any) (previous any, loaded bool) {
read := m.loadReadOnly()
if e, ok := read.m[key]; ok {
if v, ok := e.trySwap(&value); ok {
if v == nil {
return nil, false
}
return *v, true
}
}
m.mu.Lock()
read = m.loadReadOnly()
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// The entry was previously expunged, which implies that there is a
// non-nil dirty map and this entry is not in it.
m.dirty[key] = e
}
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
} else if e, ok := m.dirty[key]; ok {
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(&readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
return previous, loaded
}
从上面代码可以看到,第一步先从散列表read.m
中查找键值对。如果存在该键值对,则通过乐观锁方式修改散列表read.m
,修改成功后直接返回。方法trySwap
底层就是基于乐观锁CAS
实现的。注意,在继续尝试操作散列表dirty
之前,首先需要加锁,并且还需要再次检测散列表read.m
是否存在该键值对,如果存在,则同时修改散列表read.m
与散列表dirty
。
read.m
?原因与之前介绍的一样,在第一次查找之后到加锁成功之间,其他协程可能会操作sync.Map
,从而修改散列表read.m
。最后,当散列表read.m
与散列表dirty
都不存在该键-值对时,显然此时需要新插入该键值对,在插入之前还有可能需要将散列表dirty
的全量数据复制到散列表read.m
中。
如果是读多写少的场景,sync.Map
的并发操作效率会比较高,因为这时大部分的数据读取操作是完全不需要加锁的,数据写入操作冲突的可能性也比较低。