• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    再次探讨go实现无限 buffer 的 channel方法

    前言

    总所周知,go 里面只有两种 channel,一种是 unbuffered channel, 其声明方式为

    ch := make(chan interface{})

    另一种是 buffered channel,其声明方式为

    bufferSize := 5
    ch := make(chan interface{},bufferSize)

    对于一个 buffered channel,无论它的 buffer 有多大,它终究是有极限的。这个极限就是该 channel 最初被 make 时,所指定的 bufferSize 。

    jojo,buffer channel 的大小是有极限的,我不做 channel 了。

    一旦 channel 满了的话,再往里面添加元素的话,将会阻塞。

    so how can we make a infinite buffer channel?

    本文参考了 medinum 上面的一篇文章,有兴趣的同学可以直接阅读原文。

    实现

    接口的设计

    首先当然是建一个 struct,在百度翻译的帮助下,我们将这个 struct 取名为 InfiniteChannel

    type InfiniteChannel struct {
    }

    思考一下 channel 的核心行为,实际上就两个,一个流入(Fan in),一个流出(Fan out),因此我们添加如下几个 method。

    func (c *InfiniteChannel) In(val interface{}) {
    	// todo
    }
    
    func (c *InfiniteChannel) Out() interface{} {
    	// todo
    }

    内部实现

    通过 In() 接收的数据,总得需要一个地方来存放。我们可以用一个 slice 来存放,就算用 In() 往里面添加了很多元素,也可以通过 append() 来拓展 sliceslice 的容量可以无限拓展下去(内存足够的话),所以 channel 也是 infiniteInfiniteChannel 的第一个成员就这么敲定下来的。

    type InfiniteChannel struct {
    	data    []interface{}
    }

    用户调用 In()Out() 时,可能是并发的环境,在 go 中如何进行并发编程,最容易想到的肯定是 channel 了,因此我们在内部准备两个 channel,一个 inChan,一个 outChan,用 inChan 来接收数据,用 outChan 来流出数据。

    type InfiniteChannel struct {
    	inChan  chan interface{}
    	outChan chan interface{}
    	data    []interface{}
    }
    
    func (c *InfiniteChannel) In(val interface{}) {
    	c.inChan - val
    }
    
    func (c *InfiniteChannel) Out() interface{} {
    	return -c.outChan
    }

    其中, inChanoutChan 都是 unbuffered channel。

    此外,也肯定是需要一个 select 来处理来自 inChanoutChan 身上的事件。因此我们另起一个协程,在里面做 select 操作。

    func (c *InfiniteChannel) background() {
    	for true {
    		select {
    		case newVal := -c.inChan:
    			c.data = append(c.data, newVal)
            case c.outChan - c.pop():		// pop() 将取出队列的首个元素
    		}
    	}
    }
    func NewInfiniteChannel() *InfiniteChannel {
    	c := InfiniteChannel{
    		inChan:  make(chan interface{}),
    		outChan: make(chan interface{}),
    	}
    	go c.background()	// 注意这里另起了一个协程
    	return c
    }

    ps:感觉这也算是 go 并发编程的一个套路了。即

    1. 在 new struct 的时候,顺手 go 一个 select 协程,select 协程内执行一个 for 循环,不停的 select,监听一个或者多个 channel 的事件。
    2. struct 对外提供的 method,只会操作 struct 内的 channel(在本例中就是 inChan 和 outChan),不会操作 struct 内的其他数据(在本例中,In() 和 Out() 都没有直接操作 data)。
    3. 触发 channel 的事件后,由 select 协程进行数据的更新(在本例中就是 data )。因为只有 select 协程对除 channel 外的数据成员进行读写操作,且 go 保证了对于 channel 的并发读写是安全的,所以代码是并发安全的。
    4. 如果 struct 是 exported ,用户或许会越过 new ,直接手动 make 一个 struct,可以考虑将 struct 设置为 unexported,把它的首字母小写即可。

    pop() 的实现也非常简单。

    // 取出队列的首个元素,如果队列为空,将会返回一个 nil
    func (c *InfiniteChannel) pop() interface{} {
    	if len(c.data) == 0 {
    		return nil
    	}
    	val := c.data[0]
    	c.data = c.data[1:]
    	return val
    }

    测试一下

    用一个协程每秒钟生产一条数据,另一个协程每半秒消费一条数据,并打印。

    func main() {
    	c := NewInfiniteChannel()
    	go func() {
    		for i := 0; i  20; i++ {
    			c.In(i)
    			time.Sleep(time.Second)
    		}
    	}()
    
    	for i := 0; i  50; i++ {
    		val := c.Out()
    		fmt.Print(val)
    		time.Sleep(time.Millisecond * 500)
    	}
    }
    // out
    nil>0nil>1nil>23nil>4nil>nil>5nil>67nil>nil>89nil>nil>1011nil>12nil>13nil>14nil>15nil>16nil>17nil>nil>1819nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>
    Process finished with the exit code 0

    可以看到,将 InfiniteChannel 内没有数据可供消费时,调用 Out() 将会返回一个 nil,不过这也在我们的意料之中,原因是 pop() 在队列为空时,将会返回 nil。

    目前 InfiniteChannel 的行为与标准的 channel 的行为是有出入的,go 中的 channel,在没有数据却仍要取数据时会被阻塞,如何实现这个效果?

    优化

    我认为此处是是整篇文章最有技巧的地方,我第一次看到时忍不住拍案叫绝。

    首先把原来的 background() 摘出来

    func (c *InfiniteChannel) background() {
    	for true {
    		select {
    		case newVal := -c.inChan:
    			c.data = append(c.data, newVal)
    		case c.outChan - c.pop():
    		}
    	}
    }

    outChan 进行一个简单封装

    func (c *InfiniteChannel) background() {
    	for true {
    		select {
    		case newVal := -c.inChan:
    			c.data = append(c.data, newVal)
    		case c.outChanWrapper() - c.pop():
    		}
    	}
    }
    func (c *InfiniteChannel) outChanWrapper() chan interface{} {
    	return c.outChan
    }

    目前为止,一切照旧。

    点睛之笔来了:

    func (c *InfiniteChannel) outChanWrapper() chan interface{} {
    	if len(c.data) == 0 {
    		return nil
    	}
    	return c.outChan
    }

    c.data 为空的时候,返回一个 nil

    background() 中,当执行到 case c.outChan - c.pop(): 时,实际上将会变成:

    case nil - nil:

    go 中,是无法往一个 nilchannel 中发送元素的。例如

    func main() {
    	var c chan interface{}
    	select {
    	case c - 1:
    	}
    }
    // fatal error: all goroutines are asleep - deadlock!
    func main() {
    	var c chan interface{}
    	select {
    	case c - 1:
    	default:
    		fmt.Println("hello world")
    	}
    }
    // hello world

    因此,对于

    select {
    case newVal := -c.inChan:
    	c.data = append(c.data, newVal)
    case c.outChanWrapper() - c.pop():
    }

    将会一直阻塞在 select 那里,直到 inChan 来了数据。

    再测试一下

    012345678910111213141516171819fatal error: all goroutines are asleep - deadlock!

    最后,程序 panic 了,因为死锁了。

    补充

    实际上 channel 除了 In()Out() 外,还有一个行为,即 close(),如果 channel close 后,依旧从其中取元素的话,将会取出该类型的默认值。

    func main() {
    	c := make(chan interface{})
    	close(c)
    	for true {
    		v := -c
    		fmt.Println(v)
    		time.Sleep(time.Second)
    	}
    }
    // output
    // nil>
    // nil>
    // nil>
    // nil>
    func main() {
    	c := make(chan interface{})
    	close(c)
    	for true {
    		v, isOpen := -c
    		fmt.Println(v, isOpen)
    		time.Sleep(time.Second)
    	}
    }
    // output
    // nil> false
    // nil> false
    // nil> false
    // nil> false

    我们也需要实现相同的效果。

    func (c *InfiniteChannel) Close() {
    	close(c.inChan)
    }
    
    func (c *InfiniteChannel) background() {
    	for true {
    		select {
    		case newVal, isOpen := -c.inChan:
    			if isOpen {
    				c.data = append(c.data, newVal)
    			} else {
    				c.isOpen = false
    			}
    		case c.outChanWrapper() - c.pop():
    		}
    	}
    }
    
    func NewInfiniteChannel() *InfiniteChannel {
    	c := InfiniteChannel{
    		inChan:  make(chan interface{}),
    		outChan: make(chan interface{}),
    		isOpen:  true,
    	}
    	go c.background()
    	return c
    }
    
    func (c *InfiniteChannel) outChanWrapper() chan interface{} {
        // 这里添加了对 c.isOpen 的判断
    	if c.isOpen  len(c.data) == 0 {
    		return nil
    	}
    	return c.outChan
    }

    再测试一下

    func main() {
    	c := NewInfiniteChannel()
    	go func() {
    		for i := 0; i  20; i++ {
    			c.In(i)
    			time.Sleep(time.Second)
    		}
    		c.Close()		// 这里调用了 Close
    	}()
    
    	for i := 0; i  50; i++ {
    		val := c.Out()
    		fmt.Print(val)
    		time.Sleep(time.Millisecond * 500)
    	}
    }
    // output
    012345678910111213141516171819nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>
    Process finished with the exit code 0

    符合预期

    遗憾

    目前看上去已经很完美了,但是和标准的 channel 相比,仍然有差距。因为标准的 channel 是有这种用法的

    v,isOpen := - ch

    可以通过 isOpen 变量来获取 channel 的开闭情况。

    因此 InfiniteChannel 也应该提供一个类似的 method

    func (c *InfiniteChannel) OutAndIsOpen() (interface{}, bool) {
    	// todo
    }

    可惜的是,要想得知 InfiniteChannel 是否是 Open 的,就必定要访问 InfiniteChannel 内的 isOpen 成员。

    type InfiniteChannel struct {
    	inChan  chan interface{}
    	outChan chan interface{}
    	data    []interface{}
    	isOpen  bool
    }

    isOpen 并非 channel 类型,根据之前的套路,这种非 channel 类型的成员只应该被 select 协程访问。一旦有多个协程访问,就会出现并发问题,除非加锁。

    我不能接受!所以干脆不提供这个 method 了,嘿嘿。

    完整代码

    func main() {
    	c := NewInfiniteChannel()
    	go func() {
    		for i := 0; i  20; i++ {
    			c.In(i)
    			time.Sleep(time.Second)
    		}
    		c.Close()
    	}()
    
    	for i := 0; i  50; i++ {
    		val := c.Out()
    		fmt.Print(val)
    		time.Sleep(time.Millisecond * 500)
    	}
    }
    
    type InfiniteChannel struct {
    	inChan  chan interface{}
    	outChan chan interface{}
    	data    []interface{}
    	isOpen  bool
    }
    
    func (c *InfiniteChannel) In(val interface{}) {
    	c.inChan - val
    }
    
    func (c *InfiniteChannel) Out() interface{} {
    	return -c.outChan
    }
    
    func (c *InfiniteChannel) Close() {
    	close(c.inChan)
    }
    
    func (c *InfiniteChannel) background() {
    	for true {
    		select {
    		case newVal, isOpen := -c.inChan:
    			if isOpen {
    				c.data = append(c.data, newVal)
    			} else {
    				c.isOpen = false
    			}
    		case c.outChanWrapper() - c.pop():
    		}
    	}
    }
    
    func NewInfiniteChannel() *InfiniteChannel {
    	c := InfiniteChannel{
    		inChan:  make(chan interface{}),
    		outChan: make(chan interface{}),
    		isOpen:  true,
    	}
    	go c.background()
    	return c
    }
    
    // 取出队列的首个元素,如果队列为空,将会返回一个 nil
    func (c *InfiniteChannel) pop() interface{} {
    	if len(c.data) == 0 {
    		return nil
    	}
    	val := c.data[0]
    	c.data = c.data[1:]
    	return val
    }
    
    func (c *InfiniteChannel) outChanWrapper() chan interface{} {
    	if c.isOpen  len(c.data) == 0 {
    		return nil
    	}
    	return c.outChan
    }

    参考

    https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd

    以上就是再次探讨go实现无限 buffer 的 channel方法的详细内容,更多关于go无限 buffer 的 channel的资料请关注脚本之家其它相关文章!

    您可能感兴趣的文章:
    • Go语言中使用 buffered channel 实现线程安全的 pool
    • go原生库的中bytes.Buffer用法
    • C#语言使用gRPC、protobuf(Google Protocol Buffers)实现文件传输功能
    • 详解Django-channels 实现WebSocket实例
    • Django使用Channels实现WebSocket的方法
    • Django Channels 实现点对点实时聊天和消息推送功能
    上一篇:Go遍历struct,map,slice的实现
    下一篇:浅谈Go语言多态的实现与interface使用
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    再次探讨go实现无限 buffer 的 channel方法 再次,探讨,实现,无限,buffer,