• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    python实现MySQL指定表增量同步数据到clickhouse的脚本

    python实现MySQL指定表增量同步数据到clickhouse,脚本如下:

    #!/usr/bin/env python3
    # _*_ coding:utf8 _*_
     
    from pymysqlreplication import BinLogStreamReader
    from pymysqlreplication.row_event import (DeleteRowsEvent,UpdateRowsEvent,WriteRowsEvent,)
    import clickhouse_driver
    import configparser
    import os
     
    configfile='repl.ini'
    ########## 配置文件repl.ini 操作 ##################
    def create_configfile(configfile,log_file,log_pos):
      config = configparser.ConfigParser()
     
      if not os.path.exists(configfile):
        config['replinfo'] = {'log_file':log_file,'log_pos':str(log_pos)}
     
        with open(configfile,'w+') as f:
          config.write(f)
     
    ### repl.ini 写操作 ##################
    def write_config(configfile,log_file,log_pos):
      config = configparser.ConfigParser()
      config.read(configfile)
     
      config.set('replinfo','log_file',log_file)
      config.set('replinfo','log_pos',str(log_pos))
     
      if os.path.exists(configfile):
        with open(configfile,'w+') as f:
          config.write(f)
      else:
        create_configfile(configfile)
     
    ### 配置文件repl.ini 读操作 ##################
    def read_config(configfile):
      config = configparser.ConfigParser()
      config.read(configfile)
      # print(config['replinfo']['log_file'])
      # print(config['replinfo']['log_pos'])
      return (config['replinfo']['log_file'],int(config['replinfo']['log_pos']))
     
    ############# clickhouse 操作 ##################
    def ops_clickhouse(db,table,sql):
      column_type_dic={}
      try:
        client = clickhouse_driver.Client(host='127.0.0.1',\
    
                         port=9000,\
    
                         user='default',\
    
                         password='clickhouse')
        # sql="select name,type from system.columns where database='{0}' and table='{1}'".format(db,table)
        client.execute(sql)
     
      except Exception as error:
        message = "获取clickhouse里面的字段类型错误. %s" % (error)
        # logger.error(message)
        print(message)
        exit(1)
     
    MYSQL_SETTINGS = {'host':'127.0.0.1','port':13306,'user':'root','passwd':'Root@0101'}
    only_events=(DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent)
    def main():
      ## 每次重启时,读取上次同步的log_file,log_pos
      (log_file,log_pos) = read_config(configfile)
      # print(log_file+'|'+ str(log_pos))
      print('-----------------------------------------------------------------------------')
      stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS, resume_stream=True, blocking=True, \
    
                    server_id=10,
                     only_tables='t_repl', only_schemas='test', \
    
                    log_file=log_file,log_pos=log_pos, \
    
                    only_events=only_events, \
    
                    fail_on_table_metadata_unavailable=True, slave_heartbeat=10)
     
      try:
        for binlogevent in stream:
          for row in binlogevent.rows:
            ## delete操作
            if isinstance(binlogevent, DeleteRowsEvent):
              info = dict(row["values"].items())
              # print("DELETE FROM `%s`.`%s` WHERE %s = %s ;" %(binlogevent.schema ,binlogevent.table,binlogevent.primary_key,info[binlogevent.primary_key]) )
              # print("ALTER TABLE `%s`.`%s` DELETE WHERE %s = %s ;" %(binlogevent.schema ,binlogevent.table,binlogevent.primary_key,info[binlogevent.primary_key]) )
              sql="ALTER TABLE `%s`.`%s` DELETE WHERE %s = %s ;" %(binlogevent.schema ,binlogevent.table,binlogevent.primary_key,info[binlogevent.primary_key])
     
            ## update 操作
            elif isinstance(binlogevent, UpdateRowsEvent):
              info_before = dict(row["before_values"].items())
              info_after = dict(row["after_values"].items())
              # info_set = str(info_after).replace(":","=").replace("{","").replace("}","")
              info_set = str(info_after).replace(":", "=").replace("{", "").replace("}", "").replace("'","")
              # print("UPDATE `%s`.`%s` SET %s WHERE %s = %s ;"%(binlogevent.schema,binlogevent.table,info_set,binlogevent.primary_key,info_before[binlogevent.primary_key]  ) )
              # print("ALTER TABLE %s.%s UPDATE %s WHERE %s = %s ;"%(binlogevent.schema,binlogevent.table,info_set,binlogevent.primary_key,info_before[binlogevent.primary_key]  ) )
              sql = "ALTER TABLE %s.%s UPDATE %s WHERE %s = %s ;"%(binlogevent.schema,binlogevent.table,info_set,binlogevent.primary_key,info_before[binlogevent.primary_key]  )
     
            ## insert 操作
            elif isinstance(binlogevent, WriteRowsEvent):
              info = dict(row["values"].items())
              # print("INSERT INTO %s.%s(%s)VALUES%s ;"%(binlogevent.schema,binlogevent.table , ','.join(info.keys()) ,str(tuple(info.values())) ) )
              sql = "INSERT INTO %s.%s(%s)VALUES%s ;"%(binlogevent.schema,binlogevent.table , ','.join(info.keys()) ,str(tuple(info.values())) )
            ops_clickhouse('test', 't_repl',sql )
     
            # 当前log_file,log_pos写入配置文件
            write_config(configfile, stream.log_file, stream.log_pos)
     
      except Exception as e:
        print(e)
      finally:
        stream.close()
     
    if __name__ == "__main__":
      main()
     
     
     
    '''
    BinLogStreamReader()参数
    ctl_connection_settings:集群保存模式信息的连接设置
    resume_stream:从位置或binlog的最新事件或旧的可用事件开始
    log_file:设置复制开始日志文件
    log_pos:设置复制开始日志pos(resume_stream应该为true)
    auto_position:使用master_auto_position gtid设置位置
    blocking:在流上读取被阻止
    only_events:允许的事件数组
    ignored_events:被忽略的事件数组
    only_tables:包含要观看的表的数组(仅适用于binlog_format ROW)
    ignored_tables:包含要跳过的表的数组
    only_schemas:包含要观看的模式的数组
    ignored_schemas:包含要跳过的模式的数组
    freeze_schema:如果为true,则不支持ALTER TABLE。速度更快。
    skip_to_timestamp:在达到指定的时间戳之前忽略所有事件。
    report_slave:在SHOW SLAVE HOSTS中报告奴隶。
    slave_uuid:在SHOW SLAVE HOSTS中报告slave_uuid。
    fail_on_table_metadata_unavailable:如果我们无法获取有关row_events的表信息,应该引发异常
    slave_heartbeat:(秒)主站应主动发送心跳连接。这也减少了复制恢复时GTID复制的流量(在许多事件在binlog中跳过的情况下)。请参阅mysql文档中的MASTER_HEARTBEAT_PERIOD以了解语义
    '''

    知识点扩展:

    MySQL备份-增量同步

    mysql增量同步主要使用binlog文件进行同步,binlog文件主要记录的是数据库更新操作相关的内容。

    1. 备份数据的意义

    针对不同业务,7*24小时提供服务和数据的重要性不同。
    数据库数据是比较核心的数据,对企业的经营至关重要,数据库备份显得尤为重要。

    2. 备份数据库

    MySQL数据库自带的备份命令 `mysqldump`,基本使用方法:
    语法:`mysqldump -u username -p password dbname > filename.sql`

    执行备份命令

    `mysqldump -uroot -pmysqladmin db_test > /opt/mysql_bak.sql`

    查看备份内容

    `grep -v "#|\*|--|^$" /opt/mysql_bak.sql`

    到此这篇关于python实现MySQL指定表增量同步数据到clickhouse的脚本的文章就介绍到这了,更多相关python实现MySQL增量同步数据内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    您可能感兴趣的文章:
    • 基于python的mysql复制工具详解
    • 由Python编写的MySQL管理工具代码实例
    • python实现读取excel写入mysql的小工具详解
    • Python操作MySQL数据库的简单步骤分享
    • Python爬虫爬取全球疫情数据并存储到mysql数据库的步骤
    • Python爬取腾讯疫情实时数据并存储到mysql数据库的示例代码
    • 解决python mysql insert语句的问题
    • python 在mysql中插入null空值的操作
    • 用python开发一款操作MySQL的小工具
    上一篇:详解python的xlwings库读写excel操作总结
    下一篇:基于tensorflow __init__、build 和call的使用小结
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    python实现MySQL指定表增量同步数据到clickhouse的脚本 python,实现,MySQL,指定,表,