网站使用标题做路径,线下课程seo,网络短剧免费观看,营销网站中最重要的部分是目录
1.开篇
2. PySpark介绍
3. PySpark基础准备
3.1 PySpark安装
3.2 掌握PySpark执行环境入口对象的构建
3.3 理解PySpark的编程模型
4. PySpark#xff1a;RDD对象数据输入
4.1 RDD对象概念#xff1a;PySpark支持多种数据的输入#xff0c;完成后会返回RDD类的对…目录
1.开篇
2. PySpark介绍
3. PySpark基础准备
3.1 PySpark安装
3.2 掌握PySpark执行环境入口对象的构建
3.3 理解PySpark的编程模型
4. PySparkRDD对象数据输入
4.1 RDD对象概念PySpark支持多种数据的输入完成后会返回RDD类的对象
4.2 Python数据容器转RDD对象.parallelize(数据容器对象)
4.3 RDD存在很多计算的方法
4.4 读取文件转RDD对象通过SparkContext入口对象来读取文件构建RDD对象
5. PySparkRDD对象数据计算一
5.1 给Spark设置环境变量不设置的时候控制台会报错出现找不到python.exe解释器的情况
5.2 RDD的map方法将RDD的数据根据函数进行一条条处理
5.3 RDD的flatMap方法基本和map一样但是多一个功能将嵌套list给转成单list[[1, 2, 3], [4, 5, 6]]转成[1, 2, 3, 4, 5, 6]
5.4 RDD的reduceByKey方法将key分组后进行value逻辑处理
6. 数据计算案例一完成使用PySpark进行单词技术的案例
7. PySparkRDD对象数据计算二
7.1 RDD的filter方法传入T泛型数据返回bool为false 的数据丢弃为true的数据保留函数对RDD数据逐个处理得到True的保留至返回值的RDD中
7.2 RDD的distinct方法对RDD数据进行去重返回新RDD
7.3 RDD的sortBy方法对RDD的容器按照指定规则排序返回新RDD
8. 数据计算案例二计算城市中的商品以及销售额
8.1 需求
8.2 文件数据
8.3 需求一实现处理结果自动返回的是一个二元元组
8.4 需求二实现将字典中的数据处理返回一个list
8.5 需求三实现过滤除北京的数据并只返回一个参数category是list列表并进行去重去重后的结果进行collect输出
9. 将RDD的结果数据输出为Python对象的各类方法 导航
Python第二语言一、Python start-CSDN博客
Python第二语言二、Python语言基础-CSDN博客
Python第二语言三、Python函数def-CSDN博客
Python第二语言四、Python数据容器-CSDN博客
Python第二语言五、Python文件相关操作-CSDN博客
Python第二语言六、Python异常-CSDN博客
Python第二语言七、Python模块-CSDN博客
Python第二语言八、Python包-CSDN博客
Python第二语言九、Python第一阶段实操-CSDN博客
Python第二语言十、Python面向对象上-CSDN博客
Python第二语言十一、Python面向对象下-CSDN博客
Python第二语言十二、SQL入门和实战-CSDN博客
Python第二语言十三、PySpark实战-CSDN博客
Python第二语言十四、高阶基础-CSDN博客 1.开篇
PySpark大数据计算第三方库Spark是大数据开发的核心技术python的spark中使用map时 Python worker exited unexpectedly (crashed) 将原本的python12解释器降低版本到python10版本解释器降低python解释器版本因为版本不兼容 记得下载使用的包
2. PySpark介绍
Apache Spark是用于大规模数据large-scala data处理的统一unifield分析引擎Spark是一款分布式的计算框架用于调度成百上千的服务器集群计算TB、PB乃至EB级别的海量数据Python On SparkPython语言是Spark重点支持的方向
PySpark第三方库
PySpark是由Spark官方开发的Python语言第三方库Python开发者可以使用pip程序快速安装PySpark并像其它第三方库一样使用主要作用 进行数据处理提交至Spark集群进行分布式集群计算
3. PySpark基础准备
3.1 PySpark安装
安装命令 pip install pyspark
加速下载命令pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark 3.2 掌握PySpark执行环境入口对象的构建
PySpark是分布式集群的操作setMaster(xxx).\setAppName(xxx)是用来控制集群的代码图中代码用的是单机的setAppName是Spark任务的名称PySpark的执行环境入口对象是类SparkContext的类对象所有PySpark的功能都是从SparkContext对象作为开始
# 导包
from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf SparkConf().setMaster(local[*]).\setAppName(test_spark_app)# 基于SparkConf类对象创建SparkContext类对象
sc SparkContext(confconf)# 打印PySpark的运行版本
print(sc.version)# 停止SparkContext对象的运行
sc.stop() 3.3 理解PySpark的编程模型
SparkContext类对象是PySpark编程中一切功能的入口
PySpark的编程三大步骤 数据输入通过SparkContex类对象的成员方法完成数据的读取操作读取后得到RDD类对象数据处理计算通过RDD类对象的成员方法完成各种数据计算的需求数据输出将处理完成后的RDD对象调用各种成员方法完成写出文件转换位list等操作
4. PySparkRDD对象数据输入
RDD就是PySpark计算后返回的对象容器
4.1 RDD对象概念PySpark支持多种数据的输入完成后会返回RDD类的对象
RDD全称为弹性分布式数据集Resilient Distributed Datasets
PySpark针对数据的处理都是以RDD对象作为载体 数据存储在RDD内各类数据的计算方法也都是RDD的成员方法RDD的数据计算方法返回值依旧是RDD对象比如说JSON文件、文本文件、数据库数据都是可以通过SparkContext类对象经过RDD对象的处理并返回给文件文件或JSON文件或者数据库
4.2 Python数据容器转RDD对象.parallelize(数据容器对象)
提示 字符串会被拆分出1个个的字符存入RDD对象字典仅有key会被存入RDD对象RDD对象返回的是容器与list一样结果
from pyspark import SparkConf, SparkContextconf SparkConf().setMaster(local[*]).setAppName(test_spark_app)
sc SparkContext(confconf)# 通过parallelize方法将Python对象加载到Spark内称为RDD对象
rdd1 sc.parallelize([1, 2, 3, 4, 5])
rdd2 sc.parallelize((1, 2, 3, 4, 5))
rdd3 sc.parallelize(abcdefg)
rdd4 sc.parallelize({1, 2, 3, 4, 5})
rdd5 sc.parallelize({key1: value1, key2: value2})# 使用collect方法查看RDD中的内容
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())sc.stop() 4.3 RDD存在很多计算的方法 4.4 读取文件转RDD对象通过SparkContext入口对象来读取文件构建RDD对象
from pyspark import SparkConf, SparkContextconf SparkConf().setMaster(local[*]).setAppName(test_spark_app)
sc SparkContext(confconf)# 通过textFile方法读取文件数据加载到Spark内成为RDD对象
rdd sc.textFile(dataText)# 打印RDD内容
print(rdd.collect())
sc.stop() 小结
RDD对象称之为分布式弹性数据集是PySpark中数据计算的载体可以 提供数据存储提供数据计算的各类方法数据计算的方法返回值依旧是RDDRDD迭代计算
5. PySparkRDD对象数据计算一
可以对list容器计算可以对dict字典容器计算可以对str字符串进行计算所有的容器都可以通过RDD计算
5.1 给Spark设置环境变量不设置的时候控制台会报错出现找不到python.exe解释器的情况
os.path.exists 返回值为True或False
from pyspark import SparkConf, SparkContext
import os# 配置Spark环境变量
os.environ[PYSPARK_PYTHON] C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe# 检查PYSPARK_PYTHON路径
print(os.path.exists(C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe))
# 检查PYSPARK_DRIVER_PYTHON路径
print(os.path.exists(C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe)) 5.2 RDD的map方法将RDD的数据根据函数进行一条条处理
1. 介绍
RDD对象内置丰富的成员方法算子map算子是将RDD的数据一条条处理处理的逻辑是将python中的函数作为参数进行传递这个函数参数会将RDD种的每条数据都进行处理最终返回一个新的RDD对象 map()中的参数 (T) → UT代表传入一个参数U代表一个返回值意思代表传入的参数是一个还有一个返回值T是泛型不用指定数据类型map()中的参数 (T) → TT代表传入一个参数T代表一个返回值意思代表传入的参数是一个还有一个返回值T是泛型传入的是什么值那么返回的就是什么类型
2. func函数传递 func函数作为参数代表的是RDD中的每个值都会进行func函数的处理是RDD中的每一个元素都会被RDD处理一遍
可以简写成rdd2 rdd.map(lambda x: x * 10) # 简写的函数 3. 案例
这里存在一个大坑如果是python312版本去使用map函数会报错 Python worker exited unexpectedly (crashed) 降低版本即可我用的版本10结果RDD中的每一个元素都会被传递给func进行处理*10操作
from pyspark import SparkConf, SparkContext
import os# 配置Spark环境变量
os.environ[PYSPARK_PYTHON] C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe
conf SparkConf().setMaster(local[*]).setAppName(test_spark_app)sc SparkContext(confconf)# 准备一个RDD
rdd sc.parallelize([1, 2, 3, 4, 5])
rdd2 rdd.map(lambda x: x * 10) # 简写的函数
print(rdd2.collect())
sc.stop() 4. map链式调用
from pyspark import SparkConf, SparkContext
import os# 配置Spark环境变量
os.environ[PYSPARK_PYTHON] C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe
conf SparkConf().setMaster(local[*]).setAppName(test_spark_app)sc SparkContext(confconf)rdd sc.parallelize([1, 2, 3, 4, 5])rdd2 rdd.map(lambda x: x * 10).map(lambda x: x 5) # 链式调用将map进行第一个*10数据计算再进行map5数据计算print(rdd2.collect()) 5. 小结
map算子成员方法 接受一个处理函数可用lambda表达式快速编写对RDD内的元素逐个处理并返回一个新的RDD链式调用对于返回值是新RDD的算子可以通过链式调用的方式多次调用算子
5.3 RDD的flatMap方法基本和map一样但是多一个功能将嵌套list给转成单list[[1, 2, 3], [4, 5, 6]]转成[1, 2, 3, 4, 5, 6]
from pyspark import SparkConf, SparkContext
import osif __name__ __main__:os.environ[PYSPARK_PYTHON] C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark_app)sc SparkContext(confconf)rdd sc.parallelize([zhangSan lisi yiyi, zhangSan yiyi wangWu, wangWu yiyi zhangSan])print(rdd.map(lambda x: x.split( )).collect())print(-----------------------------------------)print(rdd.flatMap(lambda x: x.split( )).collect()) # 将嵌套list转成单list对数据接触嵌套 5.4 RDD的reduceByKey方法将key分组后进行value逻辑处理 二元元组[(a, 1), (a, 1), (b, 1)]这就是二元元组元组中只有两个元素 自动按照key分组完成组内数据value的聚合操作就是会按照元组中的key就是a, a, b进行key的value聚合1, 1, 1是valuevalue聚合的逻辑是按照传入的func函数逻辑来进行聚合 假设这是二元元组数据要进行reduceByKey算子处理
reduceByKey计算方式
1. 思路
先分组key值等于a和a一组b和b一组然后在进行函数lambda a, b: ab进行处理也即是分组后aaa, bbbb结果[(b, 3), (a, 2)]再解释b有三个值那么lambda a, b: ab中表示的是b1, 1, 1 的三个值去进行函数处理的时候先是第一个1和第二1进行相加这时候相加是ab分组后与key无关系那么第一个1和第二个1相加后等于2这时候发现还有第三个1这时候再次把第一次相加的结果与第三个1进行ab处理21是前后者参数的相加处理最终得到按照key分组聚合value的结果最终解释将数据分组后每个组的数据进行lambda a, b: a b 操作每个组中的数据进行a b操作意思就是将当前组的所有value进行相加操作
2. 实现
功能针对KV型RDD自动按照key分组然后根据提供的聚合逻辑完成组内数据value的聚合操作rdd.reduceByKey(func) from pyspark import SparkConf, SparkContext
import osif __name__ __main__:os.environ[PYSPARK_PYTHON] C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark_app)sc SparkContext(confconf)rdd sc.parallelize([(a, 1), (a, 1), (b, 1), (b, 1), (b, 1)])result rdd.reduceByKey(lambda a, b: a b) # 分组计算print(result.collect())
6. 数据计算案例一完成使用PySpark进行单词技术的案例
题目读取文件求出文件中单词出现的次数文件 思路 先将字符串进行读取然后按照空格分割[key, key]在进行分割后的数组重组为key, 1 的形式后面利用rdd的reduceByKey方法将分组后的key进行聚合操作因为value都是1所以可以得出对单词出现的次数进行统计操作 根据 (key, 1) 重组后的数据应该是 [(key1, 1), [key1, 1], (key2, 1), [key2, 1]] 然后得出最终结果 from pyspark import SparkConf, SparkContext
import osif __name__ __main__:os.environ[PYSPARK_PYTHON] C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark_app)sc SparkContext(confconf)# 1.读取数据文件假设你有一个大文件里面有 300MB 的数据如果你指定分区数为 3Spark 会尝试将这个文件分成 3 个分区每个分区大约 100MB。如果你的集群有 3 个节点每个节点可以并行处理一个分区这样就可以更快地完成任务。file sc.textFile(word, 3) # (xx , 3)3是指文件被分成的最小分区数partitions# 2.将所有单词读取出来words file.flatMap(lambda line: line.split( )) # 结果[python, java, ...]# 3.将所有单词加1做valueword_one words.map(lambda x: (x, 1)) # 结果[(python, 1), (java, 1), (php, 1), (c#, 1),...]# 4.分组并求和result word_one.reduceByKey(lambda a, b: a b)# 5.打印结果print(result.collect())
7. PySparkRDD对象数据计算二
7.1 RDD的filter方法传入T泛型数据返回bool为false 的数据丢弃为true的数据保留函数对RDD数据逐个处理得到True的保留至返回值的RDD中 功能过滤想要的数据进行保留filter算子作用 接受一个处理函数可用lambda快速编写函数对RDD数据逐个处理得到True的保留至返回值的RDD中
from pyspark import SparkConf, SparkContext
import osif __name__ __main__:os.environ[PYSPARK_PYTHON] C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark_app)sc SparkContext(confconf)rdd sc.parallelize([1, 2, 3, 4, 5])# 保留基数print(rdd.filter(lambda x: x % 2 1).collect()) 7.2 RDD的distinct方法对RDD数据进行去重返回新RDD
from pyspark import SparkConf, SparkContext
import osif __name__ __main__:os.environ[PYSPARK_PYTHON] C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark_app)sc SparkContext(confconf)rdd sc.parallelize([1, 1, 2, 3, 4, 5, 4, 5])# 对rdd对象进行去重print(rdd.distinct().collect()) 7.3 RDD的sortBy方法对RDD的容器按照指定规则排序返回新RDD
func: (T) → U告知按照rdd中的哪个数据进行排序比如lambda x: x[1] 表示按照rdd中的第二列元素进行排序numPartitions目前默认就为1
结果
按照元组tople中的第二位元素进行排序按照降序
lambda x: x[1]计算规则将所有容器的每一个元素按照函数规则处理x是遍历的元组x[1]是传入的元组的第二位元素所以规则就是按照元组的第二位元素进行降序排序
from pyspark import SparkConf, SparkContext
import osif __name__ __main__:os.environ[PYSPARK_PYTHON] C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark_app)sc SparkContext(confconf)rdd sc.parallelize([(zhangSan, 99), (lisi, 88), (wangWu, 100)])# 对结果进行排序final_rdd rdd.sortBy(lambda x: x[1], ascendingFalse, numPartitions1)print(final_rdd.collect()) sortBy算子小结 接收一个处理函数可用lambda快速编写函数表示用来决定排序的依据可以控制升序或降序全局排序需要设置分区数为1
8. 数据计算案例二计算城市中的商品以及销售额
8.1 需求 需求一各个城市销售额排名从大到小 先按行读取文件并对json进行split分割按照|符号得到最终的字典使用Spark.reduceByKey进行分组分组时传递func计算函数将所有分组后的城市销售额进行ab的形式聚合起来最终得到结果并按照降序的排序方式排序输出 需求二全部城市有哪些商品类别在售卖 文件读取后将城市的categpry商品类别distinct使用去重 需求三北京市有哪些商品类别在售卖 将除了北京市的所有数据进行filter过滤过滤后只留下category并进行去重得到结果
8.2 文件数据 {id:1,timestamp:2024-06-01T01:03.00Z,category:电脑,areaName:杭州,money:3000}|{id:2,timestamp:2024-06-01T01:03.00Z,category:电脑,areaName:杭州,money:3500}
{id:3,timestamp:2024-06-01T01:03.00Z,category:食品,areaName:杭州,money:3000}|{id:4,timestamp:2024-06-01T01:03.00Z,category:食品,areaName:杭州,money:3700}
{id:5,timestamp:2024-06-01T01:03.00Z,category:服饰,areaName:北京,money:3000}|{id:6,timestamp:2024-06-01T01:03.00Z,category:服饰,areaName:北京,money:3900}
8.3 需求一实现处理结果自动返回的是一个二元元组
from pyspark import SparkConf, SparkContext
import osif __name__ __main__:os.environ[PYSPARK_PYTHON] C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark_app)sc SparkContext(confconf)# 1.读取文件得到RDDfile_rdd sc.textFile(orders)# 2. 取出一个个JSON字符串json_str_rdd file_rdd.flatMap(lambda x: x.split(|))# 3. 将一个个JSON字符串转换为字典dict_rdd json_str_rdd.map(lambda x: json.loads(x))# print(dict_rdd.collect())# 4.取出城市和销售额数据city_with_money_rdd dict_rdd.map(lambda x: (x[areaName], int(x[money])))# 5.按城市分组按销售额聚合city_result_rdd city_with_money_rdd.reduceByKey(lambda a, b: a b)# 6.按销售额聚合结果进行排序result_rdd city_result_rdd.sortBy(lambda x: x[1], ascendingFalse, numPartitions1)print(需求1的结果, result_rdd.collect())
前三步数据结果 完整数据结果 8.4 需求二实现将字典中的数据处理返回一个list
from pyspark import SparkConf, SparkContext
import osif __name__ __main__:os.environ[PYSPARK_PYTHON] C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark_app)sc SparkContext(confconf)# 1.读取文件得到RDDfile_rdd sc.textFile(orders)# 2. 取出一个个JSON字符串json_str_rdd file_rdd.flatMap(lambda x: x.split(|))# 3. 将一个个JSON字符串转换为字典dict_rdd json_str_rdd.map(lambda x: json.loads(x))# 4.取出全部的商品类别category_rdd dict_rdd.map(lambda x: x[category]).distinct()print(需求2的结果, category_rdd.collect()) 8.5 需求三实现过滤除北京的数据并只返回一个参数category是list列表并进行去重去重后的结果进行collect输出
from pyspark import SparkConf, SparkContext
import osif __name__ __main__:os.environ[PYSPARK_PYTHON] C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark_app)sc SparkContext(confconf)# 1.读取文件得到RDDfile_rdd sc.textFile(orders)# 2. 取出一个个JSON字符串json_str_rdd file_rdd.flatMap(lambda x: x.split(|))# 3. 将一个个JSON字符串转换为字典dict_rdd json_str_rdd.map(lambda x: json.loads(x))# 4. 过滤北京的数据beijing_data_rdd dict_rdd.filter(lambda x: x[areaName] 北京)# 5.取出全部商品类别result_rdd beijing_data_rdd.map(lambda x: x[category]).distinct()print(需求3的结果, result_rdd.collect()) 9. 将RDD的结果数据输出为Python对象的各类方法
数据输出将RDD输出的值转成文件或Python对象collect算子将各个分区内的数据统一收集到Driver中形成一个list对象reduce算子对RDD数据集按照你传入的逻辑进行聚合task算子取出RDD的前N个元素组合成list返回count算子计算RDD有多少条数据返回值是一个数字
from pyspark import SparkConf, SparkContext
import osos.environ[PYSPARK_PYTHON] C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe
conf SparkConf().setMaster(local[*]).setAppName(test_spark_app)
sc SparkContext(confconf)if __name__ __main__:rdd sc.parallelize([1, 2, 3, 4, 5])# collect算子输出RDD为list对象rdd_list: list rdd.collect()print(collect算子结果, rdd_list)print(collect算子类型是, type(rdd_list))# reduce算子对RDD进行两两聚合num rdd.reduce(lambda a, b: a b)print(reduce算子结果, num)# take算子取出RDD前N个元素组成list返回take_list rdd.take(3)print(take算子结果, take_list)# count统计rdd内有多少条数据返回值为数字num_count rdd.count()print(count算子结果, num_count)