• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    nginx日志导入elasticsearch的方法示例

    将nginx日志通过filebeat收集后传入logstash,经过logstash处理后写入elasticsearch。filebeat只负责收集工作,logstash完成日志的格式化,数据的替换,拆分 ,以及将日志写入elasticsearch后的索引的创建。

    1、配置nginx日志格式

    log_format main    '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request ' 
                '$status $body_bytes_sent $http_referer ' 
                '"$http_user_agent" '
                '"$connection" '
                '"$http_cookie" '
                '$request_time '
                '$upstream_response_time';
    

    2、安装配置filebeat,启用nginx module

    tar -zxvf filebeat-6.2.4-linux-x86_64.tar.gz -C /usr/local
    cd /usr/local;ln -s filebeat-6.2.4-linux-x86_64 filebeat
    cd /usr/local/filebeat
    

    启用nginx模块

    ./filebeat modules enable nginx
    
    

    查看模块

    ./filebeat modules list
    
    

    创建配置文件

    vim /usr/local/filebeat/blog_module_logstash.yml
    filebeat.modules:
    - module: nginx
     access:
      enabled: true
      var.paths: ["/home/weblog/blog.cnfol.com_access.log"]
     #error:
     # enabled: true
     # var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"]
    
    
    output.logstash:
     hosts: ["192.168.15.91:5044"]
    
    

    启动filebeat

    ./filebeat -c blog_module_logstash.yml -e
    
    

    3、配置logstash

    tar -zxvf logstash-6.2.4.tar.gz /usr/local
    cd /usr/local;ln -s logstash-6.2.4 logstash
    创建一个nginx日志的pipline文件
    cd /usr/local/logstash
    

    logstash内置的模板目录

    vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns

    编辑 grok-patterns 添加一个支持多ip的正则

    FORWORD (?:%{IPV4}[,]?[ ]?)+|%{WORD}

    官方grok

    http://grokdebug.herokuapp.com/patterns#

    创建logstash pipline配置文件

    #input {
    # stdin {}
    #}
    # 从filebeat接受数据
    input {
     beats {
     port => 5044
     host => "0.0.0.0"
     }
    }
    
    filter {
     # 添加一个调试的开关
     mutate{add_field => {"[@metadata][debug]"=>true}}
     grok {
     # 过滤nginx日志
     #match => { "message" => "%{NGINXACCESS_TEST2}" }
     #match => { "message" => '%{IPORHOST:clientip} # (?<http_x_forwarded_for>[^\#]*) # \[%{HTTPDATE:[@metadata][webtime]}\] # %{NOTSPACE:hostname} # %{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} # %{NUMBER:response} # (?:%{NUMBER:bytes}|-) # (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) # (?:"(?<http_user_agent>[^#]*)") # (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) # (?:"(?<cookies>[^#]*)") # %{NUMBER:request_time:float} # (?:%{NUMBER:upstream_response_time:float}|-)' }
     #match => { "message" => '(?:%{IPORHOST:clientip}|-) (?:%{TWO_IP:http_x_forwarded_for}|%{IPV4:http_x_forwarded_for}|-) \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) (?:"(?<cookies>[^#]*)") %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }
        match => { "message" => '(?:%{IPORHOST:clientip}|-) %{FORWORD:http_x_forwarded_for} \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) %{QS:cookie} %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }
     }
     # 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp
     ruby { 
     #code => "event.set('@read_timestamp',event.get('@timestamp'))"
     #将时区改为东8区
     code => "event.set('@read_timestamp',event.get('@timestamp').time.localtime + 8*60*60)"
     }
     # 将nginx的日志记录时间格式化
     # 格式化时间 20/May/2015:21:05:56 +0000
     date {
     locale => "en"
     match => ["[@metadata][webtime]","dd/MMM/yyyy:HH:mm:ss Z"]
     }
     # 将bytes字段由字符串转换为数字
     mutate {
     convert => {"bytes" => "integer"}
     }
     # 将cookie字段解析成一个json
     #mutate {
     # gsub => ["cookies",'\;',',']
     #} 
     # 如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip
     if[http_x_forwarded_for] =~ ", "{
         ruby {
             code => 'event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])'
            }
        }
     # 解析ip,获得ip的地理位置
     geoip {
     source => "http_x_forwarded_for"
     # # 只获取ip的经纬度、国家、城市、时区
     fields => ["location","country_name","city_name","region_name"] 
     }
     # 将agent字段解析,获得浏览器、系统版本等具体信息
     useragent {
     source => "agent"
     target => "useragent"
     }
     #指定要删除的数据
     #mutate{remove_field=>["message"]}
     # 根据日志名设置索引名的前缀
     ruby {
     code => 'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])'
     } 
     # 将@timestamp 格式化为2019.04.23
     ruby {
     code => 'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%Y.%m.%d"))'
     }
     # 设置输出的默认索引名
     mutate {
     add_field => {
      #"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+YYYY.MM.dd}"
      "[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}"
     }
     }
     # 将cookies字段解析成json
    # mutate {
    # gsub => [
    #  "cookies", ";", ",",
    #  "cookies", "=", ":"
    # ]
    # #split => {"cookies" => ","}
    # }
    # json_encode {
    # source => "cookies"
    # target => "cookies_json"
    # }
    # mutate {
    # gsub => [
    #  "cookies_json", ',', '","',
    #  "cookies_json", ':', '":"'
    # ]
    # }
    # json {
    # source => "cookies_json"
    # target => "cookies2"
    # }
     # 如果grok解析存在错误,将错误独立写入一个索引
     if "_grokparsefailure" in [tags] {
     #if "_dateparsefailure" in [tags] {
     mutate {
      replace => {
      #"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+YYYY.MM.dd}"
      "[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"
      }
     }
     # 如果不存在错误就删除message
     }else{
     mutate{remove_field=>["message"]}
     }
    }
    
    output {
     if [@metadata][debug]{
     # 输出到rubydebuyg并输出metadata
     stdout{codec => rubydebug{metadata => true}}
     }else{
     # 将输出内容转换成 "."
     stdout{codec => dots} 
     # 将输出到指定的es
     elasticsearch {
      hosts => ["192.168.15.160:9200"]
      index => "%{[@metadata][index]}"
      document_type => "doc"
     } 
     }
    }
    
    

    启动logstash

    nohup bin/logstash -f test_pipline2.conf &

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

    上一篇:配置Nginx的防盗链的操作方法
    下一篇:nginx 平滑重启的实现方法
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯

    时间:9:00-21:00 (节假日不休)

    地址:江苏信息产业基地11号楼四层

    《增值电信业务经营许可证》 苏B2-20120278

    nginx日志导入elasticsearch的方法示例 nginx,日志,导入,elasticsearch,