Golang并发操作map

TrumanWong
11/23/2024
TrumanWong
本文基于Go 1.23写成。

mapGo语言中最常用的数据类型之一。在使用map时需要特别注意,多个协程并发操作map会抛出panic异常。然而,在某些场景需要并发操作map,这时可以采用互斥锁方案解决并发问题,或者可以使用Go语言提供的并发散列表sync.Map

map的并发问题

下面我们用一个示例验证下“多个协程并发操作map会抛出panic异常”:

package main

func main() {
	m := make(map[int]int)
	for i := 0; i <= 10; i++ {
		go func() {
            for j := 0; j <= 10; j++ {
                m[i] += j
            }
		}()
	}
	select {}
}

运行后程序抛出concurrent map writes异常:

fatal error: fatal error: concurrent map writes
concurrent map writes

goroutine 6 [running]:
main.main.func1()
	D:/goworkspace/examples/mapconcurrent/main.go:9 +0x35
created by main.main in goroutine 1
	D:/goworkspace/examples/mapconcurrent/main.go:6 +0x37

根本原因在于Go不允许多个携程并发操作map,结构体map包含一个标识字段(记录是否有协程正在写map),当用户协程需要写map时,第一步会检测该标识字段,如果其他协程也在写map,则抛出panic异常;如果没有协程在写map,则更新该标识字段,并执行写操作。

map的实现函数为mapassign_faststr,这里摘抄出Go语言检测多个协程并发操作map的逻辑,如下所示:

// runtime/map_faststr.go
func mapassign_faststr(t *maptype, h *hmap, s string) unsafe.Pointer {
	if h == nil {
		panic(plainError("assignment to entry in nil map"))
	}
	if raceenabled {
		callerpc := getcallerpc()
		racewritepc(unsafe.Pointer(h), callerpc, abi.FuncPCABIInternal(mapassign_faststr))
	}
	// 如果检测到其他协程正在写map,抛出panic异常
	if h.flags&hashWriting != 0 {
		fatal("concurrent map writes")
	}
	key := stringStructOf(&s)
	hash := t.Hasher(noescape(unsafe.Pointer(&s)), uintptr(h.hash0))

	// 设置写标识
	h.flags ^= hashWriting
	...
}

虽然多个协程并发操作map确实会抛出panic异常,但是往往有业务场景需要并发操作map。这时我们可以使用锁来解决并发问题,代码如下:

package main

import (
	"log"
	"sync"
	"time"
)

func main() {
	m := make(map[int]int, 0)
	var lock sync.Mutex
	for i := 0; i <= 10; i++ {
		go func() {
			for j := 0; j <= 10; j++ {
				lock.Lock()
				m[i] += j
				lock.Unlock()
			}
		}()
	}
	time.Sleep(1 * time.Second)
	log.Println(m)
}

但是锁通常会降低性能,如果需要高性能的并发操作map,通过互斥锁的方式这时就不太合适了。因此我们可以使用Go提供的并发散列表sync.Map

并发散列表sync.Map

我们将上面代码改成用sync.Map来实现,如下所示:

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var m sync.Map
	for i := 0; i <= 10; i++ {
		go func() {
			for j := 0; j <= 10; j++ {
				if val, ok := m.Load(i); ok {
					m.Store(i, val.(int)+1)
				} else {
					m.Store(i, 1)
				}
			}
		}()
	}
	time.Sleep(1 * time.Second)
	m.Range(func(key, value interface{}) bool {
		fmt.Println(key, value)
		return true
	})
}

sync.Map的使用非常简单,Store用于写入数据,Load用于读取数据。

Load方法有两个返回值:第一个是键值对的值value,数据类型为interface,使用时需要进行类型转换,第二个值类型为bool,表示该键值对是否存在。

sync.Map的定义

接下来我们看sync.Map的定义,如下所示:

// sync/map.go
type Map struct {
	mu Mutex

	// read contains the portion of the map's contents that are safe for
	// concurrent access (with or without mu held).
	//
	// The read field itself is always safe to load, but must only be stored with
	// mu held.
	//
	// Entries stored in read may be updated concurrently without mu, but updating
	// a previously-expunged entry requires that the entry be copied to the dirty
	// map and unexpunged with mu held.
	read atomic.Pointer[readOnly]

	// dirty contains the portion of the map's contents that require mu to be
	// held. To ensure that the dirty map can be promoted to the read map quickly,
	// it also includes all of the non-expunged entries in the read map.
	//
	// Expunged entries are not stored in the dirty map. An expunged entry in the
	// clean map must be unexpunged and added to the dirty map before a new value
	// can be stored to it.
	//
	// If the dirty map is nil, the next write to the map will initialize it by
	// making a shallow copy of the clean map, omitting stale entries.
	dirty map[any]*entry

	// misses counts the number of loads since the read map was last updated that
	// needed to lock mu to determine whether the key was present.
	//
	// Once enough misses have occurred to cover the cost of copying the dirty
	// map, the dirty map will be promoted to the read map (in the unamended
	// state) and the next store to the map will make a new dirty copy.
	misses int
}

// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
	m       map[any]*entry
	amended bool // true if the dirty map contains some key not in m.
}

sync.Map各字段含义如下:

Load方法的底层实现

sync.Map读取数据由Load方法实现,代码如下所示:

// sync/map.go
func (m *Map) Load(key any) (value any, ok bool) {
	read := m.loadReadOnly()
	e, ok := read.m[key]
	if !ok && read.amended {
		m.mu.Lock()
		// Avoid reporting a spurious miss if m.dirty got promoted while we were
		// blocked on m.mu. (If further loads of the same key will not miss, it's
		// not worth copying the dirty map for this key.)
		read = m.loadReadOnly()
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
			// Regardless of whether the entry was present, record a miss: this key
			// will take the slow path until the dirty map is promoted to the read
			// map.
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}

从上面代码可以看到,第一步先从read.m中查找键值对,如果没有查找到并且read.amendedtrue,则继续从散列表dirty中查找。继续查找时需要先加锁,加锁之后再次查找散列表read.m

为什么要再查找一次?因为在第一次查找之后到加锁成功之间,其他协程可能会操作sync.Map,从而修改散列表read.m

方法missLocked用于更新计数器misses,当misses累加到一定值时,还会将散列表dirty的全量数据复制到散列表read.m中:

func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	m.read.Store(&readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

Store方法的底层实现

sync.Map写入数据由Store方法实现,代码如下所示:

// sync/map.go
// Store sets the value for a key.
func (m *Map) Store(key, value any) {
	_, _ = m.Swap(key, value)
}

// Swap swaps the value for a key and returns the previous value if any.
// The loaded result reports whether the key was present.
func (m *Map) Swap(key, value any) (previous any, loaded bool) {
	read := m.loadReadOnly()
	if e, ok := read.m[key]; ok {
		if v, ok := e.trySwap(&value); ok {
			if v == nil {
				return nil, false
			}
			return *v, true
		}
	}

	m.mu.Lock()
	read = m.loadReadOnly()
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
			// The entry was previously expunged, which implies that there is a
			// non-nil dirty map and this entry is not in it.
			m.dirty[key] = e
		}
		if v := e.swapLocked(&value); v != nil {
			loaded = true
			previous = *v
		}
	} else if e, ok := m.dirty[key]; ok {
		if v := e.swapLocked(&value); v != nil {
			loaded = true
			previous = *v
		}
	} else {
		if !read.amended {
			// We're adding the first new key to the dirty map.
			// Make sure it is allocated and mark the read-only map as incomplete.
			m.dirtyLocked()
			m.read.Store(&readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
	}
	m.mu.Unlock()
	return previous, loaded
}

从上面代码可以看到,第一步先从散列表read.m中查找键值对。如果存在该键值对,则通过乐观锁方式修改散列表read.m,修改成功后直接返回。方法trySwap底层就是基于乐观锁CAS实现的。注意,在继续尝试操作散列表dirty之前,首先需要加锁,并且还需要再次检测散列表read.m是否存在该键值对,如果存在,则同时修改散列表read.m与散列表dirty

为什么要再次检测散列表read.m?原因与之前介绍的一样,在第一次查找之后到加锁成功之间,其他协程可能会操作sync.Map,从而修改散列表read.m

最后,当散列表read.m与散列表dirty都不存在该键-值对时,显然此时需要新插入该键值对,在插入之前还有可能需要将散列表dirty的全量数据复制到散列表read.m中。

总结

如果是读多写少的场景,sync.Map的并发操作效率会比较高,因为这时大部分的数据读取操作是完全不需要加锁的,数据写入操作冲突的可能性也比较低。