• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    python 阿里云oss实现直传签名与回调验证的示例方法

    签名

    import base64
    import json
    import time
    from datetime import datetime
    import hmac
    from hashlib import sha1
    
    access_key_id = ''
    # 请填写您的AccessKeySecret。
    access_key_secret = ''
    # host的格式为 bucketname.endpoint ,请替换为您的真实信息。
    host = ''
    # callback_url为 上传回调服务器的URL,请将下面的IP和Port配置为您自己的真实信息。
    callback_url = ""
    # 用户上传文件时指定的前缀。
    upload_dir = 'user-dir-prefix/'
    expire_time = 1200
    expire_syncpoint = int(time.time() + expire_time)
    
    policy_dict = {
      'expiration': datetime.utcfromtimestamp(expire_syncpoint).isoformat() + 'Z',
      'conditions': [
        {"bucket": "test-paige"},
        ['starts-with', '$key', 'user/test/']
      ]
    }
    policy = json.dumps(policy_dict).strip()
    policy_encode = base64.b64encode(policy.encode())
    signature = base64.encodebytes(hmac.new(access_key_secret.encode(), policy_encode, sha1).digest())
    
    callback_dict = {
      'callbackUrl': callback_url,
      'callbackBody': 'filename=${object}size=${size}mimeType=${mimeType}height=${imageInfo.height}width=${'
              'imageInfo.width}',
      'callbackBodyType': 'application/json'
    }
    
    callback = base64.b64encode(json.dumps(callback_dict).strip().encode()).decode()
    
    var = {
      'accessid': access_key_id,
      'host': host,
      'policy': policy_encode.decode(),
      'signature': signature.decode().strip(),
      'expire': expire_syncpoint,
      'callback': callback
    }
    
    

    回调验签

    import asyncio
    import base64
    import time
    import aiomysql
    import rsa
    from aiohttp import web, ClientSession
    from urllib import parse
    import uuid
    
    
    def success(msg='', data=None):
      if data is None:
        data = {}
      dict_data = {
        'code': 1,
        'msg': msg,
        'data': data
      }
      return web.json_response(dict_data)
    
    
    def failed(msg='', data=None):
      if data is None:
        data = {}
      dict_data = {
        'code': 0,
        'msg': msg,
        'data': data
      }
      return web.json_response(dict_data)
    
    
    async def handle(request):
      """
      获取连接池
      :param web.BaseRequest request:
      :return:
      """
      authorization_base64 = request.headers['authorization']
      x_oss_pub_key_url_base64 = request.headers['x-oss-pub-key-url']
      pub_key_url = base64.b64decode(x_oss_pub_key_url_base64.encode())
      authorization = base64.b64decode(authorization_base64.encode())
      path = request.path
    
      async with ClientSession() as session:
        async with session.get(pub_key_url.decode()) as resp:
          pub_key_body = await resp.text()
          pubkey = rsa.PublicKey.load_pkcs1_openssl_pem(pub_key_body.encode())
          body = await request.content.read()
          auth_str = parse.unquote(path) + '\n' + body.decode()
          parse_url = parse.parse_qs(body.decode())
          print(parse_url)
          try:
            rsa.verify(auth_str.encode(), authorization, pubkey)
            pool = request.app['mysql_pool']
            async with pool.acquire() as conn:
              async with conn.cursor() as cur:
                id = str(uuid.uuid4())
                url = parse_url['filename'][0]
                mime = parse_url['mimeType'][0]
                disk = 'oss'
                time_at = time.strftime("%Y-%m-%d %H:%I:%S", time.localtime())
                sql = "INSERT INTO media(id,url,mime,disk,created_at,updated_at) VALUES(%s,%s,%s,%s,%s,%s)"
                await cur.execute(sql, (id, url, mime, disk, time_at, time_at))
                await conn.commit()
            dict_data = {
              'id': id,
              'url': url,
              'cdn_url': 'https://cdn.***.net' + '/' + url,
              'mime': mime,
              'disk': disk,
              'created_at': time_at,
              'updated_at': time_at,
            }
            return success(data=dict_data)
          except rsa.pkcs1.VerificationError:
            return failed(msg='验证错误')
    
    
    async def init(loop):
      # 创建连接池
      mysql_pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                          user='', password='',
                          db='', loop=loop)
    
      async def on_shutdown(application):
        """
        接收到关闭信号时,要先关闭连接池,并等待连接池关闭成功.
        :param web.Application application:
        :return:
        """
        application['mysql_pool'].close()
        # 没有下面这句话会报错 RuntimeError: Event loop is closed ,因为连接池没有真正关关闭程序就关闭了,引发python的报错
        await application['mysql_pool'].wait_closed()
    
      application = web.Application()
      application.on_shutdown.append(on_shutdown)
      # 把连接池放到 application 实例中
      application['mysql_pool'] = mysql_pool
      application.add_routes([web.get('/', handle), web.post('/oss', handle)])
      return application
    
    
    if __name__ == '__main__':
      loop = asyncio.get_event_loop()
      application = loop.run_until_complete(init(loop))
      web.run_app(application, host='127.0.0.1')
      loop.close()
    
    

    到此这篇关于python 阿里云oss实现直传签名与回调验证的文章就介绍到这了,更多相关python 直传签名与回调验证内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    您可能感兴趣的文章:
    • Python生成个性签名图片获取GUI过程解析
    • PYTHON实现SIGN签名的过程解析
    • python rsa实现数据加密和解密、签名加密和验签功能
    • 对python函数签名的方法详解
    • python3个性签名设计实现代码
    • python3爬虫之设计签名小程序
    • Python GUI Tkinter简单实现个性签名设计
    上一篇:Python实现验证回文串的几种方法
    下一篇:Python中pycharm编辑器界面风格修改方法
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    python 阿里云oss实现直传签名与回调验证的示例方法 python,阿里,云,oss,实现,直传,