当前位置: 首页 > news >正文

聊聊 golang 中 channel

1、引言

Do not communicate by sharing memory; instead, share memory by communicating

Golang 的并发哲学是“不要通过共享内存进行通信,而要通过通信来共享内存”,提倡通过 channel 进行 goroutine 之间的数据传递和同步,而不是通过共享变量(内存)来实现。

func write(chanInt chan int) {for i := 0; i < 10; i++ {chanInt <- i}close(chanInt)
}func read(chanInt chan int, chanExit chan bool) {for {v, ok := <-chanIntif !ok {break}fmt.Println(v)}chanExit <- trueclose(chanExit)
}func TestCSP(t *testing.T) {chanInt := make(chan int, 10)chanExit := make(chan bool)go write(chanInt)go read(chanInt, chanExit)for {select {case _, ok := <-chanExit:if !ok {fmt.Println("done")return}}}}

如上述示例,write 函数负责写,read 函数负责读,chanInt 负责在两个 goroutine 进行数据同步,chanExit 负责监听数据已处理完成,并最终退出。整个程序没有看到锁,非常的优雅。

接下来,来说说 channel 的特性,最后结合底层源码来加深印象。

2、特性

2.1 基本用法

由于 channel 是引用类型,需要用 make 来初始化

chanBuffer := make(chan int, 10)
chanNoBuffer := make(chan int)

这里创建的是可读写的 channel,区别在于是否有 capacity(容量)

  • 带缓冲区的 channel,可以存储 cap 个数据
  • 不带缓冲区的 channel,一般用于同步
chanWriteOnly := make(chan<- int)
chanReadOnly := make(<-chan int)

这里创建的是只写和只读的 channel,不过这样写意义不大,一般用于传参,接下来用这两个 chan 把引言示示例中关于 writeread 函数给改下

func write(chanInt chan<- int) {for i := 0; i < 10; i++ {chanInt <- i}close(chanInt)
}func read(chanInt <-chan int, chanExit chan bool) {for {v, ok := <-chanIntif !ok {break}fmt.Println(v)}chanExit <- trueclose(chanExit)
}

查看 channel 的长度和容量

func TestChanLenCAP(t *testing.T) {chanInt := make(chan int, 2)chanInt <- 1fmt.Println(len(chanInt)) // 1fmt.Println(cap(chanInt)) // 2
}

关闭 channel

close(ch)

判断 channel 是否已关闭

func TestChanIsClosed(t *testing.T) {chanInt := make(chan int, 10)close(chanInt)if _, ok := <-chanInt; !ok {fmt.Println("closed")}
}

向一个已关闭的 channel 读数据,会读到零值,并且每次读也都是零值,因此可以利用这个特性来判断 channel 是否已关闭。

2.2 异常情况

接下来看看几种需要注意的异常情况

注意: Golang 版本为 1.19.12。不同版本的调度器和运行时的行为可能会有所不同,尤其是与死锁检测相关的机制。这些变化可能导致在某些版本中程序会更快地检测到死锁,而在其他版本中则可能仅仅是阻塞而不报错。

2.2.1 给一个 nil channel发送数据,

func TestWriteNil(t *testing.T) {var chanInt chan intchanInt <- 1
}

由于 chanInt 还没初始化,值为 nil,此时代码会阻塞在 chanInt <- 1 这一行,并最终形成死锁。

fatal error: all goroutines are asleep - deadlock!

解法:channel 使用前需要使用 make 初始化。

2.2.2 从一个 nil channel 读数据

func TestReadNil(t *testing.T) {var chanInt chan int<-chanInt
}

由于 chanInt 还没初始化,值为 nil,此时代码会阻塞在 <-chanInt 这一行,并最终形成死锁。

fatal error: all goroutines are asleep - deadlock!

解法:channel 使用前需要使用 make 初始化。

2.2.3 关闭一个 nil channel

func TestCloseNil(t *testing.T) {var chanInt chan intclose(chanInt)
}

如果尝试关闭一个 nilchannel,会导致运行时错误 panic: close of nil channel

panic: close of nil channel [recovered]panic: close of nil channel

解法:channel 使用前需要使用 make 初始化。

前三个异常说明,channel 使用前一定要使用 make 进行初始化。

2.2.4 向一个已关闭的 channel 发数据

func TestWriteClosed(t *testing.T) {chanNoBuffer := make(chan int)close(chanNoBuffer)chanNoBuffer <- 1
}

向一个已关闭的 channel 发送数据会引起 panic

panic: send on closed channel [recovered]panic: send on closed channel

这是因为一旦 channel 被关闭,就不能再向其发送数据,但可以继续从中接收数据

解法:判断 channel 是否已关闭。

2.2.5 向一个已关闭的 channel 发起重复关闭动作

func TestClosedOnceMore(t *testing.T) {chanNoBuffer := make(chan int)close(chanNoBuffer)close(chanNoBuffer)
}

尝试关闭一个已经关闭的 channel 会导致运行时错误 panic: close of closed channel。这个错误通常出现在多个 goroutine 试图关闭同一个 channel 或者代码逻辑不正确导致同一个 channel 被关闭多次。

panic: close of closed channel [recovered]panic: close of closed channel

解法:判断 channel 是否已关闭。

2.2.6 向没有缓冲区的 channel 写数据,但没有读取方

func TestSendNoBuffer(t *testing.T) {ch := make(chan int)ch <- 4
}

无缓冲的 channel 是一种同步通信机制,当只有发送方,没有接收方,会陷入阻塞而死锁。

fatal error: all goroutines are asleep - deadlock!

解法:无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。

2.2.7 向没有缓冲区的 channel 读取数据,但没有写入方

func TestReadNoBuffer(t *testing.T) {ch := make(chan int)<-ch
}

尝试从一个无缓冲的 channel 读取数据时,如果没有其他 goroutine 向该 channel 发送数据,读取操作将会阻塞。这会导致程序死锁,并最终导致运行时错误。

fatal error: all goroutines are asleep - deadlock!

解法:无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。

2.2.8 无缓冲区 channel 的发送和接收操作没有同时进行

func ReadNoBufferChan(chanBool chan bool) {<-chanBool
}func TestSendNoBufferChan(t *testing.T) {ch := make(chan bool)ch <- truego ReadNoBufferChan(ch)time.Sleep(1 * time.Second)
}

上面两个异常一直强调,由于无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。代码执行到 ch <- chan 时,调度器发现没有任何 goroutine 接收,于是阻塞并死锁。

fatal error: all goroutines are asleep - deadlock!

解法:无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。

func TestSendNoBufferChan(t *testing.T) {ch := make(chan bool)go ReadNoBufferChan(ch)ch <- truetime.Sleep(1 * time.Second)
}

go ReadNoBufferChan(ch) 提前,这样就确保了在发送数据之前,有一个 goroutine 正在等待接收数据。

对于无缓冲的 channel

  • 读取和写入要成对出现,并且不能在同一个 goroutine
  • 使用 for 读取数据时,写入方需要关闭 channel

2.2.9 向有缓存区的 channel 先读数据

func TestWriteBufferChan(t *testing.T) {ch := make(chan int, 1)if _, ok := <-ch; !ok {fmt.Println("closed")}
}

当尝试从一个空的带缓冲的 channel 读取数据时,读取操作会阻塞,直到有数据被写入 channel。这是因为即使是带缓冲的 channel,也需要在读取数据时有数据可读。

带缓冲的 channel 和无缓冲的 channel 的主要区别在于:带缓冲的 channel 可以存储一定数量的数据,而无缓冲的 channel 则需要发送和接收操作同步进行。然而,这并不改变以下事实:当一个 goroutine 试图从空的 channel 读取数据时,它会被阻塞,直到有其他 goroutine 写入数据。

fatal error: all goroutines are asleep - deadlock!

解法:需要在读取数据时有数据可读。

2.2.10 向有缓存区的 channel 写数据,但没有读取数据

func TestReadBufferChan(t *testing.T) {ch := make(chan int, 1)ch <- 1ch <- 2
}

当带缓冲的 channel 在缓冲区满时,写入操作会阻塞,直到有数据被读取以腾出缓冲区空间。如没有读取方,最后就会因阻塞而死锁。

fatal error: all goroutines are asleep - deadlock!

解法:当带缓冲的 channel 在缓冲区满时,需要有读取方,或者增加缓冲区的大小。

注意:对于带缓冲的 channel 在缓冲区没超过容量之前,写入数据,若没有读取,不像不带缓冲区的 channel 那样,不会产生死锁的。

其实,最后这两个带缓冲区 channel 异常情况总结就是

  • 若在同一个 goroutine 里,写数据操作一定在读数据操作前
  • channel 空了,接收者会阻塞
  • channel 满了,发送者会阻塞

3、底层实现

3.1 数据结构

Golangchannel 在运行时使用 runtime.hchan 结构体表示。

// runtime/chan.go
type hchan struct {qcount   uint           // 队列中的数据个数dataqsiz uint           // 环形缓冲区的大小buf      unsafe.Pointer // 环形缓冲区指针elemsize uint16         // 单个元素的大小closed   uint32         // 标志 channel 是否关闭elemtype *_type         // 元素的类型sendx    uint           // 发送操作的索引recvx    uint           // 接收操作的索引recvq    waitq          // 等待接收的 goroutine 队列sendq    waitq          // 等待发送的 goroutine 队列lock     mutex          // 保护 channel 的锁
}

在这里插入图片描述

先看看环形缓冲区相关的字段:

  • qcount: 当前缓冲区中的元素个数。
  • dataqsiz: 环形缓冲区的容量。
  • buf: 实际存储数据的缓冲区,类型为 unsafe.Pointer(类似 C 语言的 void *)。
  • elemsize: 每个元素的大小。
  • sendx: 环形缓冲区中下一个待写入的位置。
  • recvx: 环形缓冲区中下一个待读取的位置。

再来看看发送和接收队列:

  • recvq: 等待接收的 goroutine 队列。
  • sendq: 等待发送的 goroutine 队列。

这两个队列是通过 waitq 结构体来实现的,waitq 本质上是一个双向链表,链表中的每个节点是一个 sudog 结构体,sudog 代表一个等待中的 goroutine

type waitq struct {first *sudoglast  *sudog
}

最后看看 lock 字段

  • lock 锁用于保护 channel 数据结构的互斥锁。Golang 使用自旋锁和互斥锁的结合来保证 channel 操作的线程安全。

3.2 初始化

func makechan(t *chantype, size int) *hchan {elem := t.elem// compiler checks this but be safe.if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}mem, overflow := math.MulUintptr(elem.size, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchanswitch {case mem == 0:// Queue or element size is zero.c = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()case elem.ptrdata == 0:// Elements do not contain pointers.// Allocate hchan and buf in one call.c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default:// Elements contain pointers.c = new(hchan)c.buf = mallocgc(mem, elem, true)}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)lockInit(&c.lock, lockRankHchan)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")}return c
}

这里主要说下 switch 相关的分支代码

  • 第一个分支:如果 channel 的缓冲区大小是 0(也就是创建无缓冲 channel),或 channel 中的元素大小是 0(如 struct{}{}Golang 中“空结构体”是不占内存的,size0)时,调用 mallocgc() 在堆上为 channel 开辟一段大小为 hchanSize 的内存空间。
    • 这里说下 c.buf = c.raceaddr()c.raceaddr() 会返回一个地址,这个地址在内存中不会被实际用于存储数据,但会被数据竞争检测工具(如 Golangrace detector)用于同步,这也是无缓冲区的 channel 用来做数据同步场景的由来。
  • 第二个分支:如果元素不包含指针时。调用 mallocgc 一次性分配 hchanbuf 的内存。
  • 第三个分支:默认情况元素类型中有指针类型,调用了两次分配空间的函数 new/mallocgc

仔细看,三个分支都调用了 mallocgc 在堆上分配内存,也就说 channel 本身会被 GC 自动回收。

在函数的最后会初始化通道结构的字段,包括元素大小、元素类型、缓冲区大小和锁。

3.2 发送数据

// entry point for c <- x from compiled code
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}/** generic single channel send/recv* If block is not nil,* then the protocol will not* sleep but return if it could* not complete.** sleep can wake up with g.param == nil* when a channel involved in the sleep has* been closed.  it is easiest to loop and re-run* the operation; we'll see that it's now closed.*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 当 channel 为 nil 时处理if c == nil {if !block {return false}gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}// 竞态检测,是用来分析是否存在数据竞争。go test -race ./...if raceenabled {racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second full()).// Because a closed channel cannot transition from 'ready for sending' to// 'not ready for sending', even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasn't closed during the first observation. However, nothing here// guarantees forward progress. We rely on the side effects of lock release in// chanrecv() and closechan() to update this thread's view of c.closed and full().if !block && c.closed == 0 && full(c) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 加锁lock(&c.lock)// 检查 channel 是否关闭if c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// 检查是否有等待接收的 goroutineif sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// 检查 channel 缓冲区是否有空位if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx, nil)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true}// 非阻塞模式下if !block {unlock(&c.lock)return false}// 阻塞模式下,将当前 goroutine 加入发送队列并挂起,receiver 会帮我们完成后续的工作// Block on the channel. Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 打包 sudogmysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nil// 将当前这个发送 goroutine 打包后的 sudog 入队到 channel 的 sendq 队列中c.sendq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.// 将这个发送 g 从 Grunning -> Gwaiting// 进入休眠atomic.Store8(&gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs aren't considered as roots of the// stack tracer.KeepAlive(ep)// 以下唤醒后需要执行的代码// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseclosed := !mysg.successgp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)if closed {// 唤醒后,发现 channel 被关闭了if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}return true
}

代码比较长,可以分为两大部分:异常检测和发送数据

3.2.1 异常检测

代码一开始就排除了在异常章节中 nil channel 的情形,比如未初始化,或是被 GC 回收了。

接着会检测非阻塞模式下,也就是有缓冲区的 channel,如果还未 close 并且缓冲区已经满了,则直接返回 false


func TestASyncSendFull(t *testing.T) {ch := make(chan int, 1) // 创建一个缓冲区大小为 1 的 channelch <- 1                 // 向 channel 发送一个元素,此时缓冲区已满select {case ch <- 2: // 尝试发送第二个元素fmt.Println("Successfully sent 2")default: // 缓冲区已满,进入 default 分支fmt.Println("channel is full, unable to send 2")}
}

3.2.2 发送数据

发送数据可以归纳为以下三点

  • 直接发送:当 recvq 存在等待的接收者时,那么通过 runtime.send 直接将数据发送给阻塞的接收者
    • 注意:这里不会立马唤醒阻塞的接收者,而是将等待接收数据的 goroutine 标记成可运行状态 grunnable 并把该 goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;
  • 异步发送:当 buf 缓冲区存在空余空间时,将发送的数据写入 channel 的缓冲区;
  • 阻塞发送:当不存在缓冲区或者缓冲区已满时,等待其他 goroutinechannel 接收数据;
    • 将当前 goroutine 加入 sendq 发送队列并挂起,阻塞等待其他的协程从 channel 接收数据;
    • 当唤醒后,检查是否因为 channel 关闭而唤醒,如果是则触发 panic

发送数据的过程中包含几个会触发 goroutine 调度的时机:

  • 发送数据时发现 channel 上存在等待接收数据的 goroutine,立刻设置处理器的 runnext 属性,但是并不会立刻触发调度
  • 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 channelsendq 发送队列并调用 runtime.goparkunlock 触发 goroutine 的调度让出处理器的使用权;

3.3 接收数据

// entry points for <- c from compiled code
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)return
}// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c, "\n")}// 如果在 nil channel 上进行 recv 操作,那么会永远阻塞if c == nil {// 非阻塞的情况下,要直接返回,非阻塞出现在一些 select 的场景中if !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.if !block && empty(c) {// After observing that the channel is not ready for receiving, we observe whether the// channel is closed.//// Reordering of these checks could lead to incorrect behavior when racing with a close.// For example, if the channel was open and not empty, was closed, and then drained,// reordered reads could incorrectly indicate "open and empty". To prevent reordering,// we use atomic loads for both checks, and rely on emptying and closing to happen in// separate critical sections under the same lock.  This assumption fails when closing// an unbuffered channel with a blocked send, but that is an error condition anyway.if atomic.Load(&c.closed) == 0 {// Because a channel cannot be reopened, the later observation of the channel// being not closed implies that it was also not closed at the moment of the// first observation. We behave as if we observed the channel at that moment// and report that the receive cannot proceed.return}// The channel is irreversibly closed. Re-check whether the channel has any pending data// to receive, which could have arrived between the empty and closed checks above.// Sequential consistency is also required here, when racing with such a send.if empty(c) {// The channel is irreversibly closed and empty.if raceenabled {raceacquire(c.raceaddr())}if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)// 当前 channel 中没有数据可读if c.closed != 0 {if c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}// The channel has been closed, but the channel's buffer have data.} else {// sender 队列中有 sudog 在等待// 直接从该 sudog 中获取数据拷贝到当前 g 即可// Just found waiting sender with not closed.if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}}if c.qcount > 0 {// 直接从 buffer 里拷贝数据// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)// 接收索引 +1c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}// buffer 元素计数 -1c.qcount--unlock(&c.lock)return true, true}// 非阻塞时,且无数据可收if !block {unlock(&c.lock)return false, false}// no sender available: block on this channel.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 打包成 sudogmysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nil// 进入 recvq 队列c.recvq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.atomic.Store8(&gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// someone woke us up// 被唤醒if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}success := mysg.successgp.param = nilmysg.c = nilreleaseSudog(mysg)// 如果 channel 未被关闭,那就是真的 recv 到数据了return true, success
}func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if c.dataqsiz == 0 {if raceenabled {racesync(c, sg)}if ep != nil {// copy data from sender// 直接从发送者复制数据recvDirect(c.elemtype, sg, ep)}} else {// 缓冲区已满,从队列头部取出数据// Queue is full. Take the item at the// head of the queue. Make the sender enqueue// its item at the tail of the queue. Since the// queue is full, those are both the same slot.qp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)racenotify(c, c.recvx, sg)}// 将数据从队列复制到接收者// copy data from queue to receiverif ep != nil {typedmemmove(c.elemtype, ep, qp)}// 将数据从发送者复制到队列// copy data from sender to queuetypedmemmove(c.elemtype, qp, sg.elem)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}sg.elem = nilgp := sg.gunlockf()gp.param = unsafe.Pointer(sg)sg.success = trueif sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1)
}

Golangchannel 中,有两种接收方式

num <- ch
num, ok <- ch

这两种分别对应上述源码中的 chanrecv1chanrecv2,不过最终都会走到 chanrecv 函数。

3.3.1 异常检测

当我们从一个 nil channel 接收数据时(这里 nil 有可能是被 GC 回收导致的),若是非阻塞的 channel 会直接返回,否则会直接调用 runtime.gopark 让出处理器的使用权。

如果当前 channel 已经被 close 并且缓冲区中不存在任何数据,那么会清除 ep 指针中的数据并立刻返回。这里也就说明了为什么可以多次从已关闭的 channel 读取数据而不会报错。

3.3.2 接收数据

channel 接收数据可以归纳为以下三种情况:

3.3.2.1 直接接收

sendq 发送队列存在等待的发送者时,通过 runtime.recv 从阻塞的发送者或者缓冲区中获取数据。具体分为以下两种场景,可以仔细看 recv 函数

  • 场景一

buf 缓冲区的容量 dataqsiz0,也就是同步的 channel,调用 recvDirectsendq 发送队列中 sudog 存储的 ep 数据直接拷贝到接收者的内存地址中。

  • 场景二

当缓冲区已满时(会有两次内存的拷贝)

  • 先取出 buf 缓冲区头部的数据发给接收者(第一次拷贝)
  • 接着取出 sendq 发送队列头的数据拷贝到 buf 缓冲区中,并释放一个 sudog 阻塞的 goroutine(第二次拷贝)

到这里获取有人会问,为什么不直接从 sendq 取出数据发给接收方,而是要从 buf 里取出发给接收方?

原因在于 Golang 在缓冲模式下,channel 的数据在缓冲区中按照 FIFO(先入先出)顺序存储。缓冲区头部的数据肯定是最先存入的,那么也就需要最先取出。

这里再说下场景二下关于 recvxsendx 的更新机制。

  • 缓冲区已满时的处理逻辑

buf 缓冲区满时,recvx 指向的是 buf 的头部位置,这也是下一个将要被接收的数据。注意此时 sendx 也是指向缓冲区的头部位置。因为缓冲区已满,下一次发送会覆盖最旧的数据。

  • 从缓冲区读取数据

此时从已满的 buf 缓冲区读取数据,接收者从缓冲区的头部位置 recvx 获取数据,并将数据传递给接收方。并更新 recvx,使其指向下一个将要被接收的数据位置。

  • sendq 拷贝到缓冲区

由于此时 buf 头部的数据已经发送,那么则取出 sendq 头部的数据覆盖刚刚头部的位置所在的数据,并更新 sendx,使其和 recvx 保持一致,指向下一个要发送的位置。

这两个场景,无论发生哪种情况,运行时都会调用 runtime.goready 将当前处理器的 runnext 设置成发送数据的 goroutine,在调度器下一次调度时将阻塞的发送方唤醒。

3.3.2.2 异步接收

buf 缓冲区的 qcount 大于 0 时,也就是带缓冲的 channel 有数据时,那么会从 buf 缓冲区中 recvx 的索引位置取出数据进行处理:

  • 如果接收数据的内存地址不为空,那么会使用 runtime.typedmemmove 将缓冲区中的数据拷贝到内存中,并通过 runtime.typedmemclr 清除队列中的数据
  • 最后更新 channel 上相关数据:recvx 指向下一个位置(如果移动到了环形队列的队尾,下标需要回到队头),channelqcount 长度减一,并释放持有 channel 的锁
3.3.2.3 阻塞接收

当不属于上述两种情况,即当 channelsendq 发送队列中不存在等待的 goroutine 并且 buf 缓冲区中也不存在任何数据时,从 channel 中接收数据的操作会变成阻塞的。此时会将当前的goroutine 挂起并加入 channel 的接收队列 recvq,以便在有数据可用时能够被唤醒。

当然了,若是 goroutine 被唤醒后会完成 channel 的阻塞数据接收。接收完最后进行基本的参数检查,解除 channel 的绑定并释放 sudog

结合异常检测那一节,发现从 channel 接收数据时,会触发 goroutine 调度的两个时机:

  • channelnil
  • buf 缓冲区中不存在数据并且也不存在数据的发送者时

3.4 关闭管道

最后来看看关闭通道实现

func closechan(c *hchan) {// 关闭一个 nil channel 会直接 panicif c == nil {panic(plainError("close of nil channel"))}// 上锁,这个锁的粒度比较大,一直到释放完所有的 sudog 才解锁lock(&c.lock)// 在 close channel 时,如果 channel 已经关闭过了,直接触发 panicif c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc()racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))racerelease(c.raceaddr())}c.closed = 1var glist gList// release all readersfor {sg := c.recvq.dequeue()// 弹出的 sudog 是 nil,说明读队列已经空了if sg == nil {break}// sg.elem unsafe.Pointer,指向 sudog 的数据元素// 该元素可能在堆上分配,也可能在栈上if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}// 将 goroutine 入 glist// 为最后将全部 goroutine 都 ready 做准备gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// release all writers (they will panic)// 将所有挂在 channel 上的 writer 从 sendq 中弹出// 该操作会使所有 writer panic(向一个关闭的 channel 发数据会引起 panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}// 将所有挂在 channel 上的 writer 从 sendq 中弹出// 该操作会使所有 writer panicgp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// 在释放所有挂在 channel 上的读或写 sudog 时,是一直在临界区的unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.for !glist.empty() {gp := glist.pop()gp.schedlink = 0// 使 g 的状态切换到 Grunnablegoready(gp, 3)}
}

3.4.1 异常检测

  • 关闭一个 nil channel 会直接 panic
  • close channel 时,如果 channel 已经关闭过了,直接触发 panic

3.4.2 释放所有接收方和发送方

关闭 channel 的主要工作是释放所有的 readerswriters

主要就是取出 recvqsendqsudog 加入到 goroutine 待清除 glist 队列中,与此同时该函数会清除所有 runtime.sudog 上未被处理的元素。同时需要注意的是:在处理 sendq 时有可能会 panic,在之前的异常情况中列举往一个 closechannel 发送数据会引起 panic

最后会为所有被阻塞的 goroutine 调用 runtime.goready 触发调度。将所有 glist 队列中的 goroutine 状态从 _Gwaiting 设置为 _Grunnable 状态,等待调度器的调度。

3.4.3 优雅关闭通道

最后说说如何优雅关闭 channel

通过之前的异常小节介绍,发现:

  • 向已关闭的 channel 发送数据,会导致 panic
  • 重复关闭 channel,也会导致 panic

同时,还了解了:

  • 从一个已关闭的 channel 中接收数据,会得到零值,且不会导致程序异常
  • 关闭一个 channel,那么所有接收这个 channelselect case 都会收到信号

那么这里就引用 How to Gracefully Close Channels 介绍的优雅关闭 channel 方法来收尾。

package _0240623import ("log""math/rand""strconv""sync""testing""time"
)func TesGracefullyCloseChannel(t *testing.T) {rand.Seed(time.Now().UnixNano()) // needed before Go 1.20log.SetFlags(0)// ...const Max = 100000const NumReceivers = 10const NumSenders = 1000wgReceivers := sync.WaitGroup{}wgReceivers.Add(NumReceivers)// ...dataCh := make(chan int)stopCh := make(chan struct{})// stopCh is an additional signal channel.// Its sender is the moderator goroutine shown// below, and its receivers are all senders// and receivers of dataCh.toStop := make(chan string, 1)// The channel toStop is used to notify the// moderator to close the additional signal// channel (stopCh). Its senders are any senders// and receivers of dataCh, and its receiver is// the moderator goroutine shown below.// It must be a buffered channel.var stoppedBy string// moderatorgo func() {stoppedBy = <-toStopclose(stopCh)}()// sendersfor i := 0; i < NumSenders; i++ {go func(id string) {for {value := rand.Intn(Max)if value == 0 {// Here, the try-send operation is// to notify the moderator to close// the additional signal channel.select {case toStop <- "sender#" + id:default:}return}// The try-receive operation here is to// try to exit the sender goroutine as// early as possible. Try-receive and// try-send select blocks are specially// optimized by the standard Go// compiler, so they are very efficient.select {case <-stopCh:returndefault:}// Even if stopCh is closed, the first// branch in this select block might be// still not selected for some loops// (and for ever in theory) if the send// to dataCh is also non-blocking. If// this is unacceptable, then the above// try-receive operation is essential.select {case <-stopCh:returncase dataCh <- value:}}}(strconv.Itoa(i))}// receiversfor i := 0; i < NumReceivers; i++ {go func(id string) {defer wgReceivers.Done()for {// Same as the sender goroutine, the// try-receive operation here is to// try to exit the receiver goroutine// as early as possible.select {case <-stopCh:returndefault:}// Even if stopCh is closed, the first// branch in this select block might be// still not selected for some loops// (and forever in theory) if the receive// from dataCh is also non-blocking. If// this is not acceptable, then the above// try-receive operation is essential.select {case <-stopCh:returncase value := <-dataCh:if value == Max-1 {// Here, the same trick is// used to notify the moderator// to close the additional// signal channel.select {case toStop <- "receiver#" + id:default:}return}log.Println(value)}}}(strconv.Itoa(i))}// ...wgReceivers.Wait()log.Println("stopped by", stoppedBy)
}

这段代码的核心是这里

// moderator
go func() {stoppedBy = <-toStopclose(stopCh)
}()

对于生产者和消费者是 M*N 的情况,显然既不能在生产方关闭通道,也不适合在消费方关闭通道。那么就引入中间方,那就是 toStop,起个 goroutine 然后 stoppedBy = <-toStop 阻塞在这里,只要生产者和消费者一方满足条件,向 toStop 写入数据了,那么就可以关闭 stopCh。这也正好契合上面的 moderator 注释,一个 协调者,用来协调生产者和消费者在 M*N 情况下如何优雅关闭 channel

相关文章:

聊聊 golang 中 channel

1、引言 Do not communicate by sharing memory; instead, share memory by communicating Golang 的并发哲学是“不要通过共享内存进行通信&#xff0c;而要通过通信来共享内存”&#xff0c;提倡通过 channel 进行 goroutine 之间的数据传递和同步&#xff0c;而不是通过共享…...

SK Hynix 3D DRAM良率突破56.1%,开启存储新时代

根据韩国财经媒体Business Korea独家报道&#xff1a;在刚刚结束的VLSI 2024国际研讨会上&#xff0c;韩国半导体巨头SK Hynix公布了一项振奋人心的进展&#xff1a;其五层堆叠3D DRAM的制造良率已达到56.1%。此成果标志着3D DRAM技术在商业化道路上迈出了坚实的一步&#xff0…...

如何封装自动化测试框架?

封装自动化测试框架&#xff0c;测试人员不用关注框架的底层实现&#xff0c;根据指定的规则进行测试用例的创建、执行即可&#xff0c;这样就降低了自动化测试门槛&#xff0c;能解放出更多的人力去做更深入的测试工作。 本篇文章就来介绍下&#xff0c;如何封装自动化测试框…...

基于Java的在线编程考试系统【附源码】

毕业设计(论文) 题目&#xff1a;基于 二级学院&#xff1a; 现代技术学院 专业(方向)&#xff1a; 计算机应用技术 班 级&#xff1a; 计科B2015 学 生&#xff1a; 指导教师&#xff1a; 2024年1月 29 日 本科毕业论文&#xff08;设计&#xff09;学术诚信声明 本人郑重…...

Beautiful Soup的使用

1、Beautiful Soup简介 Beautiful Soup是一个Python的一个HTML或XML的解析库&#xff0c;我们用它可以方便地从网页中提取数据。 Beautiful Soup 提供一些简单的、Python 式的函数来处理导航、搜索、修改分析树等功能。它是一个工具箱&#xff0c;通过解析文档为用户提供需要抓…...

633. 平方数之和(中等)

633. 平方数之和 1. 题目描述2.详细题解3.代码实现3.1 Python3.2 Java内存溢出溢出代码正确代码与截图 1. 题目描述 题目中转&#xff1a;633. 平方数之和 2.详细题解 本题是167. 两数之和 II - 输入有序数组&#xff08;中等&#xff09;题目的变型&#xff0c;由两数之和变…...

GIT回滚

1. 使用 git revert git revert 命令会创建一个新的提交&#xff0c;这个提交会撤销指定提交的更改。这通常用于公共分支&#xff08;如 main 或 master&#xff09;&#xff0c;因为它不会重写历史。 git revert HEAD # 撤销最近的提交 # 或者指定一个特定的提交哈希值 …...

BEVM基于OP-Stack发布首个以WBTC为GAS连接以太坊和比特币生态的中继链

为了更好的连接以太坊和比特币生态&#xff0c;BEVM团队正在基于OPtimism的OP Stack来构建一个以WBTC为GAS兼容OP-Rollup的中继链&#xff0c;这条中继链将作为一种完全去中心化的中间层&#xff0c;把以太坊上的主流资产(WBTC/ ETH/USDC/USDT等)引入到BEVM网络。 不仅如此&am…...

【vuejs】 $on、$once、$off、$emit 事件监听方法详解以及项目实战

1. Vue实例方法概述 1.1 vm.$on vm.$on是Vue实例用来监听自定义事件的方法。它允许开发者在Vue实例上注册事件监听器&#xff0c;当事件被触发时&#xff0c;指定的回调函数会被执行。 事件监听&#xff1a;vm.$on允许开发者绑定一个或多个事件到Vue实例上&#xff0c;并且可…...

如何下载植物大战僵尸杂交版,最全攻略来了

《植物大战僵尸杂交版》由热爱原版游戏的B站UP主“潜艇伟伟迷”独立开发&#xff0c;带来了创新的游戏体验。如果你是策略游戏的爱好者&#xff0c;下面这份全面的下载和游玩攻略将是你的理想选择。 游戏亮点&#xff1a; 杂交植物系统&#xff1a;结合不同植物特性&#xff0c…...

小公司全栈是归宿吗?

在软件开发领域&#xff0c;特别是在小公司或初创公司中&#xff0c;全栈开发者的角色确实相对普遍和重要。然而&#xff0c;说“全栈是归宿”可能过于绝对&#xff0c;因为每个开发者的职业路径和兴趣点都是不同的。 以下是关于全栈开发在小公司的一些考虑&#xff1a; 需求…...

对https://registry.npm.taobao.org/tyarn的请求失败,原因:证书过期

今天安装yarn时&#xff0c;报错如下&#xff1a; request to https://registry.npm.taobao.org/yarn failed, reason: certificate has expired 原来淘宝镜像过期了&#xff0c;需要重新搞一下 记录一下解决过程&#xff1a; 1.查看当前npm配置 npm config list 2.清…...

Redisson-Lock-加锁原理

归档 GitHub: Redisson-Lock-加锁原理 Unit-Test RedissonLockTest 说明 源码类&#xff1a;RedissonLock // 加锁入口 Override public void lock() { lock(-1, null, false); }/*** 加锁实现 */ private void lock(long leaseTime, TimeUnit unit, boolean interruptib…...

deepspeed win11 安装

目录 git地址: aio报错: 编译 报错 ops已存在: 修改拷贝代码: git地址: Bug Report: Issues Building DeepSpeed on Windows Issue #5679 microsoft/DeepSpeed GitHub aio报错: setup.py 配置变量 os.environ[DISTUTILS_USE_SDK]=1 os.environ[DS_BUILD_AIO]=…...

Python列表函数append()和extend()的区别

Python列表提供了两个容易混淆的追加函数&#xff1a;append()和extend()。它们之间的使用区别如下&#xff1a; list.append(obj)&#xff1a;对象进栈。将一个对象作为整体追加到列表最后&#xff0c;返回Nonelist.extend(iter)&#xff1a;可迭代对象的元素逐个进栈。将一个…...

Spring AI 实现调用openAi 多模态大模型

什么是多模态? 多模态(Multimodal)指的是数据或信息的多种表现形式。在人工智能领域,我们经常会听到这个词,尤其是在近期大型模型(如GPT-4)开始支持多模态之后。 模态:模态是指数据的一种形式,例如文本、图像、音频等。每一种形式都是一种模态。多模态:多模态就是将…...

《妃梦千年》第十二章:层层迷雾

第十二章&#xff1a;层层迷雾 苏珊遭遇险境的消息让林清婉感到紧张。她知道&#xff0c;宫中有些人对她的势力感到威胁&#xff0c;试图通过伤害苏珊来打击她。林清婉决定采取更谨慎的措施保护自己和苏珊&#xff0c;同时查明幕后黑手的身份。 几天后&#xff0c;林清婉收到…...

java的字节符输出流基类、File Writer类和Buffered Writer类

一、字节符输出流基类&#xff1a;Writer 1.属于抽象类 2.常用方法 二、字节符输出流Flie Writer类 1.是writer类的子类 2.以字符为数据处理单元向文本文件中写数据 3.示例 4.实现步骤 三、BufferedWriter类 1.是Writer类的子类。 2.带有缓冲区 默认情况下&#xff0c…...

qt 简单实验 一个可以向右侧拖拽缩放的矩形

1.概要 目的是设置一个可以拖拽缩放的矩形&#xff0c;这里仅用右侧的一个边模拟这个过程。就是为了抓住核心&#xff0c;这个便解决了&#xff0c;其他的边也是一样的。而这个更能体现原理。 2.代码 2.1 resizablerectangle.h #ifndef RESIZABLERECTANGLE_H #define RESIZ…...

Google Adsense----Wordpress插入谷歌广告

1.搭建个人博客,绑定谷歌search consol,注册adsense 详细可以参考这个视频b站视频 2.将个人博客网站关联到Adsense 在adsense里新加网站,输入你的博客网址,双击网站 将这段代码复制到header.php的里面 在wordpress仪表盘的外观-主题文件编辑器,找到header.php将代码复制,…...

2-17 基于matlab的改进的遗传算法(IGA)对城市交通信号优化分析

基于matlab的改进的遗传算法&#xff08;IGA&#xff09;对城市交通信号优化分析。根据交通流量以及饱和流量&#xff0c;对城市道路交叉口交通信号灯实施合理优化控制&#xff0c;考虑到交通状况的动态变化&#xff0c;及每个交叉口的唯一性。通过实时监测交通流量&#xff0c…...

VOC格式转YOLO格式,xml文件转txt文件简单通用代码

目录 前言 思路介绍 代码 完整代码 拓展代码 前言 很多人在进行目标检测训练时习惯将得到的数据标注为XML文件的VOC格式&#xff0c;或者在网上获取的数据集被标注为XML文件&#xff0c;但是不同的标注工具进行的标注会产生不同的标注xml文件&#xff0c;这里我写了一种通用…...

STL迭代器的基础应用

STL迭代器的应用 迭代器的定义方法&#xff1a; 类型作用定义方式正向迭代器正序遍历STL容器容器类名::iterator 迭代器名常量正向迭代器以只读方式正序遍历STL容器容器类名::const_iterator 迭代器名反向迭代器逆序遍历STL容器容器类名::reverse_iterator 迭代器名常量反向迭…...

【SQL】数据操作语言(DML) - 删除数据:精细管理数据的利刃

目录 前言 DELETE语句的基础使用 删除指定记录 清空表与删除表数据的区别 注意 前言 在数据库管理的日常工作中&#xff0c;数据的删除是一项需要格外小心的操作&#xff0c;因为一旦数据被删除&#xff0c;往往难以恢复。数据操作语言(DML)中的DELETE语句&am…...

异步复制,主库宕机后,数据可能丢失吗?

异步复制是数据库复制的一种方式&#xff0c;它允许主数据库&#xff08;主库&#xff09;在不等待从数据库&#xff08;从库&#xff09;完成数据同步的情况下继续处理事务。这种方式可以提高数据库的性能&#xff0c;因为主库不需要等待数据复制到从库。然而&#xff0c;异步…...

如何在Spring Boot中优雅处理异常

如何在Spring Boot中优雅处理异常 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们将深入探讨在Spring Boot应用程序中如何优雅地处理异常&#xff0c;以…...

1.3.数据的表示

定点数 原码 最高位是符号位&#xff0c;0表示正号&#xff0c;1表示负号&#xff0c;其余的n-1位表示数值的绝对值。 数值0的原码表示有两种形式&#xff1a; [0]原0 0000000 [-0]原1 0000000 例&#xff1a;1010 最高位为1表示这是一个负数&#xff0c; 其它三位 010…...

【进阶篇-Day4:使用JAVA编写石头迷阵游戏】

目录 1、绘制界面2、打乱石头方块3、移动业务4、游戏判定胜利5、统计步数6、重新游戏7、完整代码&#xff1a; 1、绘制界面 上述思路是&#xff1a;使用一个二维数组存放图片的编号&#xff0c;然后在后持遍历即可获取对应的图片。 代码如下&#xff1a; package com.itheima.s…...

探索 LLamaWorker:基于LLamaSharp的.NET本地大模型服务

LLamaWorker 是一个基于 LLamaSharp 项目开发的 HTTP API 服务器。它提供与 OpenAI 兼容的 API&#xff0c;使得开发者可以轻松地将大型语言模型&#xff08;LLM&#xff09;集成到自己的应用程序中。 1. 背景 在人工智能领域&#xff0c;大型语言模型&#xff08;LLM&#xf…...

Qt开发 | Qt控件 | QTabWidget基本用法 | QListWidget应用详解 | QScrollArea应用详解

文章目录 一、QTabWidget基本用法二、QListWidget应用详解1.列表模式1.1 基本操作1.2 添加自定义item1.3 如何添加右键菜单1.4 QListWidget如何删除item 2.图标模式 三、QScrollArea应用详解 一、QTabWidget基本用法 QTabWidget 是 Qt 框架中的一个类&#xff0c;它提供了一个选…...