• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    python公司内项目对接钉钉审批流程的实现

    最近把组内的一个项目对接钉钉审批接口,通过python3.6。

    钉钉官方文档

    废话不多说了,上代码:

    import requests
    import json
    import time
    from dingtalk.crypto import DingTalkCrypto
    
    from django.conf import settings
    # settings.BASE_DIR
    
    
    class Crypto(object):
        def __init__(self, token):
            # 随便填的字符串
            self.token = token
            # 自己生成的43位随机字符串
            self.aes_key = settings.DINGDING.get("DINGTALK_AES_TOKEN")
            # 钉钉企业ID
            self.corp_id = settings.DINGDING.get("CorpId") #
            print("corp_id:", self.corp_id)
            self.nonce = settings.DINGDING.get("nonce")
            self.crypto = DingTalkCrypto(
                token=self.nonce,
                encoding_aes_key=self.aes_key,
                corpid_or_suitekey=self.corp_id
            )
    
        def encrypt_success(self):
            # 返回加密success
            result = self.crypto.encrypt_message(
                msg="success",
                nonce=self.nonce,
                timestamp=int(time.time()*1000)
            )
            return result
    
    
    class DING(object):
        def __init__(self, approve_process):
            self.AgentId = settings.DINGDING.get("AgentId")
            self.AppKey = settings.DINGDING.get("AppKey")
            self.AppSecret = settings.DINGDING.get("AppSecret")
            self.dingding_url = settings.DINGDING.get("URL")
            self.process_code = settings.DINGDING.get("APPROVE_PROCESS").get(approve_process)['process_code']
            self.aes_key = settings.DINGDING.get("DINGTALK_AES_TOKEN")
            self.nonce = settings.DINGDING.get("nonce")
    
        def get_token(self):
            '''
            获取钉钉的token
            :return: 钉钉token
            '''
            url = self.dingding_url + '/gettoken?appkey={}appsecret={}'.format(self.AppKey, self.AppSecret)
            req = requests.get(url)
            req = json.loads(req.text)
            return req['access_token']
    
    # def createCallbackDd():
    #     '''
    #     注册钉钉回调函数
    #     :return:
    #     '''
    #     url = 'https://oapi.dingtalk.com/call_back/register_call_back?access_token=' + self.getToken()
    #     data = {
    #         "call_back_tag": ["bpms_task_change", "bpms_instance_change"],  #这两个回调种类是审批的
    #         "token": TOKEN,  #自定义的字符串
    #         "aes_key": AES_KEY, #自定义的43位字符串,密钥
    #         "url": URL  #回调地址
    #     }
    #     requests.post(url, data=json.dumps(data))
    #     return ('OK')
    
        def create_process(self, originator_user_id, dept_id, form_component_value_vo, approvers, cc_list, has_cc=0):
            '''
            创建钉钉审批
            approvers为list 元素为钉钉userid   cc_list同理
            '''
            url = self.dingding_url + '/topapi/processinstance/create?access_token=' + self.get_token()
            print("form_component_value_vo:", form_component_value_vo)
            if has_cc == 0:
                data = {
                    'agent_id': self.AgentId,
                    'process_code': self.process_code,  #工单id
                    'originator_user_id': originator_user_id,
                    'dept_id': dept_id,  #创建人的钉钉部门id
                    'form_component_values': str(form_component_value_vo), #钉钉后台配置的需要填写的字段,
                    'approvers': approvers,
                    'cc_list': cc_list,
                    'cc_position': 'START_FINISH'  # 发起和完成时与抄送
                }
            else:
                data = {
                    'agent_id': self.AgentId,
                    'process_code': self.process_code,  #工单id
                    'originator_user_id': originator_user_id, #创建人的钉钉userid
                    'dept_id': dept_id,  #创建人的钉钉部门id
                    'form_component_values': str(form_component_value_vo), #钉钉后台配置的需要填写的字段,
                    'approvers': approvers,
                }
            print("dingding_utils:", data)
            response = requests.post(url, data=data)
            return response.json()
    
        def get_status(self, process_instance_id):
            url = self.dingding_url + '/topapi/processinstance/get?access_token=' + self.get_token()
            data = {
                "process_instance_id": process_instance_id
            }
            response = requests.post(url, data=data)
            return response.json()
    
        def register_callback(self, call_back_url):
            # 注册回调
            url = self.dingding_url + '/call_back/register_call_back?access_token=' + self.get_token()
            print("self.get_token():", self.get_token())
            data = {
                "call_back_tag": ['bpms_task_change', 'bpms_instance_change'],
                "token": self.nonce,
                "aes_key": self.aes_key,
                "url": call_back_url,
            }
            response = requests.post(url, data=json.dumps(data))
            return response.json()
    
        def get_callback(self):
            url = self.dingding_url + '/call_back/get_call_back?access_token=' + self.get_token()
            req = requests.get(url)
            req = json.loads(req.text)
            return req
    
        def create_process_approver_v2(self, originator_user_id, dept_id, form_component_value_vo, approvers, cc_list):
            '''
            创建钉钉审批
            '''
            url = self.dingding_url + '/topapi/processinstance/create?access_token=' + self.get_token()
            data = {
                'agent_id': self.AgentId,
                'process_code': self.process_code,
                'originator_user_id': originator_user_id,
                'dept_id': dept_id,
                'form_component_values': str(form_component_value_vo),
                'approvers_v2': json.dumps(approvers)
            }
            if cc_list:
                data['cc_list'] = cc_list
                data['cc_position'] = 'FINISH'
    
            response = requests.post(url, data=data)
            return response.json()
    
        def create_process_approver_v2_test(self, originator_user_id, dept_id, form_component_value_vo):
            '''
            创建钉钉审批
            '''
            url = self.dingding_url + '/topapi/processinstance/create?access_token=' + self.get_token()
            data = {
                'agent_id': self.AgentId,
                'process_code': self.process_code,
                'originator_user_id': originator_user_id,
                'dept_id': dept_id,
                'form_component_values': str(form_component_value_vo),
                'approvers_v2': json.dumps([
                    {
                        "task_action_type": "NONE",
                        "user_ids": ["dingding_id"],   # 单独审批人
                    },
                    {
                        "task_action_type": "OR",
                        "user_ids": ["dingding_id1", "dingding_id2"],   # 或签
                    },
                    {
                        "task_action_type": "AND",
                        "user_ids": ["dingding_id1", "dingding_id2"],  # 会签
                    }
                ])
            }
    
            response = requests.post(url, data=data)
            return response.json()
    
    
    if __name__ == "__main__":
    
        import django, os, sys
        sys.path.append('xxxxxx')   # 项目路径
        os.environ['DJANGO_SETTINGS_MODULE'] = 'xx.settings'
        # print("settings.DINGDING", settings.DINGDING)
        ding = DING("create_xx")
        # print(ding.get_token())
        # info = [{'name': '单行输入框','value': 'testixxxxxxxx'}]
        # # print(ding.create_process('11', 11, info))
        a = [
            {'name': "输入框1", 'value': "value1"},
            {'name': "输入框2", 'value': "value2"},
        ]
        # print(ding.create_process_test('11', 11, a))
        # print(ding.create_process_approver_v2_test('11', 11, a))
        # print(ding.create_process_test2())
        # print(ding.get_status('xxx'))
        print(ding.get_status('xx'))
    
        # # 验证  回调
        # a = ding.get_token()
        # print(a)
        # c = Crypto(a)
        # print(c.encrypt_success())
    
        # 注册回调
        # print(ding.register_callback("http://xxxx.vaiwan.com/xxx"))
        # print(ding.get_callback())

    说明:

      1 Crypto类用于对接钉钉回调用的。一个公司只有一个corpId,并且一个corpid只能注册一个回调地址。我司有公共组注册好了回调。只要接入公司内的回调即可。所以我实际没有使用到Crypto。

      2  在钉钉管理后台中创建应用后会有这三个东西:AgentId、AppKey,AppSecret  。在创建钉钉审批流程,可以从审批流程浏览器中获取到APPROVE_PROCESS。别忘啦给这个流程审批接口权限。这些官方文档有说。

      3  配置setting变量:

    DINGDING = {
        "AgentId": 123,
        "AppKey": "xx",
        "AppSecret": "xx",
        "URL": "https://oapi.dingtalk.com",
        "APPROVE_PROCESS": { # process_code
            "create_xx": {
                "process_code": "abc", # 审批流程的id
        },
        "DINGTALK_AES_TOKEN": "abc",
        "nonce": "abc",
        "CorpId": "abc",
    }

     4 接口形式创建的审批流程,与钉钉管理后台创建的流程有一些不同:

        1 不能在不同的审批环节设置不同的抄送人

        2 不能审批流程前后有相同的人,不能自动显示成 “自动同意”(管理后台设置成去重后,但是接口指定审批人场景,不支持)

     5 其他如:审批内容、或签,会签代码里都有示例。

    到此这篇关于python公司内项目对接钉钉审批流程的实现的文章就介绍到这了,更多相关python对接钉钉审批内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    您可能感兴趣的文章:
    • Python实现钉钉发送报警消息的方法
    • python3实现钉钉消息推送的方法示例
    • python 调用钉钉机器人的方法
    • python钉钉机器人运维脚本监控实例
    • Python制作钉钉加密/解密工具
    • Python第三方包之DingDingBot钉钉机器人
    • 如何基于python对接钉钉并获取access_token
    • 详解使用python3.7配置开发钉钉群自定义机器人(2020年新版攻略)
    • python使用自定义钉钉机器人的示例代码
    • Python 实现 T00ls 自动签到脚本代码(邮件+钉钉通知)
    • 浅谈Python 钉钉报警必备知识系统讲解
    • Python钉钉报警及Zabbix集成钉钉报警的示例代码
    上一篇:教你如何在pycharm中安装opencv,tensorflow,keras
    下一篇:python 使用GDAL实现栅格tif转矢量shp的方式小结
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    python公司内项目对接钉钉审批流程的实现 python,公司,内,项目,对接,