• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    windowns使用PySpark环境配置和基本操作

    下载依赖

    首先需要下载hadoop和spark,解压,然后设置环境变量。
    hadoop清华源下载
    spark清华源下载

    HADOOP_HOME => /path/hadoop
    SPARK_HOME => /path/spark

    安装pyspark。

    pip install pyspark

    基本使用

    可以在shell终端,输入pyspark,有如下回显:

    输入以下指令进行测试,并创建SparkContext,SparkContext是任何spark功能的入口点。

    >>> from pyspark import SparkContext
    >>> sc = SparkContext("local", "First App")
    
    

    如果以上不会报错,恭喜可以开始使用pyspark编写代码了。
    不过,我这里使用IDE来编写代码,首先我们先在终端执行以下代码关闭SparkContext。

    >>> sc.stop()

    下面使用pycharm编写代码,如果修改了环境变量需要先重启pycharm。
    在pycharm运行如下程序,程序会起本地模式的spark计算引擎,通过spark统计abc.txt文件中a和b出现行的数量,文件路径需要自己指定。

    from pyspark import SparkContext
    
    sc = SparkContext("local", "First App")
    logFile = "abc.txt"
    logData = sc.textFile(logFile).cache()
    numAs = logData.filter(lambda s: 'a' in s).count()
    numBs = logData.filter(lambda s: 'b' in s).count()
    print("Line with a:%i,line with b:%i" % (numAs, numBs))
    
    

    运行结果如下:

    20/03/11 16:15:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    20/03/11 16:15:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    Line with a:3,line with b:1

    这里说一下,同样的工作使用python可以做,spark也可以做,使用spark主要是为了高效的进行分布式计算。
    戳pyspark教程
    戳spark教程

    RDD

    RDD代表Resilient Distributed Dataset,它们是在多个节点上运行和操作以在集群上进行并行处理的元素,RDD是spark计算的操作对象。
    一般,我们先使用数据创建RDD,然后对RDD进行操作。
    对RDD操作有两种方法:
    Transformation(转换) - 这些操作应用于RDD以创建新的RDD。例如filter,groupBy和map。
    Action(操作) - 这些是应用于RDD的操作,它指示Spark执行计算并将结果发送回驱动程序,例如count,collect等。

    创建RDD

    parallelize是从列表创建RDD,先看一个例子:

    from pyspark import SparkContext
    
    
    sc = SparkContext("local", "count app")
    words = sc.parallelize(
        ["scala",
         "java",
         "hadoop",
         "spark",
         "akka",
         "spark vs hadoop",
         "pyspark",
         "pyspark and spark"
         ])
    print(words)

    结果中我们得到一个对象,就是我们列表数据的RDD对象,spark之后可以对他进行操作。

    ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

    Count

    count方法返回RDD中的元素个数。

    from pyspark import SparkContext
    
    
    sc = SparkContext("local", "count app")
    words = sc.parallelize(
        ["scala",
         "java",
         "hadoop",
         "spark",
         "akka",
         "spark vs hadoop",
         "pyspark",
         "pyspark and spark"
         ])
    print(words)
    
    counts = words.count()
    print("Number of elements in RDD -> %i" % counts)

    返回结果:

    Number of elements in RDD -> 8

    Collect

    collect返回RDD中的所有元素。

    from pyspark import SparkContext
    
    
    sc = SparkContext("local", "collect app")
    words = sc.parallelize(
        ["scala",
         "java",
         "hadoop",
         "spark",
         "akka",
         "spark vs hadoop",
         "pyspark",
         "pyspark and spark"
         ])
    coll = words.collect()
    print("Elements in RDD -> %s" % coll)
    
    

    返回结果:

    Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

    foreach

    每个元素会使用foreach内的函数进行处理,但是不会返回任何对象。
    下面的程序中,我们定义的一个累加器accumulator,用于储存在foreach执行过程中的值。

    from pyspark import SparkContext
    sc = SparkContext("local", "ForEach app")
    
    accum = sc.accumulator(0)
    data = [1, 2, 3, 4, 5]
    rdd = sc.parallelize(data)
    
    
    def increment_counter(x):
        print(x)
        accum.add(x)
     return 0
    
    s = rdd.foreach(increment_counter)
    print(s)  # None
    print("Counter value: ", accum)
    
    

    返回结果:

    None
    Counter value:  15

    filter

    返回一个包含元素的新RDD,满足过滤器的条件。

    from pyspark import SparkContext
    sc = SparkContext("local", "Filter app")
    words = sc.parallelize(
        ["scala",
         "java",
         "hadoop",
         "spark",
         "akka",
         "spark vs hadoop",
         "pyspark",
         "pyspark and spark"]
    )
    words_filter = words.filter(lambda x: 'spark' in x)
    filtered = words_filter.collect()
    print("Fitered RDD -> %s" % (filtered))
    
     
    
    Fitered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

    也可以改写成这样:

    from pyspark import SparkContext
    sc = SparkContext("local", "Filter app")
    words = sc.parallelize(
        ["scala",
         "java",
         "hadoop",
         "spark",
         "akka",
         "spark vs hadoop",
         "pyspark",
         "pyspark and spark"]
    )
    
    
    def g(x):
        for i in x:
            if "spark" in x:
                return i
    
    words_filter = words.filter(g)
    filtered = words_filter.collect()
    print("Fitered RDD -> %s" % (filtered))

    map

    将函数应用于RDD中的每个元素并返回新的RDD。

    from pyspark import SparkContext
    sc = SparkContext("local", "Map app")
    words = sc.parallelize(
        ["scala",
         "java",
         "hadoop",
         "spark",
         "akka",
         "spark vs hadoop",
         "pyspark",
         "pyspark and spark"]
    )
    words_map = words.map(lambda x: (x, 1, "_{}".format(x)))
    mapping = words_map.collect()
    print("Key value pair -> %s" % (mapping))

    返回结果:

    Key value pair -> [('scala', 1, '_scala'), ('java', 1, '_java'), ('hadoop', 1, '_hadoop'), ('spark', 1, '_spark'), ('akka', 1, '_akka'), ('spark vs hadoop', 1, '_spark vs hadoop'), ('pyspark', 1, '_pyspark'), ('pyspark and spark', 1, '_pyspark and spark')]

    Reduce

    执行指定的可交换和关联二元操作后,然后返回RDD中的元素。

    from pyspark import SparkContext
    from operator import add
    
    
    sc = SparkContext("local", "Reduce app")
    nums = sc.parallelize([1, 2, 3, 4, 5])
    adding = nums.reduce(add)
    print("Adding all the elements -> %i" % (adding))

     这里的add是python内置的函数,可以使用ide查看:

    def add(a, b):
        "Same as a + b."
        return a + b

    reduce会依次对元素相加,相加后的结果加上其他元素,最后返回结果(RDD中的元素)。

    Adding all the elements -> 15

    Join

    返回RDD,包含两者同时匹配的键,键包含对应的所有元素。

    from pyspark import SparkContext
    
    
    sc = SparkContext("local", "Join app")
    x = sc.parallelize([("spark", 1), ("hadoop", 4), ("python", 4)])
    y = sc.parallelize([("spark", 2), ("hadoop", 5)])
    print("x =>", x.collect())
    print("y =>", y.collect())
    joined = x.join(y)
    final = joined.collect()
    print( "Join RDD -> %s" % (final))
    
    

    返回结果:

    x => [('spark', 1), ('hadoop', 4), ('python', 4)]
    y => [('spark', 2), ('hadoop', 5)]
    Join RDD -> [('hadoop', (4, 5)), ('spark', (1, 2))]

    到此这篇关于windowns使用PySpark环境配置和基本操作的文章就介绍到这了,更多相关PySpark环境配置 内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    您可能感兴趣的文章:
    • PyCharm搭建Spark开发环境实现第一个pyspark程序
    • PyCharm+PySpark远程调试的环境配置的方法
    上一篇:聊聊Python中end=和sep=的区别
    下一篇:keras的get_value运行越来越慢的解决方案
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    windowns使用PySpark环境配置和基本操作 windowns,使用,PySpark,环境,