• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    MySQL 与 Elasticsearch 数据不对称问题解决办法

    MySQL 与 Elasticsearch 数据不对称问题解决办法

    jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。

    当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。

    这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化

    mysql> desc article;
    +-------------+--------------+------+-----+--------------------------------+-------+
    | Field    | Type     | Null | Key | Default            | Extra |
    +-------------+--------------+------+-----+--------------------------------+-------+
    | id     | int(11)   | NO  |   | 0               |    |
    | title    | mediumtext  | NO  |   | NULL              |    |
    | description | mediumtext  | YES |   | NULL              |    |
    | author   | varchar(100) | YES |   | NULL              |    |
    | source   | varchar(100) | YES |   | NULL              |    |
    | content   | longtext   | YES |   | NULL              |    |
    | status   | enum('Y','N')| NO  |   | 'N'              |    |
    | ctime    | timestamp  | NO  |   | CURRENT_TIMESTAMP       |    |
    | mtime    | timestamp  | YES |   | ON UPDATE CURRENT_TIMESTAMP  |    |
    +-------------+--------------+------+-----+--------------------------------+-------+
    7 rows in set (0.00 sec)

    logstash 增加 mtime 的查询规则

    jdbc {
      jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
      jdbc_user => "cms"
      jdbc_password => "password"
      schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次
      statement => "select * from article where mtime > :sql_last_value"
      use_column_value => true
      tracking_column => "mtime"
      tracking_column_type => "timestamp" 
      record_last_run => true
      last_run_metadata_path => "/var/tmp/article-mtime.last"
     }
    

    创建回收站表,这个事用于解决数据库删除,或者禁用 status = 'N' 这种情况的。

    CREATE TABLE `elasticsearch_trash` (
     `id` int(11) NOT NULL,
     `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
     PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8

    为 article 表创建触发器

    CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
    BEGIN
     -- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。
     IF NEW.status = 'N' THEN
     insert into elasticsearch_trash(id) values(OLD.id);
     END IF;
     -- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。
      IF NEW.status = 'Y' THEN
     delete from elasticsearch_trash where id = OLD.id;
     END IF;
    END
    
    CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
    BEGIN
     -- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。
     insert into elasticsearch_trash(id) values(OLD.id);
    END
    

    接下来我们需要写一个简单地 Shell 每分钟运行一次,从 elasticsearch_trash 数据表中取出数据,然后使用 curl 命令调用 elasticsearch restful 接口,删除被收回的数据。

    你还可以开发相关的程序,这里提供一个 Spring boot 定时任务例子。

    实体

    package cn.netkiller.api.domain.elasticsearch;
    
    import java.util.Date;
    
    import javax.persistence.Column;
    import javax.persistence.Entity;
    import javax.persistence.Id;
    import javax.persistence.Table;
    
    @Entity
    @Table
    public class ElasticsearchTrash {
     @Id
     private int id;
    
     @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
     private Date ctime;
    
     public int getId() {
     return id;
     }
    
     public void setId(int id) {
     this.id = id;
     }
    
     public Date getCtime() {
     return ctime;
     }
    
     public void setCtime(Date ctime) {
     this.ctime = ctime;
     }
    
    }
    
    

    仓库

    package cn.netkiller.api.repository.elasticsearch;
    
    import org.springframework.data.repository.CrudRepository;
    
    import com.example.api.domain.elasticsearch.ElasticsearchTrash;
    
    public interface ElasticsearchTrashRepository extends CrudRepositoryElasticsearchTrash, Integer>{
    
    
    }
    
    

    定时任务

    package cn.netkiller.api.schedule;
    
    import org.elasticsearch.action.delete.DeleteResponse;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.rest.RestStatus;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import com.example.api.domain.elasticsearch.ElasticsearchTrash;
    import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;
    
    @Component
    public class ScheduledTasks {
     private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
    
     @Autowired
     private TransportClient client;
    
     @Autowired
     private ElasticsearchTrashRepository alasticsearchTrashRepository;
    
     public ScheduledTasks() {
     }
    
     @Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务
     public void cleanTrash() {
     for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {
      DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();
      RestStatus status = response.status();
      logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());
      if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {
      alasticsearchTrashRepository.delete(elasticsearchTrash);
      }
     }
     }
    }
    
    

    Spring boot 启动主程序。

    package cn.netkiller.api;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    @SpringBootApplication
    @EnableScheduling
    public class Application {
    
     public static void main(String[] args) {
     SpringApplication.run(Application.class, args);
     }
    }
     
    
    

    以上就是MySQL 与 Elasticsearch 数据不对称问题解决办法的讲解,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

    您可能感兴趣的文章:
    • windows下MySQL5.6版本安装及配置过程附有截图和详细说明
    • Mysql字符串截取函数SUBSTRING的用法说明
    • MySQL日期数据类型、时间类型使用总结
    • MySQL创建用户与授权方法
    • MySQL 的CASE WHEN 语句使用说明
    • mysql update语句的用法详解
    • MySQL提示:The server quit without updating PID file问题的解决办法
    上一篇:MySQL5.7.03 更换高版本到MySQL 5.7.17安装过程及发现问题解决方案
    下一篇:mysql中url时区的陷阱该如何规避详解
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    MySQL 与 Elasticsearch 数据不对称问题解决办法 MySQL,与,Elasticsearch,数据,