• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    python注册钉钉回调事件的实现

    钉钉API文档:https://ding-doc.dingtalk.com/doc#/serverapi2/skn8ld

    钉钉有回调事件流程,有哪些回调?比如:通讯录回调、审批回调等等,拿通讯录回调来说,就是当你公司组织架构发生变动时,会自动触发你自己注册的回调地址,然后根据回调信息做一些自定义的处理,不得不说,钉钉真的是解决了协同办公的很多问题,非常nice,但就回调事件来说,每个企业只能注册一个回调地址,即使你要监听的是不同的事件、即使还有其他业务线需要用到回调,也只能不多于一个回调,当然这都是出于人家服务多方面考虑的设计,废话不多说,

    好,流程:

      一个注册端向钉钉发送注册请求,钉钉callback你自己的服务url,必须是公网访问的地址,并且该地址返回钉钉json数据来告诉钉钉回调成功

      注册成功之后这个地址就可以接收钉钉的回调post请求,做一些自定义处理,记得返回json

    好,代码

    1、注册端

    mport requests, json
    
    
    class DingDingCallBack(object):
        def __init__(self):
            self.appsecret=''
            self.appkey=''
            self.api_url = "https://oapi.dingtalk.com/gettoken?appkey=%sappsecret=%s"%(self.appkey,self.appsecret)
            self.aes_key = ""
            self.callbackUrl = "回调url"
            self.headers = {
                'Content-Type': 'application/json',
                'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.96 Safari/537.36',
                'username':'zhangsan',
                'password':'123456'
            }
    
        def get_token(self):
            res = requests.get(self.api_url)
            if res.status_code == 200:
                str_res = res.text
                token = (json.loads(str_res)).get('access_token')
                return token
    
        def regist_call_back(self):
            url = 'https://oapi.dingtalk.com/call_back/register_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
            data = {
                "call_back_tag": ["bpms_task_change", "bpms_instance_change"], # 审批回调
                "token": "qxN3cm",
                "aes_key": self.aes_key,
                "url":self.callbackUrl,
                }
            data1 = json.dumps(data)
            response = requests.post(url, headers=self.headers, data=data1)
            print(response)
            print(response.text)
    
        def query_callback(self):
            url = 'https://oapi.dingtalk.com/call_back/get_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
            response = requests.get(url, headers=self.headers, )
            print(response)
            print(response.text)
    
    
        def update_call_back(self):
            url = 'https://oapi.dingtalk.com/call_back/update_call_back?access_token={}'.format(self.get_token())
            data = {
                "call_back_tag": ["bpms_task_change", "bpms_instance_change","user_add_org","user_leave_org","org_dept_create","org_dept_modify","org_dept_remove"],
                "token": "自定义字符串",
                "aes_key": self.aes_key,
                "url":self.callbackUrl
                }
            data1 = json.dumps(data)
            response = requests.post(url, headers=self.headers, data=data1)
            print(response)
            print(response.text)
    
        def get_fail(self):
            url = 'https://oapi.dingtalk.com/call_back/get_call_back_failed_result?access_token={}'.format(self.get_token())
            response = requests.get(url, headers=self.headers, )
            print(response.text)# todo 删除回调
        def delete_callback(self):
            url = 'https://oapi.dingtalk.com/call_back/delete_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
            result = requests.get(url)
            print(result)
            print(result.text)
    
    #
    if __name__ == '__main__':
        dingOBJ = DingDingCallBack()
        # todo 获取钉钉token
    #     # dingOBJ.get_token()
        # todo 获取回调失败
    #     dingOBJ.get_fail()
        # todo 注册回调
        # dingOBJ.regist_call_back()
        # todo 查询回调事件
        dingOBJ.query_callback()
        # todo 修改回调事件
    #     dingOBJ.update_call_back()
        # todo 删除回调
        # dingOBJ.delete_callback()

    2、回调端:以下示例代码为python2,django 框架

    # -*- coding:utf-8 -*-
    
    from django.shortcuts import render
    from django.http import JsonResponse,HttpResponse
    from django.views.generic import View
    from django.views.decorators.csrf import csrf_exempt
    from DingCrypto import DingCrypto
    import random,string,time,json,requests,simplejson
    # Create your views here.from ast import literal_eval
    
    
    encodingAesKey = ''
    key = ''
    token = '自定义字符串'
    appsecret = ''
    appkey = ''
    
    api_url = "https://oapi.dingtalk.com/gettoken?appkey=%sappsecret=%s"%(appkey,appsecret)
    
    
    def randam_string(n):
        ran_str = ''.join(random.sample(string.ascii_letters + string.digits, n))
        return ran_str
    
    # 注册回调
    @csrf_exempt
    def workOrderCallback(request):
        if request.method == 'GET':
            return JsonResponse({'code':200,'msg':'ok'})
    
    
        if request.method == 'POST':
            print request.GET
            dingCrypto = DingCrypto(encodingAesKey,key)
            nonce = randam_string(8)
            timestamp = str(int(round(time.time())))
            encrpyt = dingCrypto.encrypt('success')
            # print nonce,timestamp,token,encrpyt
            signature = dingCrypto.generateSignature(nonce=nonce,timestamp=timestamp,token=token,msg_encrypt=encrpyt)
            new_data = {
                'data': {
                    'msg_signature': signature,
                    'timeStamp': timestamp,
                    'nonce': nonce,
                    'encrypt': encrpyt
                },
            }
    
    
            encrpyt11 = dingCrypto.decrypt(encrpyt)
    
            return JsonResponse(new_data.get('data'))
    
    # 响应回调
    class CallBack(View):
        def get(self,request,*args,**kwargs):
            return JsonResponse({'code':200,'msg':'ok'})
    
        def get_token(self):
            res = requests.get(api_url)
            if res.status_code == 200:
                str_res = res.text
                token = (json.loads(str_res)).get('access_token')
                return token
    
    
        def post(self,request,*args,**kwargs):
    
            # data = request.GET
    
            dingCrypto = DingCrypto(encodingAesKey, key)
            nonce = randam_string(8)
            timestamp = str(int(round(time.time())))
            encrpyt = dingCrypto.encrypt('success')
            # signature = dingCrypto.generateSignature(nonce=nonce,timestamp=timestamp,token=token,msg_encrypt=encrpyt)
            callback = json.loads(dingCrypto.decrypt(json.loads(request.body).get('encrypt')))
            if callback['EventType'] == 'bpms_instance_change': # 审批实例开始,结束
                url = 'https://oapi.dingtalk.com/topapi/processinstance/get?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
                instace_ = {
                    "process_instance_id": callback['processInstanceId']
                }
                data2 = json.dumps(instace_)
                req = requests.post(url,data=data2)
                data = literal_eval(str(req.text)).get('process_instance')
                excute_workorder(callback['processInstanceId'],data)
            elif callback['EventType'] == 'bpms_task_change':  # 审批任务开始,结束,转交
                print 'bpms_task_change'
            elif callback['EventType'] == 'user_add_org':
                print '用户增加'elif callback['EventType'] == 'user_leave_org':
                print '用户离职'
            elif callback['EventType'] == 'org_dept_create':
                print '组织架构添加'elif callback['EventType'] == 'org_dept_modify':
                print '组织架构变更'elif callback['EventType'] == 'org_dept_remove':
                print '组织架构删除'return HttpResponse(encrpyt)

    到此这篇关于python注册钉钉回调事件的实现的文章就介绍到这了,更多相关python注册钉钉回调事件内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    您可能感兴趣的文章:
    • Python实现钉钉发送报警消息的方法
    • python3实现钉钉消息推送的方法示例
    • python 调用钉钉机器人的方法
    • Python调用钉钉自定义机器人的实现
    • python钉钉机器人运维脚本监控实例
    • Python制作钉钉加密/解密工具
    • Python实现钉钉订阅消息功能
    • Python第三方包之DingDingBot钉钉机器人
    • 如何基于python对接钉钉并获取access_token
    • 详解使用python3.7配置开发钉钉群自定义机器人(2020年新版攻略)
    • 浅谈Python 钉钉报警必备知识系统讲解
    上一篇:Django的get_absolute_url方法的使用
    下一篇:教你如何在pycharm中安装opencv,tensorflow,keras
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    python注册钉钉回调事件的实现 python,注册,钉钉,回调,事件,