channel是golang中重要的组成部分。也是协程通信间通信的方式

channel常见理论面试题

channel 什么情况下会出现panic

  1. 试图往已close的channel继续发送
  2. 试图关闭已关闭的channel
  3. 试图关闭一个nil的channel

向为nil的channel发送数据会怎样

给定一个 nil channel c:
<-c 从 c 接收将永远阻塞
c <- v 发送值到 c 会永远阻塞

channel close后读的问题

关闭后继续读数据,已在缓冲区内的可以读取到数据,而后得到的是零值(对于int,就是0)。
我们可以通过for循环遍历channel,来获取到已经写入的值

1
2
3
for _, v := range channel{
    
}

对于可能获取到零值,我们可以使用断言判断,ok为false代表channel已关闭后读取的

1
value, ok := <- channel

也可以在close channel之后有意将channel 置为 nil, 如此再使用,并不会读取到零值
对于不带缓冲的ch,和带缓冲的一样,channel close掉之后并不影响读,只影响写入

channel底层源码剖析

go的运行时源码在runtime/chan.go文件下,可以在该文件下查看具体的数据结构,send(channel <-),receive(-> channel),close channel等操作

数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
type hchan struct {
    qcount   uint          // total data in the queue  代表 chan 中已经接收但还没被取走的元素的个数。内建函数 len 可以返回这个字段的值。
    dataqsiz uint          // size of the circular queue 循环队列的大小
    buf      unsafe.Pointer// points to an array of dataqsiz elements 循环队列的指针
    elemsize uint16        // chan中元素的大小
    closed   uint32        // 是否已closed
    elemtype *_type        // element type chan中元素类型
    sendx    uint          // send index send在buf中的索引
    recvx    uint          // receive index recv在buf中的索引
    recvq    waitq         // list of recv waiters receiver中的等待队列
    sendq    waitq         // list of send waiters sender的等待队列
}

go的channel底层结构包含三部分:

  1. 与buffer相关的属性字段,qcount, dataqsiz,buf。带缓冲区的channel会使用到这些字段,buffer中存放了待接收的数据。使用ring buffer实现
  2. waitq相关的属性,可以理解为是一个 FIFO 的标准队列。其中 recvq 中是正在等待接收数据的 goroutine,sendq 中是等待发送数据的 goroutine。waitq 使用双向链表实现。
  3. 其他属性,全局互斥锁lock,用于保证线程安全、elemtype(元素类型)、closed(通道关闭状态)等

往通道发送数据 send to channel

整体发送流程

  1. 如果是nil channel,则直接阻塞
  2. chan已经被close,如果继续发送,则报panic
  3. 从接收队列中出队一个等待的receiver,发送的元素正好有goroutine等待,则直接将数据交给该goroutine;
    调用send函数,递增sendx,recvx的索引,然后直接把元素给到等他的goroutine,并且唤醒他;
  4. 如果buffer缓存区未满时,ring buffer 还有空间,那么把元素放入buffer,递增索引并返回
  5. 如果buffer缓存区已满,发送者的goroutine就会加入到发送者的等待队列sendq中,直至被唤醒。 如果数据已被取走,或channel被close, 会阻塞当前goroutine,保存好goroutine当前状态,等待被唤醒;

发送流程源码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	// 如果是nil channel 则直接阻塞
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "\n")
	}

	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
	}

	if !block && c.closed == 0 && full(c) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)
	// chan已经被close的情景,如果继续发送,则报panic
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
    // 从接收队列中出队一个等待的receiver
    // 场景一:性能最好的场景,我投递的元素刚好有人在等着(那我直接给他就完了);
    // 调用的是 send 函数,递增 sendx, recvx 的索引,然后直接把元素给到等他的人,并且唤醒他;
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
    // 场景二:ring buffer 还有空间,那么把元素放好,递增索引,就可以返回了;
	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}

	if !block {
		unlock(&c.lock)
		return false
	}
	// 如果buffer缓存区已满,发送者的goroutine就会加入到发送者的等待队列sendq中,直至被唤醒。
	// 如果数据已被取走,或channel被close, 会阻塞当前goroutine,保存好goroutine当前状态,等待被唤醒;
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
    // 将当前goroutine放入waiting状态,并释放c.lock锁
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	KeepAlive(ep)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
}

往通道接受数据 receive from channel

整体接收流程

  1. 如果是nil channel, 则永久阻塞
  2. 如果通道已close,且通道缓冲没有元素时候,直接返回
  3. 从发送队列sendq中取出一个发送者sender,发送者不为空时候,将发送者数据传递给该goroutine
  4. buffer缓冲队列中有数据情况下,从缓存队列取出数据,传递给当前goroutine(接收者)

receive源码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	//  如果是nil channel, 则永久阻塞
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	if !block && empty(c) {
		
		if atomic.Load(&c.closed) == 0 {
			return
		}
		if empty(c) {
			// The channel is irreversibly closed and empty.
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)
    // 如果通道已close,且通道缓冲没有元素时候,直接返回
	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}
    // 从发送者队列中取出一个发送者,发送者不为空时候,将发送者数据传递给接收者
	if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
    // 缓冲区buf中有数据,则取出数据给当前的goroutine
	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	if !block {
		unlock(&c.lock)
		return false, false
	}
    // 如果没有发送者且buf中不包含数据,则阻塞当前的goroutine,直至被唤醒
	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success
}

关闭channel

整体流程

  1. nil的channel关闭直接panic
  2. closed的channel关闭直接panic
  3. 遍历所有的接受队列与发送队列,并依次唤醒对应的goroutine

源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func closechan(c *hchan) {
	// 当关闭的通道是nil时候,直接恐慌
	if c == nil {
		panic(plainError("close of nil channel"))
	}
	// 加锁
	lock(&c.lock)
    // 通道已关闭,再次关闭直接恐慌
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}
    ...
	c.closed = 1 // 关闭标志closed置为1
	var glist gList
    // 将接收者添加到glist中
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	// 将发送者添加到glist中
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp) // 
	}
	unlock(&c.lock)

	// 循环glist,调用goready唤醒所有接收者和发送者
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

参考文档

https://juejin.cn/post/6898132039812431885