聊聊 golang 中 channel
1、引言
Do not communicate by sharing memory; instead, share memory by communicating
Golang
的并发哲学是“不要通过共享内存进行通信,而要通过通信来共享内存”,提倡通过 channel
进行 goroutine
之间的数据传递和同步,而不是通过共享变量(内存)来实现。
func write(chanInt chan int) {for i := 0; i < 10; i++ {chanInt <- i}close(chanInt)
}func read(chanInt chan int, chanExit chan bool) {for {v, ok := <-chanIntif !ok {break}fmt.Println(v)}chanExit <- trueclose(chanExit)
}func TestCSP(t *testing.T) {chanInt := make(chan int, 10)chanExit := make(chan bool)go write(chanInt)go read(chanInt, chanExit)for {select {case _, ok := <-chanExit:if !ok {fmt.Println("done")return}}}}
如上述示例,write
函数负责写,read
函数负责读,chanInt
负责在两个 goroutine
进行数据同步,chanExit
负责监听数据已处理完成,并最终退出。整个程序没有看到锁,非常的优雅。
接下来,来说说 channel
的特性,最后结合底层源码来加深印象。
2、特性
2.1 基本用法
由于 channel
是引用类型,需要用 make
来初始化
chanBuffer := make(chan int, 10)
chanNoBuffer := make(chan int)
这里创建的是可读写的 channel
,区别在于是否有 capacity
(容量)
- 带缓冲区的
channel
,可以存储cap
个数据 - 不带缓冲区的
channel
,一般用于同步
chanWriteOnly := make(chan<- int)
chanReadOnly := make(<-chan int)
这里创建的是只写和只读的 channel
,不过这样写意义不大,一般用于传参,接下来用这两个 chan
把引言示示例中关于 write
和 read
函数给改下
func write(chanInt chan<- int) {for i := 0; i < 10; i++ {chanInt <- i}close(chanInt)
}func read(chanInt <-chan int, chanExit chan bool) {for {v, ok := <-chanIntif !ok {break}fmt.Println(v)}chanExit <- trueclose(chanExit)
}
查看 channel
的长度和容量
func TestChanLenCAP(t *testing.T) {chanInt := make(chan int, 2)chanInt <- 1fmt.Println(len(chanInt)) // 1fmt.Println(cap(chanInt)) // 2
}
关闭 channel
close(ch)
判断 channel
是否已关闭
func TestChanIsClosed(t *testing.T) {chanInt := make(chan int, 10)close(chanInt)if _, ok := <-chanInt; !ok {fmt.Println("closed")}
}
向一个已关闭的 channel
读数据,会读到零值,并且每次读也都是零值,因此可以利用这个特性来判断 channel
是否已关闭。
2.2 异常情况
接下来看看几种需要注意的异常情况
注意: Golang 版本为 1.19.12。不同版本的调度器和运行时的行为可能会有所不同,尤其是与死锁检测相关的机制。这些变化可能导致在某些版本中程序会更快地检测到死锁,而在其他版本中则可能仅仅是阻塞而不报错。
2.2.1 给一个 nil channel
发送数据,
func TestWriteNil(t *testing.T) {var chanInt chan intchanInt <- 1
}
由于 chanInt
还没初始化,值为 nil
,此时代码会阻塞在 chanInt <- 1
这一行,并最终形成死锁。
fatal error: all goroutines are asleep - deadlock!
解法:channel
使用前需要使用 make
初始化。
2.2.2 从一个 nil channel
读数据
func TestReadNil(t *testing.T) {var chanInt chan int<-chanInt
}
由于 chanInt
还没初始化,值为 nil
,此时代码会阻塞在 <-chanInt
这一行,并最终形成死锁。
fatal error: all goroutines are asleep - deadlock!
解法:channel
使用前需要使用 make
初始化。
2.2.3 关闭一个 nil channel
func TestCloseNil(t *testing.T) {var chanInt chan intclose(chanInt)
}
如果尝试关闭一个 nil
的 channel
,会导致运行时错误 panic: close of nil channel
。
panic: close of nil channel [recovered]panic: close of nil channel
解法:channel
使用前需要使用 make
初始化。
前三个异常说明,channel
使用前一定要使用 make
进行初始化。
2.2.4 向一个已关闭的 channel
发数据
func TestWriteClosed(t *testing.T) {chanNoBuffer := make(chan int)close(chanNoBuffer)chanNoBuffer <- 1
}
向一个已关闭的 channel
发送数据会引起 panic
。
panic: send on closed channel [recovered]panic: send on closed channel
这是因为一旦 channel
被关闭,就不能再向其发送数据,但可以继续从中接收数据
。
解法:判断 channel
是否已关闭。
2.2.5 向一个已关闭的 channel
发起重复关闭动作
func TestClosedOnceMore(t *testing.T) {chanNoBuffer := make(chan int)close(chanNoBuffer)close(chanNoBuffer)
}
尝试关闭一个已经关闭的 channel
会导致运行时错误 panic: close of closed channel
。这个错误通常出现在多个 goroutine
试图关闭同一个 channel
或者代码逻辑不正确导致同一个 channel
被关闭多次。
panic: close of closed channel [recovered]panic: close of closed channel
解法:判断 channel
是否已关闭。
2.2.6 向没有缓冲区的 channel
写数据,但没有读取方
func TestSendNoBuffer(t *testing.T) {ch := make(chan int)ch <- 4
}
无缓冲的 channel
是一种同步通信机制,当只有发送方,没有接收方,会陷入阻塞而死锁。
fatal error: all goroutines are asleep - deadlock!
解法:无缓冲 channel
是一种同步通信机制,需要发送和接收操作同时进行。
2.2.7 向没有缓冲区的 channel
读取数据,但没有写入方
func TestReadNoBuffer(t *testing.T) {ch := make(chan int)<-ch
}
尝试从一个无缓冲的 channel
读取数据时,如果没有其他 goroutine
向该 channel
发送数据,读取操作将会阻塞。这会导致程序死锁,并最终导致运行时错误。
fatal error: all goroutines are asleep - deadlock!
解法:无缓冲 channel
是一种同步通信机制,需要发送和接收操作同时进行。
2.2.8 无缓冲区 channel
的发送和接收操作没有同时进行
func ReadNoBufferChan(chanBool chan bool) {<-chanBool
}func TestSendNoBufferChan(t *testing.T) {ch := make(chan bool)ch <- truego ReadNoBufferChan(ch)time.Sleep(1 * time.Second)
}
上面两个异常一直强调,由于无缓冲 channel
是一种同步
通信机制,需要发送和接收操作同时
进行。代码执行到 ch <- chan
时,调度器发现没有任何 goroutine
接收,于是阻塞并死锁。
fatal error: all goroutines are asleep - deadlock!
解法:无缓冲 channel
是一种同步通信机制,需要发送和接收操作同时进行。
func TestSendNoBufferChan(t *testing.T) {ch := make(chan bool)go ReadNoBufferChan(ch)ch <- truetime.Sleep(1 * time.Second)
}
把 go ReadNoBufferChan(ch)
提前,这样就确保了在发送数据之前,有一个 goroutine
正在等待接收数据。
对于无缓冲的 channel
- 读取和写入要成对出现,并且不能在同一个
goroutine
里 - 使用
for
读取数据时,写入方需要关闭channel
2.2.9 向有缓存区的 channel
先读数据
func TestWriteBufferChan(t *testing.T) {ch := make(chan int, 1)if _, ok := <-ch; !ok {fmt.Println("closed")}
}
当尝试从一个空的带缓冲的 channel
读取数据时,读取操作会阻塞,直到有数据被写入 channel
。这是因为即使是带缓冲的 channel
,也需要在读取数据时有数据可读。
带缓冲的 channel
和无缓冲的 channel
的主要区别在于:带缓冲的 channel
可以存储一定数量的数据,而无缓冲的 channel
则需要发送和接收操作同步进行。然而,这并不改变以下事实:当一个 goroutine
试图从空的 channel
读取数据时,它会被阻塞,直到有其他 goroutine
写入数据。
fatal error: all goroutines are asleep - deadlock!
解法:需要在读取数据时有数据可读。
2.2.10 向有缓存区的 channel
写数据,但没有读取数据
func TestReadBufferChan(t *testing.T) {ch := make(chan int, 1)ch <- 1ch <- 2
}
当带缓冲的 channel
在缓冲区满时,写入操作会阻塞,直到有数据被读取以腾出缓冲区空间。如没有读取方,最后就会因阻塞而死锁。
fatal error: all goroutines are asleep - deadlock!
解法:当带缓冲的 channel
在缓冲区满时,需要有读取方,或者增加缓冲区的大小。
注意:对于带缓冲的 channel 在缓冲区没超过容量之前,写入数据,若没有读取,不像不带缓冲区的 channel 那样,不会产生死锁的。
其实,最后这两个带缓冲区 channel
异常情况总结就是
- 若在同一个
goroutine
里,写数据操作一定在读数据操作前 - 若
channel
空了,接收者会阻塞 - 若
channel
满了,发送者会阻塞
3、底层实现
3.1 数据结构
Golang
的 channel
在运行时使用 runtime.hchan
结构体表示。
// runtime/chan.go
type hchan struct {qcount uint // 队列中的数据个数dataqsiz uint // 环形缓冲区的大小buf unsafe.Pointer // 环形缓冲区指针elemsize uint16 // 单个元素的大小closed uint32 // 标志 channel 是否关闭elemtype *_type // 元素的类型sendx uint // 发送操作的索引recvx uint // 接收操作的索引recvq waitq // 等待接收的 goroutine 队列sendq waitq // 等待发送的 goroutine 队列lock mutex // 保护 channel 的锁
}
先看看环形缓冲区相关的字段:
qcount
: 当前缓冲区中的元素个数。dataqsiz
: 环形缓冲区的容量。buf
: 实际存储数据的缓冲区,类型为unsafe.Pointer
(类似C
语言的void *
)。elemsize
: 每个元素的大小。sendx
: 环形缓冲区中下一个待写入的位置。recvx
: 环形缓冲区中下一个待读取的位置。
再来看看发送和接收队列:
recvq
: 等待接收的goroutine
队列。sendq
: 等待发送的goroutine
队列。
这两个队列是通过 waitq
结构体来实现的,waitq
本质上是一个双向链表,链表中的每个节点是一个 sudog
结构体,sudog
代表一个等待中的 goroutine
。
type waitq struct {first *sudoglast *sudog
}
最后看看 lock
字段
lock
锁用于保护channel
数据结构的互斥锁。Golang
使用自旋锁和互斥锁的结合来保证channel
操作的线程安全。
3.2 初始化
func makechan(t *chantype, size int) *hchan {elem := t.elem// compiler checks this but be safe.if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}mem, overflow := math.MulUintptr(elem.size, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchanswitch {case mem == 0:// Queue or element size is zero.c = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()case elem.ptrdata == 0:// Elements do not contain pointers.// Allocate hchan and buf in one call.c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default:// Elements contain pointers.c = new(hchan)c.buf = mallocgc(mem, elem, true)}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)lockInit(&c.lock, lockRankHchan)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")}return c
}
这里主要说下 switch
相关的分支代码
- 第一个分支:如果
channel
的缓冲区大小是0
(也就是创建无缓冲channel
),或channel
中的元素大小是0
(如struct{}{}
,Golang
中“空结构体”是不占内存的,size
为0
)时,调用mallocgc()
在堆上为channel
开辟一段大小为hchanSize
的内存空间。- 这里说下
c.buf = c.raceaddr()
,c.raceaddr()
会返回一个地址,这个地址在内存中不会被实际用于存储数据,但会被数据竞争检测工具(如Golang
的race detector
)用于同步,这也是无缓冲区的channel
用来做数据同步场景的由来。
- 这里说下
- 第二个分支:如果元素不包含指针时。调用
mallocgc
一次性分配hchan
和buf
的内存。 - 第三个分支:默认情况元素类型中有指针类型,调用了两次分配空间的函数
new/mallocgc
。
仔细看,三个分支都调用了 mallocgc
在堆上分配内存,也就说 channel
本身会被 GC
自动回收。
在函数的最后会初始化通道结构的字段,包括元素大小、元素类型、缓冲区大小和锁。
3.2 发送数据
// entry point for c <- x from compiled code
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}/** generic single channel send/recv* If block is not nil,* then the protocol will not* sleep but return if it could* not complete.** sleep can wake up with g.param == nil* when a channel involved in the sleep has* been closed. it is easiest to loop and re-run* the operation; we'll see that it's now closed.*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 当 channel 为 nil 时处理if c == nil {if !block {return false}gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}// 竞态检测,是用来分析是否存在数据竞争。go test -race ./...if raceenabled {racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second full()).// Because a closed channel cannot transition from 'ready for sending' to// 'not ready for sending', even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasn't closed during the first observation. However, nothing here// guarantees forward progress. We rely on the side effects of lock release in// chanrecv() and closechan() to update this thread's view of c.closed and full().if !block && c.closed == 0 && full(c) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 加锁lock(&c.lock)// 检查 channel 是否关闭if c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// 检查是否有等待接收的 goroutineif sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// 检查 channel 缓冲区是否有空位if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx, nil)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true}// 非阻塞模式下if !block {unlock(&c.lock)return false}// 阻塞模式下,将当前 goroutine 加入发送队列并挂起,receiver 会帮我们完成后续的工作// Block on the channel. Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 打包 sudogmysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nil// 将当前这个发送 goroutine 打包后的 sudog 入队到 channel 的 sendq 队列中c.sendq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.// 将这个发送 g 从 Grunning -> Gwaiting// 进入休眠atomic.Store8(&gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs aren't considered as roots of the// stack tracer.KeepAlive(ep)// 以下唤醒后需要执行的代码// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseclosed := !mysg.successgp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)if closed {// 唤醒后,发现 channel 被关闭了if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}return true
}
代码比较长,可以分为两大部分:异常检测和发送数据
3.2.1 异常检测
代码一开始就排除了在异常章节中 nil channel
的情形,比如未初始化,或是被 GC
回收了。
接着会检测非阻塞模式下,也就是有缓冲区的 channel
,如果还未 close
并且缓冲区已经满了,则直接返回 false
。
func TestASyncSendFull(t *testing.T) {ch := make(chan int, 1) // 创建一个缓冲区大小为 1 的 channelch <- 1 // 向 channel 发送一个元素,此时缓冲区已满select {case ch <- 2: // 尝试发送第二个元素fmt.Println("Successfully sent 2")default: // 缓冲区已满,进入 default 分支fmt.Println("channel is full, unable to send 2")}
}
3.2.2 发送数据
发送数据可以归纳为以下三点
- 直接发送:当
recvq
存在等待的接收者时,那么通过runtime.send
直接将数据发送给阻塞的接收者- 注意:这里不会立马唤醒阻塞的接收者,而是将等待接收数据的
goroutine
标记成可运行状态grunnable
并把该goroutine
放到发送方所在的处理器的runnext
上等待执行,该处理器在下一次调度
时会立刻唤醒数据的接收方;
- 注意:这里不会立马唤醒阻塞的接收者,而是将等待接收数据的
- 异步发送:当
buf
缓冲区存在空余空间时,将发送的数据写入channel
的缓冲区; - 阻塞发送:当不存在缓冲区或者缓冲区已满时,等待其他
goroutine
从channel
接收数据;- 将当前
goroutine
加入sendq
发送队列并挂起,阻塞等待其他的协程从channel
接收数据; - 当唤醒后,检查是否因为
channel
关闭而唤醒,如果是则触发panic
。
- 将当前
发送数据的过程中包含几个会触发 goroutine
调度的时机:
- 发送数据时发现
channel
上存在等待接收数据的goroutine
,立刻设置处理器的runnext
属性,但是并不会立刻触发调度 - 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入
channel
的sendq
发送队列并调用runtime.goparkunlock
触发goroutine
的调度让出处理器的使用权;
3.3 接收数据
// entry points for <- c from compiled code
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)return
}// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c, "\n")}// 如果在 nil channel 上进行 recv 操作,那么会永远阻塞if c == nil {// 非阻塞的情况下,要直接返回,非阻塞出现在一些 select 的场景中if !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.if !block && empty(c) {// After observing that the channel is not ready for receiving, we observe whether the// channel is closed.//// Reordering of these checks could lead to incorrect behavior when racing with a close.// For example, if the channel was open and not empty, was closed, and then drained,// reordered reads could incorrectly indicate "open and empty". To prevent reordering,// we use atomic loads for both checks, and rely on emptying and closing to happen in// separate critical sections under the same lock. This assumption fails when closing// an unbuffered channel with a blocked send, but that is an error condition anyway.if atomic.Load(&c.closed) == 0 {// Because a channel cannot be reopened, the later observation of the channel// being not closed implies that it was also not closed at the moment of the// first observation. We behave as if we observed the channel at that moment// and report that the receive cannot proceed.return}// The channel is irreversibly closed. Re-check whether the channel has any pending data// to receive, which could have arrived between the empty and closed checks above.// Sequential consistency is also required here, when racing with such a send.if empty(c) {// The channel is irreversibly closed and empty.if raceenabled {raceacquire(c.raceaddr())}if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)// 当前 channel 中没有数据可读if c.closed != 0 {if c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}// The channel has been closed, but the channel's buffer have data.} else {// sender 队列中有 sudog 在等待// 直接从该 sudog 中获取数据拷贝到当前 g 即可// Just found waiting sender with not closed.if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}}if c.qcount > 0 {// 直接从 buffer 里拷贝数据// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)// 接收索引 +1c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}// buffer 元素计数 -1c.qcount--unlock(&c.lock)return true, true}// 非阻塞时,且无数据可收if !block {unlock(&c.lock)return false, false}// no sender available: block on this channel.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 打包成 sudogmysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nil// 进入 recvq 队列c.recvq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.atomic.Store8(&gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// someone woke us up// 被唤醒if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}success := mysg.successgp.param = nilmysg.c = nilreleaseSudog(mysg)// 如果 channel 未被关闭,那就是真的 recv 到数据了return true, success
}func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if c.dataqsiz == 0 {if raceenabled {racesync(c, sg)}if ep != nil {// copy data from sender// 直接从发送者复制数据recvDirect(c.elemtype, sg, ep)}} else {// 缓冲区已满,从队列头部取出数据// Queue is full. Take the item at the// head of the queue. Make the sender enqueue// its item at the tail of the queue. Since the// queue is full, those are both the same slot.qp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)racenotify(c, c.recvx, sg)}// 将数据从队列复制到接收者// copy data from queue to receiverif ep != nil {typedmemmove(c.elemtype, ep, qp)}// 将数据从发送者复制到队列// copy data from sender to queuetypedmemmove(c.elemtype, qp, sg.elem)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}sg.elem = nilgp := sg.gunlockf()gp.param = unsafe.Pointer(sg)sg.success = trueif sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1)
}
在 Golang
的 channel
中,有两种接收方式
num <- ch
num, ok <- ch
这两种分别对应上述源码中的 chanrecv1
和 chanrecv2
,不过最终都会走到 chanrecv
函数。
3.3.1 异常检测
当我们从一个 nil channel
接收数据时(这里 nil
有可能是被 GC
回收导致的),若是非阻塞的 channel
会直接返回,否则会直接调用 runtime.gopark
让出处理器的使用权。
如果当前 channel
已经被 close
并且缓冲区中不存在任何数据,那么会清除 ep
指针中的数据并立刻返回。这里也就说明了为什么可以多次从已关闭的 channel
读取数据而不会报错。
3.3.2 接收数据
从 channel
接收数据可以归纳为以下三种情况:
3.3.2.1 直接接收
当 sendq
发送队列存在等待的发送者时,通过 runtime.recv
从阻塞的发送者或者缓冲区中获取数据。具体分为以下两种场景,可以仔细看 recv
函数
- 场景一
当 buf
缓冲区的容量 dataqsiz
为 0
,也就是同步的 channel
,调用 recvDirect
将 sendq
发送队列中 sudog
存储的 ep
数据直接拷贝到接收者的内存地址中。
- 场景二
当缓冲区已满时(会有两次内存的拷贝)
- 先取出
buf
缓冲区头部的数据发给接收者(第一次拷贝) - 接着取出
sendq
发送队列头的数据拷贝到buf
缓冲区中,并释放一个sudog
阻塞的goroutine
(第二次拷贝)
到这里获取有人会问,为什么不直接从 sendq
取出数据发给接收方,而是要从 buf
里取出发给接收方?
原因在于 Golang
在缓冲模式下,channel
的数据在缓冲区中按照 FIFO
(先入先出)顺序存储。缓冲区头部的数据肯定是最先存入的,那么也就需要最先取出。
这里再说下场景二下关于 recvx
和 sendx
的更新机制。
- 缓冲区已满时的处理逻辑
当 buf
缓冲区满时,recvx
指向的是 buf
的头部位置,这也是下一个将要被接收的数据。注意此时 sendx
也是指向缓冲区的头部位置。因为缓冲区已满,下一次发送会覆盖最旧的数据。
- 从缓冲区读取数据
此时从已满的 buf
缓冲区读取数据,接收者从缓冲区的头部位置 recvx
获取数据,并将数据传递给接收方。并更新 recvx
,使其指向下一个将要被接收的数据位置。
- 将
sendq
拷贝到缓冲区
由于此时 buf
头部的数据已经发送,那么则取出 sendq
头部的数据覆盖刚刚头部的位置所在的数据,并更新 sendx
,使其和 recvx
保持一致,指向下一个要发送的位置。
这两个场景,无论发生哪种情况,运行时都会调用 runtime.goready
将当前处理器的 runnext
设置成发送数据的 goroutine
,在调度器下一次调度时将阻塞的发送方唤醒。
3.3.2.2 异步接收
当 buf
缓冲区的 qcount
大于 0
时,也就是带缓冲的 channel
有数据时,那么会从 buf
缓冲区中 recvx
的索引位置取出数据进行处理:
- 如果接收数据的内存地址不为空,那么会使用
runtime.typedmemmove
将缓冲区中的数据拷贝到内存中,并通过runtime.typedmemclr
清除队列中的数据 - 最后更新
channel
上相关数据:recvx
指向下一个位置(如果移动到了环形队列的队尾,下标需要回到队头),channel
的qcount
长度减一,并释放持有channel
的锁
3.3.2.3 阻塞接收
当不属于上述两种情况,即当 channel
的 sendq
发送队列中不存在等待的 goroutine
并且 buf
缓冲区中也不存在任何数据时,从 channel
中接收数据的操作会变成阻塞的。此时会将当前的goroutine
挂起并加入 channel
的接收队列 recvq
,以便在有数据可用时能够被唤醒。
当然了,若是 goroutine
被唤醒后会完成 channel
的阻塞数据接收。接收完最后进行基本的参数检查,解除 channel
的绑定并释放 sudog
。
结合异常检测那一节,发现从 channel
接收数据时,会触发 goroutine
调度的两个时机:
- 当
channel
为nil
时 - 当
buf
缓冲区中不存在数据并且也不存在数据的发送者时
3.4 关闭管道
最后来看看关闭通道实现
func closechan(c *hchan) {// 关闭一个 nil channel 会直接 panicif c == nil {panic(plainError("close of nil channel"))}// 上锁,这个锁的粒度比较大,一直到释放完所有的 sudog 才解锁lock(&c.lock)// 在 close channel 时,如果 channel 已经关闭过了,直接触发 panicif c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc()racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))racerelease(c.raceaddr())}c.closed = 1var glist gList// release all readersfor {sg := c.recvq.dequeue()// 弹出的 sudog 是 nil,说明读队列已经空了if sg == nil {break}// sg.elem unsafe.Pointer,指向 sudog 的数据元素// 该元素可能在堆上分配,也可能在栈上if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}// 将 goroutine 入 glist// 为最后将全部 goroutine 都 ready 做准备gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// release all writers (they will panic)// 将所有挂在 channel 上的 writer 从 sendq 中弹出// 该操作会使所有 writer panic(向一个关闭的 channel 发数据会引起 panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}// 将所有挂在 channel 上的 writer 从 sendq 中弹出// 该操作会使所有 writer panicgp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// 在释放所有挂在 channel 上的读或写 sudog 时,是一直在临界区的unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.for !glist.empty() {gp := glist.pop()gp.schedlink = 0// 使 g 的状态切换到 Grunnablegoready(gp, 3)}
}
3.4.1 异常检测
- 关闭一个
nil channel
会直接panic
- 在
close channel
时,如果channel
已经关闭过了,直接触发panic
3.4.2 释放所有接收方和发送方
关闭 channel
的主要工作是释放所有的 readers
和 writers
。
主要就是取出 recvq
和 sendq
的 sudog
加入到 goroutine
待清除 glist
队列中,与此同时该函数会清除所有 runtime.sudog
上未被处理的元素。同时需要注意的是:在处理 sendq
时有可能会 panic
,在之前的异常情况中列举往一个 close
的 channel
发送数据会引起 panic
。
最后会为所有被阻塞的 goroutine
调用 runtime.goready
触发调度。将所有 glist
队列中的 goroutine
状态从 _Gwaiting
设置为 _Grunnable
状态,等待调度器的调度。
3.4.3 优雅关闭通道
最后说说如何优雅关闭 channel
。
通过之前的异常小节介绍,发现:
- 向已关闭的
channel
发送数据,会导致panic
- 重复关闭
channel
,也会导致panic
同时,还了解了:
- 从一个已关闭的
channel
中接收数据,会得到零值,且不会导致程序异常 - 关闭一个
channel
,那么所有接收这个channel
的select case
都会收到信号
那么这里就引用 How to Gracefully Close Channels 介绍的优雅关闭 channel
方法来收尾。
package _0240623import ("log""math/rand""strconv""sync""testing""time"
)func TesGracefullyCloseChannel(t *testing.T) {rand.Seed(time.Now().UnixNano()) // needed before Go 1.20log.SetFlags(0)// ...const Max = 100000const NumReceivers = 10const NumSenders = 1000wgReceivers := sync.WaitGroup{}wgReceivers.Add(NumReceivers)// ...dataCh := make(chan int)stopCh := make(chan struct{})// stopCh is an additional signal channel.// Its sender is the moderator goroutine shown// below, and its receivers are all senders// and receivers of dataCh.toStop := make(chan string, 1)// The channel toStop is used to notify the// moderator to close the additional signal// channel (stopCh). Its senders are any senders// and receivers of dataCh, and its receiver is// the moderator goroutine shown below.// It must be a buffered channel.var stoppedBy string// moderatorgo func() {stoppedBy = <-toStopclose(stopCh)}()// sendersfor i := 0; i < NumSenders; i++ {go func(id string) {for {value := rand.Intn(Max)if value == 0 {// Here, the try-send operation is// to notify the moderator to close// the additional signal channel.select {case toStop <- "sender#" + id:default:}return}// The try-receive operation here is to// try to exit the sender goroutine as// early as possible. Try-receive and// try-send select blocks are specially// optimized by the standard Go// compiler, so they are very efficient.select {case <-stopCh:returndefault:}// Even if stopCh is closed, the first// branch in this select block might be// still not selected for some loops// (and for ever in theory) if the send// to dataCh is also non-blocking. If// this is unacceptable, then the above// try-receive operation is essential.select {case <-stopCh:returncase dataCh <- value:}}}(strconv.Itoa(i))}// receiversfor i := 0; i < NumReceivers; i++ {go func(id string) {defer wgReceivers.Done()for {// Same as the sender goroutine, the// try-receive operation here is to// try to exit the receiver goroutine// as early as possible.select {case <-stopCh:returndefault:}// Even if stopCh is closed, the first// branch in this select block might be// still not selected for some loops// (and forever in theory) if the receive// from dataCh is also non-blocking. If// this is not acceptable, then the above// try-receive operation is essential.select {case <-stopCh:returncase value := <-dataCh:if value == Max-1 {// Here, the same trick is// used to notify the moderator// to close the additional// signal channel.select {case toStop <- "receiver#" + id:default:}return}log.Println(value)}}}(strconv.Itoa(i))}// ...wgReceivers.Wait()log.Println("stopped by", stoppedBy)
}
这段代码的核心是这里
// moderator
go func() {stoppedBy = <-toStopclose(stopCh)
}()
对于生产者和消费者是 M*N
的情况,显然既不能在生产方关闭通道,也不适合在消费方关闭通道。那么就引入中间方,那就是 toStop
,起个 goroutine
然后 stoppedBy = <-toStop
阻塞在这里,只要生产者和消费者一方满足条件,向 toStop
写入数据了,那么就可以关闭 stopCh
。这也正好契合上面的 moderator
注释,一个 协调者
,用来协调生产者和消费者在 M*N
情况下如何优雅关闭 channel
。
相关文章:

聊聊 golang 中 channel
1、引言 Do not communicate by sharing memory; instead, share memory by communicating Golang 的并发哲学是“不要通过共享内存进行通信,而要通过通信来共享内存”,提倡通过 channel 进行 goroutine 之间的数据传递和同步,而不是通过共享…...

SK Hynix 3D DRAM良率突破56.1%,开启存储新时代
根据韩国财经媒体Business Korea独家报道:在刚刚结束的VLSI 2024国际研讨会上,韩国半导体巨头SK Hynix公布了一项振奋人心的进展:其五层堆叠3D DRAM的制造良率已达到56.1%。此成果标志着3D DRAM技术在商业化道路上迈出了坚实的一步࿰…...

如何封装自动化测试框架?
封装自动化测试框架,测试人员不用关注框架的底层实现,根据指定的规则进行测试用例的创建、执行即可,这样就降低了自动化测试门槛,能解放出更多的人力去做更深入的测试工作。 本篇文章就来介绍下,如何封装自动化测试框…...

基于Java的在线编程考试系统【附源码】
毕业设计(论文) 题目:基于 二级学院: 现代技术学院 专业(方向): 计算机应用技术 班 级: 计科B2015 学 生: 指导教师: 2024年1月 29 日 本科毕业论文(设计)学术诚信声明 本人郑重…...
Beautiful Soup的使用
1、Beautiful Soup简介 Beautiful Soup是一个Python的一个HTML或XML的解析库,我们用它可以方便地从网页中提取数据。 Beautiful Soup 提供一些简单的、Python 式的函数来处理导航、搜索、修改分析树等功能。它是一个工具箱,通过解析文档为用户提供需要抓…...

633. 平方数之和(中等)
633. 平方数之和 1. 题目描述2.详细题解3.代码实现3.1 Python3.2 Java内存溢出溢出代码正确代码与截图 1. 题目描述 题目中转:633. 平方数之和 2.详细题解 本题是167. 两数之和 II - 输入有序数组(中等)题目的变型,由两数之和变…...

GIT回滚
1. 使用 git revert git revert 命令会创建一个新的提交,这个提交会撤销指定提交的更改。这通常用于公共分支(如 main 或 master),因为它不会重写历史。 git revert HEAD # 撤销最近的提交 # 或者指定一个特定的提交哈希值 …...

BEVM基于OP-Stack发布首个以WBTC为GAS连接以太坊和比特币生态的中继链
为了更好的连接以太坊和比特币生态,BEVM团队正在基于OPtimism的OP Stack来构建一个以WBTC为GAS兼容OP-Rollup的中继链,这条中继链将作为一种完全去中心化的中间层,把以太坊上的主流资产(WBTC/ ETH/USDC/USDT等)引入到BEVM网络。 不仅如此&am…...
【vuejs】 $on、$once、$off、$emit 事件监听方法详解以及项目实战
1. Vue实例方法概述 1.1 vm.$on vm.$on是Vue实例用来监听自定义事件的方法。它允许开发者在Vue实例上注册事件监听器,当事件被触发时,指定的回调函数会被执行。 事件监听:vm.$on允许开发者绑定一个或多个事件到Vue实例上,并且可…...

如何下载植物大战僵尸杂交版,最全攻略来了
《植物大战僵尸杂交版》由热爱原版游戏的B站UP主“潜艇伟伟迷”独立开发,带来了创新的游戏体验。如果你是策略游戏的爱好者,下面这份全面的下载和游玩攻略将是你的理想选择。 游戏亮点: 杂交植物系统:结合不同植物特性,…...
小公司全栈是归宿吗?
在软件开发领域,特别是在小公司或初创公司中,全栈开发者的角色确实相对普遍和重要。然而,说“全栈是归宿”可能过于绝对,因为每个开发者的职业路径和兴趣点都是不同的。 以下是关于全栈开发在小公司的一些考虑: 需求…...

对https://registry.npm.taobao.org/tyarn的请求失败,原因:证书过期
今天安装yarn时,报错如下: request to https://registry.npm.taobao.org/yarn failed, reason: certificate has expired 原来淘宝镜像过期了,需要重新搞一下 记录一下解决过程: 1.查看当前npm配置 npm config list 2.清…...
Redisson-Lock-加锁原理
归档 GitHub: Redisson-Lock-加锁原理 Unit-Test RedissonLockTest 说明 源码类:RedissonLock // 加锁入口 Override public void lock() { lock(-1, null, false); }/*** 加锁实现 */ private void lock(long leaseTime, TimeUnit unit, boolean interruptib…...
deepspeed win11 安装
目录 git地址: aio报错: 编译 报错 ops已存在: 修改拷贝代码: git地址: Bug Report: Issues Building DeepSpeed on Windows Issue #5679 microsoft/DeepSpeed GitHub aio报错: setup.py 配置变量 os.environ[DISTUTILS_USE_SDK]=1 os.environ[DS_BUILD_AIO]=…...
Python列表函数append()和extend()的区别
Python列表提供了两个容易混淆的追加函数:append()和extend()。它们之间的使用区别如下: list.append(obj):对象进栈。将一个对象作为整体追加到列表最后,返回Nonelist.extend(iter):可迭代对象的元素逐个进栈。将一个…...

Spring AI 实现调用openAi 多模态大模型
什么是多模态? 多模态(Multimodal)指的是数据或信息的多种表现形式。在人工智能领域,我们经常会听到这个词,尤其是在近期大型模型(如GPT-4)开始支持多模态之后。 模态:模态是指数据的一种形式,例如文本、图像、音频等。每一种形式都是一种模态。多模态:多模态就是将…...
《妃梦千年》第十二章:层层迷雾
第十二章:层层迷雾 苏珊遭遇险境的消息让林清婉感到紧张。她知道,宫中有些人对她的势力感到威胁,试图通过伤害苏珊来打击她。林清婉决定采取更谨慎的措施保护自己和苏珊,同时查明幕后黑手的身份。 几天后,林清婉收到…...

java的字节符输出流基类、File Writer类和Buffered Writer类
一、字节符输出流基类:Writer 1.属于抽象类 2.常用方法 二、字节符输出流Flie Writer类 1.是writer类的子类 2.以字符为数据处理单元向文本文件中写数据 3.示例 4.实现步骤 三、BufferedWriter类 1.是Writer类的子类。 2.带有缓冲区 默认情况下,…...

qt 简单实验 一个可以向右侧拖拽缩放的矩形
1.概要 目的是设置一个可以拖拽缩放的矩形,这里仅用右侧的一个边模拟这个过程。就是为了抓住核心,这个便解决了,其他的边也是一样的。而这个更能体现原理。 2.代码 2.1 resizablerectangle.h #ifndef RESIZABLERECTANGLE_H #define RESIZ…...

Google Adsense----Wordpress插入谷歌广告
1.搭建个人博客,绑定谷歌search consol,注册adsense 详细可以参考这个视频b站视频 2.将个人博客网站关联到Adsense 在adsense里新加网站,输入你的博客网址,双击网站 将这段代码复制到header.php的里面 在wordpress仪表盘的外观-主题文件编辑器,找到header.php将代码复制,…...
生成xcframework
打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...

日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻
在如今就业市场竞争日益激烈的背景下,越来越多的求职者将目光投向了日本及中日双语岗位。但是,一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧?面对生疏的日语交流环境,即便提前恶补了…...
ssc377d修改flash分区大小
1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...
Rust 异步编程
Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...

深度学习水论文:mamba+图像增强
🧀当前视觉领域对高效长序列建模需求激增,对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模,以及动态计算优势,在图像质量提升和细节恢复方面有难以替代的作用。 🧀因此短时间内,就有不…...

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)
引言 在人工智能飞速发展的今天,大语言模型(Large Language Models, LLMs)已成为技术领域的焦点。从智能写作到代码生成,LLM 的应用场景不断扩展,深刻改变了我们的工作和生活方式。然而,理解这些模型的内部…...

[ACTF2020 新生赛]Include 1(php://filter伪协议)
题目 做法 启动靶机,点进去 点进去 查看URL,有 ?fileflag.php说明存在文件包含,原理是php://filter 协议 当它与包含函数结合时,php://filter流会被当作php文件执行。 用php://filter加编码,能让PHP把文件内容…...