• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    python脚本使用阿里云slb对恶意攻击进行封堵的实现

    环境准备:

    1.安装python3.7和相关的依赖

    并安装redis缓存数据库

    pip install aliyun-python-sdk-core
    pip install aliyun-python-sdk-slb
    pip intall IPy
    pip intall redis
    pip intall paramiko

    2.添加ram访问控制的编程接口用户

    3.添加slb的访问控制策略并和需要频控的slb进行绑定

    redis封堵ip的格式

    脚本程序目录

    Aliyun_SLB_Manager
    ├── helpers
    │   ├── common.py
    │   ├── email.py
    │   ├── remote.py
    │   └── slb.py
    ├── logs
    │   └── run_20210204.log
    └── run.py

    # 程序核心就是使用shell命令对nginx的日志中出现的ip地址 和 访问的接口进行过滤,找出访问频繁的那些程序加入slb黑名单,同时加入redis缓存,因为slb有封堵ip个数限制,redis中存储的ip需要设置过期时间,对比后删除slb中封堵的Ip

    # grep 04/Feb/2021:15:4 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -200
      2454 114.248.45.15
      1576 47.115.122.23
      1569 47.107.239.148
      269 112.32.217.52
    
    grep 04/Feb/2021:14:5 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | awk -F ':' '{print $2}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >15)print $1,$2}'
    
    [root@alisz-edraw-api-server-web01:~]# grep 04/Feb/2021:15:4 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -3
      2454 114.248.45.15
      1576 47.115.122.23
      1569 47.107.239.148

    python脚本
    主入口程序

    run.py

    import time
    from helpers.email import send_mail
    from helpers.remote import get_black_ips
    from helpers.common import is_white_ip,get_ban_ip_time,set_ban_ip_time,groups
    from helpers.slb import slb_add_host,slb_del_host,slb_get_host
    
    if __name__ == "__main__":
      # aliyun 访问控制针对 slb 的管理用户
      # 用户登录名称 slb-frequency-user@xxx.onaliyun.com
      accessKeyId = 'id'
      accessSecret = 'pass'
    
      # slb 访问控制策略id
      acl_id = 'acl-slb'
      # reginid 查询地址:https://help.aliyun.com/document_detail/40654.html?spm=a2c6h.13066369.0.0.54a17471VmN3kA
      region_id = 'cn-shenzhen'
      # 黑名单限制个数 300
      slb_limit = 200
      # 每10分钟访问限制阈值
      threshold = 50
      # 接收邮箱
      mails = ['reblue520@chinasoft.cn']
    
      # 远程ssh执行grep过滤出可疑ip
      res = get_black_ips(threshold)
      deny_host_list = res[0]
      hosts_with_count = res[1]
      hosts_with_count = sorted(hosts_with_count.items(), key=lambda x: x[1] , reverse=True)
      print(hosts_with_count)
      # exit()
      # 等待被ban的ip , 过滤掉ip白名单
      deny_hosts = []
      for host in deny_host_list:
        if (is_white_ip(host) == False):
          deny_hosts.append(host + '/32')
    
      # 获取所有已经被ban的ip
      response = slb_get_host(accessKeyId , accessSecret , acl_id , region_id)
      denied_hosts = []
      if('AclEntrys' in response.keys()):
        for item in response['AclEntrys']['AclEntry']:
          denied_hosts.append(item['AclEntryIP'])
    
      # 被ban超过2天,首先移除
      must_del_hosts = []
      denied_hosts_clone = denied_hosts.copy()
      for host in denied_hosts:
        if (get_ban_ip_time(host) == 0 or (get_ban_ip_time(host)  int(round(time.time())) - 2* 24 * 3600)):
          must_del_hosts.append(host)
          denied_hosts_clone.remove(host)
    
      # 排除相同的
      deny_hosts_new = []
      for item in deny_hosts:
        if(item not in denied_hosts_clone):
          deny_hosts_new.append(item)
    
      # 两者和超过300的限制
      if((len(denied_hosts_clone)+len(deny_hosts_new))>slb_limit):
        denied_hosts_detail = {}
        for host in denied_hosts_clone:
          denied_hosts_detail[host] = get_ban_ip_time(host)
        # 需要排除的数量
        num = len(denied_hosts_clone) + len(deny_hosts_new) - slb_limit
        denied_hosts_detail = sorted(denied_hosts_detail.items(), key=lambda x: x[1])
        denied_hosts_detail = denied_hosts_detail[:num]
        for item in denied_hosts_detail:
          must_del_hosts.append(item[0])
    
      print("denied:",denied_hosts)
      print("delete:",must_del_hosts)
      print("add:",deny_hosts_new)
      # exit()
      # 先删除一部分 must_del_hosts
      if(len(must_del_hosts)>0):
        if (len(must_del_hosts)>50):
          must_del_hosts_clone = groups(must_del_hosts,50)
          for item in must_del_hosts_clone:
            slb_del_host(item, accessKeyId, accessSecret, acl_id, region_id)
            time.sleep(1)
        else :
          slb_del_host(must_del_hosts, accessKeyId, accessSecret, acl_id, region_id)
    
      # 再新增 deny_hosts_new
      if(len(deny_hosts_new)>0):
        if(len(deny_hosts_new)>50):
          deny_hosts_new_clone = groups(deny_hosts_new,50)
          for item in deny_hosts_new_clone:
            slb_add_host(item, accessKeyId, accessSecret, acl_id, region_id)
            time.sleep(1)
        else:
          slb_add_host(deny_hosts_new, accessKeyId, accessSecret, acl_id, region_id)
    
      # 记录ip被禁时间
      for host in deny_hosts_new:
        set_ban_ip_time(host)
    
      if (len(deny_hosts_new) >= 1):
        mail_content = ''
        if(len(must_del_hosts) > 0):
          mail_content += "以下黑名单已被解禁("+str(len(must_del_hosts))+"):\n"+"\n".join(must_del_hosts) + "\n"
        mail_content += "\n新增以下ip黑名单("+str(len(deny_hosts_new))+"):\n"+"\n".join(deny_hosts_new)
        mail_content += "\n\n10分钟访问超过15次("+str(len(hosts_with_count))+"):\n"
        for item in hosts_with_count:
          mail_content += str(item[1]) + " " + str(item[0]) + "\n"
        mail_content += "\n\n黑名单("+str(len(denied_hosts))+"个):\n"
        for item in denied_hosts:
          mail_content += str(item) + "\n"
        send_mail(mail_content , mails)

    slb操作相关的脚本
    slb.py

    import logging , json
    
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkslb.request.v20140515.AddAccessControlListEntryRequest import AddAccessControlListEntryRequest
    from aliyunsdkslb.request.v20140515.RemoveAccessControlListEntryRequest import RemoveAccessControlListEntryRequest
    from aliyunsdkslb.request.v20140515.DescribeAccessControlListAttributeRequest import DescribeAccessControlListAttributeRequest
    
    
    # 阿里云slb访问控制里添加ip
    def slb_add_host(hosts, accessKeyId, accessSecret, acl_id, region_id):
      client = AcsClient(accessKeyId, accessSecret, region_id)
      request = AddAccessControlListEntryRequest()
      request.set_accept_format('json')
      logging.info("正在封印IP:%s" % ",".join(hosts))
    
      try:
        add_hosts = []
        for host in hosts:
          add_hosts.append({"entry": host, "comment": "deny"})
    
        request.set_AclEntrys(add_hosts)
        request.set_AclId(acl_id)
        response = client.do_action_with_exception(request)
        print(response)
      except BaseException as e:
        logging.error("添加黑名单失败,原因:%s" % e)
    
    
    # slb删除ip
    def slb_del_host(hosts, accessKeyId, accessSecret, acl_id , region_id = 'us-west-1'):
      logging.info("正在解封IP:%s" % ",".join(hosts))
      try:
        del_hosts = []
        for host in hosts:
          del_hosts.append({"entry": host, "comment": "deny"})
    
        client = AcsClient(accessKeyId, accessSecret, region_id)
        request = RemoveAccessControlListEntryRequest()
        request.set_accept_format('json')
        request.set_AclEntrys(del_hosts)
        request.set_AclId(acl_id)
    
        client.do_action_with_exception(request)
        logging.info("slb删除IP:%s成功" % ",".join(hosts)) # 查看调用接口结果
        logging.info("slb删除IP:%s成功" % ",".join(hosts)) # 查看调用接口结果
      except BaseException as e:
        logging.error("移出黑名单失败,原因:%s" % e)
    
    
    # 阿里云slb获取IP黑名单列表
    def slb_get_host(accessKeyId, accessSecret, acl_id, region_id):
      client = AcsClient(accessKeyId, accessSecret, region_id)
      request = DescribeAccessControlListAttributeRequest()
      request.set_accept_format('json')
    
      try:
        request.set_AclId(acl_id)
        response = client.do_action_with_exception(request)
        data_sub = json.loads((response.decode("utf-8")))
        return data_sub
      except BaseException as e:
        logging.error("获取黑名单失败,原因:%s" % e)

    远程操作日志的脚本
    remote.py

    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
    
    import datetime
    import re
    import paramiko
    
    
    def get_black_ips(threshold = 100):
      # file = '/data/www/logs/nginx_log/access/*api*_access.log'
      file = '/data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log'
      # 可以 ssh 访问服务器 nginx 日志的用户信息
      username = 'apache'
      passwd = 'pass'
    
      ten_min_time = (datetime.datetime.now() - datetime.timedelta(minutes=10)).strftime("%d/%b/%Y:%H:%M")
      ten_min_time = ten_min_time[:-1]
    
      # 线上 需要对日志进行过滤的目标服务器,一般是内网ip,本地调试时可以直接使用外网ip方便调试
      ssh_hosts = ['1.1.1.1']
      deny_host_list = []
      for host in ssh_hosts:
    
        '''
        # 过滤日志文件,需要显示如下效果,次数 ip地址,需要定位具体的api接口,否则误伤率极高
        # grep 04/Feb/2021:15:2 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -5 | awk '{if ($1 >15)print $1,$2}'
        2998 116.248.89.2
        2381 114.248.45.15
        1639 47.107.239.148
        1580 47.115.122.23
        245 59.109.149.45
        '''
        shell = (
              # "grep %s %s | grep '/index.php?submod=checkoutmethod=indexpid' | awk '{print $1}' | awk -F ':' '{print $2}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >15)print $1,$2}'") % (
              # grep 04/Feb/2021:14:5 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api/user' | awk '{print $1}' | awk -F ':' '{print $2}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >15)print $1,$2}'
              "grep %s %s | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >2000)print $1,$2}'") % (
              ten_min_time, file)
        print(shell)
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(host, port=2020, username=username, password=passwd)
        stdin, stdout, stderr = ssh.exec_command(shell)
        result = stdout.read().decode(encoding="utf-8")
        deny_host_re = re.compile(r'\d{1,99} \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')
        deny_host_re = deny_host_re.findall(result)
        deny_host_list = deny_host_list + deny_host_re
    
      uniq_host = {}
      for host_str in deny_host_list:
        tmp = host_str.split(' ')
        if tmp[1] in uniq_host:
          uniq_host[tmp[1]] += int(tmp[0])
        else:
          uniq_host[tmp[1]] = int(tmp[0])
    
      deny_host_list = []
      for v in uniq_host:
        if (uniq_host[v] > threshold):
          deny_host_list.append(v)
    
      return [deny_host_list , uniq_host]

    发送邮件的脚本
    email.py

    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
    
    import smtplib
    from email.mime.text import MIMEText
    from email.header import Header
    import logging
    
    
    def send_mail(host , receivers):
      # 发送邮件的服务器,用户信息
      mail_host = "smtpdm-ap-southeast-1.aliyun.com"
      mail_user = "admin@mail.chinasoft.com"
      mail_pass = "pass"
    
      sender = 'admin@mail.chinasoft.com'
    
      message = MIMEText('chinasoft国内接口被刷,单个IP最近10分钟内访问超过阈值100次会收到此邮件告警!!!!\n%s' % (host), 'plain', 'utf-8')
      message['From'] = Header("chinasoft国内接口被刷", 'utf-8')
    
      subject ='[DDOS]购买链接接口异常链接!!'
      message['Subject'] = Header(subject, 'utf-8')
    
      try:
        smtpObj = smtplib.SMTP(mail_host, 80)
        smtpObj.login(mail_user, mail_pass)
        smtpObj.sendmail(sender, receivers, message.as_string())
        logging.info("邮件发送成功")
      except smtplib.SMTPException as e:
        logging.error("发送邮件失败,原因:%s" % e)

    配置文件
    common.py

    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
    
    import IPy
    from functools import reduce
    import redis,time
    
    
    def groups(L1,len1):
      groups=zip(*(iter(L1),)*len1)
      L2=[list(i) for i in groups]
      n=len(L1) % len1
      L2.append(L1[-n:]) if n !=0 else L2
      return L2
    
    
    def ip_into_int(ip):
      return reduce(lambda x, y: (x  8) + y, map(int, ip.split('.')))
    
    
    # 过滤掉内网ip
    def is_internal_ip(ip):
      ip = ip_into_int(ip)
      net_a = ip_into_int('10.255.255.255') >> 24
      net_b = ip_into_int('172.31.255.255') >> 20
      net_c = ip_into_int('192.168.255.255') >> 16
      return ip >> 24 == net_a or ip >> 20 == net_b or ip >> 16 == net_c
    
    
    # 是否为白名单ip (公司内网+集群内网ip+slb和需要互访的服务器ip避免误杀)
    def is_white_ip(ip):
      if (is_internal_ip(ip)):
        return True
      white_hosts = [
        # web-servers
        '1.1.1.1',
        '1.1.1.2',
      ];
      for white in white_hosts:
        if (ip in IPy.IP(white)):
          return True
      return False
    
    
    def get_ban_ip_time(ip):
      pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=1)
      client = redis.Redis(connection_pool=pool)
      key = 'slb_ban_'+ip
      val = client.get(key)
      if val == None:
        return 0
      else :
        return int(val)
    
    
    def set_ban_ip_time(ip):
      pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=1)
      client = redis.Redis(connection_pool=pool)
      key = 'slb_ban_'+ip
      timestamp = time.time()
      timestamp = int(round(timestamp))
      return client.set(key , timestamp , 86400)

    本地可以直接运行run.py进行调试

    到此这篇关于python脚本使用阿里云slb对恶意攻击进行封堵的实现的文章就介绍到这了,更多相关python脚本阿里云slb内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    您可能感兴趣的文章:
    • 利用Python+阿里云实现DDNS动态域名解析的方法
    • 阿里云 CentOS7.4 安装 Python3.6的方法讲解
    • Python3编程实现获取阿里云ECS实例及监控的方法
    • 在阿里云服务器上配置CentOS+Nginx+Python+Flask环境
    上一篇:用60行代码实现Python自动抢微信红包
    下一篇:python中的unittest框架实例详解
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    python脚本使用阿里云slb对恶意攻击进行封堵的实现 python,脚本,使用,阿里,云,