Go 1.23
写成。Go
语言中的协程和信道可以很方便地用于并发编程。需要警惕的是编写正确的并发程序是非常难的,由于单条计算机指令执行得非常快,而且并发程序执行具有不确定性,每次执行的结果都可能不一致,因此往往难以调试。
关于协程并发常遇到的问题有:死锁Deadlock
、数据竞争Data Race
和协程泄漏Goroutine Leak
。
首先介绍最简单的死锁错误的调试,这个错误可以在执行的时候抛出。对于无缓冲信道来说,信道发送消息send
和接收消息receive
都是阻塞block
的,即无缓冲信道在发送消息和接收消息的时候,信道所在的协程都处于阻塞状态。
死锁是指所有协程都在等待资源释放而彼此互不相让的情况。当我们将信道用于并发编程时,如果处理不当,就可能发生死锁。关于死锁的调试,只需要借助go run
命令来运行程序即可。
func main() {
ch := make(chan int)
go func() {
ch <- 1
}()
ch <- 2
fmt.Println(<-ch)
}
运行上面代码,程序将会输出:
fatal error: all goroutines are asleep - deadlock!
将fmt.Println(<-ch)
注释掉即可排除这个错误。这是因为信道至少涉及2个协程,由于程序没有任何一个协程再往信道ch
中写入值,无法再次读取到值,因此会抛出deadlock
错误。
死锁的错误相对而言比较好排查,但伴随协程的并发编程,真正灾难性的错误其实是数据竞争。数据竞争相当常见,而且在业务逻辑非常复杂的时候比较隐晦,不易察觉,同时也可能非常难于调试。
数据竞争,往往出现的条件为:当两个协程并发地访问同一个共享变量,并且其中至少一个是要执行写操作时。
package main
import (
"fmt"
"sync"
)
func main() {
var counter = 0
var wg sync.WaitGroup
for i := 0; i < 40; i++ {
wg.Add(1)
go func() {
val := counter
val++
counter = val
wg.Done()
}()
}
wg.Wait()
fmt.Println(counter)
}
上述代码中,程序成功执行,大概率会如预期输出40
。这是由于40
个协程在多核CPU
的情况下能够“正常”处理。这也是为什么有些并发问题要等上线运行很长一段时间,或者在并发量突增到某一个阈值后才会出现问题。上述示例程序中的代码如果调大循环的次数,比如调大到40000
个并发,那么counter
输出的值小于40000是大概率的事。
Go
语言本身提供了一个工具,在执行go run
命令时,可以通过参数-race
来开启数据竞争检测:
$ go run -race main.go
==================
WARNING: DATA RACE
Read at 0x00c000014158 by goroutine 8:
main.main.func1()
/data/docker/golang/examples/channel/main.go:14 +0x33
Previous write at 0x00c000014158 by goroutine 43:
main.main.func1()
/data/docker/golang/examples/channel/main.go:16 +0x45
Goroutine 8 (running) created at:
main.main()
/data/docker/golang/examples/channel/main.go:13 +0x78
Goroutine 43 (finished) created at:
main.main()
/data/docker/golang/examples/channel/main.go:13 +0x78
==================
==================
WARNING: DATA RACE
Write at 0x00c000014158 by goroutine 8:
main.main.func1()
/data/docker/golang/examples/channel/main.go:16 +0x45
Previous write at 0x00c000014158 by goroutine 26:
main.main.func1()
/data/docker/golang/examples/channel/main.go:16 +0x45
Goroutine 8 (running) created at:
main.main()
/data/docker/golang/examples/channel/main.go:13 +0x78
Goroutine 26 (finished) created at:
main.main()
/data/docker/golang/examples/channel/main.go:13 +0x78
==================
40
Found 2 data race(s)
exit status 66
可以发现有提示WARNING: DATA RACE
,且提示Found 2 data race(s)
,也就是检测到2
个数据竞争的情况,第一个DATA RACE
,Goroutine 8
正在读取Read at
,而之前的Goroutine 43
在写入Previous write at
,同时给出了位置main2.go:14
。第二个DATA RACE
,可以清晰看到,写数据发生了数据竞争。此时,我们可以通过加锁的方式来同步数据,当然也可以通过标准库中的原子操作包来实现同步。
Go
从版本1.2
开始才有数据竞争检测器。Go
语言提供的原子操作是非侵入式的,由标准库sync/atomic
提供,相对于锁机制,这个标准库开销更小、性能更高。调用sync/atomic
中的函数可以对几种简单的类型进行原子操作。这些类型包括int32
、int64
、uint32
、uint64
、uintptr
和unsafe.Pointer
。每种原子操作支持增或减、比较并交换、载入、存储和交换。
对上面的示例稍加修改,改成无数据竞争的程序:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter int32 = 0
var wg sync.WaitGroup
for i := 0; i < 40; i++ {
wg.Add(1)
go func() {
atomic.AddInt32(&counter, 1)
wg.Done()
}()
}
wg.Wait()
fmt.Println(counter)
}
运行-race
检测:
$ go run -race main.go
40
可以看到,不会再检测到数据竞争的情况。
协程泄漏是指协程由于某种原因,在协程执行后并未正确结束,而是处于阻塞状态,那么此时垃圾回收机制也无法回收这些协程占用的资源,这就导致无法释放的协程数量越来越多,于是发生泄漏。
协程泄漏对于非服务类型的程序一般影响较小,因为每次打开的程序即使出现协程泄漏,当程序退出后,所有泄漏的协程也都会清理,例如一般的控制台程序。
对于服务类型的程序,如Web服务,如果发生协程泄漏,那么随着时间的推移,得不到释放的数量将越来越多,直至到达一定阈值后服务崩溃。
协程泄漏一般是由于信道使用不当或者锁同步机制使用错误所导致的。信道如果发送不接收、接收不发送或者向空信道发送和接收数据都将会导致阻塞,从而可能导致协程泄漏。
为了对协程泄漏进行调试,需要一个监控来及时获取到当前程序中的协程数量,runtime.NumGoroutine()
方法可以获取当前运行中的协程数量,因此可以通过它确认是否发生了协程泄漏。
package main
import (
"fmt"
"os"
"runtime"
"strconv"
"time"
)
// 协程泄露,只接收数据
func consumer(ch chan int) {
fmt.Println("consume...")
fmt.Println(<-ch)
}
func checkLeak() {
fmt.Println("check goroutine leak...")
arr := make([]int, 0)
for {
arr = append(arr, runtime.NumGoroutine())
time.Sleep(1 * time.Second)
if len(arr) >= 5 {
n := arr[len(arr)-1] - arr[0]
f, _ := strconv.ParseFloat(strconv.Itoa(n), 32)
f2, _ := strconv.ParseFloat(strconv.Itoa(len(arr)), 32)
if f/f2 > 0.5 {
fmt.Println("检测到Goroutine泄露")
os.Exit(100)
}
}
}
}
func main() {
ch := make(chan int)
go checkLeak()
for {
time.Sleep(time.Second)
go consumer(ch)
}
}
运行结果如下:
$ go run main.go
check goroutine leak...
consume...
consume...
consume...
consume...
consume...
检测到Goroutine泄露
exit status 100
在示例程序中,定义了一个消费者函数consumer
,它只负责从信道ch
中接收数据并打印提示信息。checkLeak
函数用于检测协程Goroutine
泄漏,它每隔1秒通过调用runtime.NumGoroutine()
获取当前运行的协程数量,并保存到切片arr
中。
当切片的长度大于5
时,计算一下采集的协程数量上升情况,可以简单计算一下斜率,如果一直增长,那么斜率大于0,这里取阈值为0.5。需要注意的是,这里计算斜率的算法并不准确,只是为了简化运算,用采集的最后一个值减去第一个采集的值,除以当前的数量,如果大于0.5,就认为会一直上升,非常可能出现协程泄漏。
在协程访问共享变量时,为防止出现数据竞争,需要给共享变量加锁进行保护。如果没有及时解锁,那么将导致其他协程一直阻塞,等待加锁的协程解锁。如果处理不当,非常容易出现协程泄露。下面给出一个检测锁同步情况下的协程泄漏示例程序:
package main
import (
"fmt"
"os"
"runtime"
"strconv"
"sync"
"time"
)
var mutex sync.Mutex
var counter = 0
// 协程泄露,mutex未释放
func adder() {
fmt.Println("adding...")
mutex.Lock()
counter++
fmt.Println("counter = ", counter)
}
func checkLeak() {
fmt.Println("check goroutine leak...")
arr := make([]int, 0)
for {
arr = append(arr, runtime.NumGoroutine())
time.Sleep(1 * time.Second)
if len(arr) >= 5 {
n := arr[len(arr)-1] - arr[0]
f, _ := strconv.ParseFloat(strconv.Itoa(n), 32)
f2, _ := strconv.ParseFloat(strconv.Itoa(len(arr)), 32)
if f/f2 > 0.5 {
fmt.Println("检测到Goroutine泄露")
os.Exit(100)
}
}
}
}
func main() {
go checkLeak()
for {
time.Sleep(time.Second)
go adder()
}
}
运行结果如下:
$ go run main.go
check goroutine leak...
adding...
counter = 1
adding...
adding...
adding...
检测到Goroutine泄露
exit status 100
在示例程序中,函数adde
在内部对全局变量counter
执行加1操作。为了防止数据竞争,在操作前调用mutex.Lock()
加锁,但是操作完成后并未及时解锁,这将导致其他协程一直等待。
另外,WaitGroup
用于等待一组协程操作的完成,它也可能导致协程泄漏。在使用时,如果我们没有正确设置任务数,就可能会使协程阻塞,并导致协程泄露。