• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    使用Redis实现延时任务的解决方案

    最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案。这篇文章记录了调研的过程,以及初步方案的实现。

    候选方案对比

    下面是想到的几种实现延时任务的方案,总结了一下相应的优势和劣势。

    方案 优势 劣势 选用场景
    JDK 内置的延迟队列 DelayQueue 实现简单 数据内存态,不可靠 一致性相对低的场景
    调度框架和 MySQL 进行短间隔轮询 实现简单,可靠性高 存在明显的性能瓶颈 数据量较少实时性相对低的场景
    RabbitMQ  DLX  TTL,一般称为 死信队列 方案 异步交互可以削峰 延时的时间长度不可控,如果数据需要持久化则性能会降低 -
    调度框架和 Redis 进行短间隔轮询 数据持久化,高性能 实现难度大 常见于支付结果回调方案
    时间轮 实时性高 实现难度大,内存消耗大 实时性高的场景

    如果应用的数据量不高,实时性要求比较低,选用调度框架和 MySQL 进行短间隔轮询这个方案是最优的方案。但是笔者遇到的场景数据量相对比较大,实时性并不高,采用扫库的方案一定会对 MySQL 实例造成比较大的压力。记得很早之前,看过一个PPT叫《盒子科技聚合支付系统演进》,其中里面有一张图片给予笔者一点启发:

    里面刚好用到了调度框架和 Redis 进行短间隔轮询实现延时任务的方案,不过为了分摊应用的压力,图中的方案还做了分片处理。鉴于笔者当前业务紧迫,所以在第一期的方案暂时不考虑分片,只做了一个简化版的实现。

    由于PPT中没有任何的代码或者框架贴出,有些需要解决的技术点需要自行思考,下面会重现一次整个方案实现的详细过程。

    场景设计

    实际的生产场景是笔者负责的某个系统需要对接一个外部的资金方,每一笔资金下单后需要延时30分钟推送对应的附件。这里简化为一个订单信息数据延迟处理的场景,就是每一笔下单记录一条订单消息(暂时叫做 OrderMessage ),订单消息需要延迟5到15秒后进行异步处理。

    否决的候选方案实现思路

    下面介绍一下其它四个不选用的候选方案,结合一些伪代码和流程分析一下实现过程。

    JDK内置延迟队列

    DelayQueue 是一个阻塞队列的实现,它的队列元素必须是 Delayed 的子类,这里做个简单的例子:

    public class DelayQueueMain {
      private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class);
      public static void main(String[] args) throws Exception {
        DelayQueueOrderMessage> queue = new DelayQueue>();
        // 默认延迟5秒
        OrderMessage message = new OrderMessage("ORDER_ID_10086");
        queue.add(message);
        // 延迟6秒
        message = new OrderMessage("ORDER_ID_10087", 6);
        queue.add(message);
        // 延迟10秒
        message = new OrderMessage("ORDER_ID_10088", 10);
        queue.add(message);
        ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
          Thread thread = new Thread(r);
          thread.setName("DelayWorker");
          thread.setDaemon(true);
          return thread;
        });
        LOGGER.info("开始执行调度线程...");
        executorService.execute(() -> {
          while (true) {
            try {
              OrderMessage task = queue.take();
              LOGGER.info("延迟处理订单消息,{}", task.getDescription());
            } catch (Exception e) {
              LOGGER.error(e.getMessage(), e);
            }
          }
        });
        Thread.sleep(Integer.MAX_VALUE);
      }
      private static class OrderMessage implements Delayed {
        private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        /**
         * 默认延迟5000毫秒
         */
        private static final long DELAY_MS = 1000L * 5;
        /**
         * 订单ID
         */
        private final String orderId;
        /**
         * 创建时间戳
         */
        private final long timest
        /**
         * 过期时间
         */
        private final long expire;
        /**
         * 描述
         */
        private final String description;
        public OrderMessage(String orderId, long expireSeconds) {
          this.orderId = orderId;
          this.timestamp = System.currentTimeMillis();
          this.expire = this.timestamp + expireSeconds * 1000L;
          this.description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderId,
              LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
              LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
        }
        public OrderMessage(String orderId) {
          this.orderId = orderId;
          this.timestamp = System.currentTimeMillis();
          this.expire = this.timestamp + DELAY_MS;
          this.description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderId,
              LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
              LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
        }
        public String getOrderId() {
          return orderId;
        }
        public long getTimestamp() {
          return timest
        }
        public long getExpire() {
          return expire;
        }
        public String getDescription() {
          return description;
        }
        @Override
        public long getDelay(TimeUnit unit) {
          return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        @Override
        public int compareTo(Delayed o) {
          return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }
      }
    }

    注意一下, OrderMessage 实现 Delayed 接口,关键是需要实现 Delayed#getDelay()Delayed#compareTo() 。运行一下 main() 方法:

    10:16:08.240 [main] INFO club.throwable.delay.DelayQueueMain - 开始执行调度线程...
    10:16:13.224 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10086]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:13
    10:16:14.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10087]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:14
    10:16:18.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10088]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:18

    调度框架 + MySQL

    使用调度框架对 MySQL 表进行短间隔轮询是实现难度比较低的方案,通常服务刚上线,表数据不多并且实时性不高的情况下应该首选这个方案。不过要注意以下几点:

    MySQL

    引入 QuartzMySQL 的Java驱动包和 spring-boot-starter-jdbc (这里只是为了方便用相对轻量级的框架实现,生产中可以按场景按需选择其他更合理的框架):

    dependency>
      groupId>mysql/groupId>
      artifactId>mysql-connector-java/artifactId>
      version>5.1.48/version>
      scope>test/scope>
    /dependency>
    dependency>
      groupId>org.springframework.boot/groupId>
      artifactId>spring-boot-starter-jdbc/artifactId>
      version>2.1.7.RELEASE/version>
      scope>test/scope>
    /dependency>
    dependency>
      groupId>org.quartz-scheduler/groupId>
      artifactId>quartz/artifactId>
      version>2.3.1/version>
      scope>test/scope>
    /dependency>

    假设表设计如下:

    CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci;
    
    USE `delayTask`;
    
    CREATE TABLE `t_order_message`
    (
      id      BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT,
      order_id   VARCHAR(50) NOT NULL COMMENT '订单ID',
      create_time DATETIME  NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建日期时间',
      edit_time  DATETIME  NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改日期时间',
      retry_times TINYINT   NOT NULL DEFAULT 0 COMMENT '重试次数',
      order_status TINYINT   NOT NULL DEFAULT 0 COMMENT '订单状态',
      INDEX idx_order_id (order_id),
      INDEX idx_create_time (create_time)
    ) COMMENT '订单信息表';
    
    # 写入两条测试数据
    INSERT INTO t_order_message(order_id) VALUES ('10086'),('10087');

    编写代码:

    // 常量
    public class OrderConstants {
    
      public static final int MAX_RETRY_TIMES = 5;
    
      public static final int PENDING = 0;
    
      public static final int SUCCESS = 1;
    
      public static final int FAIL = -1;
    
      public static final int LIMIT = 10;
    }
    
    // 实体
    @Builder
    @Data
    public class OrderMessage {
    
      private Long id;
      private String orderId;
      private LocalDateTime createTime;
      private LocalDateTime editTime;
      private Integer retryTimes;
      private Integer orderStatus;
    }
    
    // DAO
    @RequiredArgsConstructor
    public class OrderMessageDao {
    
      private final JdbcTemplate jdbcTemplate;
    
      private static final ResultSetExtractorListOrderMessage>> M = r -> {
        ListOrderMessage> list = Lists.newArrayList();
        while (r.next()) {
          list.add(OrderMessage.builder()
              .id(r.getLong("id"))
              .orderId(r.getString("order_id"))
              .createTime(r.getTimestamp("create_time").toLocalDateTime())
              .editTime(r.getTimestamp("edit_time").toLocalDateTime())
              .retryTimes(r.getInt("retry_times"))
              .orderStatus(r.getInt("order_status"))
              .build());
        }
        return list;
      };
    
      public ListOrderMessage> selectPendingRecords(LocalDateTime start,
                              LocalDateTime end,
                              ListInteger> statusList,
                              int maxRetryTimes,
                              int limit) {
        StringJoiner joiner = new StringJoiner(",");
        statusList.forEach(s -> joiner.add(String.valueOf(s)));
        return jdbcTemplate.query("SELECT * FROM t_order_message WHERE create_time >= ? AND create_time = ? " +
                "AND order_status IN (?) AND retry_times  ? LIMIT ?",
            p -> {
              p.setTimestamp(1, Timestamp.valueOf(start));
              p.setTimestamp(2, Timestamp.valueOf(end));
              p.setString(3, joiner.toString());
              p.setInt(4, maxRetryTimes);
              p.setInt(5, limit);
            }, M);
      }
    
      public int updateOrderStatus(Long id, int status) {
        return jdbcTemplate.update("UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?",
            p -> {
              p.setInt(1, status);
              p.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now()));
              p.setLong(3, id);
            });
      }
    }
    
    // Service
    @RequiredArgsConstructor
    public class OrderMessageService {
    
      private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageService.class);
    
      private final OrderMessageDao orderMessageDao;
    
      private static final ListInteger> STATUS = Lists.newArrayList();
    
      static {
        STATUS.add(OrderConstants.PENDING);
        STATUS.add(OrderConstants.FAIL);
      }
    
      public void executeDelayJob() {
        LOGGER.info("订单处理定时任务开始执行......");
        LocalDateTime end = LocalDateTime.now();
        // 一天前
        LocalDateTime start = end.minusDays(1);
        ListOrderMessage> list = orderMessageDao.selectPendingRecords(start, end, STATUS, OrderConstants.MAX_RETRY_TIMES, OrderConstants.LIMIT);
        if (!list.isEmpty()) {
          for (OrderMessage m : list) {
            LOGGER.info("处理订单[{}],状态由{}更新为{}", m.getOrderId(), m.getOrderStatus(), OrderConstants.SUCCESS);
            // 这里其实可以优化为批量更新
            orderMessageDao.updateOrderStatus(m.getId(), OrderConstants.SUCCESS);
          }
        }
        LOGGER.info("订单处理定时任务开始完毕......");
      }
    }
    
    // Job
    @DisallowConcurrentExecution
    public class OrderMessageDelayJob implements Job {
    
      @Override
      public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        OrderMessageService service = (OrderMessageService) jobExecutionContext.getMergedJobDataMap().get("orderMessageService");
        service.executeDelayJob();
      }
    
      public static void main(String[] args) throws Exception {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://localhost:3306/delayTask?useSSL=falsecharacterEncoding=utf8");
        config.setDriverClassName(Driver.class.getName());
        config.setUsername("root");
        config.setPassword("root");
        HikariDataSource dataSource = new HikariDataSource(config);
        OrderMessageDao orderMessageDao = new OrderMessageDao(new JdbcTemplate(dataSource));
        OrderMessageService service = new OrderMessageService(orderMessageDao);
        // 内存模式的调度器
        StdSchedulerFactory factory = new StdSchedulerFactory();
        Scheduler scheduler = factory.getScheduler();
        // 这里没有用到IOC容器,直接用Quartz数据集合传递服务引用
        JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.put("orderMessageService", service);
        // 新建Job
        JobDetail job = JobBuilder.newJob(OrderMessageDelayJob.class)
            .withIdentity("orderMessageDelayJob", "delayJob")
            .usingJobData(jobDataMap)
            .build();
        // 新建触发器,10秒执行一次
        Trigger trigger = TriggerBuilder.newTrigger()
            .withIdentity("orderMessageDelayTrigger", "delayJob")
            .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
            .build();
        scheduler.scheduleJob(job, trigger);
        // 启动调度器
        scheduler.start();
        Thread.sleep(Integer.MAX_VALUE);
      }
    }

    这个例子里面用了 create_time 做轮询,实际上可以添加一个调度时间 schedule_time 列做轮询,这样子才能更容易定制空闲时和忙碌时候的调度策略。上面的示例的运行效果如下:

    11:58:27.202 [main] INFO org.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler (v2.3.1) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
     Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
     NOT STARTED.
     Currently in standby mode.
     Number of jobs executed: 0
     Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
     Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
    
    11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
    11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3.1
    11:58:27.209 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
    11:58:27.212 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
    11:58:27.217 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
    11:58:27.219 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@10eb8c53
    11:58:27.220 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
    11:58:27.221 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job delayJob.orderMessageDelayJob
    11:58:34.440 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始执行......
    11:58:34.451 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@3d27ece4
    11:58:34.459 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@64e808af
    11:58:34.470 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@79c8c2b7
    11:58:34.477 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@19a62369
    11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@1673d017
    11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - After adding stats (total=10, active=0, idle=10, waiting=0)
    11:58:34.559 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL query
    11:58:34.565 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [SELECT * FROM t_order_message WHERE create_time >= ? AND create_time = ? AND order_status IN (?) AND retry_times  ? LIMIT ?]
    11:58:34.645 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
    11:58:35.210 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - SQLWarning ignored: SQL state '22007', error code '1292', message [Truncated incorrect DOUBLE value: '0,-1']
    11:58:35.335 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 处理订单[10086],状态由0更新为1
    11:58:35.342 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
    11:58:35.346 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
    11:58:35.347 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
    11:58:35.354 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 处理订单[10087],状态由0更新为1
    11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
    11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
    11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
    11:58:35.361 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始完毕......
    11:58:35.363 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
    11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
    11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers

    RabbitMQ死信队列

    使用 RabbitMQ 死信队列依赖于 RabbitMQ 的两个特性: TTLDLX

    TTLTime To Live ,消息存活时间,包括两个维度:队列消息存活时间和消息本身的存活时间。

    DLXDead Letter Exchange ,死信交换器。

    画个图描述一下这两个特性:

    下面为了简单起见, TTL 使用了针对队列的维度。引入 RabbitMQ 的Java驱动:

    dependency>
      groupId>com.rabbitmq/groupId>
      artifactId>amqp-client/artifactId>
      version>5.7.3/version>
      scope>test/scope>
    /dependency>

    代码如下:

    public class DlxMain {
    
      private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
      private static final Logger LOGGER = LoggerFactory.getLogger(DlxMain.class);
    
      public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel producerChannel = connection.createChannel();
        Channel consumerChannel = connection.createChannel();
        // dlx交换器名称为dlx.exchange,类型是direct,绑定键为dlx.key,队列名为dlx.queue
        producerChannel.exchangeDeclare("dlx.exchange", "direct");
        producerChannel.queueDeclare("dlx.queue", false, false, false, null);
        producerChannel.queueBind("dlx.queue", "dlx.exchange", "dlx.key");
        MapString, Object> queueArgs = new HashMap>();
        // 设置队列消息过期时间,5秒
        queueArgs.put("x-message-ttl", 5000);
        // 指定DLX相关参数
        queueArgs.put("x-dead-letter-exchange", "dlx.exchange");
        queueArgs.put("x-dead-letter-routing-key", "dlx.key");
        // 声明业务队列
        producerChannel.queueDeclare("business.queue", false, false, false, queueArgs);
        ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
          Thread thread = new Thread(r);
          thread.setDaemon(true);
          thread.setName("DlxConsumer");
          return thread;
        });
        // 启动消费者
        executorService.execute(() -> {
          try {
            consumerChannel.basicConsume("dlx.queue", true, new DlxConsumer(consumerChannel));
          } catch (IOException e) {
            LOGGER.error(e.getMessage(), e);
          }
        });
        OrderMessage message = new OrderMessage("10086");
        producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
            message.getDescription().getBytes(StandardCharsets.UTF_8));
        LOGGER.info("发送消息成功,订单ID:{}", message.getOrderId());
    
        message = new OrderMessage("10087");
        producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
            message.getDescription().getBytes(StandardCharsets.UTF_8));
        LOGGER.info("发送消息成功,订单ID:{}", message.getOrderId());
    
        message = new OrderMessage("10088");
        producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
            message.getDescription().getBytes(StandardCharsets.UTF_8));
        LOGGER.info("发送消息成功,订单ID:{}", message.getOrderId());
    
        Thread.sleep(Integer.MAX_VALUE);
      }
    
      private static class DlxConsumer extends DefaultConsumer {
    
        DlxConsumer(Channel channel) {
          super(channel);
        }
    
        @Override
        public void handleDelivery(String consumerTag,
                      Envelope envelope,
                      AMQP.BasicProperties properties,
                      byte[] body) throws IOException {
          LOGGER.info("处理消息成功:{}", new String(body, StandardCharsets.UTF_8));
        }
      }
    
      private static class OrderMessage {
    
        private final String orderId;
        private final long timest
        private final String description;
    
        OrderMessage(String orderId) {
          this.orderId = orderId;
          this.timestamp = System.currentTimeMillis();
          this.description = String.format("订单[%s],订单创建时间为:%s", orderId,
              LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F));
        }
    
        public String getOrderId() {
          return orderId;
        }
    
        public long getTimestamp() {
          return timest
        }
    
        public String getDescription() {
          return description;
        }
      }
    }

    运行 main() 方法结果如下:

    16:35:58.638 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10086
    16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10087
    16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10088
    16:36:03.646 [pool-1-thread-4] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10086],订单创建时间为:2019-08-20 16:35:58
    16:36:03.670 [pool-1-thread-5] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10087],订单创建时间为:2019-08-20 16:35:58
    16:36:03.670 [pool-1-thread-6] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10088],订单创建时间为:2019-08-20 16:35:58

    时间轮

    时间轮 TimingWheel 是一种高效、低延迟的调度数据结构,底层采用数组实现存储任务列表的环形队列,示意图如下:

    这里暂时不对时间轮和其实现作分析,只简单举例说明怎么使用时间轮实现延时任务。这里使用 Netty 提供的 HashedWheelTimer ,引入依赖:

    dependency>
      groupId>io.netty/groupId>
      artifactId>netty-common/artifactId>
      version>4.1.39.Final/version>
    /dependency>

    代码如下:

    public class HashedWheelTimerMain {
    
      private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
    
      public static void main(String[] args) throws Exception {
        AtomicInteger counter = new AtomicInteger();
        ThreadFactory factory = r -> {
          Thread thread = new Thread(r);
          thread.setDaemon(true);
          thread.setName("HashedWheelTimerWorker-" + counter.getAndIncrement());
          return thread;
        };
        // tickDuration - 每tick一次的时间间隔, 每tick一次就会到达下一个槽位
        // unit - tickDuration的时间单位
        // ticksPerWhee - 时间轮中的槽位数
        Timer timer = new HashedWheelTimer(factory, 1, TimeUnit.SECONDS, 60);
        TimerTask timerTask = new DefaultTimerTask("10086");
        timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
        timerTask = new DefaultTimerTask("10087");
        timer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
        timerTask = new DefaultTimerTask("10088");
        timer.newTimeout(timerTask, 15, TimeUnit.SECONDS);
        Thread.sleep(Integer.MAX_VALUE);
      }
    
      private static class DefaultTimerTask implements TimerTask {
    
        private final String orderId;
        private final long timest
    
        public DefaultTimerTask(String orderId) {
          this.orderId = orderId;
          this.timestamp = System.currentTimeMillis();
        }
    
        @Override
        public void run(Timeout timeout) throws Exception {
          System.out.println(String.format("任务执行时间:%s,订单创建时间:%s,订单ID:%s",
              LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), orderId));
        }
      }
    }

    运行结果:

    任务执行时间:2019-08-20 17:19:49.310,订单创建时间:2019-08-20 17:19:43.294,订单ID:10086
    任务执行时间:2019-08-20 17:19:54.297,订单创建时间:2019-08-20 17:19:43.301,订单ID:10087
    任务执行时间:2019-08-20 17:19:59.297,订单创建时间:2019-08-20 17:19:43.301,订单ID:10088

    一般来说,任务执行的时候应该使用另外的业务线程池,以免阻塞时间轮本身的运动。

    选用的方案实现过程

    最终选用了基于 Redis 的有序集合 Sorted SetQuartz 短轮询进行实现。具体方案是:

    对于第4点处理有两种方案:

    最终暂时选用了方案一,也就是从 Sorted Set 弹出订单ID并且从 Hash 中获取完推送数据之后马上删除这两个集合中对应的数据。方案的流程图大概是这样:

    这里先详细说明一下用到的 Redis 命令。

    Sorted Set相关命令

    ZADD 命令 - 将一个或多个成员元素及其分数值加入到有序集当中。

    ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN

    ZREVRANGEBYSCORE 命令 - 返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。

    ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]

    max:分数区间 - 最大分数。 min:分数区间 - 最小分数。 WITHSCORES:可选参数,是否返回分数值,指定则会返回得分值。 LIMIT:可选参数,offset和count原理和 MySQLLIMIT offset,size 一致,如果不指定此参数则返回整个集合的数据。 ZREM 命令 - 用于移除有序集中的一个或多个成员,不存在的成员将被忽略。

    ZREM key member [member ...]

    Hash相关命令 HMSET 命令 - 同时将多个field-value(字段-值)对设置到哈希表中。

    HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN

    HDEL 命令 - 删除哈希表key中的一个或多个指定字段,不存在的字段将被忽略。

    HDEL KEY_NAME FIELD1.. FIELDN

    Lua相关 加载 Lua 脚本并且返回脚本的 SHA-1 字符串: SCRIPT LOAD script 。 执行已经加载的 Lua 脚本: EVALSHA sha1 numkeys key [key ...] arg [arg ...]unpack 函数可以把 table 类型的参数转化为可变参数,不过需要注意的是 unpack 函数必须使用在非变量定义的函数调用的最后一个参数,否则会失效,详细见 Stackoverflow 的提问 table.unpack() only returns the first element 。

    PS:如果不熟悉Lua语言,建议系统学习一下,因为想用好Redis,一定离不开Lua。

    引入依赖:

    dependencyManagement>
      dependencies>
        dependency>
          groupId>org.springframework.boot/groupId>
          artifactId>spring-boot-dependencies/artifactId>
          version>2.1.7.RELEASE/version>
          type>pom/type>
          scope>import/scope>
        /dependency>
      /dependencies>
    /dependencyManagement>
    
    dependencies>
      dependency>
        groupId>org.quartz-scheduler/groupId>
        artifactId>quartz/artifactId>
        version>2.3.1/version>
      /dependency>
      dependency>
        groupId>redis.clients/groupId>
        artifactId>jedis/artifactId>
        version>3.1.0/version>
      /dependency>
      dependency>
        groupId>org.springframework.boot/groupId>
        artifactId>spring-boot-starter-web/artifactId>
      /dependency>
      dependency>
        groupId>org.springframework.boot/groupId>
        artifactId>spring-boot-starter-jdbc/artifactId>
      /dependency>  
      dependency>
        groupId>org.springframework/groupId>
        artifactId>spring-context-support/artifactId>
        version>5.1.9.RELEASE/version>
      /dependency> 
      dependency>
        groupId>org.projectlombok/groupId>
        artifactId>lombok/artifactId>
        version>1.18.8/version>
        scope>provided/scope>
      /dependency>
      dependency>
        groupId>com.alibaba/groupId>
        artifactId>fastjson/artifactId>
        version>1.2.59/version>
      /dependency>    
    /dependencies>

    编写 Lua 脚本 /lua/enqueue.lua/lua/dequeue.lua

    -- /lua/enqueue.lua
    local zset_key = KEYS[1]
    local hash_key = KEYS[2]
    local zset_value = ARGV[1]
    local zset_score = ARGV[2]
    local hash_field = ARGV[3]
    local hash_value = ARGV[4]
    redis.call('ZADD', zset_key, zset_score, zset_value)
    redis.call('HSET', hash_key, hash_field, hash_value)
    return nil
    
    -- /lua/dequeue.lua
    -- 参考jesque的部分Lua脚本实现
    local zset_key = KEYS[1]
    local hash_key = KEYS[2]
    local min_score = ARGV[1]
    local max_score = ARGV[2]
    local offset = ARGV[3]
    local limit = ARGV[4]
    -- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代
    local status, type = next(redis.call('TYPE', zset_key))
    if status ~= nil and status == 'ok' then
      if type == 'zset' then
        local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
        if list ~= nil and #list > 0 then
          -- unpack函数能把table转化为可变参数
          redis.call('ZREM', zset_key, unpack(list))
          local result = redis.call('HMGET', hash_key, unpack(list))
          redis.call('HDEL', hash_key, unpack(list))
          return result
        end
      end
    end
    return nil

    编写核心API代码:

    // Jedis提供者
    @Component
    public class JedisProvider implements InitializingBean {
    
      private JedisPool jedisPool;
    
      @Override
      public void afterPropertiesSet() throws Exception {
        jedisPool = new JedisPool();
      }
    
      public Jedis provide(){
        return jedisPool.getResource();
      }
    }
    
    // OrderMessage
    @Data
    public class OrderMessage {
    
      private String orderId;
      private BigDecimal amount;
      private Long userId;
    }
    
    // 延迟队列接口
    public interface OrderDelayQueue {
    
      void enqueue(OrderMessage message);
    
      ListOrderMessage> dequeue(String min, String max, String offset, String limit);
    
      ListOrderMessage> dequeue();
    
      String enqueueSha();
    
      String dequeueSha();
    }
    
    // 延迟队列实现类
    @RequiredArgsConstructor
    @Component
    public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {
    
      private static final String MIN_SCORE = "0";
      private static final String OFFSET = "0";
      private static final String LIMIT = "10";
      private static final String ORDER_QUEUE = "ORDER_QUEUE";
      private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
      private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
      private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
      private static final AtomicReferenceString> ENQUEUE_LUA_SHA = new AtomicReference>();
      private static final AtomicReferenceString> DEQUEUE_LUA_SHA = new AtomicReference>();
      private static final ListString> KEYS = Lists.newArrayList();
    
      private final JedisProvider jedisProvider;
    
      static {
        KEYS.add(ORDER_QUEUE);
        KEYS.add(ORDER_DETAIL_QUEUE);
      }
    
      @Override
      public void enqueue(OrderMessage message) {
        ListString> args = Lists.newArrayList();
        args.add(message.getOrderId());
        args.add(String.valueOf(System.currentTimeMillis()));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        try (Jedis jedis = jedisProvider.provide()) {
          jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args);
        }
      }
    
      @Override
      public ListOrderMessage> dequeue() {
        // 30分钟之前
        String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT);
      }
    
      @SuppressWarnings("unchecked")
      @Override
      public ListOrderMessage> dequeue(String min, String max, String offset, String limit) {
        ListString> args = new ArrayList>();
        args.add(max);
        args.add(min);
        args.add(offset);
        args.add(limit);
        ListOrderMessage> result = Lists.newArrayList();
        try (Jedis jedis = jedisProvider.provide()) {
          ListString> eval = (ListString>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), KEYS, args);
          if (null != eval) {
            for (String e : eval) {
              result.add(JSON.parseObject(e, OrderMessage.class));
            }
          }
        }
        return result;
      }
    
      @Override
      public String enqueueSha() {
        return ENQUEUE_LUA_SHA.get();
      }
    
      @Override
      public String dequeueSha() {
        return DEQUEUE_LUA_SHA.get();
      }
    
      @Override
      public void afterPropertiesSet() throws Exception {
        // 加载Lua脚本
        loadLuaScript();
      }
    
      private void loadLuaScript() throws Exception {
        try (Jedis jedis = jedisProvider.provide()) {
          ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
          String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
          String sha = jedis.scriptLoad(luaContent);
          ENQUEUE_LUA_SHA.compareAndSet(null, sha);
          resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
          luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
          sha = jedis.scriptLoad(luaContent);
          DEQUEUE_LUA_SHA.compareAndSet(null, sha);
        }
      }
    
      public static void main(String[] as) throws Exception {
        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        JedisProvider jedisProvider = new JedisProvider();
        jedisProvider.afterPropertiesSet();
        RedisOrderDelayQueue queue = new RedisOrderDelayQueue(jedisProvider);
        queue.afterPropertiesSet();
        // 写入测试数据
        OrderMessage message = new OrderMessage();
        message.setAmount(BigDecimal.valueOf(10086));
        message.setOrderId("ORDER_ID_10086");
        message.setUserId(10086L);
        message.setTimestamp(LocalDateTime.now().format(f));
        ListString> args = Lists.newArrayList();
        args.add(message.getOrderId());
        // 测试需要,score设置为30分钟之前
        args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        try (Jedis jedis = jedisProvider.provide()) {
          jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args);
        }
        ListOrderMessage> dequeue = queue.dequeue();
        System.out.println(dequeue);
      }
    }

    这里先执行一次 main() 方法验证一下延迟队列是否生效:

    [OrderMessage(orderId=ORDER_ID_10086, amount=10086, userId=10086, timestamp=2019-08-21 08:32:22.885)]

    确定延迟队列的代码没有问题,接着编写一个 QuartzJob 类型的消费者 OrderMessageConsumer

    @DisallowConcurrentExecution
    @Component
    public class OrderMessageConsumer implements Job {
    
      private static final AtomicInteger COUNTER = new AtomicInteger();
      private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
        return thread;
      });
      private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
    
      @Autowired
      private OrderDelayQueue orderDelayQueue;
    
      @Override
      public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        LOGGER.info("订单消息处理定时任务开始执行......");
        ListOrderMessage> messages = orderDelayQueue.dequeue();
        if (!messages.isEmpty()) {
          // 简单的列表等分放到线程池中执行
          ListListOrderMessage>> partition = Lists.partition(messages, 2);
          int size = partition.size();
          final CountDownLatch latch = new CountDownLatch(size);
          for (ListOrderMessage> p : partition) {
            BUSINESS_WORKER_POOL.execute(new ConsumeTask(p, latch));
          }
          try {
            latch.await();
          } catch (InterruptedException ignore) {
            //ignore
          }
        }
        stopWatch.stop();
        LOGGER.info("订单消息处理定时任务执行完毕,耗时:{} ms......", stopWatch.getTotalTimeMillis());
      }
    
      @RequiredArgsConstructor
      private static class ConsumeTask implements Runnable {
    
        private final ListOrderMessage> messages;
        private final CountDownLatch latch;
    
        @Override
        public void run() {
          try {
            // 实际上这里应该单条捕获异常
            for (OrderMessage message : messages) {
              LOGGER.info("处理订单信息,内容:{}", message);
            }
          } finally {
            latch.countDown();
          }
        }
      }
    }

    上面的消费者设计的时候需要有以下考量:

    其他 Quartz 相关的代码:

    // Quartz配置类
    @Configuration
    public class QuartzAutoConfiguration {
    
      @Bean
      public SchedulerFactoryBean schedulerFactoryBean(QuartzAutowiredJobFactory quartzAutowiredJobFactory) {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setAutoStartup(true);
        factory.setJobFactory(quartzAutowiredJobFactory);
        return factory;
      }
    
      @Bean
      public QuartzAutowiredJobFactory quartzAutowiredJobFactory() {
        return new QuartzAutowiredJobFactory();
      }
    
      public static class QuartzAutowiredJobFactory extends AdaptableJobFactory implements BeanFactoryAware {
    
        private AutowireCapableBeanFactory autowireCapableBeanFactory;
    
        @Override
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
          this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory;
        }
    
        @Override
        protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
          Object jobInstance = super.createJobInstance(bundle);
          // 这里利用AutowireCapableBeanFactory从新建的Job实例做一次自动装配,得到一个原型(prototype)的JobBean实例
          autowireCapableBeanFactory.autowireBean(jobInstance);
          return jobInstance;
        }
      }
    }

    这里暂时使用了内存态的 RAMJobStore 去存放任务和触发器的相关信息,如果在生产环境最好替换成基于 MySQL 也就是 JobStoreTX 进行集群化,最后是启动函数和 CommandLineRunner 的实现:

    @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, TransactionAutoConfiguration.class})
    public class Application implements CommandLineRunner {
    
      @Autowired
      private Scheduler scheduler;
    
      @Autowired
      private JedisProvider jedisProvider;
    
      public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
      }
    
      @Override
      public void run(String... args) throws Exception {
        // 准备一些测试数据
        prepareOrderMessageData();
        JobDetail job = JobBuilder.newJob(OrderMessageConsumer.class)
            .withIdentity("OrderMessageConsumer", "DelayTask")
            .build();
        // 触发器5秒触发一次
        Trigger trigger = TriggerBuilder.newTrigger()
            .withIdentity("OrderMessageConsumerTrigger", "DelayTask")
            .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever())
            .build();
        scheduler.scheduleJob(job, trigger);
      }
    
      private void prepareOrderMessageData() throws Exception {
        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        try (Jedis jedis = jedisProvider.provide()) {
          ListOrderMessage> messages = Lists.newArrayList();
          for (int i = 0; i  100; i++) {
            OrderMessage message = new OrderMessage();
            message.setAmount(BigDecimal.valueOf(i));
            message.setOrderId("ORDER_ID_" + i);
            message.setUserId((long) i);
            message.setTimestamp(LocalDateTime.now().format(f));
            messages.add(message);
          }
          // 这里暂时不使用Lua
          MapString, Double> map = Maps.newHashMap();
          MapString, String> hash = Maps.newHashMap();
          for (OrderMessage message : messages) {
            // 故意把score设计成30分钟前
            map.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
            hash.put(message.getOrderId(), JSON.toJSONString(message));
          }
          jedis.zadd("ORDER_QUEUE", map);
          jedis.hmset("ORDER_DETAIL_QUEUE", hash);
        }
      }
    }

    输出结果如下:

    2019-08-21 22:45:59.518  INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务开始执行......
    2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_91, amount=91, userId=91, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_95, amount=95, userId=95, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_97, amount=97, userId=97, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_99, amount=99, userId=99, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_93, amount=93, userId=93, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_94, amount=94, userId=94, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_96, amount=96, userId=96, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_92, amount=92, userId=92, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_98, amount=98, userId=98, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_90, amount=90, userId=90, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:45:59.540  INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务执行完毕,耗时:22 ms......
    2019-08-21 22:46:04.515  INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务开始执行......
    2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_89, amount=89, userId=89, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_87, amount=87, userId=87, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_85, amount=85, userId=85, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_88, amount=88, userId=88, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_83, amount=83, userId=83, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_81, amount=81, userId=81, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_86, amount=86, userId=86, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_82, amount=82, userId=82, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_84, amount=84, userId=84, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_80, amount=80, userId=80, timestamp=2019-08-21 22:45:59.475)
    2019-08-21 22:46:04.516  INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务执行完毕,耗时:1 ms......
    ......

    首次执行的时候涉及到一些组件的初始化,会比较慢,后面看到由于我们只是简单打印订单信息,所以定时任务执行比较快。如果在不调整当前架构的情况下,生产中需要注意:

    这里其实有一个性能隐患,命令 ZREVRANGEBYSCORE 的时间复杂度可以视为为 O(N)N 是集合的元素个数,由于这里把所有的订单信息都放进了同一个 Sorted Set ( ORDER_QUEUE )中,所以在一直有新增数据的时候, dequeue 脚本的时间复杂度一直比较高,后续订单量升高之后会此处一定会成为性能瓶颈,后面会给出解决的方案。

    小结

    这篇文章主要从一个实际生产案例的仿真例子入手,分析了当前延时任务的一些实现方案,还基于 RedisQuartz 给出了一个完整的示例。当前的示例只是处于可运行的状态,有些问题尚未解决。下一篇文章会着眼于解决两个方面的问题:

    1. 分片。
    2. 监控。

    还有一点, 架构是基于业务形态演进出来的,很多东西需要结合场景进行方案设计和改进,思路仅供参考,切勿照搬代码 。

    以上所述是小编给大家介绍的使用Redis实现延时任务的解决方案,非常不错,具有一定的参考借鉴价值,需要的朋友参考下吧!

    您可能感兴趣的文章:
    • golang实现redis的延时消息队列功能示例
    • 利用Redis实现延时处理的方法实例
    • redis实现延时队列的两种方式(小结)
    上一篇:基于redis实现token验证用户是否登陆
    下一篇:Redis全量复制与部分复制示例详解
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    使用Redis实现延时任务的解决方案 使用,Redis,实现,延时,任务,