• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    go语言同步教程之条件变量

    Go的标准库中有一个类型叫条件变量:sync.Cond。这种类型与互斥锁和读写锁不同,它不是开箱即用的,它需要与互斥锁组合使用:

    // NewCond returns a new Cond with Locker l.
    func NewCond(l Locker) *Cond {
     return Cond{L: l}
    }
    
    // A Locker represents an object that can be locked and unlocked.
    type Locker interface {
     Lock()
     Unlock()
    }

    通过使用 NewCond 函数可以返回 *sync.Cond 类型的结果, *sync.Cond 我们主要操作其三个方法,分别是:

    wait():等待通知

    Signal():单播通知

    Broadcast():广播通知

    具体的函数说明如下:

    // Wait atomically unlocks c.L and suspends execution
    // of the calling goroutine. After later resuming execution,
    // Wait locks c.L before returning. Unlike in other systems,
    // Wait cannot return unless awoken by Broadcast or Signal.
    //
    // Because c.L is not locked when Wait first resumes, the caller
    // typically cannot assume that the condition is true when
    // Wait returns. Instead, the caller should Wait in a loop:
    //
    // c.L.Lock()
    // for !condition() {
    //  c.Wait()
    // }
    // ... make use of condition ...
    // c.L.Unlock()
    //
    func (c *Cond) Wait() {
     c.checker.check()
     t := runtime_notifyListAdd(c.notify)
     c.L.Unlock()
     runtime_notifyListWait(c.notify, t)
     c.L.Lock()
    }
    
    // Signal wakes one goroutine waiting on c, if there is any.
    //
    // It is allowed but not required for the caller to hold c.L
    // during the call.
    func (c *Cond) Signal() {
     c.checker.check()
     runtime_notifyListNotifyOne(c.notify)
    }
    
    // Broadcast wakes all goroutines waiting on c.
    //
    // It is allowed but not required for the caller to hold c.L
    // during the call.
    func (c *Cond) Broadcast() {
     c.checker.check()
     runtime_notifyListNotifyAll(c.notify)
    }

    条件变量sync.Cond本质上是一些正在等待某个条件的线程的同步机制。

    sync.Cond 主要实现一个条件变量,假如 goroutine A 执行前需要等待另外的goroutine B 的通知,那边处于等待的goroutine A 会保存在一个通知列表,也就是说需要某种变量状态的goroutine A 将会等待/Wait在那里,当某个时刻状态改变时负责通知的goroutine B 通过对条件变量通知的方式(Broadcast,Signal)来通知处于等待条件变量的goroutine A, 这样便可首先一种“消息通知”的同步机制。

    以go的http处理为例,在Go的源码中http模块server部分源码中所示,当需要处理一个新的连接的时候,若连接conn是实现自*tls.Conn的情况下,会进行相关的客户端与服务端的“握手”处理Handshake(), 入口代码如下:

    if tlsConn, ok := c.rwc.(*tls.Conn); ok {
      if d := c.server.ReadTimeout; d != 0 {
       c.rwc.SetReadDeadline(time.Now().Add(d))
      }
      if d := c.server.WriteTimeout; d != 0 {
       c.rwc.SetWriteDeadline(time.Now().Add(d))
      }
      if err := tlsConn.Handshake(); err != nil {
       c.server.logf("http: TLS handshake error from %s: %v", c.rwc.RemoteAddr(), err)
       return
      }
      c.tlsState = new(tls.ConnectionState)
      *c.tlsState = tlsConn.ConnectionState()
      if proto := c.tlsState.NegotiatedProtocol; validNPN(proto) {
       if fn := c.server.TLSNextProto[proto]; fn != nil {
        h := initNPNRequest{tlsConn, serverHandler{c.server}}
        fn(c.server, tlsConn, h)
       }
       return
      }
     }

    其中的Handshake函数代码通过使用条件变量的方式来处理新连接握手调用的同步问题:

    func (c *Conn) Handshake() error {
     c.handshakeMutex.Lock()
     defer c.handshakeMutex.Unlock()
    
     for {
      if err := c.handshakeErr; err != nil {
       return err
      }
      if c.handshakeComplete {
       return nil
      }
      if c.handshakeCond == nil {
       break
      }
    
      c.handshakeCond.Wait()
     }
    
     c.handshakeCond = sync.NewCond(c.handshakeMutex)
     c.handshakeMutex.Unlock()
    
     c.in.Lock()
     defer c.in.Unlock()
    
     c.handshakeMutex.Lock()
    
     if c.handshakeErr != nil || c.handshakeComplete {
      panic("handshake should not have been able to complete after handshakeCond was set")
     }
    
     if c.isClient {
      c.handshakeErr = c.clientHandshake()
     } else {
      c.handshakeErr = c.serverHandshake()
     }
     if c.handshakeErr == nil {
      c.handshakes++
     } else {
      c.flush()
     }
    
     if c.handshakeErr == nil  !c.handshakeComplete {
      panic("handshake should have had a result.")
     }
    
     c.handshakeCond.Broadcast()
     c.handshakeCond = nil
    
     return c.hand

    我们也可以再通过一个例子熟悉sync.Cond的使用:

    我们尝试实现一个读写同步的例子,需求是:我们有数个读取器和数个写入器,读取器必须依赖写入器对缓存区进行数据写入后,才可从缓存区中对数据进行读出。我们思考下,要实现类似的功能,除了使用channel,还能如何做?

    写入器每次完成写入数据后,它都需要某种通知机制广播给处于阻塞状态的读取器,告诉它们可以对数据进行访问,这其实跟sync.Cond 的 广播机制是不是很像? 有了这个广播机制,我们可以通过sync.Cond来实现这个例子了:

    package main
    
    import (
     "bytes"
     "fmt"
     "io"
     "sync"
     "time"
    )
    
    type MyDataBucket struct {
     br  *bytes.Buffer
     gmutex *sync.RWMutex
     rcond *sync.Cond //读操作需要用到的条件变量
    }
    
    func NewDataBucket() *MyDataBucket {
     buf := make([]byte, 0)
     db := MyDataBucket{
      br:  bytes.NewBuffer(buf),
      gmutex: new(sync.RWMutex),
     }
     db.rcond = sync.NewCond(db.gmutex.RLocker())
     return db
    }
    
    func (db *MyDataBucket) Read(i int) {
     db.gmutex.RLock()
     defer db.gmutex.RUnlock()
     var data []byte
     var d byte
     var err error
     for {
      //读取一个字节
      if d, err = db.br.ReadByte(); err != nil {
       if err == io.EOF {
        if string(data) != "" {
         fmt.Printf("reader-%d: %s\n", i, data)
        }
        db.rcond.Wait()
        data = data[:0]
        continue
       }
      }
      data = append(data, d)
     }
    }
    
    func (db *MyDataBucket) Put(d []byte) (int, error) {
     db.gmutex.Lock()
     defer db.gmutex.Unlock()
     //写入一个数据块
     n, err := db.br.Write(d)
     db.rcond.Broadcast()
     return n, err
    }
    
    func main() {
     db := NewDataBucket()
    
     go db.Read(1)
    
     go db.Read(2)
    
     for i := 0; i  10; i++ {
      go func(i int) {
       d := fmt.Sprintf("data-%d", i)
       db.Put([]byte(d))
      }(i)
      time.Sleep(100 * time.Millisecond)
     }
    }

    当使用sync.Cond的时候有两点移动要注意的:

    如下面 Wait() 的源码所示,Cond.Wait会自动释放锁等待信号的到来,当信号到来后,第一个获取到信号的Wait将继续往下执行并从新上锁

    func (c *Cond) Wait() {
     c.checker.check()
     t := runtime_notifyListAdd(c.notify)
     c.L.Unlock()
     runtime_notifyListWait(c.notify, t)
     c.L.Lock()
    }

    如果不释放锁, 其它收到信号的gouroutine将阻塞无法继续执行。

    总结

    以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对脚本之家的支持。

    您可能感兴趣的文章:
    • 详解Go语言变量作用域
    • go语言 全局变量和局部变量实例
    • go语言的初始化顺序,包,变量,init详解
    • go语言的工作空间和GOPATH环境变量介绍
    • Go语言变量创建的五种方法
    • Go语言基本的语法和内置数据类型初探
    • Go语言变量与基础数据类型详情
    上一篇:Go语言开发发送Get和Post请求的示例
    下一篇:golang不到30行代码实现依赖注入的方法
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    go语言同步教程之条件变量 语言,同步,教程,之,条件,