哈希表与Go语言实现机制

TrumanWong
12/10/2021
TrumanWong

Go语言中的map又被称为哈希表,是使用频率极高的一种数据结构。哈希表的原理是将多个键/值key/value对分散存储在buckets(桶)中。给定一个键key,哈希Hash算法会计算出键值对存储的位置。

哈希碰撞与解决方法

哈希碰撞导致同一个桶中可能存在多个元素,有多种方式可以避免哈希碰撞,一般有两种主要的策略:拉链法及开放寻址法。

拉链法

如下图所示,拉链法将同一个桶中的元素通过链表的形式进行链接,这是一种最简单、最常用的策略。随着桶中元素的增加,可以不断链接新的元素,同时不用预先为元素分配内存。拉链法的不足之处在于,需要存储额外的指针用于链接元素,这增加了整个哈希表的大小。同时由于链表存储的地址不连续,所以无法高效利用CPU高速缓存。

img

开放寻址法Open Addressing

与拉链法对应的另一种解决哈希碰撞的策略为开放寻址法Open Addressing,如下所示,所有元素都存储在桶的数组中。当必须插入新条目时,将按某种探测策略操作,直到找到未使用的数组插槽为止。当搜索元素时,将按相同顺序扫描存储桶,直到查找到目标记录或找到未使用的插槽为止。

img

哈希表底层结构

Go 语言map底层实现如下:

// runtime/map.go
// A header for a Go map.
type hmap struct {
	// Note: the format of the hmap is also encoded in cmd/compile/internal/reflectdata/reflect.go.
	// Make sure this stays in sync with the compiler's definition.
	count     int // # live cells == size of map.  Must be first (used by len() builtin)
	flags     uint8
	B         uint8  // log_2 of # of buckets (can hold up to loadFactor * 2^B items)
	noverflow uint16 // approximate number of overflow buckets; see incrnoverflow for details
	hash0     uint32 // hash seed

	buckets    unsafe.Pointer // array of 2^B Buckets. may be nil if count==0.
	oldbuckets unsafe.Pointer // previous bucket array of half the size, non-nil only when growing
	nevacuate  uintptr        // progress counter for evacuation (buckets less than this have been evacuated)

	extra *mapextra // optional fields
}

// mapextra holds fields that are not present on all maps.
type mapextra struct {
	// If both key and elem do not contain pointers and are inline, then we mark bucket
	// type as containing no pointers. This avoids scanning such maps.
	// However, bmap.overflow is a pointer. In order to keep overflow buckets
	// alive, we store pointers to all overflow buckets in hmap.extra.overflow and hmap.extra.oldoverflow.
	// overflow and oldoverflow are only used if key and elem do not contain pointers.
	// overflow contains overflow buckets for hmap.buckets.
	// oldoverflow contains overflow buckets for hmap.oldbuckets.
	// The indirection allows to store a pointer to the slice in hiter.
	overflow    *[]*bmap
	oldoverflow *[]*bmap

	// nextOverflow holds a pointer to a free overflow bucket.
	nextOverflow *bmap
}

其中:

代表桶的bmap结构在运行时只列出了首个字段,即一个固定长度为8的数组。此字段顺序存储key的哈希值的前8位。

// A bucket for a Go map.
type bmap struct {
	// tophash generally contains the top byte of the hash value
	// for each key in this bucket. If tophash[0] < minTopHash,
	// tophash[0] is a bucket evacuation state instead.
	tophash [bucketCnt]uint8
	// Followed by bucketCnt keys and then bucketCnt elems.
	// NOTE: packing all the keys together and then all the elems together makes the
	// code a bit more complicated than alternating key/elem/key/elem/... but it allows
	// us to eliminate padding which would be needed for, e.g., map[int64]int8.
	// Followed by an overflow pointer.
}

在运行期间,bmap结构体其实不止包含 tophash 字段,因为哈希表中可能存储不同类型的键值对,而且 Go 语言也不支持泛型,所以键值对占据的内存空间大小只能在编译时进行推导。bmap中的其他字段在运行时也都是通过计算内存地址的方式访问的,所以它的定义中就不包含这些字段,不过我们能根据编译期间的 cmd/compile/internal/reflectdata/reflect.MapBucketType函数重建它的结构:

// MapBucketType makes the map bucket type given the type of the map.
func MapBucketType(t *types.Type) *types.Type {
	if t.MapType().Bucket != nil {
		return t.MapType().Bucket
	}

	keytype := t.Key()
	elemtype := t.Elem()
	types.CalcSize(keytype)
	types.CalcSize(elemtype)
	if keytype.Width > MAXKEYSIZE {
		keytype = types.NewPtr(keytype)
	}
	if elemtype.Width > MAXELEMSIZE {
		elemtype = types.NewPtr(elemtype)
	}

	field := make([]*types.Field, 0, 5)

	// The first field is: uint8 topbits[BUCKETSIZE].
	arr := types.NewArray(types.Types[types.TUINT8], BUCKETSIZE)
	field = append(field, makefield("topbits", arr))

	arr = types.NewArray(keytype, BUCKETSIZE)
	arr.SetNoalg(true)
	keys := makefield("keys", arr)
	field = append(field, keys)

	arr = types.NewArray(elemtype, BUCKETSIZE)
	arr.SetNoalg(true)
	elems := makefield("elems", arr)
	field = append(field, elems)

	// If keys and elems have no pointers, the map implementation
	// can keep a list of overflow pointers on the side so that
	// buckets can be marked as having no pointers.
	// Arrange for the bucket to have no pointers by changing
	// the type of the overflow field to uintptr in this case.
	// See comment on hmap.overflow in runtime/map.go.
	otyp := types.Types[types.TUNSAFEPTR]
	if !elemtype.HasPointers() && !keytype.HasPointers() {
		otyp = types.Types[types.TUINTPTR]
	}
	overflow := makefield("overflow", otyp)
	field = append(field, overflow)

	// link up fields
	bucket := types.NewStruct(types.NoPkg, field[:])
	bucket.SetNoalg(true)
	types.CalcSize(bucket)

	// Check invariants that map code depends on.
	if !types.IsComparable(t.Key()) {
		base.Fatalf("unsupported map key type for %v", t)
	}
	if BUCKETSIZE < 8 {
		base.Fatalf("bucket size too small for proper alignment")
	}
	if keytype.Align > BUCKETSIZE {
		base.Fatalf("key align too big for %v", t)
	}
	if elemtype.Align > BUCKETSIZE {
		base.Fatalf("elem align too big for %v", t)
	}
	if keytype.Width > MAXKEYSIZE {
		base.Fatalf("key size to large for %v", t)
	}
	if elemtype.Width > MAXELEMSIZE {
		base.Fatalf("elem size to large for %v", t)
	}
	if t.Key().Width > MAXKEYSIZE && !keytype.IsPtr() {
		base.Fatalf("key indirect incorrect for %v", t)
	}
	if t.Elem().Width > MAXELEMSIZE && !elemtype.IsPtr() {
		base.Fatalf("elem indirect incorrect for %v", t)
	}
	if keytype.Width%int64(keytype.Align) != 0 {
		base.Fatalf("key size not a multiple of key align for %v", t)
	}
	if elemtype.Width%int64(elemtype.Align) != 0 {
		base.Fatalf("elem size not a multiple of elem align for %v", t)
	}
	if bucket.Align%keytype.Align != 0 {
		base.Fatalf("bucket align not multiple of key align %v", t)
	}
	if bucket.Align%elemtype.Align != 0 {
		base.Fatalf("bucket align not multiple of elem align %v", t)
	}
	if keys.Offset%int64(keytype.Align) != 0 {
		base.Fatalf("bad alignment of keys in bmap for %v", t)
	}
	if elems.Offset%int64(elemtype.Align) != 0 {
		base.Fatalf("bad alignment of elems in bmap for %v", t)
	}

	// Double-check that overflow field is final memory in struct,
	// with no padding at end.
	if overflow.Offset != bucket.Width-int64(types.PtrSize) {
		base.Fatalf("bad offset of overflow in bmap for %v", t)
	}

	t.MapType().Bucket = bucket

	bucket.StructType().Map = t
	return bucket
}

从上面的函数代码可以看出,重建后的结构如下:

type bmap struct {
    topbits  [8]uint8
    keys     [8]keytype
    elems    [8]elemtype
    overflow uintptr
}

Go语言选择将keyvalue分开存储而不是以key/value的形式存储,是为了在字节对齐时压缩空间。

在进行hash[key]map访问操作时,会首先找到桶的位置,如下为伪代码:

hash = hashfunc(key)
index = hash % array_size

找到桶的位置后遍历tophash数组,如下图所示,如果在数组中找到了相同的hash,那么可以接着通过指针的寻址操作找到对应的keyvalue

img

Go语言中还有一个溢出桶的概念,在执行hash[key]=value赋值操作时,当指定桶中的数据超过8个时,并不会直接开辟一个新桶,而是将数据放置到溢出桶中,每个桶的最后都存储了overflow,即溢出桶的指针。在正常情况下,数据是很少会跑到溢出桶里面去的。

同理,我们可以知道,在map执行查找操作时,如果keyhash在指定桶的tophash数组中不存在,那么需要遍历溢出桶中的数据。后面还会看到,如果一开始,初始化map的数量比较大,则map会提前创建好一些溢出桶存储在extra *mapextra字段。

这样当出现溢出现象时,可以用提前创建好的桶而不用申请额外的内存空间。只有预分配的溢出桶使用完了,才会新建溢出桶。当发生以下两种情况之一时,map会进行重建:

在哈希表中有经典的负载因子的概念:

负载因子=哈希表中的元素数量/桶的数量

负载因子的增大,意味着更多的元素会被分配到同一个桶中,此时效率会减慢。试想,如果桶的数量只有1个,则此时负载因子到达最大,搜索效率就成了遍历数组。Go语言中的负载因子为6.5,当超过其大小后,map会进行扩容,增大到旧表2倍的大小,如下图所示。

img

旧桶的数据会存到oldbuckets字段中,并想办法分散转移到新桶中。当旧桶中的数据全部转移到新桶中后,旧桶就会被清空。map的重建还存在第二种情况,即溢出桶的数量太多,这时map只会新建和原来相同大小的桶,目的是防止溢出桶的数量缓慢增长导致的内存泄露。

如下图所示,当进行mapdelete操作时,和赋值操作类似,delete操作会根据key找到指定的桶,如果存在指定的key,那么就释放掉keyvalue引用的内存。同时tophash中的指定位置会存储emptyOne,代表当前位置是空的。

img

删除操作会探测当前要删除的元素之后是否都是空的。如果是,则tophash会存储为emptyRest。这样做的好处是在做查找操作时,遇到emptyRest可以直接退出,因为后面的元素都是空的。

深入哈希表原理

make初始化原理

如果使用make关键字初始化map,则在typecheck1类型检查阶段,将节点NodeOp操作变为OMAKEMAP;如果make指定了哈希表的长度,则会将长度常量值类型转换为TINT;如果未指定长度,则长度为0nodintconst(0)

# cmd/compile/internal/typecheck/typecheck.go
// typecheck1 should ONLY be called from typecheck.
func typecheck1(n ir.Node, top int) ir.Node {
	if n, ok := n.(*ir.Name); ok {
		typecheckdef(n)
	}
    ...
    case ir.OMAKE:
		n := n.(*ir.CallExpr)
		return tcMake(n)
    ...
}

# cmd/compile/internal/typecheck/func.go
func tcMake(n *ir.CallExpr) ir.Node {
	args := n.Args
	if len(args) == 0 {
		base.Errorf("missing argument to make")
		n.SetType(nil)
		return n
	}

	n.Args = nil
	l := args[0]
	l = typecheck(l, ctxType)
	t := l.Type()
	if t == nil {
		n.SetType(nil)
		return n
	}
    
	i := 1
	var nn ir.Node
	switch t.Kind() {
    ...
    case types.TMAP:
		if i < len(args) {
			l = args[i]
			i++
			l = Expr(l)
			l = DefaultLit(l, types.Types[types.TINT])
			if l.Type() == nil {
				n.SetType(nil)
				return n
			}
			if !checkmake(t, "size", &l) {
				n.SetType(nil)
				return n
			}
		} else {
			l = ir.NewInt(0)
		}
		nn = ir.NewMakeExpr(n.Pos(), ir.OMAKEMAP, l, nil)
		nn.SetEsc(n.Esc())
     ...
    }
}

如果make的第二个参数不是整数,则会在类型检查时报错。

# cmd/compile/internal/typecheck/typecheck.go
func checkmake(t *types.Type, arg string, np *ir.Node) bool {
	n := *np
	if !n.Type().IsInteger() && n.Type().Kind() != types.TIDEAL {
		base.Errorf("non-integer %s argument in make(%v) - %v", arg, t, n.Type())
		return false
	}
	...
}

在编译时的函数walk遍历阶段,walkexpr函数会指定运行时应该调用runtime.makemap函数还是runtime.makemap64函数。

# cmd/compile/internal/walk/expr.go
// The result of walkExpr MUST be assigned back to n, e.g.
// 	n.Left = walkExpr(n.Left, init)
func walkExpr(n ir.Node, init *ir.Nodes) ir.Node {
    ...
    n = walkExpr1(n, init)
    ...
}

# cmd/compile/internal/walk/expr.go
func walkExpr1(n ir.Node, init *ir.Nodes) ir.Node {
	switch n.Op() {
        ...
        case ir.OMAKEMAP:
		n := n.(*ir.MakeExpr)
		return walkMakeMap(n, init)
        ...
    }
}

# cmd/compile/internal/walk/builtin.go
// walkMakeMap walks an OMAKEMAP node.
func walkMakeMap(n *ir.MakeExpr, init *ir.Nodes) ir.Node {
	...
	// When hint fits into int, use makemap instead of
	// makemap64, which is faster and shorter on 32 bit platforms.
	fnname := "makemap64"
	argtype := types.Types[types.TINT64]

	// Type checking guarantees that TIDEAL hint is positive and fits in an int.
	// See checkmake call in TMAP case of OMAKE case in OpSwitch in typecheck1 function.
	// The case of hint overflow when converting TUINT or TUINTPTR to TINT
	// will be handled by the negative range checks in makemap during runtime.
	if hint.Type().IsKind(types.TIDEAL) || hint.Type().Size() <= types.Types[types.TUINT].Size() {
		fnname = "makemap"
		argtype = types.Types[types.TINT]
	}

	fn := typecheck.LookupRuntime(fnname)
	fn = typecheck.SubstArgTypes(fn, hmapType, t.Key(), t.Elem())
	return mkcall1(fn, n.Type(), init, reflectdata.TypePtr(n.Type()), typecheck.Conv(hint, argtype), h)
}

makemap64最后也调用了makemap函数,并保证创建map的长度不能超过int的大小。

# runtime/map.go
func makemap64(t *maptype, hint int64, h *hmap) *hmap {
	if int64(int(hint)) != hint {
		hint = 0
	}
	return makemap(t, int(hint), h)
}

makemap函数会计算出需要的桶的数量,即log2N,并调用makeBucketArray函数生成桶和溢出桶。如果初始化时生成了溢出桶,则会放置到mapextra字段里去。

// runtime/map.go
// makemap implements Go map creation for make(map[k]v, hint).
// If the compiler has determined that the map or the first bucket
// can be created on the stack, h and/or bucket may be non-nil.
// If h != nil, the map can be created directly in h.
// If h.buckets != nil, bucket pointed to can be used as the first bucket.
func makemap(t *maptype, hint int, h *hmap) *hmap {
	mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
	if overflow || mem > maxAlloc {
		hint = 0
	}

	// initialize Hmap
	if h == nil {
		h = new(hmap)
	}
	h.hash0 = fastrand()

	// Find the size parameter B which will hold the requested # of elements.
	// For hint < 0 overLoadFactor returns false since hint < bucketCnt.
	B := uint8(0)
	for overLoadFactor(hint, B) {
		B++
	}
	h.B = B

	// allocate initial hash table
	// if B == 0, the buckets field is allocated lazily later (in mapassign)
	// If hint is large zeroing this memory could take a while.
	if h.B != 0 {
		var nextOverflow *bmap
		h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
		if nextOverflow != nil {
			h.extra = new(mapextra)
			h.extra.nextOverflow = nextOverflow
		}
	}

	return h
}

runtime.makeBucketArray会为map申请内存:

// runtime/map.go
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
	base := bucketShift(b)
	nbuckets := base
	// For small b, overflow buckets are unlikely.
	// Avoid the overhead of the calculation.
	if b >= 4 {
		// Add on the estimated number of overflow buckets
		// required to insert the median number of elements
		// used with this value of b.
		nbuckets += bucketShift(b - 4)
		sz := t.bucket.size * nbuckets
		up := roundupsize(sz)
		if up != sz {
			nbuckets = up / t.bucket.size
		}
	}

	if dirtyalloc == nil {
		buckets = newarray(t.bucket, int(nbuckets))
	} else {
		// dirtyalloc was previously generated by
		// the above newarray(t.bucket, int(nbuckets))
		// but may not be empty.
		buckets = dirtyalloc
		size := t.bucket.size * nbuckets
		if t.bucket.ptrdata != 0 {
			memclrHasPointers(buckets, size)
		} else {
			memclrNoHeapPointers(buckets, size)
		}
	}

	if base != nbuckets {
		// We preallocated some overflow buckets.
		// To keep the overhead of tracking these overflow buckets to a minimum,
		// we use the convention that if a preallocated overflow bucket's overflow
		// pointer is nil, then there are more available by bumping the pointer.
		// We need a safe non-nil pointer for the last overflow bucket; just use buckets.
		nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
		last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
		last.setoverflow(t, (*bmap)(buckets))
	}
	return buckets, nextOverflow
}

根据上述代码,我们能确定在正常情况下,正常桶和溢出桶在内存中的存储空间是连续的,只是被 runtime.hmap中的不同字段引用,当溢出桶数量较多时会通过 runtime.newobject创建新的溢出桶。

字面量初始化原理

如果map采取了字面量初始化的方式,那么它最终仍然需要转换为make操作。map的长度被自动推断为字面量的长度,其核心逻辑位于cmd/compile/internal/walk/complit.go文件的anylit函数中,该函数专门用于处理各种类型的字面量。

// cmd/compile/internal/walk/complit.go
func anylit(n ir.Node, var_ ir.Node, init *ir.Nodes) {
	t := n.Type()
	switch n.Op() {
        ...
        case ir.OMAPLIT:
		n := n.(*ir.CompLitExpr)
		if !t.IsMap() {
			base.Fatalf("anylit: not map")
		}
		maplit(n, var_, init)
	}
}

// cmd/compile/internal/walk/complit.go
func maplit(n *ir.CompLitExpr, m ir.Node, init *ir.Nodes) {
	// make the map var
	a := ir.NewCallExpr(base.Pos, ir.OMAKE, nil, nil)
	a.SetEsc(n.Esc())
	a.Args = []ir.Node{ir.TypeNode(n.Type()), ir.NewInt(int64(len(n.List)))}
	litas(m, a, init)

	entries := n.List

	// The order pass already removed any dynamic (runtime-computed) entries.
	// All remaining entries are static. Double-check that.
	for _, r := range entries {
		r := r.(*ir.KeyExpr)
		if !isStaticCompositeLiteral(r.Key) || !isStaticCompositeLiteral(r.Value) {
			base.Fatalf("maplit: entry is not a literal: %v", r)
		}
	}

	if len(entries) > 25 {
		// For a large number of entries, put them in an array and loop.

		// build types [count]Tindex and [count]Tvalue
		tk := types.NewArray(n.Type().Key(), int64(len(entries)))
		te := types.NewArray(n.Type().Elem(), int64(len(entries)))

		tk.SetNoalg(true)
		te.SetNoalg(true)

		types.CalcSize(tk)
		types.CalcSize(te)

		// make and initialize static arrays
		vstatk := readonlystaticname(tk)
		vstate := readonlystaticname(te)

		datak := ir.NewCompLitExpr(base.Pos, ir.OARRAYLIT, nil, nil)
		datae := ir.NewCompLitExpr(base.Pos, ir.OARRAYLIT, nil, nil)
		for _, r := range entries {
			r := r.(*ir.KeyExpr)
			datak.List.Append(r.Key)
			datae.List.Append(r.Value)
		}
		fixedlit(inInitFunction, initKindStatic, datak, vstatk, init)
		fixedlit(inInitFunction, initKindStatic, datae, vstate, init)

		// loop adding structure elements to map
		// for i = 0; i < len(vstatk); i++ {
		//	map[vstatk[i]] = vstate[i]
		// }
		i := typecheck.Temp(types.Types[types.TINT])
		rhs := ir.NewIndexExpr(base.Pos, vstate, i)
		rhs.SetBounded(true)

		kidx := ir.NewIndexExpr(base.Pos, vstatk, i)
		kidx.SetBounded(true)
		lhs := ir.NewIndexExpr(base.Pos, m, kidx)

		zero := ir.NewAssignStmt(base.Pos, i, ir.NewInt(0))
		cond := ir.NewBinaryExpr(base.Pos, ir.OLT, i, ir.NewInt(tk.NumElem()))
		incr := ir.NewAssignStmt(base.Pos, i, ir.NewBinaryExpr(base.Pos, ir.OADD, i, ir.NewInt(1)))

		var body ir.Node = ir.NewAssignStmt(base.Pos, lhs, rhs)
		body = typecheck.Stmt(body) // typechecker rewrites OINDEX to OINDEXMAP
		body = orderStmtInPlace(body, map[string][]*ir.Name{})

		loop := ir.NewForStmt(base.Pos, nil, cond, incr, nil)
		loop.Body = []ir.Node{body}
		*loop.PtrInit() = []ir.Node{zero}

		appendWalkStmt(init, loop)
		return
	}
	// For a small number of entries, just add them directly.

	// Build list of var[c] = expr.
	// Use temporaries so that mapassign1 can have addressable key, elem.
	// TODO(josharian): avoid map key temporaries for mapfast_* assignments with literal keys.
	tmpkey := typecheck.Temp(m.Type().Key())
	tmpelem := typecheck.Temp(m.Type().Elem())

	for _, r := range entries {
		r := r.(*ir.KeyExpr)
		index, elem := r.Key, r.Value

		ir.SetPos(index)
		appendWalkStmt(init, ir.NewAssignStmt(base.Pos, tmpkey, index))

		ir.SetPos(elem)
		appendWalkStmt(init, ir.NewAssignStmt(base.Pos, tmpelem, elem))

		ir.SetPos(tmpelem)
		var a ir.Node = ir.NewAssignStmt(base.Pos, ir.NewIndexExpr(base.Pos, m, tmpkey), tmpelem)
		a = typecheck.Stmt(a) // typechecker rewrites OINDEX to OINDEXMAP
		a = orderStmtInPlace(a, map[string][]*ir.Name{})
		appendWalkStmt(init, a)
	}

	appendWalkStmt(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, tmpkey))
	appendWalkStmt(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, tmpelem))
}

如果字面量的个数大于25,则会构建两个数组专门存储keyvalue,在运行时循环添加数据。如果字面量的个数小于25,则编译时会通过在运行时初始化时直接添加的方式进行赋值

map访问原理

map的访问有两种形式:一种是返回单个值v := hash[key],一种是返回多个值。Go语言没有函数重载的概念,很明显只能在编译时决定返回一个值还是两个值。v := rating["Go"]在构建抽象语法树阶段被解析为一个node,其中左边的类型为ONAME,存储名字:,右边的类型为OLITERAL,存储"Go",节点的Op操作为OINDEXMAP,根据hash[key]位于赋值号的左边或右边,决定要执行访问还是赋值操作。

在编译的类型检查期间,hash[key] 以及类似的操作都会被转换成哈希的 OINDEXMAP 操作,中间代码生成阶段会在 cmd/compile/internal/gc.walkexpr函数中将这些 OINDEXMAP 操作转换成如下的代码:

v		:= hash[key]	// => v 	:= mapaccess1(*maptype, hash, &key)
v, ok	:= hash[key]	// => v, ok	:= mapaccess2(maptype, hash, &key)

赋值语句左侧接受参数的个数会决定使用的运行时方法:

runtime.mapaccess1会先通过哈希表设置的哈希函数、种子获取当前键对应的哈希,再通过 runtime.bucketMaskruntime.add拿到该键值对所在的桶序号和哈希高位的 8 位数字:

// runtime/map.go
// mapaccess1 returns a pointer to h[key].  Never returns nil, instead
// it will return a reference to the zero object for the elem type if
// the key is not in the map.
// NOTE: The returned pointer may keep the whole map live, so don't
// hold onto it for very long.
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
	if raceenabled && h != nil {
		callerpc := getcallerpc()
		pc := funcPC(mapaccess1)
		racereadpc(unsafe.Pointer(h), callerpc, pc)
		raceReadObjectPC(t.key, key, callerpc, pc)
	}
	if msanenabled && h != nil {
		msanread(key, t.key.size)
	}
	if h == nil || h.count == 0 {
		if t.hashMightPanic() {
			t.hasher(key, 0) // see issue 23734
		}
		return unsafe.Pointer(&zeroVal[0])
	}
	if h.flags&hashWriting != 0 {
		throw("concurrent map read and map write")
	}
	hash := t.hasher(key, uintptr(h.hash0))
	m := bucketMask(h.B)
	b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
	if c := h.oldbuckets; c != nil {
		if !h.sameSizeGrow() {
			// There used to be half as many buckets; mask down one more power of two.
			m >>= 1
		}
		oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
		if !evacuated(oldb) {
			b = oldb
		}
	}
	top := tophash(hash)
bucketloop:
	for ; b != nil; b = b.overflow(t) {
		for i := uintptr(0); i < bucketCnt; i++ {
			if b.tophash[i] != top {
				if b.tophash[i] == emptyRest {
					break bucketloop
				}
				continue
			}
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
			if t.indirectkey() {
				k = *((*unsafe.Pointer)(k))
			}
			if t.key.equal(key, k) {
				e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
				if t.indirectelem() {
					e = *((*unsafe.Pointer)(e))
				}
				return e
			}
		}
	}
	return unsafe.Pointer(&zeroVal[0])
}

bucketloop 循环中,哈希会依次遍历正常桶和溢出桶中的数据,它会先比较哈希的高 8 位和桶中存储的 tophash,后比较传入的和桶中的值以加速数据的读写。用于选择桶序号的是哈希的最低几位,而用于加速访问的是哈希的高 8 位,这种设计能够减少同一个桶中有大量相等 tophash 的概率影响性能。

另一个同样用于访问哈希表中数据的 runtime.mapaccess2 只是在 runtime.mapaccess1的基础上多返回了一个标识键值对是否存在的 bool 值:

func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {
	if raceenabled && h != nil {
		callerpc := getcallerpc()
		pc := funcPC(mapaccess2)
		racereadpc(unsafe.Pointer(h), callerpc, pc)
		raceReadObjectPC(t.key, key, callerpc, pc)
	}
	if msanenabled && h != nil {
		msanread(key, t.key.size)
	}
	if h == nil || h.count == 0 {
		if t.hashMightPanic() {
			t.hasher(key, 0) // see issue 23734
		}
		return unsafe.Pointer(&zeroVal[0]), false
	}
	if h.flags&hashWriting != 0 {
		throw("concurrent map read and map write")
	}
	hash := t.hasher(key, uintptr(h.hash0))
	m := bucketMask(h.B)
	b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
	if c := h.oldbuckets; c != nil {
		if !h.sameSizeGrow() {
			// There used to be half as many buckets; mask down one more power of two.
			m >>= 1
		}
		oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
		if !evacuated(oldb) {
			b = oldb
		}
	}
	top := tophash(hash)
bucketloop:
	for ; b != nil; b = b.overflow(t) {
		for i := uintptr(0); i < bucketCnt; i++ {
			if b.tophash[i] != top {
				if b.tophash[i] == emptyRest {
					break bucketloop
				}
				continue
			}
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
			if t.indirectkey() {
				k = *((*unsafe.Pointer)(k))
			}
			if t.key.equal(key, k) {
				e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
				if t.indirectelem() {
					e = *((*unsafe.Pointer)(e))
				}
				return e, true
			}
		}
	}
	return unsafe.Pointer(&zeroVal[0]), false
}

使用 v, ok := hash[k] 的形式访问哈希表中元素时,我们能够通过这个布尔值更准确地知道当 v == nil 时,v 到底是哈希中存储的元素还是表示该键对应的元素不存在,所以在访问哈希时,更推荐使用这种方式判断元素是否存在。

map赋值操作原理

当形如 hash[k] 的表达式出现在赋值符号左侧时,该表达式也会在编译期间转换成 runtime.mapassign函数的调用,该函数与 runtime.mapaccess1比较相似,我们将其分成几个部分依次分析,首先是函数会根据传入的键拿到对应的哈希和桶:

// runtime/map.go
// Like mapaccess, but allocates a slot for the key if it is not present in the map.
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
	if h == nil {
		panic(plainError("assignment to entry in nil map"))
	}
	if raceenabled {
		callerpc := getcallerpc()
		pc := funcPC(mapassign)
		racewritepc(unsafe.Pointer(h), callerpc, pc)
		raceReadObjectPC(t.key, key, callerpc, pc)
	}
	if msanenabled {
		msanread(key, t.key.size)
	}
	if h.flags&hashWriting != 0 {
		throw("concurrent map writes")
	}
	hash := t.hasher(key, uintptr(h.hash0))

	// Set hashWriting after calling t.hasher, since t.hasher may panic,
	// in which case we have not actually done a write.
	h.flags ^= hashWriting

	if h.buckets == nil {
		h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
	}

again:
	bucket := hash & bucketMask(h.B)
	if h.growing() {
		growWork(t, h, bucket)
	}
	b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
	top := tophash(hash)

然后通过遍历比较桶中存储的 tophash 和键的哈希,如果找到了相同结果就会返回目标位置的地址。其中 inserti 表示目标元素的在桶中的索引,insertkval 分别表示键值对的地址,获得目标地址之后会通过算术计算寻址获得键值对 kval

	var inserti *uint8
	var insertk unsafe.Pointer
	var elem unsafe.Pointer
bucketloop:
	for {
		for i := uintptr(0); i < bucketCnt; i++ {
			if b.tophash[i] != top {
				if isEmpty(b.tophash[i]) && inserti == nil {
					inserti = &b.tophash[i]
					insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
					elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
				}
				if b.tophash[i] == emptyRest {
					break bucketloop
				}
				continue
			}
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
			if t.indirectkey() {
				k = *((*unsafe.Pointer)(k))
			}
			if !t.key.equal(key, k) {
				continue
			}
			// already have a mapping for key. Update it.
			if t.needkeyupdate() {
				typedmemmove(t.key, k, key)
			}
			elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
			goto done
		}
		ovf := b.overflow(t)
		if ovf == nil {
			break
		}
		b = ovf
	}

上述的 for 循环会依次遍历正常桶和溢出桶中存储的数据,整个过程会分别判断 tophash 是否相等、key 是否相等,遍历结束后会从循环中跳出。

如果当前桶已经满了,哈希会调用 runtime.hmap.newoverflow创建新桶或者使用 runtime.hmap预先在 noverflow 中创建好的桶来保存数据,新创建的桶不仅会被追加到已有桶的末尾,还会增加哈希表的 noverflow 计数器。

	// Did not find mapping for key. Allocate new cell & add entry.

	// If we hit the max load factor or we have too many overflow buckets,
	// and we're not already in the middle of growing, start growing.
	if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
		hashGrow(t, h)
		goto again // Growing the table invalidates everything, so try again
	}

	if inserti == nil {
		// The current bucket and all the overflow buckets connected to it are full, allocate a new one.
		newb := h.newoverflow(t, b)
		inserti = &newb.tophash[0]
		insertk = add(unsafe.Pointer(newb), dataOffset)
		elem = add(insertk, bucketCnt*uintptr(t.keysize))
	}

	// store new key/elem at insert position
	if t.indirectkey() {
		kmem := newobject(t.key)
		*(*unsafe.Pointer)(insertk) = kmem
		insertk = kmem
	}
	if t.indirectelem() {
		vmem := newobject(t.elem)
		*(*unsafe.Pointer)(elem) = vmem
	}
	typedmemmove(t.key, insertk, key)
	*inserti = top
	h.count++

done:
	if h.flags&hashWriting == 0 {
		throw("concurrent map writes")
	}
	h.flags &^= hashWriting
	if t.indirectelem() {
		elem = *((*unsafe.Pointer)(elem))
	}
	return elem
}

如果当前键值对在哈希中不存在,哈希会为新键值对规划存储的内存地址,通过 runtime.typedmemmove将键移动到对应的内存空间中并返回键对应值的地址 val。如果当前键值对在哈希中存在,那么就会直接返回目标区域的内存地址,哈希并不会在 runtime.mapassign这个运行时函数中将值拷贝到桶中,该函数只会返回内存地址,真正的赋值操作是在编译期间插入的

00018 (+5) CALL runtime.mapassign_fast64(SB)
00020 (5) MOVQ 24(SP), DI               ;; DI = &value
00026 (5) LEAQ go.string."88"(SB), AX   ;; AX = &"88"
00027 (5) MOVQ AX, (DI)                 ;; *DI = AX

runtime.mapassign_fast64runtime.mapassign函数的逻辑差不多,我们需要关注的是后面的三行代码,其中 24(SP) 是该函数返回的值地址,我们通过 LEAQ 指令将字符串的地址存储到寄存器 AX 中,MOVQ 指令将字符串 "88" 存储到了目标地址上完成了这次哈希的写入。

map重建原理

当发生以下两种情况之一时,map会进行重建

// runtime/map.go
// Like mapaccess, but allocates a slot for the key if it is not present in the map.
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    ...
    if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
		hashGrow(t, h)
		goto again // Growing the table invalidates everything, so try again
	}
}

重建时需要调用hashGrow函数,如果负载因子超载,则会进行双倍重建。当溢出桶的数量过多时,会进行等量重建。新桶会存储到buckets字段,旧桶会存储到oldbuckets字段。mapextra字段的溢出桶也进行同样的转移。

// runtime/map.go
func hashGrow(t *maptype, h *hmap) {
	// If we've hit the load factor, get bigger.
	// Otherwise, there are too many overflow buckets,
	// so keep the same number of buckets and "grow" laterally.
	bigger := uint8(1)
	if !overLoadFactor(h.count+1, h.B) {
		bigger = 0
		h.flags |= sameSizeGrow
	}
	oldbuckets := h.buckets
	newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)

	flags := h.flags &^ (iterator | oldIterator)
	if h.flags&iterator != 0 {
		flags |= oldIterator
	}
	// commit the grow (atomic wrt gc)
	h.B += bigger
	h.flags = flags
	h.oldbuckets = oldbuckets
	h.buckets = newbuckets
	h.nevacuate = 0
	h.noverflow = 0

	if h.extra != nil && h.extra.overflow != nil {
		// Promote current overflow buckets to the old generation.
		if h.extra.oldoverflow != nil {
			throw("oldoverflow is not nil")
		}
		h.extra.oldoverflow = h.extra.overflow
		h.extra.overflow = nil
	}
	if nextOverflow != nil {
		if h.extra == nil {
			h.extra = new(mapextra)
		}
		h.extra.nextOverflow = nextOverflow
	}

	// the actual copying of the hash table data is done incrementally
	// by growWork() and evacuate().
}

要注意的是,这里并没有实际执行将旧桶中的数据转移到新桶的过程。数据转移遵循写时复制copy on write的规则,只有在真正赋值时,才会选择是否需要进行数据转移,其核心逻辑位于growWorkevacuate函数中。

bucket := hash & bucketMask(h.B)
if h.growing() {
	growWork(t, h, bucket)
}

在进行写时复制时,并不是所有的数据都一次性转移,而是只转移当前需要的旧桶中的数据。bucket := hash & bucketMask(h.B)得到了当前新桶所在的位置,而要转移的旧桶位于bucket&h.oldbucketmask()中。

在双倍重建中,两个新桶的距离值总是与旧桶的数量值相等。例如,在下图中,旧桶的数量为2,则转移到新桶的距离也为2。

img

下图为为等量重建,进行简单的直接转移即可。

img

在map双倍重建时,我们还需要解决旧桶中的数据要转移到某一个新桶中的问题。其中有一个非常重要的原则:如果数据的hash&bucketMask小于或等于旧桶的大小,则此数据必须转移到和旧桶位置完全对应的新桶中去,理由是当前key所在新桶的序号与旧桶是完全相同的。

map删除原理

map删除delete的原理之前简单介绍过,其核心代码位于runtime.mapdelete函数中,删除操作同样需要根据key计算出hash的前8位和指定的桶,同样会一直寻找是否有相同的key,如果找不到,则会一直查找当前桶的溢出桶,直到到达溢出桶链表末尾。如果查找到了指定的key,则会清空该数据,将hash位设置为emptyOne。如果发现后面没有元素,则会将hash位设置为emptyRest,并循环向上检查前一个元素是否为空,代码如下:

// runtime/map.go
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
	if raceenabled && h != nil {
		callerpc := getcallerpc()
		pc := funcPC(mapdelete)
		racewritepc(unsafe.Pointer(h), callerpc, pc)
		raceReadObjectPC(t.key, key, callerpc, pc)
	}
	if msanenabled && h != nil {
		msanread(key, t.key.size)
	}
	if h == nil || h.count == 0 {
		if t.hashMightPanic() {
			t.hasher(key, 0) // see issue 23734
		}
		return
	}
	if h.flags&hashWriting != 0 {
		throw("concurrent map writes")
	}

	hash := t.hasher(key, uintptr(h.hash0))

	// Set hashWriting after calling t.hasher, since t.hasher may panic,
	// in which case we have not actually done a write (delete).
	h.flags ^= hashWriting

	bucket := hash & bucketMask(h.B)
	if h.growing() {
		growWork(t, h, bucket)
	}
	b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
	bOrig := b
	top := tophash(hash)
search:
	for ; b != nil; b = b.overflow(t) {
		for i := uintptr(0); i < bucketCnt; i++ {
			if b.tophash[i] != top {
				if b.tophash[i] == emptyRest {
					break search
				}
				continue
			}
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
			k2 := k
			if t.indirectkey() {
				k2 = *((*unsafe.Pointer)(k2))
			}
			if !t.key.equal(key, k2) {
				continue
			}
			// Only clear key if there are pointers in it.
			if t.indirectkey() {
				*(*unsafe.Pointer)(k) = nil
			} else if t.key.ptrdata != 0 {
				memclrHasPointers(k, t.key.size)
			}
			e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
			if t.indirectelem() {
				*(*unsafe.Pointer)(e) = nil
			} else if t.elem.ptrdata != 0 {
				memclrHasPointers(e, t.elem.size)
			} else {
				memclrNoHeapPointers(e, t.elem.size)
			}
			b.tophash[i] = emptyOne
			// If the bucket now ends in a bunch of emptyOne states,
			// change those to emptyRest states.
			// It would be nice to make this a separate function, but
			// for loops are not currently inlineable.
			if i == bucketCnt-1 {
				if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
					goto notLast
				}
			} else {
				if b.tophash[i+1] != emptyRest {
					goto notLast
				}
			}
			for {
				b.tophash[i] = emptyRest
				if i == 0 {
					if b == bOrig {
						break // beginning of initial bucket, we're done.
					}
					// Find previous bucket, continue at its last entry.
					c := b
					for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
					}
					i = bucketCnt - 1
				} else {
					i--
				}
				if b.tophash[i] != emptyOne {
					break
				}
			}
		notLast:
			h.count--
			// Reset the hash seed to make it more difficult for attackers to
			// repeatedly trigger hash collisions. See issue 25237.
			if h.count == 0 {
				h.hash0 = fastrand()
			}
			break search
		}
	}

	if h.flags&hashWriting == 0 {
		throw("concurrent map writes")
	}
	h.flags &^= hashWriting
}