文章内容更新请以 WGrape GitHub博客 : 浅谈Go语言的并发控制 为准
前言
本文原创,著作权归WGrape所有,未经授权,严禁转载
一、说明
- 为方便阅读,“线程”、“协程”、“子程序”虽然是有区别的,但在本文中不做区分,存在混用的情况
- 文章虽然以Go语言为主题,但很多原理和思想对于其他语言都是通用的,可举一反三
- 内容尽量浅尝辄止,不过于广泛或细节而偏离主题的同时,也让大家有所收获
二、内容大纲
- 了解并发控制
- Go语言常用的并发控制
- channel存在的问题
- 强大的Context
三、了解并发控制
1、理解
一种协调多个子程序间并发执行的技术,注意“多个”和“并发”这两个关键条件缺一不可
2、目的
实现线程安全,也称为并发安全
3、分类
并发控制主要分为两类,一种是对数据的控制,一种是对行为的控制
- 数据的控制 :数据的共享或者独占
- 行为的控制 :行为的开始或者停止
4、总结
- 并发控制包括锁、非锁两种技术
- 控制的对象主要包括数据、行为两种
- 锁通过对数据的直接控制,可以实现对行为的间接控制(如Redis的Setnx分布式锁)
- 非锁技术通过对行为的直接控制,可以实现对数据的间接控制(如JAVA的synchronized同步语句块)
四、Go语言常用的并发控制
Go主要提供了如下几种并发控制方法
- 锁技术 :锁
- 非锁技术 :同步原语、Channel、Context
1、锁
Go只提供了Mutex和RWMutex两种类型的锁,之所以只提供这两种锁,是因为在应用层所有奇奇怪怪的锁都是基于这两个锁之上的,这两个是最基本的锁。就像在硬件层,总线锁是最基本的锁一样
(1) Mutex 互斥锁
<1> 理解
Mutex的本意是互斥量,如果一个量的值只能为0或1,那么这个量就是互斥量
互斥量的值0或1,可以用来表示上锁和解锁两种状态,这种基于互斥量的锁也就叫做互斥锁,其最重要的特征就是互斥性(独占性),即线程1占有的时候,其他线程必须等待,如图所示
<2> 使用
假设有一个账户,转账线程先获得了锁,但是在过了一段时间后,100元才入账成功。在转账线程没有解锁的这段时间内,读取账户的线程在这段时间内无法获得锁
type Account struct {
money int
lock sync.Mutex
}
func main() {
// 我的账户
var myAccount Account
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
fmt.Print("write thread: try to get lock ...\n")
myAccount.lock.Lock()
fmt.Print("write thread: get lock ! \n")
go func() {
fmt.Print("read thread: try to get lock ...\n")
myAccount.lock.Lock()
fmt.Printf("read thread: get lock ! money = %d \n", myAccount.money)
wg.Done()
}()
time.Sleep(3 * time.Second) // 模拟业务逻辑耗时
myAccount.money = 100
fmt.Print("write thread: write success ! \n")
myAccount.lock.Unlock()
wg.Done()
}()
wg.Wait()
}
由于无法显示动图, 请到原地址查看
<3> 适用场景
适用于读写频率没有显著区别,且任意时刻都必须保证只有一个线程可以写入或访问的情况
(2) RWMutex 读写锁
<1> 互斥锁的缺点
互斥锁有一个比较明显的缺点,如果被线程锁住时间较长,且线程在最后一刻才把数据写入,那么在被线程锁住的这打段时间内,会耽误其他线程的读取操作(比如展示用户当前粉丝数、金币数等),如图所示
<2> 读写锁
为了解决互斥锁占用资源、效率低的问题,就出现了一种新的锁,即读写锁
- 读写锁具有两个状态 :写状态 和 读状态
- 在写状态下,所有线程(读或写)都需要等待获取锁
- 在读状态下,读线程可以并发获得锁,不需要等待,但是写线程必须等待所有读线程结束后才能获得锁
<3> 使用
在互斥锁代码的基础上做如下修改,转账线程开始只用读锁,在最后转账的时候,才用写锁。这样在转账线程未真正转账前,不会影响到读取账户的线程
type Account struct {
money int
lock sync.RWMutex
}
func main() {
// 我的账户
var myAccount Account
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
fmt.Print("write thread: try to get lock ...\n")
myAccount.lock.RLock()
fmt.Print("write thread: get lock ! \n")
go func() {
fmt.Print("read thread: try to get lock ...\n")
myAccount.lock.RLock()
fmt.Printf("read thread: get lock ! money = %d \n", myAccount.money)
wg.Done()
myAccount.lock.RUnlock()
}()
time.Sleep(3 * time.Second) // 模拟业务逻辑耗时
myAccount.lock.RUnlock()
myAccount.lock.Lock()
myAccount.money = 100
fmt.Print("write thread: write success ! \n")
myAccount.lock.Unlock()
wg.Done()
}()
wg.Wait()
}
由于无法显示动图, 请到原地址查看
<4> 适用场景
适用于多读少写的场景
(3) 扩展
<1> Mutex饥饿模式
Mutex内置了公平机制,即增加了饥饿模式,用来解决线程饥饿的问题
<2> 实现其他类型锁
有了Go提供的基本锁,可以在其基础上实现自旋锁、可重入锁
2、同步原语
原语一词译作primitive,是原始、远古的意思,表示已经到了最远始,不可再追溯的时间尽头,引申到计算机中就是不可再分割的、最小的操作语句
同步原语是一组用于子程序间同步的语句,且这组语句在执行过程中不可再分割,即具有原子性
(1) atomic包
Go提供的基于原子读、原子写而实现的一系列原子操作,由于没有使用锁,所以是一种无锁(lock free)技术
<1> 读和写都是非原子的
在并发条件下,读操作、写操作存在不是原子操作的情况,如下图所示
- 非原子读 :线程1读取的时候,被线程2的写入而干扰
- 非原子写 :写=读+写,线程1读出数据后加一,新值还未写入,就被线程2先写入
<2> 原子读和原子写的实现
Go的原子读(Load)和原子写(Store),都是借助硬件层通过汇编实现的
- 原子读 :依赖硬件层的 总线锁 或 缓存锁 实现
- 原子写 :依赖硬件层的CPU指令 CMPXCHG
对于这部分,只需要知道Go底层的Load、Store操作等是通过硬件支持的即可,本文不再描述。下面主要介绍下CMPXCHG指令的思想 :CAS
<3> 理解CAS
理解CAS(Compare And Swap)需要从其本意上去思考,其包含“比较和交换”两个操作,先比较,如果一致则交换。由于每次只是检测是否一致,是否有冲突,所以这是一种基于冲突检测的机制。注意CAS是一种思想,既可以在硬件层实现(如CMPXCHG指令),也可以在应用上层实现(如乐观锁)
在比较中,如果发现不一样,一般的补偿机制是自旋,即不断重试以下操作
- 读取新值
- 再次比较是否一致,如果一致则写入
- 不一致则再次重试整个操作
<4> CAS的常见应用
目前基于CAS实现的技术非常多,常见的有如下几种
- 乐观锁
- MVCC
<5> CAS的优缺点
优点 :性能明显高于锁 缺点 :CPU开销大,ABA问题
<6> 一系列原子操作
既然有了最基本的原子操作 :原子读和原子写。因此很容易就可以在这两个操作的基础上,再实现一系列原子操作(当然还可以扩展出更多 ……),这样的一系列原子操作,都会收录在Go语言的atomic包中
- Increase() :原子性的自增
- Decrease() :原子性的自减
- Add(x) :原子性的对某个数字增加X
<7> 使用
先看第一种不使用原子操作的情况,对 money
并发自增1000次,可以看到最后的结果并不是1000
func main() {
money := 0
wg := sync.WaitGroup{}
wg.Add(1000)
for i := 1; i <= 1000; i++ {
go func() {
money++
wg.Done()
}()
}
wg.Wait()
fmt.Printf("money = %d \n", money)
}
由于无法显示动图, 请到原地址查看
再看使用原子操作的情况,结果是1000,实现了并发场景下的原子性
func main() {
var money int32
wg := sync.WaitGroup{}
wg.Add(1000)
for i := 1; i <= 1000; i++ {
go func() {
atomic.AddInt32(&money, 1)
wg.Done()
}()
}
wg.Wait()
fmt.Printf("money = %d \n", money)
}
由于无法显示动图, 请到原地址查看
<8> 为什么不用锁
上面的代码使用锁,也可以达到相同的效果,但由于锁需要用户控制,容易产生死锁或bug,而原子操作由底层实现,安全可靠。所以一般情况下,只要是可以用原子操作代替锁的场景,都建议使用原子操作
<9> 适用场景
适用于需要对整型数字做并发安全控制的场景
(2) 信号量
信号量是一种由Dijkstra(迪杰斯特拉)发明的子程序间同步技术,包含一个信号量和两个PV原语,P原语用来尝试获取信号(-1),V原语用来新增信号(+1)
Go语言的实现主要由一个数据结构和三个具有原子性的方法组成,如下所示
type Semaphore struct {
value int
}
func (s *Semaphore) add() {
atomic.AddUint64(&s.value, 1)
}
func (s *Semaphore) done() {
atomic.AddUint64(&s.value, -1)
}
func (s *Semaphore) wait() {
for{
if atomic.LoadUint64(&s.value) == 0{
return
}
}
}
<1> 理解
信号量可以理解为门口的信号灯,每新来一辆车就排队(add),门里面的车未完全进入时,下一辆车必须等待(wait),直到门里面的车完全出去后(done),下一辆车才能进入
注意信号量的机制不是队列,图中内容只是为了更形象的理解
<2> 使用
在Go语言中,信号量由WaitGroup对象实现,主要包括add、done、wait三个操作。如果主协程开启了两个子协程,主协程需要等待两个协程工作完全结束后才能停止,实现方法如下
func main(){
wg := sync.WaitGroup{}
fmt.Print("main routine waiting ...\n\n")
wg.Add(2)
go func() {
fmt.Print("goroutine1 working ...\n\n")
time.Sleep(1 * time.Second) // 模拟业务逻辑耗时
fmt.Print("goroutine1 done\n\n")
wg.Done()
}()
go func() {
fmt.Print("goroutine2 working ...\n\n")
time.Sleep(2 * time.Second) // 模拟业务逻辑耗时
fmt.Print("goroutine2 done\n\n")
wg.Done()
}()
wg.Wait()
fmt.Print("main routine stop ...\n\n")
}
由于无法显示动图, 请到原地址查看
<3> 适用场景
适用于一个协程等待另一个或一组协程结束的场景
(3) once
once也是Go提供的一种同步原语,具有一个重要特征 :即使并发,也只会执行一次
<1> 理解
once由一个Once对象和一个Do方法组成,内部基于锁和原子操作
- 锁 :用来保证只有一个协程执行
- 原子操作 :保证 done 字段读取的原子性 ```go // Once 对象 type Once struct { done uint32 // 标记, 记录是否已执行, 0未执行, 1已执行 m Mutex // 锁, 用来保证只有一个协程执行 }
func (o *Once) Do(f func()) { if atomic.LoadUint32(&o.done) == 0 { o.doSlow(f) } }
func (o *Once) doSlow(f func()) { o.m.Lock() defer o.m.Unlock() if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) f() } }
#### <2> 使用
开启10个协程,每个协程都执行某个方法,要求这个方法只能被执行1次
```go
func main() {
var once sync.Once
doOnce := func() {
fmt.Print("\n\ndo only once ...\n\n\n")
}
for i := 0; i < 10; i++ {
go func() {
once.Do(doOnce)
}()
}
time.Sleep(2 * time.Second)
}
由于无法显示动图, 请到原地址查看
<3> 适用场景
适用于在并发条件下,某个操作必须只做一次的场景
(4) cond
cond是Go提供的一种比较少用的同步原语,如果需要在某个条件下才唤醒一个协程或全部的协程的时候,可以考虑下cond。而且它能够让出处理器的使用权,提高 CPU 的利用率,远比我们使用 sleep()
这种方式优雅和高效
<1> 理解
使用步骤如下
- 先使用
NewCond()
创建一个cond变量 - 在不同的协程中,使用cond先
lock()
,然后wait()
等待条件,最后unlock()
- 在需要唤醒不同协程的通知协程中,使用
signal()
唤醒一个协程,或使用lock()
、broadcast()
、unlock()
的方式唤醒所有协程 - 注意唤醒顺序是按照入链表顺序,即先进先出
<2> 使用
func main() {
var condition = false
var wg sync.WaitGroup
wg.Add(3)
mutexLock := sync.Mutex{}
conditionPrimitive := sync.NewCond(&mutexLock)
// 子协程拿到锁后, 发现条件为假, 会阻塞等待, 进入 notifyList 待唤醒列表
go func() {
conditionPrimitive.L.Lock() // 上锁
for condition == false {
fmt.Println("routine1 wait ...")
conditionPrimitive.Wait() // 阻塞等待
}
fmt.Println("routine1 done: condition =", condition)
conditionPrimitive.L.Unlock() // 解锁
wg.Done()
}()
// 子协程拿到锁后, 发现条件为假, 会阻塞等待, 进入 notifyList 待唤醒列表
go func() {
conditionPrimitive.L.Lock() // 上锁
for condition == false {
fmt.Println("routine2 wait ...")
conditionPrimitive.Wait() // 阻塞等待
}
fmt.Println("routine2 done: condition =", condition)
conditionPrimitive.L.Unlock() // 解锁
wg.Done()
}()
// 子协程拿到锁后, 发现条件为假, 会阻塞等待, 进入 notifyList 待唤醒列表
go func() {
conditionPrimitive.L.Lock() // 上锁
for condition == false {
fmt.Println("routine3 wait ...")
conditionPrimitive.Wait() // 阻塞等待
}
fmt.Println("routine3 done: condition =", condition)
conditionPrimitive.L.Unlock() // 解锁
wg.Done()
}()
time.Sleep(2 * time.Second)
// 主协程唤醒先第一个协程, 条件置为真
fmt.Print("\nwakeup: \n")
condition = true
conditionPrimitive.Signal()
// 主协程拿到锁后, 条件置为真, 并广播给剩下的所有子协程
conditionPrimitive.L.Lock() // 上锁
conditionPrimitive.Broadcast() // 广播给待通知链表中的所有协程
fmt.Print("broadcast: \n\n")
conditionPrimitive.L.Unlock() // 解锁
wg.Wait()
}
由于无法显示动图, 请到原地址查看
<3> 适用场景
需要一个或一组协程等待,并在一定条件下才会被唤醒的情况
3、channel
Channel是Go提供的用于协程间通信的数据结构
(1) 使用
func work(done chan bool) {
for {
select {
case <-done:
fmt.Printf("routine done\n\n")
return
default:
fmt.Print("routine doing ...\n")
time.Sleep(300 * time.Millisecond) // 模拟业务逻辑耗时
}
}
}
func main() {
done := make(chan bool)
fmt.Print("main routine doing ...\n\n")
go work(done)
time.Sleep(2 * time.Second) // 模拟业务逻辑耗时
done <- true // 通知子协程停止工作
fmt.Printf("main done\n\n")
}
由于无法显示动图, 请到原地址查看
(2) 适用场景
适用于通知协程的场景
五、channel存在的问题
1、理想情况
在理想的场景下,协程间通信的情况不会很复杂,比如协程间的通信只是单向的、参与通信的协程数量极少、不存在级联操作等。这样用一个channel就可以解决通信的问题,如果不够再增加也不会很复杂
2、现实情况
现实中的场景会要复杂的多,如下所示的消息队列的消费服务,包含了主协程、后台协程、接收协程、调度协程、消费协程共5种协程。
后台协程用于监听信号,如退出信号、热更新信号、重启信号等,所以就必须为上述架构再增加一套协程间通信的设计,设计如下图所示
每两个协程之间都需要两个反向的channel,而且这些协程之间的级联操作很长,就会导致整个服务内部各个协程间的通信过于复杂
3、总结
channel适用于场景简单的少量协程间的通信,一旦发现需要增加很多channel才能解决问题的时候,就要考虑修改架构设计,或者使用一种专门解决大量channel间通信问题的技术 :Context
六、强大的Context
(1) 介绍
Context是Go内置的一种接口类型,在此基础上又定义了三种结构体类型,分别是cancelCtx、timerCtx和valueCtx
Go提供了WithCancel()
、WithDeadline()
、WithTimeout()
、WithValue()
4个用于创建上述3种ctx对象的方法,其中WithValue()
会在当前ctx上写入一个KV键值对,用于实现协程间数据的传递,其他的三种方法都是基于父类ctx新建一个子ctx,用于实现协程退出后,与之相关的子协程的自动结束
(2) 理解
1、先通过 Background()
创建一个根ctx
2、有了这个根节点之后,再继续在其基础上通过With*()
创建子ctx节点。
3、每一个ctx节点的结束,子ctx节点都会结束
4、如果需要传递数据,使用WithValue()
即可
其中ctx之间的派生关系如下图所示,ctx之间可以传递数据,也可以方便的实现自动级联操作
(3) 使用
// 后台协程
func background(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Print("background done\n")
return
default:
}
time.Sleep(500 * time.Millisecond)
}
}
// 接受协程
func receive(ctx context.Context) {
for {
ctx = context.WithValue(ctx, "message", rand.Int31n(100))
dispatch(ctx)
select {
case <-ctx.Done():
fmt.Print("receive done\n")
return
default:
}
time.Sleep(500 * time.Millisecond)
}
}
// 调度协程
func dispatch(ctx context.Context) {
go consume(ctx)
select {
case <-ctx.Done():
fmt.Print("dispatch done\n")
return
default:
}
}
// 消费协程
func consume(ctx context.Context) {
select {
case <-ctx.Done():
fmt.Print("consume done\n")
return
default:
fmt.Printf("consuming ... message = %d\n", ctx.Value("message"))
}
}
func main() {
rootCtx := context.Background()
ctx, cancel := context.WithCancel(rootCtx)
go receive(ctx)
go background(ctx)
time.Sleep(1 * time.Second)
cancel()
time.Sleep(1 * time.Second)
}
由于无法显示动图, 请到原地址查看
(4) 适用场景
适用于协程间复杂通信的场景,如级联操作
七、总结
无论在Go还是其他语言中,一旦遇到并发控制的问题,都可以使用下面的步骤分析并解决 1、要控制数据,还是控制行为 ? 2、应该选择直接控制还是间接控制 ? 3、尽量使用其他技术来代替锁,不要直接使用锁 ! 4、尽量保持线程间关系的简单清晰,不要设计复杂 !