坑:Unlock 方法可以被任意的 goroutine 调用释放锁,即使是没持有这个互斥锁的 goroutine,也可以进行这个操作。这是因为,Mutex 本身并没有包含持有这把锁的 goroutine 的信息,所以,Unlock 也不会对此进行检查。Mutex 的这个设计一直保持至 今。

互斥锁是怎么实现的

1
2
3
4
5
如果锁是空闲状态且没等待的goroutine,直接获取锁

正常模式: 当前的`goroutine`会与被唤醒的`goroutine`进行抢锁,如果锁未抢到,则会进入自旋状态,自旋多次后,还未竞争到锁,如果是第1次未获取到锁,则加入到等待队列的尾部,如果超过阈值1毫秒,那么,将这个Mutex设置为饥饿模式。

饥饿模式:饥饿模式下,`mutex`将锁直接交给等待队列的最前面的`goroutine`,新来的`goroutine`不会尝试获取锁,即使锁没有被持有,也不会去抢,也不会`spin`,会加入到等待队列的尾部. 如果当前等待的`goroutine`是最后一个`waiter`,没有其他等待的`goroutine` 或者此`goroutine`等待的时间小于1ms,退出饥饿模式。

初版(最普通的做法)

数据结构

1
2
3
4
type Mutex struct { 
    key int32   // 锁是否被持有的标识
    sema int32  // 信号量专用,用以阻塞/唤醒goroutine 
}

具体实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 保证成功在val上增加delta的值
func xadd(val *int32, delta int32) (new int32) 
{ 
    for {
        v := *val 
        if cas(val, v, v+delta) 
        { 
            return v + delta 
        }
    }
    panic("unreached") 
}

// 请求锁
func (m *Mutex) Lock() 
{ 
    if xadd(&m.key, 1) == 1 // 标识加1,如果等于1,成功获取到锁
    { 
        return 
    }
    semacquire(&m.sema)     // 否则阻塞等待
}

// 解锁
func (m *Mutex) Unlock() { 
    if xadd(&m.key, -1) == 0 {
        //将标识减去1,如果等于0,则没有其它等待者
        return 
    }
    semrelease(&m.sema) // 唤醒其它阻塞的goroutine 
}

改进版

因:请求锁的 goroutine 会排队等待获取互斥锁。虽 然这貌似很公平,但是从性能上来看,却不是最优的。因为如果我们能够把锁交给正在占用 CPU 时间片的 goroutine的话,那就不需要做上下文的切换,在高并发的情况下,可能会有更好的性能。

改动点: 此次改动主要就是,新来的 goroutine 也有机会先获取到锁,甚至 一个 goroutine 可能连续获取到锁,打破了先来先得的逻辑。 state 是一个复合型的字段,一个字段包含多个意义,这样可以通过尽可能少的内存来实现互斥锁。这个字段的第一位(最小的一位)来表示这个锁是否被持有,第二位代表是否有唤醒的goroutine,剩余的位数代表的是等待此锁的 goroutine 数。所以,state这一个字段被分成了三部分,代表三个数据。

image.png-911.2kB 数据结构

1
2
3
4
5
6
type Mutex struct {
	state int32  // 不仅仅只是锁持有标识
	sema  uint32 // 信号量专用,用以阻塞/唤醒goroutine 
}

state: 0[等待的gorotine数量]0[唤醒标识]0[锁是否持有标识]

具体实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
const (
	mutexLocked = 1 << iota // mutex is locked
	mutexWoken
	mutexWaiterShift = iota
)

func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	// 快速上锁,当前 state 为 0,说明没人锁, 不阻塞。CAS 上锁后直接返回
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		return
	}
	
	// 被唤醒标记,如果是被别的 goroutine 唤醒的那么后面会置 true
	awoke := false
	for {
		// 老的状态值(0,1		old := m.state
		new := old | mutexLocked // 新值要置 mutexLocked 位为 1
		// 如果旧状态为1,代表已经被其他goroutine锁住
		// 如果 old mutexLocked 位不为 0,那说明有人己经锁上了,那么将 state 变量的 waiter 计数部分 +1
		if old&mutexLocked != 0 {
			// 那么将 state 变量的 waiter 计数部分 +1
			new = old + 1<<mutexWaiterShift
			// new = 5
		}
		// 如果当前goroutine被唤醒
		if awoke {
			// The goroutine has been woken from sleep,
			// so we need to reset the flag in either case.
			// 那么清除这个 mutexWoken 位,置为 0
			// new = 101
			new &^= mutexWoken
		}
		// CAS 更新,如果 m.state 不等于 old,说明有人也在抢锁,那么 for 循环发起新的一轮竞争。
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// 如果 old mutexLocked 位为 1,说明当前 CAS 是为了更新 waiter 计数。如果为 0,说明是抢锁成功,那么直接 break 退出。
			if old&mutexLocked == 0 {
				break
			}
			// 此时如果 sema <= 0 那么阻塞在这里等待唤醒,也就是 park 住。走到这里都是要休眠了。
			runtime_Semacquire(&m.sema)
			awoke = true // 有人释放了锁,然后当前 goroutine 被 runtime 唤醒了,设置 awoke true
		}
	}
}

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
	// Fast path: drop lock bit.
	// 快速将 state 的 mutexLocked 位清 0,然后 new 返回更新后的值,注意此 add 完成后,很有可能新的 goroutine 抢锁,并上锁成功
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if (new+mutexLocked)&mutexLocked == 0 { // 如果释放了一个己经释放的锁,直接 panic
		panic("sync: unlock of unlocked mutex")
	}

	old := new
	for {
		// 如果 state 变量的 waiter 计数为 0 说明没人等待锁
		// 直接 return 就好
		// 同时如果 old 值的 mutexLocked|mutexWoken 任一置 1		// 说明要么有人己经抢上了锁,要么说明己经有被唤醒的 goroutine 去抢锁了
		// 没必要去做通知操作

		// If there are no waiters or a goroutine has already
		// been woken or grabbed the lock, no need to wake anyone.
		if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
			return
		}
		// Grab the right to wake someone.
		//  将 waiter 计数位减一,并设置 awoken 位
		new = (old - 1<<mutexWaiterShift) | mutexWoken
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// cas 成功后,再做 sema release 操作,唤醒休眠的 goroutine
			runtime_Semrelease(&m.sema)
			return
		}
		old = m.state
	}
}

再次改进版

如果新来的 goroutine 或者是被唤醒的 goroutine 首次获取不到锁,它们就会通过自旋(spin,通过循环不断尝试,spin 的逻辑是在runtime 实现的)的方式,尝试检查锁是否被释放。在尝试一定的自旋次数后,再执行原来的逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (m *Mutex) Lock() { 
    // Fast path: 幸运之路,正好获取到锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { 
        return 
    }
    
    awoke := false 
    iter := 0 
    for { // 不管是新来的请求锁的goroutine, 还是被唤醒的goroutine,都不断尝试请求锁
        old := m.state // 先保存当前锁的状态
        new := old | mutexLocked  // 新状态设置加锁标志
        if old&mutexLocked != 0 { // 锁未释放
            if runtime_canSpin(iter) { // 还可以自旋
                // 未唤醒&&唤醒标志位为0&&有其他等待的goroutine&&其他goroutine未改变过状态
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true 
                }
                // 执行自旋
                runtime_doSpin() 
                // 自旋+1
                iter++ 
                continue // 自旋,再次尝试请求锁
            }
            // 自旋也未抢到锁,将等待加1
            new = old + 1<<mutexWaiterShift 
        }
        
        if awoke {
            // 唤醒状态
            if new&mutexWoken == 0 { 
                panic("sync: inconsistent mutex state") 
            }
            new &^= mutexWoken // 新状态清除唤醒标记
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) { 
            if old&mutexLocked == 0 { // 旧状态锁已释放,新状态成功持有了锁,直接
                break
            }
            runtime_Semacquire(&m.sema) // 阻塞等待
            awoke = true // 被唤醒
            iter = 0 
        } 
    }
}

增加饥饿模式

因为新来的 goroutine 也参与竞争,有可能每次都会被新来的 goroutine 抢到获取锁的机会,在极端情况下,等待中的 goroutine 可能会一直获取不到锁,这就是饥饿问题。Go 1.9 中 Mutex 增加了饥饿模式

image.png-385.4kB

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
const ( 
    mutexLocked = 1 << iota // mutex is locked 
    mutexWoken 
    mutexStarving // 从state字段中分出一个饥饿标记
    mutexWaiterShift = iota 
    
    starvationThresholdNs = 1e6 
)

func (m *Mutex) Lock() { 
    // Fast path: 幸运之路,一下就获取到了锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { 
        return 
    }
    
    // Slow path:缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争
    m.lockSlow() 
}

func (m *Mutex) lockSlow() {

    var waitStartTime int64 
    starving := false // 此goroutine的饥饿标记
    awoke := false // 唤醒标记
    iter := 0 // 自旋次数
    old := m.state // 当前的锁的状态

    for {// 锁是非饥饿状态,锁还没被释放,尝试自旋
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true 
            }
            runtime_doSpin()
            iter++ 
            old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了

            continue 
        }

        new := old
        if old&mutexStarving == 0 { 
            new |= mutexLocked // 非饥饿状态,加锁
        }
        
        if old&(mutexLocked|mutexStarving) != 0 { 
            new += 1 << mutexWaiterShift // waiter数量加1 
        }
        
        if starving && old&mutexLocked != 0 { 
            new |= mutexStarving // 设置饥饿状态
        }
        
        if awoke { 
            if new&mutexWoken == 0 { 
                throw("sync: inconsistent mutex state") 
            }
            new &^= mutexWoken // 新状态清除唤醒标记
        }

        if atomic.CompareAndSwapInt32(&m.state, old, new) { // 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
            if old&(mutexLocked|mutexStarving) == 0 { 
                break // locked the mutex with CAS 
            }
            
            // 处理饥饿状态
            // 如果以前就在队列里面,加入到队列头
            queueLifo := waitStartTime != 0 
            if waitStartTime == 0 { 
                waitStartTime = runtime_nanotime() 
            }
            
            // 阻塞等待
            runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 唤醒之后检查锁是否应该处于饥饿状态
            starving = starving || runtime_nanotime()-waitStartTime > star old = m.state // 如果锁已经处于饥饿状态,直接抢到锁,返回
            
            if old&mutexStarving != 0 { 
              if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				} 
                
                // 有点绕,加锁并且将waiter数减1 
                delta := int32(mutexLocked - 1<<mutexWaiterShift) 
                if !starving || old>>mutexWaiterShift == 1 { 
                    delta -= mutexStarving // 最后一个waiter或者已经不饥饿了,清
                }
                atomic.AddInt32(&m.state, delta) 
                break 
            }
            awoke = true 
            iter = 0
        } else { 
            old = m.state 
        } 
    } 
}

补充

对于信号量的唤醒的阻塞可以参考这篇文章:https://blog.csdn.net/liyunlong41/article/details/104949898

对于正常模式与饥饿模式下详细解析可以参考这篇文章:https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/