• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    PostgreSQL 数据同步到ES 搭建操作

    安装python 和dev 开发包

    [root@rtm2 Packages]# rpm -ivh python-devel-2.7.5-58.el7.x86_64.rpm
    准备中...       ################################# [100%]
    正在升级/安装...
     1:python-devel-2.7.5-58.el7  ################################# [100%]
    [root@rtm2 Packages]# ls

    安装 multicorn

    [root@rtm2 multicorn-1.3.5]# make
    Python version is 2.7
    gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/errors.o src/errors.c
    gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/python.o src/python.c
    gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/query.o src/query.c
    gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/multicorn.o src/multicorn.c
    gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -shared -o multicorn.so src/errors.o src/python.o src/query.o src/multicorn.o -L/opt/pgsql-10/lib -Wl,--as-needed -Wl,-rpath,'/opt/pgsql-10/lib',--enable-new-dtags -lpthread -ldl -lutil -lm -lpython2.7 -lpthread -ldl -lutil -lm -lpython2.7 -Xlinker -export-dynamic
    .//preflight-check.sh
    cp sql/multicorn.sql sql/multicorn--1.3.5.sql
    [root@rtm2 multicorn-1.3.5]# make install
    Python version is 2.7
    ...

    安装pg-es-fdw-master

    [root@rtm2 multicorn-1.3.5]# cd ../pg-es-fdw-master
    [root@rtm2 pg-es-fdw-master]# ls
    demo.sh dite LICENSE README.md setup.py
    [root@rtm2 pg-es-fdw-master]# python setup.py build
    running build
    running build_py
    creating build
    creating build/lib
    creating build/lib/dite
    copying dite/__init__.py -> build/lib/dite
    [root@rtm2 pg-es-fdw-master]# python setup.py install
    running install
    running bdist_egg
    running egg_info
    creating dite.egg-info
    writing dite.egg-info/PKG-INFO

    安装插件 multicorn

    [postgres@rtm2 ~]$ psql
    psql (10.3)
    Type "help" for help.
    postgres=# select * from pg_extension;
     extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition
    ---------+----------+--------------+----------------+------------+-----------+--------------
     plpgsql |  10 |   11 | f    | 1.0  |   |
    (1 row)
    postgres=# CREATE EXTENSION multicorn;
    CREATE EXTENSION
    postgres=# psql
    postgres=# select * from pg_extension;
     extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition
    -----------+----------+--------------+----------------+------------+-----------+--------------
     plpgsql |  10 |   11 | f    | 1.0  |   |
     multicorn |  10 |   2200 | t    | 1.3.5  |   |
    (2 rows)
    postgres=# CREATE SERVER multicorn_es FOREIGN DATA WRAPPER multicorn OPTIONS(wrapper 'dite.ElasticsearchFDW');
    CREATE SERVER
    postgres=#
    

    es

    [root@rtm2 config]# vi elasticsearch.yml
    node.name: "es-node1"
    network.host: 192.168.31.121
    discovery.zen.ping.unicast.hosts: ["192.168.31.121"]
    [root@rtm2 config]# vi /etc/sysctl.conf
    vm.max_map_count=262144
    sysctl -p
    [root@rtm2 config]# vi /etc/security/limits.conf
    # End of file
     root soft nofile 65536
    root hard nofile 65536
    root soft nproc 4096
    root hard nproc 4096
    ~

    启动es

    [root@rtm2 bin]# ls
    elasticsearch  elasticsearch.in.bat elasticsearch-service-mgr.exe elasticsearch-service-x86.exe plugin.bat
    elasticsearch.bat elasticsearch.in.sh elasticsearch-service-x64.exe plugin       service.bat
    [root@rtm2 bin]# ./bin/elasticsearch
    test=# CREATE FOREIGN TABLE pp_es (id bigint,age bigint) SERVER multicorn_es OPTIONS (host
    test(# '192.168.31.121', port '9200', node 'es-node1', index 'pp');
    CREATE FOREIGN TABLE
    test=#

    创建触发器和外部表

    test=# CREATE OR REPLACE FUNCTION index_pp() RETURNS trigger AS $def$
    test$# BEGIN
    test$# INSERT INTO pp_es (id, age) VALUES
    test$# (NEW.id, NEW.age);
    test$# RETURN NEW;
    test$# END;
    test$# $def$ LANGUAGE plpgsql;
    CREATE FUNCTION
    test=# CREATE TRIGGER es_insert_pp AFTER INSERT ON pp FOR EACH ROW EXECUTE PROCEDURE index_pp();
    CREATE TRIGGER
    test=#

    新增数据测试

    test=# insert into pp (id,age) values (1,11);
    INSERT 0 1
    test=# select * from pp;
     id | age
    ----+-----
     1 | 11
    (1 row)
    test=#
    

    检查es数据

    [root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search?q=*:*pretty'
    {
     "took" : 104,
     "timed_out" : false,
     "_shards" : {
     "total" : 5,
     "successful" : 5,
     "failed" : 0
     },
     "hits" : {
     "total" : 2,
     "max_score" : 1.0,
     "hits" : [ {
      "_index" : "es-node1",
      "_type" : "pp",
      "_id" : "1",
      "_score" : 1.0,
      "_source":{"age": "11"}
     }, {
      "_index" : "es-node1",
      "_type" : "pp",
      "_id" : "2",
      "_score" : 1.0,
      "_source":{"age": "22"}
     } ]
     }
    }
    [root@rtm2 ~]#

    创建更新触发器

    test=# CREATE OR REPLACE FUNCTION updadeIndex_pp() RETURNS trigger AS $def$
    BEGIN
    UPDATE pp_es SET
    id = NEW.id,
    age = NEW.age
    where id =NEW.id;
    RETURN NEW;
    END;
    $def$ LANGUAGE plpgsql;
    CREATE FUNCTION
    test=# ^C
    test=#
    test=# CREATE TRIGGER es_update_pp AFTER UPDATE OF id, age ON pp FOR EACH ROW WHEN (OLD.* IS DISTINCT
    test(# FROM NEW.*)EXECUTE PROCEDURE updadeIndex_pp();
    CREATE TRIGGER
    test=#
    

    更新表数据

    test=# select * from pp;
     id | age
    ----+-----
     1 | 11
     2 | 22
     3 | 22
    (3 rows)
    test=# update pp a set a.age = 33 where a.id = 3;
    ERROR: column "a" of relation "pp" does not exist
    LINE 1: update pp a set a.age = 33 where a.id = 3;
          ^
    test=# update pp set age = 33 where id = 3;
    UPDATE 1
    test=# select * from pp;
     id | age
    ----+-----
     1 | 11
     2 | 22
     3 | 33
    (3 rows)
    test=#
    

    es查询变更

    [root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search?q=*:*pretty'
    {
     "took" : 4,
     "timed_out" : false,
     "_shards" : {
     "total" : 5,
     "successful" : 5,
     "failed" : 0
     },
     "hits" : {
     "total" : 3,
     "max_score" : 1.0,
     "hits" : [ {
      "_index" : "es-node1",
      "_type" : "pp",
      "_id" : "1",
      "_score" : 1.0,
      "_source":{"age": "11"}
     }, {
      "_index" : "es-node1",
      "_type" : "pp",
      "_id" : "2",
      "_score" : 1.0,
      "_source":{"age": "22"}
     }, {
      "_index" : "es-node1",
      "_type" : "pp",
      "_id" : "3",
      "_score" : 1.0,
      "_source":{"age": "33"}
     } ]
     }
    }
    [root@rtm2 ~]# 

    补充:logstash同步pgsql数据到Elasticsearch

    一、对于logstash的配置我就不在多说,主要是三部分,input、filter、output的配置

    二、配置步骤

    1、input配置

    input {
     stdin {
     }
     jdbc {
      jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/world"
      jdbc_user => "postgres"
      jdbc_password => "zhang123"
      jdbc_driver_library => "D:\logstash-6.4.0\bin\pgsql\postgresql-42.2.5.jar"
      jdbc_driver_class => "org.postgresql.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "300000"
      use_column_value => "true"
      tracking_column => "id"
      statement_filepath => "D:\logstash-6.4.0\bin\pgsql\jdbc.sql"
     schedule => "* * * * *"
     type => "jdbc"
     jdbc_default_timezone =>"Asia/Shanghai"
     }
    }

    2、filter配置

    filter {
     json {
      source => "message"
      remove_field => ["message"]
     }
    }

    3、output 配置,就是elasticsearch的基本配置

    output {
     elasticsearch {
      hosts => ["localhost:9200"]
      index => "test_out"
     template => "D:\logstash-6.4.0\bin\pgsql\es-template.json"
     template_name => "t-statistic-out-logstash"
     template_overwrite => true
     document_type => "out"
      document_id => "%{id}"
     }
     stdout {
      codec => json_lines
     }
    }

    以上就是整个logstash 的jdbc.conf

    4、es-template.json的配置

    {
     "template" : "t-statistis-out-template", 
     "order":1,
     "settings": {
       "index": {
        "refresh_interval": "5s"
       }
      },
     "mappings": {
       "_default_": {
     "_all" : {"enabled":false}, 
        "dynamic_templates": [
         { 
        "message_field" : { 
        "match" : "message", 
        "match_mapping_type" : "string", 
        "mapping" : { "type" : "string", "index" : "not_analyzed" } 
        } 
       }, { 
        "string_fields" : { 
        "match" : "*", 
        "match_mapping_type" : "string", 
        "mapping" : { "type" : "string", "index" : "not_analyzed" } 
        } 
       }
        ],
        "properties": {
         "@timestamp": {
          "type": "date"
         },
         "@version": {
          "type": "keyword"
         },     
      "id": {
          "type": "keyword"
         },
      "name": {
          "type": "keyword"
         },
      "pp": {
          "type": "keyword"
         }  
        }
       }
      },
      "aliases": {}
     
    }

    最后就是就是下载好pgsql的连接驱动,这个官网可以下载;配置好自己的数据库表格的数据

    启动命令:进入到logstash的bin目录下,自己的logstash配置都是放在bin的pgsql这个目录下面(这个自己随意创建位置都可以)

    logstash.bat -f ./pgsql/jdbc.conf

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。如有错误或未考虑完全的地方,望不吝赐教。

    您可能感兴趣的文章:
    • PostgreSQL 主备数据宕机恢复测试方案
    • PostgreSQL+Pgpool实现HA主备切换的操作
    • postgresql 如何查看pg_wal目录下xlog文件总大小
    • postgresql之使用lsn 获取 wal文件名的实例
    • 修改postgresql存储目录的操作方式
    • postgresql运维之远程迁移操作
    • postgresql 12版本搭建及主备部署操作
    上一篇:Postgresql 跨库同步表及postgres_fdw的用法说明
    下一篇:基于PostgreSQL pg_hba.conf 配置参数的使用说明
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    PostgreSQL 数据同步到ES 搭建操作 PostgreSQL,数据,同步,到,搭建,