做网站需要学习编程吗,深圳市官方网站,上海企业名录大全官网,郑州网站开发工程师Spark 的介绍与搭建#xff1a;从理论到实践-CSDN博客
Spark 的Standalone集群环境安装与测试-CSDN博客
PySpark 本地开发环境搭建与实践-CSDN博客
目录
一、本地开发与远程提交测试
#xff08;一#xff09;问题背景
#xff08;二#xff09;解决方案
集群环境准…Spark 的介绍与搭建从理论到实践-CSDN博客
Spark 的Standalone集群环境安装与测试-CSDN博客
PySpark 本地开发环境搭建与实践-CSDN博客
目录
一、本地开发与远程提交测试
一问题背景
二解决方案
集群环境准备
Windows 指定远程解析器
问题
验证是否已经安装
三代码提交到集群
自动提交
手动提交
四代码实现与参数传递
单词统计案例
运行过程中出现错误
修改配置文件
参考
二、集群提交spark - submit 脚本
一复习
二spark - submit 的用法
任务的命令格式
具体案例讲解
参数详解
三Driver 驱动进程和 Executor 计算进程
四实战案例
WordCount 程序提交示例
编写提交任务的命令
本地模式测试
Standalone 模式
五Driver 启动位置client 模式与 cluster 模式
基本概念
工作流程对比编辑
集群提交deploy mode
三、总结 在大数据处理领域Apache Spark 是一款强大的分布式计算框架。它提供了高效、灵活的数据处理能力广泛应用于各种数据密集型应用场景。无论是在本地开发环境还是在集群环境中正确地开发和提交 Spark 程序都是至关重要的。本文将深入探讨 Spark 程序在本地开发并远程提交到集群测试的过程以及使用 spark - submit 脚本在集群模式下提交程序的相关知识包括参数配置、运行模式等内容。 一、本地开发与远程提交测试 一问题背景 在本地开发 Spark 程序时常常会面临一些挑战。比如当使用 HDFS 上的数据进行开发时数据量可能过大导致本地运行消耗大量本地资源且运行时间很长。因此需要一种便捷的方法能够在本地编写代码后通过简单操作将代码自动提交到集群上运行。 二解决方案 集群环境准备
首先要启动集群在第一台机器上执行以下命令
start - dfs.sh
cd /opt/installs/spark
sbin/start - master.sh
sbin/start - workers.sh
sbin/start - history - server.shWindows 指定远程解析器 创建一个用于同步本地代码到服务上的文件夹。在 Linux 上创建同步目录将 Windows 上的代码和数据同步到/root/pyspark_code目录下
mkdir -p /root/pyspark_code这样后续右键运行代码时就可以将代码自动提交给集群运行。如果需要换回本地运行可以相应地切换环境。 问题
解决方案在bigdata01中安装pyspark。
我们在linux上配置了阿里云的源不代表在base中也可以使用运行时需要带上源地址
pip install pyspark3.1.2 -i https://pypi.tuna.tsinghua.edu.cn/simple/
验证是否已经安装
pip list 或者 conda list
也可以这么干
pip list | grep pyspark
下载完成之后重启一下 pycharm. 三代码提交到集群 自动提交 手动提交 同步后成功会显示绿色在linux上查看有代码 四代码实现与参数传递 单词统计案例
import os
import timefrom pyspark import SparkContext, SparkConf
import sysif __name__ __main__:# 配置环境os.environ[JAVA_HOME] /opt/installs/jdk# 配置Hadoop的路径就是前面解压的那个路径os.environ[HADOOP_HOME] /opt/installs/hadoop# 配置base环境Python解析器的路径os.environ[PYSPARK_PYTHON] /opt/installs/anaconda3/bin/python3 # 配置base环境Python解析器的路径os.environ[PYSPARK_DRIVER_PYTHON] /opt/installs/anaconda3/bin/python3# 获取sc 对象conf SparkConf().setMaster(spark://bigdata01:7077).setAppName(wordcount单词统计)sc SparkContext(confconf)print(sc)# 编写各种需求代码print(sys.argv[0])# 读取hdfs上的数据fileRdd sc.textFile(sys.argv[1])filterRdd fileRdd.filter(lambda line:len(line.strip()) 0)flatMapRdd filterRdd.flatMap(lambda line: line.split())mapRdd flatMapRdd.map(lambda word: (word,1))reduceBykeyRdd mapRdd.reduceByKey(lambda total,num : total num)reduceBykeyRdd.saveAsTextFile(sys.argv[2])#time.sleep(10000)# 关闭scsc.stop()运行的话直接在本地右键运行即可以上代码需要传递参数所以在ide工具中写。 运行过程中出现错误
Initial job has not accepted any resources; check your cluster UI to ensure that workers are registe有可能是以下几个问题 1、集群资源不足关闭掉没用的资源 2、目前集群中有其他任务占用了资源直接关闭掉 修改配置文件
修改 worker 的内存大小
如何关闭呢http://bigdata01:8080
选择任务后面的 【kill】 按钮
参考
spark报错WARN TaskSchedulerImpl: Initial job has not accepted any resources check your cluster UI..._initial job has not accepted any resources; check -CSDN博客 二、集群提交spark - submit 脚本 一复习 在 Pycharm 中开发好的程序要在 Linux 中执行可以使用spark - submit脚本。例如
/opt/installs/spark/bin/spark - submit --master spark://bigdata01:7077 /opt/installs/spark/examples/src/main/python/pi.py 200spark - submit的作用是将写好的代码提交到 Spark 环境可能是本地或集群环境运行它是基于 Linux 平台实现 Spark 程序提交运行的客户端工具。 二spark - submit 的用法 任务的命令格式
提交任务的命令
spark-submit [options] app jar | pythonfile | R file [app arguments]杀死一个正在运行的任务
spark-submit --kill [submission ID] --master [spark://...]查看某个任务的状态
spark-submit --status [submission ID] --master [spark://...]以上这些其实只需要学习如何提交任务即可因为我们有图形化界面 http://bigdata01:8080 具体案例讲解
# 提交程序的语法
# spark-submit [可选的选项] Python文件 Python文件中用到的参数
spark-submit --master local[2] / spark://bigdata01:7077 / yarn \
……
hdfs://bigdata01:9820/spark/app/pyspark_core_word_args.py /spark/wordcount/input /spark/wordcount/output
参数详解 --master用于指定程序运行的模式有本地模式--master local、Standalone 模式--master spark://master:7077、YARN 模式--master yarn等 5 种模式其作用等同于在代码中设置setMaster。--deploy - mode用于指定 Driver 进程运行位置重点内容后面详细讲解。--name用于指定程序的名称等同于代码中的setAppName。--jars指定一些额外的 jar 包如读写 MySQL 时需要的 MySQL 驱动包。--conf指定当前程序运行的额外配置等同于代码中的set。 三Driver 驱动进程和 Executor 计算进程
Spark 程序在集群模式运行时会启动两种进程Driver 驱动进程和 Executor 计算进程每种进程运行都需要资源。 Driver 进程每个程序有 1 个负责申请资源、解析代码、构建 Job/Task、分配调度 Task、监控 Task 运行构建有向无环图 DAG。Executor 进程每个程序可以有多个分布式启动在多台节点上负责运行 Driver 分配的 Task 任务。可以类比 MR 程序运行在 YARN 上Driver 类似于 Application Master项目经理Executor 类似于 Container包含 Map Task、ReduceTask 等进程。 此外还有一些与资源相关的参数 Driver 资源选项 --driver - memory指定 Driver 进程能够使用的内存大小默认是 1G。--driver - cores指定 Driver 进程能够使用的 CPU 核数默认是 1Core。--supervise指定如果 Driver 故障就自动重启。Executor 资源选项 --executor - memory指定每个 Executor 能够使用的内存。--executor - cores指定每个 Executor 能够使用的 CPU。--total - executor - coresStandalone 集群模式下指定所有 Executor 总共使用的 CPU 核数用于间接指定 Executor 的个数。--num - executorsYARN 集群模式下直接指定 Executor 的个数。--queue指定提交程序到哪个队列中运行。 以上这些参数还可以直接写在代码中可以配置在conf文件
加载顺序优先级代码中配置【set】 参数选项【--conf】 配置文件【公共配置spark-defualt.conf】 默认配置 - 集群管理器配置 - 环境变量配置 - 命令行参数 - 用户提供的SparkConf配置 - 执行时环境变量配置 优先级最高的 用户提供的SparkConf配置 命令行参数 公共的配置spark-default.conf 默认配置 四实战案例 WordCount 程序提交示例
上传 Python 代码文件时要注意注释掉代码中的master、本地环境变量相关内容因为代码中的设置优先级较高去掉这些才能使用spark - submit中编写的配置。以下是一个示例代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-from pyspark import SparkContext, SparkConf
import os
import sys
-------------------------------------------------Description : TODO用于实现词频统计SourceFile : 04.pyspark_core_wordcount_hdfs_args
-------------------------------------------------
if __name__ __main__:# todo:0-设置系统环境变量# os.environ[JAVA_HOME] D:/jdk1.8.0_241# os.environ[HADOOP_HOME] D:/hadoop-3.3.0# os.environ[PYSPARK_PYTHON] D:/Anaconda/python.exe# os.environ[PYSPARK_DRIVER_PYTHON] D:/Anaconda/python.exe# os.environ[HADOOP_USER_NAME] root# todo:1-构建SparkContext# 甚至 任务的名字都可以不写让提交任务的时候指定conf SparkConf().setAppName(SparkSubmitApp)# .setMaster(local[2])\sc SparkContext(confconf)# todo:2-数据处理读取、转换、保存# step1: 读取数据SparkContext对象负责读取文件用传递的第二个参数作为程序的输入地址input_rdd sc.textFile(sys.argv[1])# 输出第一行# print(input_rdd.first())# 打印总行数# print(input_rdd.count())# step2: 处理数据rs_rdd input_rdd\.filter(lambda line: len(line.strip()) 0)\.flatMap(lambda line: line.strip().split( ))\.map(lambda word: (word, 1))\.reduceByKey(lambda tmp,item: tmpitem)# step3: 保存结果# 打印结果rs_rdd.foreach(lambda x: print(x))# 结果保存到文件中路径不能提前存在将第二个参数作为输出路径rs_rdd.saveAsTextFile(sys.argv[2])# todo:3-关闭SparkContextsc.stop() 编写提交任务的命令
本地模式测试
/opt/installs/spark/bin/spark-submit \
--master local[2] \
/home/pyspark_core_word_args.py \
/home/data.txt \
/home/output01如果出现以上这个是因为本地模式之前配置集群时软连接已经被毁了读取数据默认是去hdfs上读取的没结果假如你想运行创建软连接
rm -rf /opt/installs/spark
ln -s /opt/installs/spark-local /opt/installs/spark Standalone 模式
创建软连接
rm -rf /opt/installs/spark
ln -s /opt/installs/spark-standalone /opt/installs/spark
没指定资源的写法
/opt/installs/spark/bin/spark-submit \
--master spark://bigdata01:7077 \
/home/_06_WordCount_远程.py \
hdfs://bigdata01:9820/spark/wordcount/input \
hdfs://bigdata01:9820/spark/wordcount/jiqun01
指定资源的写法
方法一
/opt/installs/spark/bin/spark-submit \
--master spark://bigdata01:7077 \
--driver-memory 512M \
--driver-cores 1 \
--supervise \
--executor-memory 1G \
--executor-cores 1 \
--total-executor-cores 2 \
/home/_06_WordCount_远程.py \
hdfs://bigdata01:9820/spark/wordcount/input \
hdfs://bigdata01:9820/spark/wordcount/jiqun02
方法二
/opt/installs/spark/bin/spark-submit \
--master spark://bigdata01:7077 \
--deploy-mode client \
--driver-memory 1G \
--driver-cores 1 \
--supervise \
--executor-cores 1 \
--executor-memory 1G \
--total-executor-cores 2 \
/root/pyspark_code/main/Demo06.py /spark/wordcount/input/data.txt /spark/wordcount/output09 五Driver 启动位置client 模式与 cluster 模式 基本概念 --deploy-mode client | cluster 只有这两个 当你提交任务时假如等于 client , 此时Driver 进程在提交任务的那台机器上运行。假如在bigdata01上提交任务 spark-submit Driver 就在bigdata01上运行。 假如 等于cluster此时driver 在某一台服务器上运行。集群上哪一台都可以。 pyspark 不支持 cluster 支持 client 集群模式下任何一个Spark程序都包含两种进程Driver和Executor 程序提交以后会先启动Driver进程 Driver进程驱动进程每一个Spark程序都有1个向主节点申请资源去启动Executor进程 Driver等待所有Executor都启动完成会解析代码变成Task任务 Driver会将Task任务调度分配给Executor去运行并且监控所有Task运行 Executor进程计算进程每一个Spark程序都至少有1个 Executor进程会利用Worker节点的资源运行 所有Executor一旦启动成功向Driver反向注册负责运行Driver所分配所有Task任务 工作流程对比 step1客户端提交程序给主节点 step2主节点会根据提交参数在对应的位置启动Driver进程 step3Driver向主节点申请启动Executor计算进程 step4Master根据配置在Worker节点上启动对应的Executor step5所有Executor启动成功以后会向Driver反向注册 step6Driver解析代码根据代码构建Task分配给Executor运行并监控所有Task 集群提交deploy mode
程序启动之后Driver在哪里取决于提交任务的时候deploy mode 的值是什么
DEPLOY_MODE 这个值只有两个一个是client 一个是cluster
client默认值表示Driver运行在客户端节点上【在哪台机 器提交代码的哪台机器就是客户端】
cluster建议值表示Driver可以随机运行在某台从节点上【工作中一般都用cluster模式】
# deploy-mode client
/opt/installs/spark/bin/spark-submit \
--master spark://bigdata01:7077 \
--deploy-mode client \
--driver-memory 512M \
--driver-cores 1 \
--supervise \
--executor-memory 1G \
--executor-cores 1 \
--total-executor-cores 2 \/home/pyspark_core_word_args.py \
/spark/wordcount/input \
/spark/wordcount/output03# deploy-mode cluster
/opt/installs/spark/bin/spark-submit \
--master spark://bigdata01:7077 \
--deploy-mode cluster \
--driver-memory 512M \
--driver-cores 1 \
--supervise \
--executor-memory 1G \
--executor-cores 1 \
--total-executor-cores 2 \/home/pyspark_core_word_args.py \
/spark/wordcount/input \
/spark/wordcount/output05 client 模式该模式下Driver 充当了非常重要的角色任务在运行的时候必须保证Driver的服务正常运行。Driver需要做的事情很多任务在运行过程中Driver不能走。 cluster 模式在该模式下client端提交完之后就可以走了Driver进程放在了AppMaster里面spark集群将任务执行完即可。 假如deploy-modeclient 可以理解为胖客户端模式deploy-modecluster 可以理解为瘦客户端模式。 Exception in thread main org.apache.spark.SparkException: Cluster deploy mode is currently not supported for python applications on standalone clusters. 目前的版本中pysparks 在在不支持模式为cluster的写法所以会报以上错误请知晓换成scala等是不会有问题的。 以下这两张是Yarn集群的图帮助理解概念。
client模式的一个示意图 cluster模式 区别yarn 模式下就有 appmaster client模式 Driver进程和AppMaster是共存关系各玩各的 cluster模式Driver进程和AppMaster合二为一Driver在AppMaster里面。 deploy-mode 有 cluster 和 client 两个可选参数默认为 client。这里以 Spark On Yarn 模式对两者进行说明
在 cluster 模式下Spark Drvier 在应用程序的 Master 进程内运行该进程由集群上的 YARN 管理提交作业的客户端可以在启动应用程序后关闭在 client 模式下Spark Drvier 在提交作业的客户端进程中运行Master 进程仅用于从 YARN 请求资源。 spark-submit 提交任务时client模式和cluster模式有何不同。 以Yarn集群为例 client模式 Driver进程和AppMaster是 不在一起的各玩各的。Driver进程在提交命令的电脑上运行运行期间该服务器不能停止因为Client 在这个模式下起的作用很大。 cluster模式Driver进程和AppMaster合二为一Driver在AppMaster里面。Client端仅仅是提交了代码到集群提交完就没有什么事情了可以关闭。 三、总结 本文详细介绍了 Spark 程序从本地开发远程提交到集群测试的方法以及使用spark - submit脚本在集群模式下提交程序的相关知识。在本地开发时通过合理配置集群环境和同步代码目录可以方便地将代码提交到集群运行。而在集群提交方面spark - submit脚本的参数配置至关重要不同的参数如--master、--deploy - mode、各种资源相关参数等决定了程序在集群中的运行模式和资源分配情况。特别是--deploy - mode的client和cluster模式在 Driver 进程的启动位置和整个程序的运行机制上有很大区别。理解这些内容对于正确开发和高效运行 Spark 程序充分利用集群资源来处理大数据任务具有重要意义。无论是在开发过程中遇到资源问题的排查还是根据实际场景选择合适的提交模式和参数配置都需要对这些知识有深入的掌握以便更好地发挥 Spark 在大数据处理中的优势。