• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    详解pandas apply 并行处理的几种方法

    1. pandarallel (pip install )

    对于一个带有Pandas DataFrame df的简单用例和一个应用func的函数,只需用parallel_apply替换经典的apply。

    from pandarallel import pandarallel
     
    # Initialization
    pandarallel.initialize()
     
    # Standard pandas apply
    df.apply(func)
     
    # Parallel apply
    df.parallel_apply(func)
    

    注意,如果不想并行化计算,仍然可以使用经典的apply方法。

    另外可以通过在initialize函数中传递progress_bar=True来显示每个工作CPU的一个进度条。

    2. joblib (pip install )

     https://pypi.python.org/pypi/joblib

    # Embarrassingly parallel helper: to make it easy to write readable parallel code and debug it quickly
     
    from math import sqrt
    from joblib import Parallel, delayed
     
    def test():
      start = time.time()
      result1 = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10000))
      end = time.time()
      print(end-start)
      result2 = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(10000))
      end2 = time.time()
      print(end2-end)
    

    -------输出结果----------

    0.4434356689453125
    0.6346755027770996

    3. multiprocessing

    import multiprocessing as mp
     
    with mp.Pool(mp.cpu_count()) as pool:
      df['newcol'] = pool.map(f, df['col'])
    multiprocessing.cpu_count()
    
    

    返回系统的CPU数量。

    该数量不同于当前进程可以使用的CPU数量。可用的CPU数量可以由 len(os.sched_getaffinity(0)) 方法获得。

    可能引发 NotImplementedError 。

    参见os.cpu_count()

    4. 几种方法性能比较

    (1)代码

    import sys
    import time
    import pandas as pd
    import multiprocessing as mp
    from joblib import Parallel, delayed
    from pandarallel import pandarallel
    from tqdm import tqdm, tqdm_notebook
     
     
    def get_url_len(url):
      url_list = url.split(".")
      time.sleep(0.01) # 休眠0.01秒
      return len(url_list)
     
    def test1(data):
      """
      不进行任何优化
      """
      start = time.time()
      data['len'] = data['url'].apply(get_url_len)
      end = time.time()
      cost_time = end - start
      res = sum(data['len'])
      print("res:{}, cost time:{}".format(res, cost_time))
     
    def test_mp(data):
      """
      采用mp优化
      """
      start = time.time()
      with mp.Pool(mp.cpu_count()) as pool:
        data['len'] = pool.map(get_url_len, data['url'])
      end = time.time()
      cost_time = end - start
      res = sum(data['len'])
      print("test_mp \t res:{}, cost time:{}".format(res, cost_time))
     
    def test_pandarallel(data):
      """
      采用pandarallel优化
      """
      start = time.time()
      pandarallel.initialize()
      data['len'] = data['url'].parallel_apply(get_url_len)
      end = time.time()
      cost_time = end - start
      res = sum(data['len'])
      print("test_pandarallel \t res:{}, cost time:{}".format(res, cost_time))
     
     
    def test_delayed(data):
      """
      采用delayed优化
      """
      def key_func(subset):
        subset["len"] = subset["url"].apply(get_url_len)
        return subset
     
      start = time.time()
      data_grouped = data.groupby(data.index)
      # data_grouped 是一个可迭代的对象,那么就可以使用 tqdm 来可视化进度条
      results = Parallel(n_jobs=8)(delayed(key_func)(group) for name, group in tqdm(data_grouped))
      data = pd.concat(results)
      end = time.time()
      cost_time = end - start
      res = sum(data['len'])
      print("test_delayed \t res:{}, cost time:{}".format(res, cost_time))
     
     
    if __name__ == '__main__':
      
      columns = ['title', 'url', 'pub_old', 'pub_new']
      temp = pd.read_csv("./input.csv", names=columns, nrows=10000)
      data = temp
      """
      for i in range(99):
        data = data.append(temp)
      """
      print(len(data))
      """
      test1(data)
      test_mp(data)
      test_pandarallel(data)
      """
      test_delayed(data)
    

    (2) 结果输出

    1k
    res:4338, cost time:0.0018074512481689453
    test_mp   res:4338, cost time:0.2626469135284424
    test_pandarallel   res:4338, cost time:0.3467681407928467
     
    1w
    res:42936, cost time:0.008773326873779297
    test_mp   res:42936, cost time:0.26111721992492676
    test_pandarallel   res:42936, cost time:0.33237743377685547
     
    10w
    res:426742, cost time:0.07944369316101074
    test_mp   res:426742, cost time:0.294996976852417
    test_pandarallel   res:426742, cost time:0.39208269119262695
     
    100w
    res:4267420, cost time:0.8074917793273926
    test_mp   res:4267420, cost time:0.9741342067718506
    test_pandarallel   res:4267420, cost time:0.6779992580413818
     
    1000w
    res:42674200, cost time:8.027287006378174
    test_mp   res:42674200, cost time:7.751036882400513
    test_pandarallel   res:42674200, cost time:4.404983282089233

    在get_url_len函数里加个sleep语句(模拟复杂逻辑),数据量为1k,运行结果如下:

    1k
    res:4338, cost time:10.054503679275513
    test_mp   res:4338, cost time:0.35697126388549805
    test_pandarallel   res:4338, cost time:0.43415403366088867
    test_delayed   res:4338, cost time:2.294757843017578

    5. 小结

    (1)如果数据量比较少,并行处理比单次执行效率更慢;

    (2)如果apply的函数逻辑简单,并行处理比单次执行效率更慢。

    6. 问题及解决方法

    (1)ImportError: This platform lacks a functioning sem_open implementation, therefore, the required synchronization primitives needed will not function, see issue 3770.

    https://www.jianshu.com/p/0be1b4b27bde

    (2)Linux查看物理CPU个数、核数、逻辑CPU个数

    https://lover.blog.csdn.net/article/details/113951192

    (3) 进度条的使用

    https://www.jb51.net/article/206219.htm

    到此这篇关于详解pandas apply 并行处理的几种方法的文章就介绍到这了,更多相关pandas apply 并行处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    您可能感兴趣的文章:
    • pandas中apply和transform方法的性能比较及区别介绍
    • 对pandas中apply函数的用法详解
    • pandas 使用apply同时处理两列数据的方法
    • pandas apply 函数 实现多进程的示例讲解
    • pandas使用apply多列生成一列数据的实例
    • pandas apply多线程实现代码
    • pandas使用函数批量处理数据(map、apply、applymap)
    • pandas提升计算效率的一些方法汇总
    上一篇:python自动生成sql语句的脚本
    下一篇:Python的Tqdm模块实现进度条配置
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    详解pandas apply 并行处理的几种方法 详解,pandas,apply,并行,处理,