15 并发操作
15 并发操作
在 cpp 中我们是主线程,然后通过多线程进行并发设计,而在 Go 中,取代线程的是协程,与 cpp 中的 协程 类似,Go 中的协程也是用户级别的,对内核完全透明,它的多协程操作类似于其他语言的多线程。
值得一提的是,通常情况下一个 OS 线程拥有 2MB 左右的栈内存,但是一个 goroutinue 只有 2KB 左右,因此一个进程可以随便创建上万个 goroutine,且由于协程的调度完全基于用户态,可以避免与内核态切换的上下文消耗,因此使用 goroutine 可以非常容易的开发出高性能应用。
基础
这个操作和 cpp 的线程基本完全一致,此处我们直接展示使用:
启动协程
使用 go
关键字即可启动一个协程,看一个错误例子:
func task() {
for i := 0; i < 10; i++ {
fmt.Println("task:", i)
time.Sleep(time.Millisecond * 100)
}
}
func main() {
go task() // 启动协程,会在后台执行这个任务
}
还记得一个线程创建后需要做什么吗,我们需要管控线程,要么进行 join,要么 deteach,对于 go 的协程也是同理,如果主线程退出,那么即使协程没有执行完毕,程序也会退出,由操作系统回收内存页等资源,因此需要让主线程等待协程完成后才退出。
WaitGroup 同步
sync.WaitGroup
用于等待一组协程完成:
var wg sync.WaitGroup
func task(id int) {
defer wg.Done() // 协程结束时计数 -1
fmt.Printf("Task %d running\n", id)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 启动协程前计数 +1
go task(i)
}
wg.Wait() // 阻塞直到计数归零
}
注意,需要在创建一个协程前就先增加计数,否则会出现竟态条件,可以自行分析一下如果先启动协程然后再增加计数会导致什么情况。
并发数量控制
Go 运行时的调度器会使用 GOMAXPROCS 来确定需要调度多少核心来执行代码,默认情况下 会调度所有的核心,当然我们也可以自己指定。
cpuNum := runtime.NumCPU() // 看一下有多少核心
runtime.GOMAXPROCS(cpuNum) // 指定使用多少核心
管道
如果我想让多个 goroutine 之间进行通信呢,这时就需要引入 Channel 的概念了。
因为 Go 的并发模型是 Csp,这个我们在 并发设计模式 里已经介绍过了,它是 通过通信实现共享内存 而不是 通过共享内存实现通信,而 Csp 模式的核心组件就是 Channel。
Channel 是语言级别上的 goroutine 的通信方式,一个协程把数据放在管道里,然后就不管了,另外一个协程可以从管道里取出数据,因此可以说 goroutine 是并发执行体,而 channel就是它们之间的桥梁。
创建与操作
chan 是一个引用类型,必须分配内存才能使用,基本语法很简单:
// 声明一个存储 var_type 类型的管道
var var_name chan var_type
对一个管道的操作主要有 发送、接收和关闭 三种操作
// 创建一个可以存储10个 int 类型数据的管道
ch := make(chan int, 10)
// 发送数据,即把数据放入管道
ch <- 42
// 接收数据,从管道中取出数据
val := <-ch
<-ch // 忽略接收值
// 关闭管道
close(ch)
对于关闭管道需要注意的是: 对于管道的关闭不是必须的,它可以被 GC 自动回收。且一个管道关闭后就不能再发送值了,只能读取,如果还有值就继续取出,但是没值时照样可以取出,只不过会取出对应类型的零值。
管道阻塞
无缓冲管道: 即不指定容量的管道,发送方会阻塞直到接收方接收,是一对一的,此时如果没有接收方,发送方就会一直等待而不退出,就会发生死锁。
ch := make(chan int) // 无缓冲
ch <- 10 // 此时就会死锁
有缓冲管道: 缓冲区未满时可以直接发送而不阻塞,只有缓冲区满时还想发送或者缓冲区空了还想接收才会阻塞。
ch := make(chan int, 2)
ch <- 1 // 不阻塞
ch <- 2 // 不阻塞
ch <- 3 // 阻塞! 缓冲区已满
判断管道是否关闭
使用两个返回值,只有管道已关闭且为空的情况下 ok 才会为 false。
val, ok := <-ch
使用 range 遍历,它会自动检测管道关闭状态,当管道关闭且数据取完后会自动退出循环:
ch := make(chan int, 5)
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 必须关闭,否则 range 会永久阻塞
for val := range ch {
fmt.Println(val) // 输出 0 1 2 3 4 后自动退出
}
如果管道没有关闭,range
会一直阻塞等待新数据,对于已关闭的管道,range
会先取完所有剩余数据,然后才会退出循环。
单向管道
有的时候我们会将管道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用管道都会对其进行限制,比如限制管道在函数中只能发送或只能接收:
// 只读管道
func consume(ch <-chan int) {
for val := range ch {
fmt.Println(val)
}
}
// 只写管道
func produce(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
func main() {
ch := make(chan int, 10) // 双向管道
go produce(ch) // 自动转为只写
consume(ch) // 自动转为只读
}
select
select
用于同时监听多个管道操作,类似 Unix 的 I/O 多路复用,比如,我需要查看多个管道,看看哪个里面有值,这种情况下就需要遍历对吧,但是一个管道为空会导致阻塞,导致其他管道数据无法及时处理,select就是为了解决这个问题出来的:
select {
case val := <-ch1:
fmt.Println("从 ch1 接收:", val)
case val := <-ch2:
fmt.Println("从 ch2 接收:", val)
case ch3 <- data:
fmt.Println("发送到 ch3")
default:
fmt.Println("所有管道都阻塞")
}
他会具有这么一个流程:
评估阶段: 检查所有 case 的管道操作是否就绪
选择阶段: 如果有一个 case 就绪,那就选择它,如果有多个 case 就绪则会随机选择一个 case,如果没有任何一个就绪,这是就要看是否有 default,如果有 default,则会选择此分支,否则就会阻塞等待到某个 case 就绪
执行阶段: 执行选择的 case,随后退出 select
并发安全与锁
锁这种同步原语我在 并发 中有详细介绍,原理什么的都一样,此处我们直接看如何在 go 中使用就好。
互斥锁
多个 goroutine 访问共享变量需要加锁:
var (
count int
mu sync.Mutex
)
func increment() {
mu.Lock()
count++
mu.Unlock()
}
检测竞态: go build -race
编译后运行可检测数据竞争。
读写锁
允许多个读操作并发,但写操作完全互斥:
var (
data int
rwMu sync.RWMutex
)
func read() {
rwMu.RLock()
fmt.Println(data)
rwMu.RUnlock()
}
func write(val int) {
rwMu.Lock()
data = val
rwMu.Unlock()
}