详解Golang中select的使用与源码分析

01-03 51阅读 0评论

?=背景

golang 中主推 channel 通信。单个 channel 的通信可以通过一个Goroutinechannel数据,另外一个从channel取数据进行。这是阻塞的,因为要想顺利执行完这个步骤,需要 channel 准备好才行,准备好的条件如下:

1.发送

缓存有空间(如果是有缓存的 channel)有等待接收的 GOroutine

2.接收

缓存有数据(如果是有缓存的 channel)有等待发送的 goroutine

channel实际使用中还有如下两个需求,这个时候就需要select了。

同时监听多个channel在没有channel准备好的时候,也可以往下执行。

select 流程

1.空select作用是阻塞当前goroutine。不要用for{}来阻塞goroutine,因为会占用cpu。而select{}不会,因为当前goroutine不会再被调度。

 if len(cases) == 0 {  block()  }

2.配置好poll的顺序。由于是同时监听多个channel的发送或者接收,所以需要按照一定的顺序查看哪个channel准备好了。如果每次采用select中的顺序查看channel是否准备好了,那么只要在前面的channel准备好的足够快,那么会造成后面的channel即使准备好了,也永远不会被执行。打乱顺序的逻辑如下,采用了洗牌算法\color{red}{洗牌算法}洗牌算法,注意此过程中会过滤掉channel为nil的case。\color{red}{注意此过程中会过滤掉 channel 为 nil 的 case。}注意此过程中会过滤掉channel为nil的case。

 // generate permuted order  norder := 0  for i := range scases {  cas := &scases[i]   // Omit cases without channels from the poll and lock orders.  if cas.c == nil {  cas.elem = nil // allow GC  continue  }   j := fastrandn(uint32(norder + 1))  pollorder[norder] = pollorder[j]  pollorder[j] = UInt16(i)  norder++  }

3.配置好lock的顺序。由于可能会修改channel中的数据,所以在打算往channel中发送数据或者从channel接收数据的时候,需要锁住 channel。而一个channel可能被多个select监听,如果两个select对两个channel A和B,分别按照顺序A, B和B,A上锁,是可能会造成死锁的,导致两个select都执行不下去。

详解Golang中select的使用与源码分析

所以select中锁住channel的顺序至关重要,解决方案是按照channel的地址的顺序锁住channel。因为在两个selectchannel有交集的时候,都是按照交集中channel的地址顺序锁channel

实际排序代码如下,采用堆排序算法\color{red}{堆排序算法}堆排序算法按照channel的地址从小到大对channel进行排序。

 // sort the cases by Hchan address to Get the locking order.  // simple heap sort, to guarantee n log n time and constant stack footprint.  for i := range lockorder {  j := i  // Start with the pollorder to permute cases on the same channel.  c := scases[pollorder[i]].c  for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {  k := (j - 1) / 2  lockorder[j] = lockorder[k]  j = k  }  lockorder[j] = pollorder[i]  }  for i := len(lockorder) - 1; i >= 0; i-- {  o := lockorder[i]  c := scases[o].c  lockorder[i] = lockorder[0]  j := 0  for {  k := j*2 + 1  if k >= i {  break  }  if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {  k++  }  if c.sortkey() < scases[lockorder[k]].c.sortkey() {  lockorder[j] = lockorder[k]  j = k  continue  }  break  }  lockorder[j] = o  }

4.锁住select中的所有channel。要查看channel中的数据了。

 // lock all the channels involved in the select  sellock(scases, lockorder) 

5.第一轮查看是否已有准备好的channel。如果有直接发送数据到channel或者从channel接收数据。注意selectchannel切片中,前面部分是从channel接收数据的case,后半部分是往channel发送数据的case。

详解Golang中select的使用与源码分析

按照pollorder顺序查看是否有channel准备好了。

 for _, casei := range pollorder {  casi = int(casei)  cas = &scases[casi]  c = cas.c  if casi >= nsends {  sg = c.sendq.dequeue()  if sg != nil {  goto recv  }  if c.qcount > 0 {  goto bufrecv  }  if c.Closed != 0 {  goto rclose  }  } else {  if raceenabled {  racereadpc(c.raceaddr(), casePC(casi), chansendpc)  }  if c.closed != 0 {  goto sclose  }  sg = c.recvq.dequeue()  if sg != nil {  goto send  }  if c.qcount < c.dataqsiz {  goto bufsend  }  }  }

6.直接执行default分支

 if !block {  selunlock(scases, lockorder)  casi = -1  goto retc  }

7.第二轮遍历channel创建sudog把当前goroutine放到每个channel的等待列表中去,等待channel准备好时被唤醒。

 // pass 2 - enqueue on all chans  gp = getg()  if gp.waiting != nil {  throw("gp.waiting != nil")  }  nextp = &gp.waiting  for _, casei := range lockorder {  casi = int(casei)  cas = &scases[casi]  c = cas.c  sg := acquireSudog()  sg.g = gp  sg.isSelect = true  // No stack spliTS between assigning elem and enqueuing  // sg on gp.waiting where Copystack can find it.  sg.elem = cas.elem  sg.releasetime = 0  if t0 != 0 {  sg.releasetime = -1  }  sg.c = c  // ConStruct waiting list in lock order.  *nextp = sg  nextp = &sg.waitlink   if casi < nsends {  c.sendq.enqueue(sg)  } else {  c.recvq.enqueue(sg)  }  }

8.等待被唤醒。其中gopark的时候会释放对所有channel占用的锁。

 // wait for someone to wake us up  gp.param = nil  // Signal to anyone trying to shrink our stack that we're about  // to park on a channel. The window between when this G's status  // changes and when we set gp.activeStackChans is not safe for  // stack shrinking.  atomic.Store8(&gp.parkingOnChan, 1)  gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)  gp.activeStackChans = false 

9.被唤醒

锁住所有channel清理当前goroutine的等待sudog找到是被哪个channel唤醒的,并清理每个channel上当前的goroutine对应的sudog
 sellock(scases, lockorder)   gp.selecTDone = 0  sg = (*sudog)(gp.param)  gp.param = nil   // pass 3 - dequeue from unsuccessful chans  // otherwise they stack up on quiet channels  // record the successful case, if any.  // We singly-linked up the SudoGs in lock order.  casi = -1  cas = nil  caseSuccess = false  sglist = gp.waiting  // Clear all elem before unlinking from gp.waiting.  for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {  sg1.isSelect = false  sg1.elem = nil  sg1.c = nil  }  gp.waiting = nil   for _, casei := range lockorder {  k = &scases[casei]  if sg == sglist {  // sg has already been dequeued by the G that woke us up.  casi = int(casei)  cas = k  caseSuccess = sglist.success  if sglist.releasetime > 0 {  caseReleaseTime = sglist.releasetime  }  } else {  c = k.c  if int(casei) < nsends {  c.sendq.dequeueSudoG(sglist)  } else {  c.recvq.dequeueSudoG(sglist)  }  }  sgnext = sglist.waitlink  sglist.waitlink = nil  releaseSudog(sglist)  sglist = sgnext  }

到此这篇关于详解Golang中select的使用与源码分析的文章就介绍到这了,更多相关GoLang select内容请搜索云初冀北以前的文章或继续浏览下面的相关文章希望大家以后多多支持云初冀北!

免责声明
本站提供的资源,都来自网络,版权争议与本站无关,所有内容及软件的文章仅限用于学习和研究目的。不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负,我们不保证内容的长久可用性,通过使用本站内容随之而来的风险与本站无关,您必须在下载后的24个小时之内,从您的电脑/手机中彻底删除上述内容。如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。侵删请致信E-mail:goliszhou@gmail.com
$

发表评论

表情:
评论列表 (暂无评论,51人围观)

还没有评论,来说两句吧...