Golang切片底层原理

TrumanWong
11/30/2021
TrumanWong
提示:本文基于Go 1.17写成

数据结构

切片是一种轻量级的数据结构,提供了访问数组任意元素的功能。和数组不同的是,切片是长度可变的序列,且不用指定固定长度,一个切片一般写作[]T,其中T代表slice中元素的类型。以下是切片的数据结构定义:

// runtime/slice.go
type slice struct {
	array unsafe.Pointer
	len   int
	cap   int
}

从定义中我们可以看到,slice有三个属性:

编译时使用NewSlice函数新建一个切片类型,并需要传递切片元素的类型。从中可以看出,切片元素的类型elem是在编译期间确定的。

fun NewSlice(elem *Type) *Type {
    t := New(TSLICE)
    t.Extra = Slice{Elem: elem}
    elem.Cache.slice = t
    return t
}

切片的初始化

字面量初始化

当使用形如[]int{1, 2, 3}的字面量创建新的切片时,会创建一个array数组[3]int{1,2,3}存储于静态区中,并在堆区创建一个新的切片,在程序启动时将静态区的数据复制到堆区,这样可以加快切片的初始化过程。其核心逻辑位于cmd/compile/internal/walk/complit.goslicelit函数中,其抽象的伪代码如下。

var vstat [3]int
vstat[0] = 1
vstat[1] = 2
vstat[2] = 3
var vauto *[3]int = new([3]int)
*vauto = vstat
slice := vauto[:]

关键字初始化

如果使用字面量的方式创建切片,大部分工作会在编译期间完成。但是当使用make关键字创建切片的时候,很多工作都需要运行时的参与;调用方必须向make函数传入切片的大小及可选的容量,类型检查期间的cmd/compile/internal/typecheck/typecheck.gotypecheck1函数会校验入参:

// cmd/compile/internal/typecheck/typecheck.go
// typecheck1 should ONLY be called from typecheck.
func typecheck1(n ir.Node, top int) ir.Node {
	if n, ok := n.(*ir.Name); ok {
		typecheckdef(n)
	}

	switch n.Op() {
     ...
     case ir.OMAKE:
		n := n.(*ir.CallExpr)
		return tcMake(n)
     ...
 }
 ...
}

// cmd/compile/internal/typecheck/func.go
// tcMake typechecks an OMAKE node.
func tcMake(n *ir.CallExpr) ir.Node {
	args := n.Args
	if len(args) == 0 {
		base.Errorf("missing argument to make")
		n.SetType(nil)
		return n
	}

	n.Args = nil
	l := args[0]
	l = typecheck(l, ctxType)
	t := l.Type()
	if t == nil {
		n.SetType(nil)
		return n
	}

	i := 1
	var nn ir.Node
	switch t.Kind() {
	default:
		base.Errorf("cannot make type %v", t)
		n.SetType(nil)
		return n

	case types.TSLICE:
		if i >= len(args) {
			base.Errorf("missing len argument to make(%v)", t)
			n.SetType(nil)
			return n
		}

		l = args[i]
		i++
		l = Expr(l)
		var r ir.Node
		if i < len(args) {
			r = args[i]
			i++
			r = Expr(r)
		}

		if l.Type() == nil || (r != nil && r.Type() == nil) {
			n.SetType(nil)
			return n
		}
		if !checkmake(t, "len", &l) || r != nil && !checkmake(t, "cap", &r) {
			n.SetType(nil)
			return n
		}
		if ir.IsConst(l, constant.Int) && r != nil && ir.IsConst(r, constant.Int) && constant.Compare(l.Val(), token.GTR, r.Val()) {
			base.Errorf("len larger than cap in make(%v)", t)
			n.SetType(nil)
			return n
		}
		nn = ir.NewMakeExpr(n.Pos(), ir.OMAKESLICE, l, r)

  ...
 }
 ...
}

从上面代码可以看出,tcMake函数不仅会检查len是否传入,还会保证传入的容量cap一定大于或等于len。除了校验参数之外,当前函数会将OMAKE节点转换成OMAKESLICE,中间代码生成的cmd/compile/internal/walk/builtin.gowalkMakeSlice函数会依据下面两个条件转换OMAKESLICE类型的节点:

编译时对于字面量的重要优化是判断变量应该被分配到栈中还是应该逃逸到堆区。如果make函数初始化了一个太大的切片,则该切片会逃逸到堆中。如果分配了一个比较小的切片,则会直接在栈中分配。

字面量内存逃逸的核心逻辑如下:

// cmd/compile/internal/walk/builtin.go
// walkMakeSlice walks an OMAKESLICE node.
func walkMakeSlice(n *ir.MakeExpr, init *ir.Nodes) ir.Node {
	l := n.Len
	r := n.Cap
	if r == nil {
		r = safeExpr(l, init)
		l = r
	}
	t := n.Type()
	if t.Elem().NotInHeap() {
		base.Errorf("%v can't be allocated in Go; it is incomplete (or unallocatable)", t.Elem())
	}
	if n.Esc() == ir.EscNone {
		if why := escape.HeapAllocReason(n); why != "" {
			base.Fatalf("%v has EscNone, but %v", n, why)
		}
		// var arr [r]T
		// n = arr[:l]
		i := typecheck.IndexConst(r)
		if i < 0 {
			base.Fatalf("walkExpr: invalid index %v", r)
		}

		// cap is constrained to [0,2^31) or [0,2^63) depending on whether
		// we're in 32-bit or 64-bit systems. So it's safe to do:
		//
		// if uint64(len) > cap {
		//     if len < 0 { panicmakeslicelen() }
		//     panicmakeslicecap()
		// }
		nif := ir.NewIfStmt(base.Pos, ir.NewBinaryExpr(base.Pos, ir.OGT, typecheck.Conv(l, types.Types[types.TUINT64]), ir.NewInt(i)), nil, nil)
		niflen := ir.NewIfStmt(base.Pos, ir.NewBinaryExpr(base.Pos, ir.OLT, l, ir.NewInt(0)), nil, nil)
		niflen.Body = []ir.Node{mkcall("panicmakeslicelen", nil, init)}
		nif.Body.Append(niflen, mkcall("panicmakeslicecap", nil, init))
		init.Append(typecheck.Stmt(nif))

		t = types.NewArray(t.Elem(), i) // [r]T
		var_ := typecheck.Temp(t)
		appendWalkStmt(init, ir.NewAssignStmt(base.Pos, var_, nil))  // zero temp
		r := ir.NewSliceExpr(base.Pos, ir.OSLICE, var_, nil, l, nil) // arr[:l]
		// The conv is necessary in case n.Type is named.
		return walkExpr(typecheck.Expr(typecheck.Conv(r, n.Type())), init)
	}

	// n escapes; set up a call to makeslice.
	// When len and cap can fit into int, use makeslice instead of
	// makeslice64, which is faster and shorter on 32 bit platforms.

	len, cap := l, r

	fnname := "makeslice64"
	argtype := types.Types[types.TINT64]

	// Type checking guarantees that TIDEAL len/cap are positive and fit in an int.
	// The case of len or cap overflow when converting TUINT or TUINTPTR to TINT
	// will be handled by the negative range checks in makeslice during runtime.
	if (len.Type().IsKind(types.TIDEAL) || len.Type().Size() <= types.Types[types.TUINT].Size()) &&
		(cap.Type().IsKind(types.TIDEAL) || cap.Type().Size() <= types.Types[types.TUINT].Size()) {
		fnname = "makeslice"
		argtype = types.Types[types.TINT]
	}
	fn := typecheck.LookupRuntime(fnname)
	ptr := mkcall1(fn, types.Types[types.TUNSAFEPTR], init, reflectdata.TypePtr(t.Elem()), typecheck.Conv(len, argtype), typecheck.Conv(cap, argtype))
	ptr.MarkNonNil()
	len = typecheck.Conv(len, types.Types[types.TINT])
	cap = typecheck.Conv(cap, types.Types[types.TINT])
	sh := ir.NewSliceHeaderExpr(base.Pos, t, ptr, len, cap)
	return walkExpr(typecheck.Expr(sh), init)
}

n.Esc()代表是否判断出变量需要逃逸。

如果没有逃逸,那么切片运行时最终会被分配在栈中。而如果发生了逃逸,那么运行时调用makesliceXX函数会将切片分配在堆中。当切片的长度和容量小于int类型的最大值时,会调用makeslice函数,反之调用makeslice64函数创建切片。makeslice64函数最终也调用了makeslice函数。makeslice函数会先判断要申请的内存大小是否超过了理论上系统可以分配的内存大小,并判断其长度是否小于容量。再调用mallocgc函数在堆中申请内存,申请的内存大小为类型大小×容量

// runtime/slice.go
func makeslice(et *_type, len, cap int) unsafe.Pointer {
	mem, overflow := math.MulUintptr(et.size, uintptr(cap))
	if overflow || mem > maxAlloc || len < 0 || len > cap {
		// NOTE: Produce a 'len out of range' error instead of a
		// 'cap out of range' error when someone does make([]T, bignumber).
		// 'cap out of range' is true too, but since the cap is only being
		// supplied implicitly, saying len is clearer.
		// See golang.org/issue/4085.
		mem, overflow := math.MulUintptr(et.size, uintptr(len))
		if overflow || mem > maxAlloc || len < 0 {
			panicmakeslicelen()
		}
		panicmakeslicecap()
	}

	return mallocgc(mem, et, true)
}

func makeslice64(et *_type, len64, cap64 int64) unsafe.Pointer {
	len := int(len64)
	if int64(len) != len64 {
		panicmakeslicelen()
	}

	cap := int(cap64)
	if int64(cap) != cap64 {
		panicmakeslicecap()
	}

	return makeslice(et, len, cap)
}

切片扩容原理

切片使用append函数添加元素,但不是使用了append函数就需要进行扩容,如下代码向长度为3,容量为4的切片a中添加元素后不需要扩容。

切片增加元素后长度超过了现有容量,例如b一开始的长度和容量都为3,但使用append函数后,其容量变为了6

// 扩容
a := make([]int, 3, 4)
append(a, 1) // cap(a) = 4

// 不扩容
b := make([]int, 3, 3)
append(b, 1) // cap(a) = 6

切片扩容的现象说明了Go语言并不会在每次append时都进行扩容,也不会每增加一个元素就扩容一次,这是由于扩容常会涉及内存的分配,减慢append的速度。

append函数在运行时调用了runtime/slice.go文件下的growslice函数:

// runtime/slice.go
func growslice(et *_type, old slice, cap int) slice {
	if raceenabled {
		callerpc := getcallerpc()
		racereadrangepc(old.array, uintptr(old.len*int(et.size)), callerpc, funcPC(growslice))
	}
	if msanenabled {
		msanread(old.array, uintptr(old.len*int(et.size)))
	}

	if cap < old.cap {
		panic(errorString("growslice: cap out of range"))
	}

	if et.size == 0 {
		// append should not create a slice with nil pointer but non-zero len.
		// We assume that append doesn't need to preserve old.array in this case.
		return slice{unsafe.Pointer(&zerobase), old.len, cap}
	}

	newcap := old.cap
	doublecap := newcap + newcap
	if cap > doublecap {
		newcap = cap
	} else {
		if old.cap < 1024 {
			newcap = doublecap
		} else {
			// Check 0 < newcap to detect overflow
			// and prevent an infinite loop.
			for 0 < newcap && newcap < cap {
				newcap += newcap / 4
			}
			// Set newcap to the requested cap when
			// the newcap calculation overflowed.
			if newcap <= 0 {
				newcap = cap
			}
		}
	}
    ...
}

上面的代码显示了扩容的核心逻辑,Go语言中切片扩容的策略为:

growslice函数会根据切片的类型,分配不同大小的内存。为了对齐内存,申请的内存可能大于实际的类型大小×容量大小。如果切片需要扩容,那么最后需要到堆区申请内存。要注意的是,扩容后新的切片不一定拥有新的地址。因此在使用append函数时,通常会采用a=append(a,T)的形式。根据et.ptrdata是否判断切片类型为指针,执行不同的逻辑。

var p unsafe.Pointer
if et.ptrdata == 0 {
    p = mallocgc(capmem, nil, false)
    // The append() that calls growslice is going to overwrite from old.len to cap (which will be the new length).
    // Only clear the part that will not be overwritten.
    memclrNoHeapPointers(add(p, newlenmem), capmem-newlenmem)
} else {
    // Note: can't use rawmem (which avoids zeroing of memory), because then GC can scan uninitialized memory.
    p = mallocgc(capmem, et, true)
    if lenmem > 0 && writeBarrier.enabled {
        // Only shade the pointers in old.array since we know the destination slice p
        // only contains nil pointers because it has been cleared during alloc.
        bulkBarrierPreWriteSrcOnly(uintptr(p), uintptr(old.array), lenmem-et.size+et.ptrdata)
    }
}
memmove(p, old.array, lenmem)

return slice{p, old.len, newcap}

当切片类型不是指针时,分配内存后只需要将内存后面的值清空,memmove(p, old.array, lenmem)函数用于将旧切片的值赋给新的切片。整个过程的抽象表示如下。

oldSlice = make([]int, 3, 3)
newSlice = append(oldSlice, 1) => newSlice = malloc(newcap * sizeof(int))
newSlice[1] = old[1]
newSlice[2] = old[2]
newSlice[3] = old[3]

当切片类型为指针,涉及垃圾回收写屏障开启时,对旧切片中指针指向的对象进行标记。

切片的复制

复制的切片不会改变指向底层的数据源,但有些时候希望建一个新的数组,并且与旧数组不共享相同的数据源,这时可以使用copy函数。

当我们使用copy(a, b)的形式对切片进行拷贝时,编译期间的cmd/compile/internal/walk/builtin.gowalkCopy函数会分两种情况进行处理拷贝操作,如果当前copy不是在运行时调用的,copy(a, b)会被直接转换成下面的代码:

n := len(a)
if n > len(b) { n = len(b) }
if a.ptr != b.ptr { memmove(a.ptr, b.ptr, n*sizeof(elem(a))) }

上面代码中memmove负责拷贝内存。而如果拷贝是在运行时发生的,编译器会使用runtime/slice.goslicecopy函数替换运行期间调用的copy函数:

// runtime/slice.go
// slicecopy is used to copy from a string or slice of pointerless elements into a slice.
func slicecopy(toPtr unsafe.Pointer, toLen int, fromPtr unsafe.Pointer, fromLen int, width uintptr) int {
	if fromLen == 0 || toLen == 0 {
		return 0
	}

	n := fromLen
	if toLen < n {
		n = toLen
	}

	if width == 0 {
		return n
	}

	size := uintptr(n) * width
	if raceenabled {
		callerpc := getcallerpc()
		pc := funcPC(slicecopy)
		racereadrangepc(fromPtr, size, callerpc, pc)
		racewriterangepc(toPtr, size, callerpc, pc)
	}
	if msanenabled {
		msanread(fromPtr, size)
		msanwrite(toPtr, size)
	}

	if size == 1 { // common case worth about 2x to do here
		// TODO: is this still worth it with new memmove impl?
		*(*byte)(toPtr) = *(*byte)(fromPtr) // known to be a byte pointer
	} else {
		memmove(toPtr, fromPtr, size)
	}
	return n
}

从上面代码可以看出,无论是编译期间拷贝还是运行期间拷贝,两种拷贝方式都会通过memmove将整块内存的内容拷贝到目标的内存区域中。

相比于依次拷贝元素,runtime.memmove能够提供更好的性能。需要注意的是,整块拷贝内存仍然会占用非常多的资源,在大切片上执行拷贝操作时一定要注意对性能的影响。