• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    golang将多路复异步io转成阻塞io的方法详解

    前言

    本文主要给大家介绍了关于golang 如何将多路复异步io转变成阻塞io的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍:

    package main
    
    import (
     "net"
    )
    
    func handleConnection(c net.Conn) {
     //读写数据
     buffer := make([]byte, 1024)
     c.Read(buffer)
     c.Write([]byte("Hello from server"))
    }
    
    func main() {
     l, err := net.Listen("tcp", "host:port")
     if err != nil {
     return
     }
     defer l.Close()
     for {
     c, err := l.Accept()
     if err!= nil {
     return
     }
     go handleConnection(c)
     }
    }

    对于我们都会写上面的代码,很简单,的确golang的网络部分对于我们隐藏了太多东西,我们不用像c++一样去调用底层的socket函数,也不用去使用epoll等复杂的io多路复用相关的逻辑,但是上面的代码真的就像我们看起来的那样在调用accept和read时阻塞吗?

    // Multiple goroutines may invoke methods on a Conn simultaneously.
    //官方注释:多个goroutines可能同时调用方法在一个连接上,我的理解就是所谓的惊群效应吧
    //换句话说就是你多个goroutines监听同一个连接同一个事件,所有的goroutines都会触发,
    //这只是我的猜测,有待验证。
    type Conn interface {
     Read(b []byte) (n int, err error)
     Write(b []byte) (n int, err error)
     Close() error
     LocalAddr() Addr
     RemoteAddr() Addr
     SetDeadline(t time.Time) error
     SetReadDeadline(t time.Time) error
     SetWriteDeadline(t time.Time) error
    }
    
    type conn struct {
     fd *netFD
    }

    这里面又一个Conn接口,下面conn实现了这个接口,里面只有一个成员netFD.

    // Network file descriptor.
    type netFD struct {
     // locking/lifetime of sysfd + serialize access to Read and Write methods
     fdmu fdMutex
    
     // immutable until Close
     sysfd  int
     family  int
     sotype  int
     isConnected bool
     net   string
     laddr  Addr
     raddr  Addr
    
     // wait server
     pd pollDesc
    }
    
    func (fd *netFD) accept() (netfd *netFD, err error) {
     //................
     for {
     s, rsa, err = accept(fd.sysfd)
     if err != nil {
     nerr, ok := err.(*os.SyscallError)
     if !ok {
     return nil, err
     }
     switch nerr.Err {
     /* 如果错误是EAGAIN说明Socket的缓冲区为空,未读取到任何数据
        则调用fd.pd.WaitRead,*/
     case syscall.EAGAIN:
     if err = fd.pd.waitRead(); err == nil {
      continue
     }
     case syscall.ECONNABORTED:
     continue
     }
     return nil, err
     }
     break
     }
     //.........
     //代码过长不再列出,感兴趣看go的源码,runtime 下的fd_unix.go
     return netfd, nil
    }

    上面代码段是accept部分,这里我们注意当accept有错误发生的时候,会检查这个错误是否是syscall.EAGAIN,如果是,则调用WaitRead将当前读这个fd的goroutine在此等待,直到这个fd上的读事件再次发生为止。当这个socket上有新数据到来的时候,WaitRead调用返回,继续for循环的执行,这样以来就让调用netFD的Read的地方变成了同步“阻塞”。有兴趣的可以看netFD的读和写方法,都有同样的实现。

    到这里所有的疑问都集中到了pollDesc上,它到底是什么呢?

    const (
     pdReady uintptr = 1
     pdWait uintptr = 2
    )
    
    // Network poller descriptor.
    type pollDesc struct {
     link *pollDesc // in pollcache, protected by pollcache.lock
     lock mutex // protects the following fields
     fd  uintptr
     closing bool
     seq  uintptr // protects from stale timers and ready notifications
     rg  uintptr // pdReady, pdWait, G waiting for read or nil
     rt  timer // read deadline timer (set if rt.f != nil)
     rd  int64 // read deadline
     wg  uintptr // pdReady, pdWait, G waiting for write or nil
     wt  timer // write deadline timer
     wd  int64 // write deadline
     user uint32 // user settable cookie
    }
    
    type pollCache struct {
     lock mutex
     first *pollDesc
    }

    pollDesc网络轮询器是Golang中针对每个socket文件描述符建立的轮询机制。 此处的轮询并不是一般意义上的轮询,而是Golang的runtime在调度goroutine或者GC完成之后或者指定时间之内,调用epoll_wait获取所有产生IO事件的socket文件描述符。当然在runtime轮询之前,需要将socket文件描述符和当前goroutine的相关信息加入epoll维护的数据结构中,并挂起当前goroutine,当IO就绪后,通过epoll返回的文件描述符和其中附带的goroutine的信息,重新恢复当前goroutine的执行。这里我们可以看到pollDesc中有两个变量wg和rg,其实我们可以把它们看作信号量,这两个变量有几种不同的状态:

    继续接着上面的WaitRead调用说起,go在这里到底做了什么让当前的goroutine挂起了呢。

    func net_runtime_pollWait(pd *pollDesc, mode int) int {
     err := netpollcheckerr(pd, int32(mode))
     if err != 0 {
     return err
     }
     // As for now only Solaris uses level-triggered IO.
     if GOOS == "solaris" {
     netpollarm(pd, mode)
     }
     for !netpollblock(pd, int32(mode), false) {
     err = netpollcheckerr(pd, int32(mode))
     if err != 0 {
     return err
     }
     // Can happen if timeout has fired and unblocked us,
     // but before we had a chance to run, timeout has been reset.
     // Pretend it has not happened and retry.
     }
     return 0
    }
    
    
    // returns true if IO is ready, or false if timedout or closed
    // waitio - wait only for completed IO, ignore errors
    func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
     //根据读写模式获取相应的pollDesc中的读写信号量
     gpp := pd.rg
     if mode == 'w' {
     gpp = pd.wg
     }
    
     for {
     old := *gpp
     //已经准备好直接返回true
     if old == pdReady {
     *gpp = 0
     return true
     }
     if old != 0 {
     throw("netpollblock: double wait")
     }
      //设置gpp pdWait
     if atomic.Casuintptr(gpp, 0, pdWait) {
     break
     }
     }
    
     if waitio || netpollcheckerr(pd, mode) == 0 {
     gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
     }
    
     old := atomic.Xchguintptr(gpp, 0)
     if old > pdWait {
     throw("netpollblock: corrupted state")
     }
     return old == pdReady
    }

    当调用WaitRead时经过一段汇编最重调用了上面的net_runtime_pollWait函数,该函数循环调用了netpollblock函数,返回true表示io已准备好,返回false表示错误或者超时,在netpollblock中调用了gopark函数,gopark函数调用了mcall的函数,该函数用汇编来实现,具体功能就是把当前的goroutine挂起,然后去执行其他可执行的goroutine。到这里整个goroutine挂起的过程已经结束,那当goroutine可读的时候是如何通知该goroutine呢,这就是epoll的功劳了。

    func netpoll(block bool) *g {
     if epfd == -1 {
     return nil
     }
     waitms := int32(-1)
     if !block {
     waitms = 0
     }
     var events [128]epollevent
    retry:
     //每次最多监听128个事件
     n := epollwait(epfd, events[0], int32(len(events)), waitms)
     if n  0 {
     if n != -_EINTR {
     println("runtime: epollwait on fd", epfd, "failed with", -n)
     throw("epollwait failed")
     }
     goto retry
     }
     var gp guintptr
     for i := int32(0); i  n; i++ {
     ev := events[i]
     if ev.events == 0 {
     continue
     }
     var mode int32
     //读事件
     if ev.events(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
     mode += 'r'
     }
     //写事件
     if ev.events(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
     mode += 'w'
     }
     if mode != 0 {
      //把epoll中的data转换成pollDesc
     pd := *(**pollDesc)(unsafe.Pointer(ev.data))
     netpollready(gp, pd, mode)
     }
     }
     if block  gp == 0 {
     goto retry
     }
     return gp.ptr()
    }

    这里就是熟悉的代码了,epoll的使用,看起来亲民多了。pd:=*(**pollDesc)(unsafe.Pointer(ev.data))这是最关键的一句,我们在这里拿到当前可读时间的pollDesc,上面我们已经说了,当pollDesc的读写信号量保存为G pointer时当前goroutine就会挂起。而在这里我们调用了netpollready函数,函数中把相应的读写信号量G指针擦出,置为pdReady,G-pointer状态被抹去,当前goroutine的G指针就放到可运行队列中,这样goroutine就被唤醒了。

    可以看到虽然我们在写tcp server看似一个阻塞的网络模型,在其底层实际上是基于异步多路复用的机制来实现的,只是把它封装成了跟阻塞io相似的开发模式,这样是使得我们不用去关注异步io,多路复用等这些复杂的概念以及混乱的回调函数。

    总结

    以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对脚本之家的支持。

    您可能感兴趣的文章:
    • Go语言同步与异步执行多个任务封装详解(Runner和RunnerAsync)
    上一篇:Golang中数据结构Queue的实现方法详解
    下一篇:利用Golang实现TCP连接的双向拷贝详解
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    golang将多路复异步io转成阻塞io的方法详解 golang,将,多路,复,异步,转成,