在 Python/Java/C++ 等语言中,线程间的通信是通过共享内存实现的。传统的线程模型会使用锁来保护共享的内存;当需要访问共享数据时,线程会首先尝试获取相应的锁,然后再进行相应的读写。这种做法可以最大程度上保证数据的完整性和正确性,避免并发时一段数据被不同的线程同时修改。对于一些常用的数据结构,这些语言都提供了内建的线程安全的实现,这样就不用每次都自己编写并发的部分了。

传统的线程模型

图1 传统的线程模型

然而在 Go 中,情况有所不同。Go 素来以 Goroutine 和 channel 闻名,前者Goroutine 比起传统语言中的线程轻量了不少,时空效率都有很大提升,同时写起来也很方便,只需要在函数调用前加个go就能快捷地创建一个并发的协程。Channel 则是 goroutine 通信的一种方式,也能很便捷地在不同的协程直接传递数据。

不过,在平时的搬砖过程中,我们也能很清楚地感受到 channel 的使用思想跟惯用的 mutex 不太一样。不同于传统的线程模型,在 Go 的 channel 中,数据变成了一种可以流动的东西,在不同的 Goroutine 之间交换。在流动的过程中,Go 保证了一段数据在同一时间只可能被一个 Goroutine 访问,从而避免了使用锁来锁住数据。这也使数据的传递更为优雅——毕竟,柔美的水流总是要比笨重的锁头显得更为轻巧和美妙一些。

CSP 模型

图2 CSP 模型

当然,考虑到实际需求,在 Go 中仍然也提供了传统的互斥锁(sync.Mutex,sync.RWMutex),但Go 的设计者们是鼓励使用 channel 来实现内存共享的。Goroutine 和 channel 作为 Go 独有的并发原语,背后体现出的是 Go 并发设计的核心哲学:

Do not communicate by sharing memory; instead, share memory by communicating.

不要通过共享内存来进行(协程间的)沟通,而要通过沟通来共享内存

0. 约定

考虑到我们的主线是分析 channel 运作的原理,因此一些跟主线关系不大的代码(比如:竞态分析,g的调度)在这里不作讨论,有兴趣的同学可以进一步在互联网上查阅资料。

1. 数据结构

Go 的 Channel 使用起来很方便:

1
2
3
4
// no buffer 的 channel
c := make(chan int)
// 自带 buffer 的 channel
c1 := make(chan int, 10)

如果不看代码,通过平常使用 channel 的经验,能否想象出它的底层是怎么存储的呢?我们不妨试一下,如果认为 channel 也是一个 struct,那么这个 struct 里会有哪些元素呢?

首先最直观的,channel 是区分类型的,并且可以带缓冲区,意味着channel会把类型和缓冲区的大小先记下来。

然后,channel 如果有缓冲区的话,那么意味着 channel 中的元素也可以为任意个,只要小于缓冲区的大小就行了,那么意味着 channel 中还会存储缓冲区现有元素的数量。

接下来,我们知道一个 channel 可以在多个 Goroutine 中使用,并且如果 channel 为满或者为空,那么发送/接收端的 Goroutine 会阻塞;如果满/空的状态消失,那么被阻塞的 Goroutine 又会被重新唤醒。这意味着 channel 中还会存储读写该 channel 的协程。

最后,在使用中我们知道,channel 在使用结束之后应该被关闭,这个状态也应该存储下来。

好了,根据我们上面的思考,channel 这个结构中会存储的成员至少会有:

  • channel 所能传递的类型
  • channel 缓冲区的大小
  • channel 中现有元素的数量
  • channel 中的元素
  • 读写 channel 的 goroutine
  • channel 是否被关闭

那么让我们再来看看,实际情况中,Go channel 的结构会是什么样子的呢?

Go 语言的 Channel 在运行时使用 runtime.hchan 结构体表示。我们在 Go 语言中创建新的 Channel 时,实际上创建的都是如下所示的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq

lock mutex
}

事实上,我们上面考虑到的已经很接近了。此外,channel 设计时是先入先出的,这意味着:

  • 先进入 Channel 的数据会先出 Channel
  • 先从 Channel 读取数据的 Goroutine 会先接收到数据
  • 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利

因此,这意味着 channel 的缓冲区和等待读写的 Goroutine 的数据结构都是队列。更具体一点,缓冲区的实现是一个循环队列,而存储等待读写的 Goroutine 的实现是链表队列。这样一来,整个数据结构就很明晰了。

  • elemtype — Channel 所传递元素的类型
  • dataqsiz — Channel 中缓冲区的长度(i.e. 循环队列的长度)
  • qcount — Channel 中的元素个数
  • buf — 指向 Channel 缓冲区数据的指针
  • sendq — 向 Channel 中发送数据的 Goroutine
  • recvq — 从 Channel 中接受数据的 Goroutine
  • sendx, recvx — 缓冲区的具体实现是循环队列,因此分别代表了当前发送/接收操作所处理到的位置
  • closed — channel 是否关闭
  • lock — 保证 channel 不会被多个 Goroutine 同时读写的锁
  1. Create

让我们再来回顾一下,Channel 是如何被 make 出来的:

1
2
3
4
// no buffer 的 channel
c := make(chan int)
// 自带 buffer 的 channel
c1 := make(chan int, 10)

make经过编译之后,正常情况下会对应调用的是runtime.chan中的makechan这个方法,它的方法签名是:

1
func makechan(t *chantype, size int) *hchan

很明显,两个参数分别对应make传入的两个值。

为什么说正常情况呢,当size超过 1<<32时,会调用另一个makechan64()的方法,它的方法体很简单,就是判断下size是否超过1<<64,超过的话直接报out of range,否则再调用makechan()。

1
2
3
4
5
6
7
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}

return makechan(t, int(size))
}

接下来,让我们来看看makechan()这个方法的真面貌。我们可以把方法执行的过程划分为三部分:

  1. 参数校验
  2. 初始化hchan
  3. 赋值

2.1. 参数校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func makechan(t *chantype, size int) *hchan {
elem := t.elem


// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}


mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
...
}

这一部分就是一些参数上的校验,包括了:

  • 校验 channel 传递的元素的 size
  • 校验元素的对齐
  • 校验缓冲区的大小

校验部分的代码大部分都是一些非常底层的设定,跟 channel 的逻辑关系不大,所以简单看看就好。

2.2. 初始化hchan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func makechan(t *chantype, size int) *hchan {
...
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
...
}

对于 channel 结构体hchan的初始化,首先可以分为缓冲区是否为0来考虑。

当缓冲区大小为0时,那么直接为hchan分配大小为hchanSize的一块内存,这里的hchanSize是一个常量。

当缓冲区大小不为0时,根据chantype是否为指针类型,又可以继续分为两种情况考虑:

  • 如果不是指针类型,那么直接给hchan分配大小为hchanSize+mem的一段连续内存即可,mem即为缓冲区大小(sizeof(elem) * 缓冲区的长度),同时让hchan.buf指向缓冲区的内存
  • 如果是指针类型,那么首先给hchan分配自身大小的内存,然后再给hchan.buf分配缓冲区大小的内存,这样做可以让hchan和hchan.buf指向的内存不连续

那么,为什么要这么做呢?这里网上基本上找不到什么资料,其实有一些偏离本文的主题了。但是,基于好奇心,我给 Go 的 mailing list 发了邮件询问,得到了下面的答复(不得不说老外还是挺友好的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
> I was reading the source code of makechan(), and these questions confused me. I would appreciate it if someone could help me.
>
> Q1. What does Elements do not contain pointers. mean? Does that means that the type of channel is not a pointer type (like chan int and chan *int)?


It means that the element type of the channel is not a pointer and
also does not contain any pointers. For example "struct { a, b int }"
does not contain any pointers, but "struct { a int; b []byte }" does
contain pointers.


> Q2. Why does the allocation strategy of memory allocation differ? It seems like the buf and the hchan should not be allocated into one piece of continuous memory, if the "elements contains pointers", so what's the point? The logically continuous memory might not be physically continuous.

The garbage collector has to always know exactly where pointers are
stored in memory. We could in principle use contiguous allocation for
the case where the element type contains pointers, but we would have
to build a description that tells the garbage collector exactly where
those pointers are. Those descriptions are built by the compiler (on
tip, the code is in cmd/compile/internal/reflectdata/reflect.go), so
the compiler would have to build a new descriptor for every channel
type with an element type that contains pointers. And the descriptor
would have to vary based on the channel size, so it would be based not
just on the channel type but also on the argument passed to "make".
Of course the argument passed to "make" can be a variable, so that
adds another complication.

So it's probably possible to use a contiguous buffer here, but it's
not simple, and it's not the common case, and it's not clear that it
would be worth it.

我的理解是,这跟垃圾回收机制有关。对于带指针的类型,如果 GC 想正确地回收它,那么就需要知道其中有哪些指针,分别是什么类型的。而传入mallocgc()的第二个参数正是类型,这个过程相当于告诉 GC 这个类型中的详细情况,以便于之后 GC 的策略执行(引用计数,染色计数,etc…)

2.3 赋值

1
2
3
4
5
6
7
8
9
func makechan(t *chantype, size int) *hchan {
...
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)

return c
}

这一部分主要是一些常规的赋值,以及锁的初始化,代码已经很清晰了。

关于lockInit()中的第二个参数,网上也找不到什么资料,我的理解是,channel 使用的 mutex lock 有一个 rank,如果想获得 high rank 的 lock,就必须要先获取 low rank 的 lock。这个lockRankHchan就是其中的一级。具体的代码可以去runtime.lockrank中看,在此就不赘述了。

  1. Send

对于 channel 的发送,也很简单:

1
c <- x

经过编译器后,实际是对于一个方法的调用:

1
2
3
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}

而这个chansend()实际上就是我们需要去研究的方法,它的函数签名是:

1
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

可以看出,前两个参数分别代表了 send 的 channel 和变量,第三个代表是否阻塞,看起来似乎一直都是true,那么什么时候为false呢?这个问题我们一会再看。第四个参数跟寄存器有关,仅在竞态分析的时候起作用,不是我们的重点。

同样,对于chansend()过程的研究,我们也分步来看。

  1. 参数校验
  2. 加锁
  3. 发送

3.1. 参数校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
...
if !block && c.closed == 0 && full(c) {
return false
}
...
}

同样也是一目了然的参数校验。相信大家都知道,向一个nil的 channel 发送数据会导致无限阻塞,体现在代码上,就是block为true,同时c为nil,而第6行就是让 Goroutine park,即等待调度的状态,也就是我们说的无限阻塞的状态。

3.2. 加锁

1
2
3
4
5
6
7
8
9
10
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
lock(&c.lock)

if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}

首先请求获取锁,避免跟别的 Goroutine 发生同时读写同一个通道的问题;然后判断 channel 是否已经关闭了,如果关闭的话直接panic。

3.3. 发送

我们可以将发送的逻辑再进一步细化,可以分为三种情况:

3.3.1. 读等待队列不为空,直接发送

1
2
3
4
5
6
7
8
9
10
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
}

这种情况下,直接把数据发出去就完事了。send()函数就是完成这个过程的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}

首先使用sendDirect()将数据直接拷贝至x = <-c中x的地址,然后将接收的 Goroutine 唤醒。中间还有一些底层上的处理逻辑,在此也略过不研究了。

3.3.2. 读等待队列为空,但是缓冲区还有空间,放入缓冲区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}

第4行的chanbuf()仍然是个简单粗暴的封装:

1
2
3
4
// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

注释也写的很明白了,返回的就是 buffer 中的第 i 个空。

找到缓冲区的可用空间,接下来就是把数据放入缓冲区了。

然后是循环队列的实现:

首先sendx,代表发送队列的索引++,如果与队列的大小相等时,由于是循环队列,就要=0。这个过程相当于sendx = (sendx + 1) % dataqsiz。

做完这些之后,channel 缓冲区中的元素个数+1,并且释放锁,返回true,就代表发送成功了。

3.3.3. 缓冲区满了,阻塞发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if !block {
unlock(&c.lock)
return false
}
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)

atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)


if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}

可以看到,首先判断是否需要阻塞发送,如果不阻塞就直接解锁,并且返回false。如果需要阻塞发送(大多数情况),那么会保存现在的状态(sudog),并且告诉调度器,这个 Goroutine 要进入阻塞状态了。然后,将保存好的sudog放入 channel 的发送等待队列中,并且等待唤醒。唤醒之后,做一些收尾工作,对一些状态进行修改,然后返回true。

也许你会想,唤醒之后为什么没有将元素放入 channel 的代码呢?事实上,唤醒发送 Goroutine 的那个接收 Goroutine,在接收时已经直接获取到了发送队列首位所发送的数据了。这么说有一些绕口,一会介绍到接收过程时,将二者对比起来看,或许能有更直观的认识。

3.4. 非阻塞发送

上面有一个阴魂不散的block一直为true,从字面理解我们也能猜到它是是否阻塞的意思。那么,什么时候为false呢?我们知道,用select…case…去发送 channel 时,就是非阻塞的。事实上,对于这样的一段代码

1
2
3
4
5
6
select {
case c <- v:
// ... foo
default:
// ... bar
}

经过编译之后,其实是对于selectnbsend()的调用,形式为

1
2
3
4
5
if selectnbsend(c, v) {
// ... foo
} else {
// ... bar
}

而这个selectnbsend(),其实也非常的简单粗暴

1
2
3
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}

于是,谜底就揭晓了。

  1. Receive

在研究过 channel 发送过程之后,再看接收过程其实就简单很多了,很多地方的逻辑基本上是对称的。我们日常使用时,从 channel 中读取,一般有两种写法:

1
2
3
4
x <- c


x, ok := <- c

经过编译之后,相当于

1
2
3
4
5
6
7
8
9
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}


func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}

核心都是对chanrecv()的调用。如法炮制,我们再根据这个方法的执行逻辑,一步步地来分析它做了些什么。我们将它分为以下两步:

  1. 参数校验 & 加锁
  2. 接收

4.1. 参数校验 & 加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 { //TODO why?
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
lock(&c.lock)
...
}

同样,对于一个nil的通道,如果我们在它上面接收,也会进入无限阻塞的状态。

4.2. 接收

4.2.1. 写等待队列不为空,直接接收

1
2
3
4
5
6
7
8
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
...
}

同样,核心方法还是这个recv()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}

可以看到,recv()是分两种情况接收的。

  • 当没有缓冲区的时候,调用recvDirect(),直接将发送队列的队首发送者的数据拷贝到接收的变量的地址中;
  • 当有缓冲区时,先取缓冲区的队首元素,放入接收者中;然后再将写等待队列队首的发送者的数据放入缓冲区中,然后令recvx++。如果recvx==dataqsiz,因为是循环队列,这时候要令它等于0。最后,因为此时缓冲区是满的,sendx和recvx指向的是同一个地方。

sendx和recvx

图3 sendx和recvx

4.2.2. 缓冲区不为空,从缓冲区里取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
}

4.2.3. 缓冲区和写队列都为空,阻塞读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block {
unlock(&c.lock)
return false, false
}
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}

4.3. 非阻塞接收

与发送过程相似,接收时也有一个block的参数。同样,当使用select…case…时

1
2
3
4
5
6
select {
case v, ok = <-c:
// ... foo
default:
// ... bar
}

会编译成

1
2
3
4
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
selected, *received = chanrecv(c, elem, false)
return
}

这样就是非阻塞式的接收了。

  1. Close

close()的过程就非常直观了。

5.1. 前期准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}


lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}


c.closed = 1
...
}

首先是一些前期的准备,参数校验之类的,判断一下要关闭的 channel 是否为 nil或已经是关闭状态,很明显这两种情况都应该抛出panic。然后将 channel 的关闭状态改为1。

5.2. 释放接收者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func closechan(c *hchan) {
...
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
...
}
...
}

关闭一个 channel 时,要将等待从它这里接收的 Goroutine 全部释放掉,10~11行是将接收返回值的变量先清空,再令其返回nil。至于为什么要多一步,猜想可能是因为gc吧。

5.3. 释放发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func closechan(c *hchan) {
...
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
...
}
...
}

同上,等待着的发送者也应当被释放,但是它们会panic,原因是在关闭的 channel 上发送,panic的代码可以参考下面chansend()。

5.4. 唤醒所有等待的 Goroutine

1
2
3
4
5
6
7
8
9
10
11
func closechan(c *hchan) {
...
unlock(&c.lock)

for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
...
}

无须赘述,代码胜千言。

参考资料

https://blog.golang.org/codelab-share

https://zhuanlan.zhihu.com/p/299592156

https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel