坑:Unlock 方法可以被任意的 goroutine 调用释放锁,即使是没持有这个互斥锁的
goroutine,也可以进行这个操作。这是因为,Mutex 本身并没有包含持有这把锁的
goroutine 的信息,所以,Unlock 也不会对此进行检查。Mutex 的这个设计一直保持至
今。
互斥锁是怎么实现的#
1
2
3
4
5
|
如果锁是空闲状态且没等待的goroutine,直接获取锁
正常模式: 当前的`goroutine`会与被唤醒的`goroutine`进行抢锁,如果锁未抢到,则会进入自旋状态,自旋多次后,还未竞争到锁,如果是第1次未获取到锁,则加入到等待队列的尾部,如果超过阈值1毫秒,那么,将这个Mutex设置为饥饿模式。
饥饿模式:饥饿模式下,`mutex`将锁直接交给等待队列的最前面的`goroutine`,新来的`goroutine`不会尝试获取锁,即使锁没有被持有,也不会去抢,也不会`spin`,会加入到等待队列的尾部. 如果当前等待的`goroutine`是最后一个`waiter`,没有其他等待的`goroutine` 或者此`goroutine`等待的时间小于1ms,退出饥饿模式。
|
初版(最普通的做法)#
数据结构
1
2
3
4
|
type Mutex struct {
key int32 // 锁是否被持有的标识
sema int32 // 信号量专用,用以阻塞/唤醒goroutine
}
|
具体实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
// 保证成功在val上增加delta的值
func xadd(val *int32, delta int32) (new int32)
{
for {
v := *val
if cas(val, v, v+delta)
{
return v + delta
}
}
panic("unreached")
}
// 请求锁
func (m *Mutex) Lock()
{
if xadd(&m.key, 1) == 1 // 标识加1,如果等于1,成功获取到锁
{
return
}
semacquire(&m.sema) // 否则阻塞等待
}
// 解锁
func (m *Mutex) Unlock() {
if xadd(&m.key, -1) == 0 {
//将标识减去1,如果等于0,则没有其它等待者
return
}
semrelease(&m.sema) // 唤醒其它阻塞的goroutine
}
|
改进版#
因:请求锁的 goroutine 会排队等待获取互斥锁。虽
然这貌似很公平,但是从性能上来看,却不是最优的。因为如果我们能够把锁交给正在占用 CPU 时间片的 goroutine的话,那就不需要做上下文的切换,在高并发的情况下,可能会有更好的性能。
改动点:
此次改动主要就是,新来的 goroutine 也有机会先获取到锁,甚至
一个 goroutine 可能连续获取到锁,打破了先来先得的逻辑。
state 是一个复合型的字段,一个字段包含多个意义,这样可以通过尽可能少的内存来实现互斥锁。这个字段的第一位(最小的一位)来表示这个锁是否被持有,第二位代表是否有唤醒的goroutine,剩余的位数代表的是等待此锁的 goroutine 数。所以,state这一个字段被分成了三部分,代表三个数据。
数据结构
1
2
3
4
5
6
|
type Mutex struct {
state int32 // 不仅仅只是锁持有标识
sema uint32 // 信号量专用,用以阻塞/唤醒goroutine
}
state: 0[等待的gorotine数量]0[唤醒标识]0[锁是否持有标识]
|
具体实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 快速上锁,当前 state 为 0,说明没人锁, 不阻塞。CAS 上锁后直接返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// 被唤醒标记,如果是被别的 goroutine 唤醒的那么后面会置 true
awoke := false
for {
// 老的状态值(0,1)
old := m.state
new := old | mutexLocked // 新值要置 mutexLocked 位为 1
// 如果旧状态为1,代表已经被其他goroutine锁住
// 如果 old mutexLocked 位不为 0,那说明有人己经锁上了,那么将 state 变量的 waiter 计数部分 +1
if old&mutexLocked != 0 {
// 那么将 state 变量的 waiter 计数部分 +1
new = old + 1<<mutexWaiterShift
// new = 5
}
// 如果当前goroutine被唤醒
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
// 那么清除这个 mutexWoken 位,置为 0
// new = 101
new &^= mutexWoken
}
// CAS 更新,如果 m.state 不等于 old,说明有人也在抢锁,那么 for 循环发起新的一轮竞争。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果 old mutexLocked 位为 1,说明当前 CAS 是为了更新 waiter 计数。如果为 0,说明是抢锁成功,那么直接 break 退出。
if old&mutexLocked == 0 {
break
}
// 此时如果 sema <= 0 那么阻塞在这里等待唤醒,也就是 park 住。走到这里都是要休眠了。
runtime_Semacquire(&m.sema)
awoke = true // 有人释放了锁,然后当前 goroutine 被 runtime 唤醒了,设置 awoke true
}
}
}
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
// 快速将 state 的 mutexLocked 位清 0,然后 new 返回更新后的值,注意此 add 完成后,很有可能新的 goroutine 抢锁,并上锁成功
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 { // 如果释放了一个己经释放的锁,直接 panic
panic("sync: unlock of unlocked mutex")
}
old := new
for {
// 如果 state 变量的 waiter 计数为 0 说明没人等待锁
// 直接 return 就好
// 同时如果 old 值的 mutexLocked|mutexWoken 任一置 1,
// 说明要么有人己经抢上了锁,要么说明己经有被唤醒的 goroutine 去抢锁了
// 没必要去做通知操作
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
// Grab the right to wake someone.
// 将 waiter 计数位减一,并设置 awoken 位
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// cas 成功后,再做 sema release 操作,唤醒休眠的 goroutine
runtime_Semrelease(&m.sema)
return
}
old = m.state
}
}
|
再次改进版#
如果新来的 goroutine 或者是被唤醒的 goroutine 首次获取不到锁,它们就会通过自旋(spin,通过循环不断尝试,spin 的逻辑是在runtime 实现的)的方式,尝试检查锁是否被释放。在尝试一定的自旋次数后,再执行原来的逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
func (m *Mutex) Lock() {
// Fast path: 幸运之路,正好获取到锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
awoke := false
iter := 0
for { // 不管是新来的请求锁的goroutine, 还是被唤醒的goroutine,都不断尝试请求锁
old := m.state // 先保存当前锁的状态
new := old | mutexLocked // 新状态设置加锁标志
if old&mutexLocked != 0 { // 锁未释放
if runtime_canSpin(iter) { // 还可以自旋
// 未唤醒&&唤醒标志位为0&&有其他等待的goroutine&&其他goroutine未改变过状态
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 执行自旋
runtime_doSpin()
// 自旋+1
iter++
continue // 自旋,再次尝试请求锁
}
// 自旋也未抢到锁,将等待加1
new = old + 1<<mutexWaiterShift
}
if awoke {
// 唤醒状态
if new&mutexWoken == 0 {
panic("sync: inconsistent mutex state")
}
new &^= mutexWoken // 新状态清除唤醒标记
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 { // 旧状态锁已释放,新状态成功持有了锁,直接
break
}
runtime_Semacquire(&m.sema) // 阻塞等待
awoke = true // 被唤醒
iter = 0
}
}
}
|
增加饥饿模式#
因为新来的 goroutine 也参与竞争,有可能每次都会被新来的 goroutine 抢到获取锁的机会,在极端情况下,等待中的 goroutine 可能会一直获取不到锁,这就是饥饿问题。Go 1.9 中 Mutex 增加了饥饿模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving // 从state字段中分出一个饥饿标记
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
func (m *Mutex) Lock() {
// Fast path: 幸运之路,一下就获取到了锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path:缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false // 此goroutine的饥饿标记
awoke := false // 唤醒标记
iter := 0 // 自旋次数
old := m.state // 当前的锁的状态
for {// 锁是非饥饿状态,锁还没被释放,尝试自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了
continue
}
new := old
if old&mutexStarving == 0 {
new |= mutexLocked // 非饥饿状态,加锁
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift // waiter数量加1
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving // 设置饥饿状态
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // 新状态清除唤醒标记
}
if atomic.CompareAndSwapInt32(&m.state, old, new) { // 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 处理饥饿状态
// 如果以前就在队列里面,加入到队列头
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 唤醒之后检查锁是否应该处于饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > star old = m.state // 如果锁已经处于饥饿状态,直接抢到锁,返回
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 有点绕,加锁并且将waiter数减1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving // 最后一个waiter或者已经不饥饿了,清
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}
|
对于信号量的唤醒的阻塞可以参考这篇文章:https://blog.csdn.net/liyunlong41/article/details/104949898
对于正常模式与饥饿模式下详细解析可以参考这篇文章:https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/