• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    基于Morphia实现MongoDB按小时、按天聚合操作方法

    MongoDB按照天数或小时聚合

    需求

    最近接到需求,需要对用户账户下的设备状态,分别按照天以及小时进行聚合,以此为基础绘制设备状态趋势图.
    实现思路是启动定时任务,对各用户的设备状态数据分别按照小时以及天进行聚合,并存储进数据库中供用户后续查询.
    涉及到的技术栈分别为:Spring Boot,MongoDB,Morphia.

    数据模型

    @Data
    @Builder
    @Entity(value = "rawDevStatus", noClassnameStored = true)
    // 设备状态索引
    @Indexes({
        // 设置数据超时时间(TTL,MongoDB根据TTL在后台进行数据删除操作)
        @Index(fields = @Field("time"), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)),
        @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
    })
    public class RawDevStatus {
      @Id
      @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
      private ObjectId objectId;
      private String userId;
      private Instant time;
      @Embedded("points")
      ListPoint> protocolPoints;
      @Data
      @AllArgsConstructor
      public static class Point {
        /**
         * 协议类型
         */
        private Protocol protocol;
        /**
         * 设备总数
         */
        private Integer total;
        /**
         * 设备在线数目
         */
        private Integer onlineNum;
        /**
         * 处于启用状态设备数目
         */
        private Integer enableNum;
      }
    }

    上述代码是设备状态实体类,其中设备状态数据是按照设备所属协议进行区分的.

    @Data
    @Builder
    @Entity(value = "aggregationDevStatus", noClassnameStored = true)
    @Indexes({
        @Index(fields = @Field("expireAt"), options = @IndexOptions(expireAfterSeconds = 0)),
        @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
    })
    public class AggregationDevStatus {
      @Id
      @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
      private ObjectId objectId;
      /**
       * 用户ID
       */
      private String userId;
      /**
       * 设备总数
       */
      private Double total;
      /**
       * 设备在线数目
       */
      private Double onlineNum;
      /**
       * 处于启用状态设备数目
       */
      private Double enableNum;
      /**
       * 聚合类型(按照小时还是按照天聚合)
       */
      @Property("aggDuration")
      private AggregationDuration aggregationDuration;
      private Instant time;
      /**
       * 动态设置文档过期时间
       */
      private Instant expireAt;
    }

    上述代码是期待的聚合结果,其中构建两个索引:(1)超时索引;(2)复合索引,程序会根据用户名以及时间查询设备状态聚合结果.

    聚合操作符介绍

    聚合操作类似于管道,管道中的每一步操作产生的中间结果作为下一步的输入源,最终输出聚合结果.

    此次聚合主要涉及以下操作:

    •$project:指定输出文档中的字段.
    •$unwind:拆分数据中的数组;
    •match:选择要处理的文档数据;
    •group:根据key分组聚合结果.

    原始聚合语句

    db.getCollection('raw_dev_status').aggregate([
      {$match:
        {
          time:{$gte: ISODate("2019-06-27T00:00:00Z")},
        }
      },
      {$unwind: "$points"},
      {$project:
        {
          userId:1,points:1,
          tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
        }
      },
      {$project:
        {
          userId:1,points:1,
          groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
        }
      },
      {$group:
        {
          _id:{user_id:'$userId', cal_time:'$groupTime'},
          devTotal:{'$avg':'$points.total'},
          onlineTotal:{'$avg':'$points.onlineNum'},
          enableTotal:{'$avg':'$points.enableNum'}
        }
      },
    ])

    上述代码是按小时聚合数据,以下来逐步介绍处理思路:

    (1) $match

    根据小时聚合数据,因为只需要获取近24小时的聚合结果,所以对数据进行初步筛选.

    (2) $unwind

    raw_dev_status中的设备状态是按照协议区分的数组,因此需要对其进行展开,以便下一步进行筛选;

    (3) $project

      {$project:
        {
          userId:1,points:1,
          tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
        }
      }

    选择需要输出的数据,分别为:userId,points以及tmp.

    需要注意,为了按照时间聚合,对$time属性进行操作,提取%Y:%m:%dT%H时信息至$tmp作为下一步的聚合依据.

    如果需要按天聚合,则format数据可修改为:%Y:%m:%dT00:00:00Z即可满足要求.

    (4) $project

      {$project:
        {
          userId:1,points:1,
          groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
        }
      }

    因为上一步project操作中,tmp为字符串数据,最终的聚合结果需要时间戳(主要懒,不想在程序中进行转换操作).
    因此,此处对$tmp进行操作,转换为时间类型数据,即groupTime.

    (5) $group

    对聚合结果进行分类操作,并生成最终输出结果.

     {$group:
        {
          # 根据_id进行分组操作,依据是`user_id`以及`$groupTime`
          _id:{user_id:'$userId', cal_time:'$groupTime'},
          # 求设备总数平均值
          devTotal:{'$avg':'$points.total'},
          # 求设备在线数平均值
          onlineTotal:{'$avg':'$points.onlineNum'},
          # ...
          enableTotal:{'$avg':'$points.enableNum'}
        }
      }

    代码编写

    此处ODM选择Morphia,亦可以使用MongoTemplate,原理类似.

     /**
       * 创建聚合条件
       *
       * @param pastTime   过去时间段
       * @param dateToString 格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z)
       * @return 聚合条件
       */
      private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) {
        QueryRawDevStatus> query = datastore.createQuery(RawDevStatus.class);
        return datastore.createAggregation(RawDevStatus.class)
            .match(query.field("time").greaterThanOrEq(pastTime))
            .unwind("points", new UnwindOptions().preserveNullAndEmptyArrays(false))
            .match(query.field("points.protocol").equal("ALL"))
            .project(Projection.projection("userId"),
                Projection.projection("points"),
                Projection.projection("convertTime",
                    Projection.expression("$dateToString",
                        new BasicDBObject("format", dateToString)
                            .append("date", "$time"))
                )
            )
            .project(Projection.projection("userId"),
                Projection.projection("points"),
                Projection.projection("convertTime",
                    Projection.expression("$dateFromString",
                        new BasicDBObject("format", stringToDate)
                            .append("dateString", "$convertTime"))
                )
            )
            .group(
                Group.id(Group.grouping("userId"), Group.grouping("convertTime")),
                Group.grouping("total", Group.average("points.total")),
                Group.grouping("onlineNum", Group.average("points.onlineNum")),
                Group.grouping("enableNum", Group.average("points.enableNum"))
            );
      }
      /**
       * 获取聚合结果
       *
       * @param pipeline 聚合条件
       * @return 聚合结果
       */
      private ListAggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) {
        ListAggregationMidDevStatus> statuses = new ArrayList>();
        IteratorAggregationMidDevStatus> resultIterator = pipeline.aggregate(
            AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse(true).build());
        while (resultIterator.hasNext()) {
          statuses.add(resultIterator.next());
        }
        return statuses;
      }
      //......................................................................................
      // 获取聚合结果(省略若干代码)
      AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate);
      ListAggregationMidDevStatus> midStatuses = getAggregationResult(pipeline);
      if (CollectionUtils.isEmpty(midStatuses)) {
        log.warn("Can not get dev status aggregation result.");
        return;
      }

    总结

    以上所述是小编给大家介绍的基于Morphia实现MongoDB按小时、按天聚合操作方法,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对脚本之家网站的支持!
    如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

    您可能感兴趣的文章:
    • JAVA mongodb 聚合几种查询方式详解
    • Mongodb中MapReduce实现数据聚合方法详解
    • Mongodb聚合函数count、distinct、group如何实现数据聚合操作
    • MongoDB教程之聚合(count、distinct和group)
    • MongoDB聚合功能浅析
    上一篇:Mongodb的oplog详解
    下一篇:MongoDB中的定时索引示例详解
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    基于Morphia实现MongoDB按小时、按天聚合操作方法 基于,Morphia,实现,MongoDB,