• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    golang实现redis的延时消息队列功能示例

    前言

    在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现。本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel来实现一个小demo。

    提前准备 安装redis, redis-go

    因为用的是macOS, 直接

    $ brew install redis
    $ go get github.com/garyburd/redigo/redis

    又因为比较懒,生成任务的唯一id时,直接采用了bson中的objectId,所以:

    $ go get gopkg.in/mgo.v2/bson

    唯一id不是必须有,但如果之后有实际应用需要携带,便于查找相应任务。

    生产者

    通过一个for循环生成10w个任务, 每一个任务有不同的时间

    func producer() {
     count := 0
     //生成100000个任务
     for count  100000 {
     count++
     dealTime := int64(rand.Intn(5)) + time.Now().Unix()
     uuid := bson.NewObjectId().Hex()
     redis.Client.AddJob(job.JobMessage{
     Id: uuid,
     DealTime: dealTime,
     }, + int64(dealTime))
     }
    }
    
    

    其中AddJob函数在另一个包中, 将上一个函数中随机生成的时间作为需要处理的时间戳.

    // 添加任务
    func (client *RedisClient) AddJob(msg *job.JobMessage, dealTime int64) {
     conn := client.Get()
     defer conn.Close()
    
     key := "JOB_MESSAGE_QUEUE"
     conn.Do("zadd", key, dealTime, util.JsonEncode(msg))
    }
    
    

    消费者

    消费者处理流程分为两个步骤:

    因为在获取小于等于当前时间戳的任务时,可能有多个go routine同时读到了当前任务,而只有一个任务可以来处理当前任务。因此我们需要通过一个方案来判断究竟由谁来处理这个任务(当然如果只有一个消费者可以读到就直接处理):这个时候可以通过redis的删除操作来获取,因为删除指定value时只有成功的操作才会返回不为0,所以我们可以认为删除当前队列成功的那个go routine拿到了当前的任务。

    下面是代码:

    // 消费者
    func consumer() {
     // 启动10个go routine一起去拿
     count := 0
     for count  10 {
     go func() {
     for {
     jobs := redis.Client.GetJob()
     if len(jobs) = 0 {
      time.Sleep(time.Second * 1)
      continue
     }
     currentJob := jobs[0]
     // 如果当前抢redis队列成功,
     if redis.Client.DelJob(currentJob) > 0 {
      var jobMessage job.JobMessage
      util.JsonDecode(currentJob, jobMessage) //自定义的json解析函数
      handleMessage(jobMessage)
     }
    
     }
    
     }()
     count++
     }
    }
    
    // 处理任务用函数
    func handleMessage(msg *job.JobMessage) {
     fmt.Printf("deal job: %s, require time: %d \n", msg.Id, msg.DealTime)
     go func() {
     countChan - true
     }()
    }

    redis部分的代码,获取任务和删除任务

    // 获取任务
    func (client *RedisClient) GetJob() []string {
     conn := client.Get()
     defer conn.Close()
    
     key := "JOB_MESSAGE_QUEUE"
     timeNow := time.Now().Unix()
     ret, err := redis.Strings(conn.Do("zrangebyscore", key, 0, timeNow, "limit", 0, 1))
     if err != nil {
     panic(err)
     }
     return ret
    }
    
    // 删除当前任务, 用来判断是否抢到了当前任务
    func (client *RedisClient) DelJob(value string) int {
     conn := client.Get()
     defer conn.Close()
    
     key := "JOB_MESSAGE_QUEUE"
     ret, err := redis.Int(conn.Do("zrem", key, value))
     if err != nil {
     panic(err)
     }
     return ret
    }
    
    

    代码大抵如此。最后跑起来之后,大概每3-4秒钟能够处理掉1w个任务,速度上确实是...

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

    您可能感兴趣的文章:
    • Java实现Redis延时消息队列
    上一篇:Go 如何基于IP限制HTTP访问频率的方法实现
    下一篇:go切片的copy和view的使用方法
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    golang实现redis的延时消息队列功能示例 golang,实现,redis,的,延时,