• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    python multiprocessing 多进程并行计算的操作

    python的multiprocessing包是标准库提供的多进程并行计算包,提供了和threading(多线程)相似的API函数,但是相比于threading,将任务分配到不同的CPU,避免了GIL(Global Interpreter Lock)的限制。

    下面我们对multiprocessing中的Pool和Process类做介绍。

    Pool

    采用Pool进程池对任务并行处理更加方便,我们可以指定并行的CPU个数,然后 Pool 会自动把任务放到进程池中运行。 Pool 包含了多个并行函数。

    apply apply_async

    apply 要逐个执行任务,在python3中已经被弃用,而apply_async是apply的异步执行版本。并行计算一定要采用apply_async函数。

    import multiprocessing
    import time
    from random import randint, seed
    def f(num):
      seed()
      rand_num = randint(0,10) # 每次都随机生成一个停顿时间
      time.sleep(rand_num)
      return (num, rand_num)
    start_time = time.time()
    cores = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=cores)
    pool_list = []
    result_list = []
    start_time = time.time()
    for xx in xrange(10):
      pool_list.append(pool.apply_async(f, (xx, ))) # 这里不能 get, 会阻塞进程
    result_list = [xx.get() for xx in pool_list]
    #在这里不免有人要疑问,为什么不直接在 for 循环中直接 result.get()呢?这是因为pool.apply_async之后的语句都是阻塞执行的,调用 result.get() 会等待上一个任务执行完之后才会分配下一个任务。事实上,获取返回值的过程最好放在进程池回收之后进行,避免阻塞后面的语句。
    # 最后我们使用一下语句回收进程池:  
    pool.close()
    pool.join()
    print result_list
    print '并行花费时间 %.2f' % (time.time() - start_time)
    print '串行花费时间 %.2f' % (sum([xx[1] for xx in result_list]))
    #[(0, 8), (1, 2), (2, 4), (3, 9), (4, 0), (5, 1), (6, 8), (7, 3), (8, 4), (9, 6)]
    #并行花费时间 14.11
    #串行花费时间 45.00
    

    map map_async

    map_async 是 map的异步执行函数。

    相比于 apply_async, map_async 只能接受一个参数。

    import time
    from multiprocessing import Pool
    def run(fn):
     #fn: 函数参数是数据列表的一个元素
     time.sleep(1)
     return fn*fn
    if __name__ == "__main__":
     testFL = [1,2,3,4,5,6] 
     print '串行:' #顺序执行(也就是串行执行,单进程)
     s = time.time()
     for fn in testFL:
      run(fn)
     e1 = time.time()
     print "顺序执行时间:", int(e1 - s)
     print '并行:' #创建多个进程,并行执行
     pool = Pool(4) #创建拥有5个进程数量的进程池
     #testFL:要处理的数据列表,run:处理testFL列表中数据的函数
     rl =pool.map(run, testFL) 
     pool.close()#关闭进程池,不再接受新的进程
     pool.join()#主进程阻塞等待子进程的退出
     e2 = time.time()
     print "并行执行时间:", int(e2-e1)
     print rl
    # 串行:
    # 顺序执行时间: 6
    # 并行:
    # 并行执行时间: 2
    # [1, 4, 9, 16, 25, 36]
    

    Process

    采用Process必须注意的是,Process对象来创建进程,每一个进程占据一个CPU,所以要建立的进程必须 小于等于 CPU的个数。

    如果启动进程数过多,特别是当遇到CPU密集型任务,会降低并行的效率。

    #16.6.1.1. The Process class
    from multiprocessing import Process, cpu_count
    import os
    import time
    start_time = time.time()
    def info(title):
    #   print(title)
      if hasattr(os, 'getppid'): # only available on Unix
        print 'parent process:', os.getppid()
      print 'process id:', os.getpid()
      time.sleep(3)
    def f(name):
      info('function f')
      print 'hello', name
    if __name__ == '__main__':
    #   info('main line')
      p_list = [] # 保存Process新建的进程
      cpu_num = cpu_count()
      for xx in xrange(cpu_num):
        p_list.append(Process(target=f, args=('xx_%s' % xx,)))
      for xx in p_list:
        xx.start()
      for xx in p_list:
        xx.join()
      print('spend time: %.2f' % (time.time() - start_time))
    parent process: 11741
    # parent process: 11741
    # parent process: 11741
    # process id: 12249
    # process id: 12250
    # parent process: 11741
    # process id: 12251
    # process id: 12252
    # hello xx_1
    # hello xx_0
    # hello xx_2
    # hello xx_3
    # spend time: 3.04
    

    进程间通信

    Process和Pool均支持Queues 和 Pipes 两种类型的通信。

    Queue 队列

    队列遵循先进先出的原则,可以在各个进程间使用。

    # 16.6.1.2. Exchanging objects between processes
    # Queues
    from multiprocessing import Process, Queue
    def f(q):
      q.put([42, None, 'hello'])
    if __name__ == '__main__':
      q = Queue()
      p = Process(target=f, args=(q,))
      p.start()
      print q.get()  # prints "[42, None, 'hello']"
      p.join()
    

    pipe

    from multiprocessing import Process, Pipe
    def f(conn):
      conn.send([42, None, 'hello'])
      conn.close()
    if __name__ == '__main__':
      parent_conn, child_conn = Pipe()
      p = Process(target=f, args=(child_conn,))
      p.start()
      print parent_conn.recv()  # prints "[42, None, 'hello']"
      p.join()
    

    queue 与 pipe比较

    Pipe() can only have two endpoints.

    Queue() can have multiple producers and consumers.

    When to use them

    If you need more than two points to communicate, use a Queue().

    If you need absolute performance, a Pipe() is much faster because Queue() is built on top of Pipe().

    参考:

    https://stackoverflow.com/questions/8463008/python-multiprocessing-pipe-vs-queue

    共享资源

    多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。

    在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。

    此时我们可以通过共享内存和Manager的方法来共享资源。

    但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。

    共享内存

    共享内存仅适用于 Process 类,不能用于进程池 Pool

    # 16.6.1.4. Sharing state between processes
    # Shared memory
    from multiprocessing import Process, Value, Array
    def f(n, a):
      n.value = 3.1415927
      for i in range(len(a)):
        a[i] = -a[i]
    if __name__ == '__main__':
      num = Value('d', 0.0)
      arr = Array('i', range(10))
      p = Process(target=f, args=(num, arr))
      p.start()
      p.join()
      print num.value
      print arr[:]
    # 3.1415927
    # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
    

    Manager Class

    Manager Class 既可以用于Process 也可以用于进程池 Pool。

    from multiprocessing import Manager, Process
    def f(d, l, ii):
      d[ii] = ii
      l.append(ii)
    if __name__ == '__main__':
      manager = Manager()
      d = manager.dict()
      l = manager.list(range(10))
      p_list = [] 
      for xx in range(4):
        p_list.append(Process(target=f, args=(d, l, xx)))
      for xx in p_list:
        xx.start()
      for xx in p_list:
        xx.join()
      print d
      print l
    # {0: 0, 1: 1, 2: 2, 3: 3}
    # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3]
    

    补充:python程序多进程运行时间计算/多进程写数据/多进程读数据

    import time
    time_start=time.time()
    time_end=time.time()
    print('time cost',time_end-time_start,'s')
    

    单位为秒,也可以换算成其他单位输出

    注意写测试的时候,函数名要以test开头,否则运行不了。

    多线程中的问题:

    1)多线程存数据:

    def test_save_features_to_db(self):
        df1 = pd.read_csv('/home/sc/PycharmProjects/risk-model/xg_test/statis_data/shixin_company.csv')
        com_list = df1['company_name'].values.tolist()
        # com_list = com_list[400015:400019]
        # print 'test_save_features_to_db'
        # print(com_list)
        p_list = [] # 进程列表
        i = 1
        p_size = len(com_list)
        for company_name in com_list:
          # 创建进程
          p = Process(target=self.__save_data_iter_method, args=[company_name])
          # p.daemon = True
          p_list.append(p)
          # 间歇执行进程
          if i % 20 == 0 or i == p_size: # 20页处理一次, 最后一页处理剩余
            for p in p_list:
              p.start()
            for p in p_list:
              p.join() # 等待进程结束
            p_list = [] # 清空进程列表
          i += 1
    

    总结:多进程写入的时候,不需要lock,也不需要返回值。

    核心p = Process(target=self.__save_data_iter_method, args=[company_name]),其中target指向多进程的一次完整的迭代,arg则是该迭代的输入。

    注意写法args=[company_name]才对,原来写成:args=company_name,args=(company_name)会报如下错:只需要1个参数,而给出了34个参数。

    多进程外层循环则是由输入决定的,有多少个输入就为多少次循环,理解p.start和p.join;

    def __save_data_iter_method(self, com):
        # time_start = time.time()
        # print(com)
        f_d_t = ShiXinFeaturesDealSvc()
        res = f_d_t.get_time_features(company_name=com)
        # 是否失信
        shixin_label = res.shixin_label
        key1 = res.shixin_time
        if key1:
          public_at = res.shixin_time
          company_name = res.time_map_features[key1].company_name
          # print(company_name)
          established_years = res.time_map_features[key1].established_years
          industry_dx_rate = res.time_map_features[key1].industry_dx_rate
          regcap_change_cnt = res.time_map_features[key1].regcap_change_cnt
          share_change_cnt = res.time_map_features[key1].share_change_cnt
          industry_dx_cnt = res.time_map_features[key1].industry_dx_cnt
          address_change_cnt = res.time_map_features[key1].address_change_cnt
          fr_change_cnt = res.time_map_features[key1].fr_change_cnt
          judgedoc_cnt = res.time_map_features[key1].judgedoc_cnt
          bidding_cnt = res.time_map_features[key1].bidding_cnt
          trade_mark_cnt = res.time_map_features[key1].trade_mark_cnt
          network_share_cancel_cnt = res.time_map_features[key1].network_share_cancel_cnt
          cancel_cnt = res.time_map_features[key1].cancel_cnt
          industry_all_cnt = res.time_map_features[key1].industry_all_cnt
          network_share_zhixing_cnt = res.time_map_features[key1].network_share_zhixing_cnt
          network_share_judge_doc_cnt = res.time_map_features[key1].network_share_judge_doc_cnt
          net_judgedoc_defendant_cnt = res.time_map_features[key1].net_judgedoc_defendant_cnt
          judge_doc_cnt = res.time_map_features[key1].judge_doc_cnt
          f_d_do = ShixinFeaturesDto(company_name=company_name, established_years=established_years,
                        industry_dx_rate=industry_dx_rate, regcap_change_cnt=regcap_change_cnt,
                        share_change_cnt=share_change_cnt, industry_all_cnt=industry_all_cnt,
                        industry_dx_cnt=industry_dx_cnt, address_change_cnt=address_change_cnt,
                        fr_change_cnt=fr_change_cnt, judgedoc_cnt=judgedoc_cnt,
                        bidding_cnt=bidding_cnt, trade_mark_cnt=trade_mark_cnt,
                        network_share_cancel_cnt=network_share_cancel_cnt, cancel_cnt=cancel_cnt,
                        network_share_zhixing_cnt=network_share_zhixing_cnt,
                        network_share_judge_doc_cnt=network_share_judge_doc_cnt,
                        net_judgedoc_defendant_cnt=net_judgedoc_defendant_cnt,
                        judge_doc_cnt=judge_doc_cnt, public_at=public_at, shixin_label=shixin_label)
          # time_end = time.time()
          # print('totally cost', time_end - time_start)
          self.cfdbsvc.save_or_update_features(f_d_do)
    def save_or_update_features(self, shixin_features_dto):
        """
        添加或更新:
        插入一行数据, 如果不存在则插入,存在则更新
        """
        self._pg_util = PgUtil()
        p_id = None
        if isinstance(shixin_features_dto, ShixinFeaturesDto):
          p_id = str(uuid.uuid1())
          self._pg_util.execute_sql(
            self.s_b.insert_or_update_row(
              self.model.COMPANY_NAME,
              {
                self.model.ID: p_id,
                # 公司名
                self.model.COMPANY_NAME: shixin_features_dto.company_name,
                # 失信时间
                self.model.PUBLIC_AT: shixin_features_dto.public_at,
                self.model.SHIXIN_LABEL : shixin_features_dto.shixin_label,
                self.model.ESTABLISHED_YEARS: shixin_features_dto.established_years, 
                self.model.INDUSTRY_DX_RATE: shixin_features_dto.industry_dx_rate, 
                self.model.REGCAP_CHANGE_CNT: shixin_features_dto.regcap_change_cnt, 
                self.model.SHARE_CHANGE_CNT: shixin_features_dto.share_change_cnt, 
                self.model.INDUSTRY_ALL_CNT: shixin_features_dto.industry_all_cnt, 
                self.model.INDUSTRY_DX_CNT: shixin_features_dto.industry_dx_cnt, 
                self.model.ADDRESS_CHANGE_CNT: shixin_features_dto.address_change_cnt, 
                self.model.NETWORK_SHARE_CANCEL_CNT: shixin_features_dto.network_share_cancel_cnt,
                self.model.CANCEL_CNT: shixin_features_dto.cancel_cnt, 
                self.model.NETWORK_SHARE_ZHIXING_CNT: shixin_features_dto.network_share_zhixing_cnt,
                self.model.FR_CHANGE_CNT: shixin_features_dto.fr_change_cnt, 
                self.model.JUDGEDOC_CNT: shixin_features_dto.judgedoc_cnt, 
                self.model.NETWORK_SHARE_JUDGE_DOC_CNT: shixin_features_dto.network_share_judge_doc_cnt,
                self.model.BIDDING_CNT: shixin_features_dto.bidding_cnt, 
                self.model.TRADE_MARK_CNT: shixin_features_dto.trade_mark_cnt, 
                self.model.JUDGE_DOC_CNT: shixin_features_dto.judge_doc_cnt 
              },
              [self.model.ADDRESS_CHANGE_CNT,self.model.BIDDING_CNT,self.model.CANCEL_CNT,
               self.model.ESTABLISHED_YEARS,self.model.FR_CHANGE_CNT,self.model.INDUSTRY_ALL_CNT,
               self.model.INDUSTRY_DX_RATE,self.model.INDUSTRY_DX_CNT,self.model.JUDGE_DOC_CNT,
               self.model.JUDGEDOC_CNT,self.model.NETWORK_SHARE_CANCEL_CNT,self.model.NETWORK_SHARE_JUDGE_DOC_CNT,
               self.model.NETWORK_SHARE_ZHIXING_CNT,self.model.REGCAP_CHANGE_CNT,self.model.TRADE_MARK_CNT,
               self.model.SHARE_CHANGE_CNT,self.model.SHIXIN_LABEL,self.model.PUBLIC_AT]
            )
          )
        return p_id
    

    函数中重新初始化了self._pg_util = PgUtil(),否则会报ssl error 和ssl decryption 的错误,背后原因有待研究!

    **2)多进程取数据——(思考取数据为何要多进程)**
      def flush_process(self, lock): #需要传入lock;
        """
        运行待处理的方法队列
        :type lock Lock
        :return 返回一个dict
        """
        # process_pool = Pool(processes=20)
        # data_list = process_pool.map(one_process, self.__process_data_list)
        #
        # for (key, value) in data_list:
        #
        # 覆盖上期变量
        self.__dct_share = self.__manager.Value('tmp', {}) # 进程共享变量
        p_list = [] # 进程列表
        i = 1
        p_size = len(self.__process_data_list)
        for process_data in self.__process_data_list:  **#循环遍历需要同时查找的公司列表!!!self.__process_data_list包含多个process_data,每个process_data包含三种属性?类对象也可以循环????**
          # 创建进程
          p = Process(target=self.__one_process, args=(process_data, lock)) #参数需要lock
          # p.daemon = True
          p_list.append(p)
          # 间歇执行进程
          if i % 20 == 0 or i == p_size: # 20页处理一次, 最后一页处理剩余
            for p in p_list:
              p.start()
            for p in p_list:
              p.join() # 等待进程结束
            p_list = [] # 清空进程列表
          i += 1
        # end for
        self.__process_data_list = [] # 清空订阅
        return self.__dct_share.value
     def __one_process(self, process_data, lock):  #迭代函数
        """
        处理进程
        :param process_data: 方法和参数集等
        :param lock: 保护锁
        """
        fcn = process_data.fcn
        params = process_data.params
        data_key = process_data.data_key
        if isinstance(params, tuple):
          data = fcn(*params) #**注意:*params 与 params区别**
        else:
          data = fcn(params)
        with lock:
          temp_dct = dict(self.__dct_share.value)
          if data_key not in temp_dct:
            temp_dct[data_key] = []
          temp_dct[data_key].append(data)
          self.__dct_share.value = temp_dct
    

    主程序调用:

    def exe_process(self, company_name, open_from, time_nodes):
        """
        多进程执行pre订阅的数据
        :param company_name: 公司名
        :return:
        """
        mul_process_helper = MulProcessHelper()
        lock = Lock()
        self.__get_time_bidding_statistic(company_name, mul_process_helper)
        data = mul_process_helper.flush_process(lock)
        return data
     def __get_time_bidding_statistic(self, company_name, mul_process_helper):
        # 招投标信息
        process_data = ProcessData(f_e_t_svc.get_bidding_statistic_time_node_api, company_name,
                      self.__BIDDING_STATISTIC_TIME) **#此处怎么理解?ProcessData是一个类!!!**
        mul_process_helper.add_process_data_list(process_data)  #同时调用多个api???将api方法当做迭代????用于同时查找多个公司????
     def add_process_data_list(self, process_data):
        """
        添加用于进程处理的方法队列
        :type process_data ProcessData
        :param process_data:
        :return:
        """
        self.__process_data_list.append(process_data)
     class ProcessData(object):
      """
      用于进程处理的的数据
      """
      def __init__(self, fcn, params, data_key):
        self.fcn = fcn # 方法
        self.params = params # 参数
        self.data_key = data_key # 存储到进程共享变量中的名字
    

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。如有错误或未考虑完全的地方,望不吝赐教。

    您可能感兴趣的文章:
    • Python基础之进程详解
    • python 多进程和多线程使用详解
    • python 实现多进程日志轮转ConcurrentLogHandler
    • Python多进程与多线程的使用场景详解
    • 解决Python 进程池Pool中一些坑
    • python多进程执行方法apply_async使用说明
    • python 进程池pool使用详解
    • Python使用多进程运行含有任意个参数的函数
    • 详解python网络进程
    上一篇:Tensorflow 读取ckpt文件中的tensor操作
    下一篇:使用Python 统计文件夹内所有pdf页数的小工具
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    python multiprocessing 多进程并行计算的操作 python,multiprocessing,多,进程,