![Spark大数据编程实用教程](https://wfqqreader-1252317822.image.myqcloud.com/cover/942/36922942/b_36922942.jpg)
2.4 运行Spark程序(分布式方式)
本节介绍Spark程序的第二种运行方式:分布式运行,Spark程序分布在集群的多个节点上并行处理,这样可以利用集群强大的计算能力,且扩展方便。
Spark程序分布式运行要依赖特定的集群管理器,最常用的有 Yarn 和 Standalone。把Spark程序在Yarn上运行称为Spark on Yarn,同理,把Spark程序在Standalone上运行称为Spark on Standalone。
不管是Spark on Yarn还是Spark on Standalone,都统称为Spark程序的运行模式。
同时,根据 Client 和 Driver 是否在一个进程,又可以分为 client 和 cluster 两种部署模式,Spark on Yarn和Spark on Standalone都支持这两种部署模式。
因此,对Spark程序分布式运行来说,可以分为4类,如表2-1所示。本节将按照表中序号,依次介绍每种分类。
表2-1 Spark运行分类表
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_42_06.jpg?sign=1739280825-NDIrJHl88ADqPeY4VoEzZ5MApQGP0cJh-0-982e27e28e55b377b63f349332a8b408)
2.4.1 Spark on Yarn
如前所述,Spark on Yarn 有两种部署模式:client 和 cluster,本节针对这两种部署模式用示例进行说明。对于每个示例,会先介绍其具体的操作,然后介绍Spark程序在Yarn上的运行过程。
1.Spark on Yarn(client deploy mode)
本节以DFSReadWriteTest为例,说明Spark on Yarn 的client deploy mode。
DFSReadWriteTest 是 spark-examples_2.11-2.3.0.jar 自带的一个示例,它会读取本地文件进行单词计数,然后将本地文件上传到 HDFS,从 HDFS 读取该文件,使用 Spark 进行计数,最后比对两次计数的结果。
(1)提交Spark程序到Yarn上,以client deploy mode运行
运行命令如下。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_43_01.jpg?sign=1739280825-Sk1DWG8vZzZkrMUZtjZCYibgrheqSV1K-0-1c4e9b24332249e1007484a2396a3187)
具体参数说明如下。
●--class org.apache.spark.examples.DFSReadWriteTest,指明此次程序的Main Class;
●--master yarn,指明将程序提交到Yarn上运行;
●/home/user/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar,程序所在的jar包;
●/etc/profile,表示本地文件路径(注意,不需要用file:///来表示本地文件路径);
●/output,是文件输出路径(位于HDFS上)。
要运行该程序,除了Yarn启动外,还要确保HDFS也已启动;
Spark在Yarn上运行,不需要在Spark中startall.sh脚本,也就是说不需要启动Master、Worker这些和Standalone有关的组件;
deploy mode 可以通过 spark-submit 后面传参--deploy-mode 来指定,默认是 client 模式,即 spark-submit 后面如果不加--deploy-mode,则部署模式是 client,如果要指定 cluster 模式,则要加--deploy-mode cluster,从上面的命令可以看出,其部署模式是client。
程序执行后,先得到/etc/profile的单词计数结果A;将profile上传到HDFS的/output目录;读取该文件,并使用RDD操作进行单词计数,计数结果写回/output/dfs_read_write_test目录,存储为文件B;最后比较A和B,如结果正确,则输出如下。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_43_05.jpg?sign=1739280825-KxWLeoJ3VVplpSXHXKG8640baiORo96F-0-fff3ef18716c2f3560fb0cda3fc806c8)
初次运行程序时,可能会有以下两个报错。
报错1的报错信息如下所示。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_43_06.jpg?sign=1739280825-pfg6PnbBNPRyGtnkc7alslqy0Aes0TY2-0-25bc958591a1ebf459d902ac0f622114)
报错原因:没有设置环境变量:HADOOP_CONF_DIR 或YARN_CONF_DIR。
解决办法:在/etc/profile中增加下面的内容。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_43_07.jpg?sign=1739280825-q0YNEZcLOSEDPr3FyZpWCRCzHKXY3qSb-0-257739a8b7f49cf61cb275b3f2ae8f80)
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_44_01.jpg?sign=1739280825-yJ3MyFMZo7zxDvSCoREjcbXBRuGTMvUi-0-1834953d56210b2005be8fddfbb4a8c1)
切回到普通用户,使刚才的配置生效。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_44_02.jpg?sign=1739280825-nwfFEuRH5dERvVih1DUGUmcbfNfHdIaA-0-36832ddb079fc2c40b1a160748ab9381)
报错2的报错信息如下所示。
报错原因:Container(容器)的内存超出了虚拟内存限制,Container 的虚拟内存为2.1GB,但使用了2.3GB。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_44_03.jpg?sign=1739280825-l57jV8LV2Pm27d9UAtjUgukXd8icKSDV-0-c36a885dbf41b60204522388c5b0ebfd)
Container是Yarn上的资源抽象,Yarn上的Container目前包括CPU和内存两种资源;
提交到Yarn上的程序,会在Container中运行。例如,Spark on Yarn程序运行时,其Executor就是在Container中运行的。
解决办法:
方法一:改变分配 Container 的最小物理内存值,将 yarn.scheduler.minimum-allocation-mb设置成2GB,重启Yarn,每个Container向RM申请的虚拟内存为2GB×2.1=4.2GB。
yarn.scheduler.minimum-allocation-mb:默认值是1GB;
yarn.scheduler.maximum-allocation-mb:默认值是8GB。
方法二:改变分配Container的虚拟内存比例,将yarn.nodemanager.vmem-pmem-ratio设置成3,重启Yarn,每个Container向RM申请的虚拟内存为1GB×3=3GB。
yarn.nodemanager.vmem-pmem-ratio:默认值是2.1,是Container中虚拟内存/物理内存的值。
方法三:不检查虚拟机内存限制,将yarn.nodemanager.vmem-check-enabled设置为false,重启Yarn。
yarn.nodemanager.vmem-check-enabled:默认值是 true,它会检查 Container 中虚拟内存的使用是否超过yarn.scheduler.minimum-allocation-mb*yarn.nodemanager.vmem-pmem-ratio。
(2)Spark程序在Yarn上的执行过程(client deploy mode)
本例中,DFSReadWriteTest是一个Spark程序,部署模式是client,那么,它在Yarn上的执行过程是怎样的?图 2-6 列出了Spark程序在 Yarn 上的执行过程(client deploy mode),同MapReduce程序一样,Spark程序在Yarn上也是运行在Container之中的。具体说明如下。
1)client模式下,Client和Driver在一个进程内,向Resource Manager发出请求;
2)Resource Manager 指定一个节点启动 Container,用来运行 AM(Application Master);AM向Resource Manager申请Container来执行程序,Resource Manager向AM返回可用节点;
3)AM同可用节点的NodeManager通信,在每个节点上启动Container,每个Container中运行一个Spark的Executor,Executor再运行若干Tasks;
4)Driver 与 Executor 通信,向其分配 Task 并运行,并监测其状态,直到整个任务完成;
5)总任务完成后,Driver清理Executor,通知AM、AM向ResourceManager请求释放Cotainer,所有资源清理完毕后,AM注销并退出、Client退出。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_45_01.jpg?sign=1739280825-Bbhk0tvIUtwDxpsCDvD1WbttGVqGrU77-0-0b969ef597bdd9880ba20e9194515b8d)
图2-6 Spark程序在Yarn上执行过程图(client deploy mode)
AM的作用是Container的申请、释放和管理,它是Yarn中的一个概念;
Spark Driver负责Spark任务的管理和监控;
Client负责Spark任务的提交。
2.Spark on Yarn(cluster deploy mode)
本节继续以DFSReadWriteTest为例,说明Spark on Yarn的cluster deploy mode。
(1)提交DFSReadWriteTest到Yarn运行(cluster deploy mode)
下面使用Cluster模式在Yarn上运行Spark程序,命令如下,增加了--deploy-mode cluster的参数。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_45_05.jpg?sign=1739280825-UEmcSNvW1ZElBSkhQbOCq6uhCDIxkpfe-0-18f28408ccbfe4ef31f58901c8b2dbeb)
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_46_01.jpg?sign=1739280825-wqVHZroP1mwBdqfSXoYnMqyDLlE1lzbt-0-2d2b6038bb0b7ee9e8b37b36c54c92c6)
(2)Spark程序在Yarn上执行过程(cluster deploy mode)
在cluster deploy mode下,Spark on Yarn 执行过程如图2-7所示,Client和Driver分离,Driver在另一个节点,Driver和AM合并在同一个进程内,执行过程如下。
1)Client向ResourceManager提交Application请求;
2)ResourceManager指定一个节点,启动Container来运行AM和Spark Driver;AM根据任务情况向ResourceManager申请Container;ResourceManager返回可以运行Container的NodeManger;
3)AM与这些NodeManager通信,启动Container,在Container中运行Executor;
4)Spark Driver与Executor通信,向它们分配Task,并监控Task执行状态;
5)所有Task执行完毕后,清理Executor(总任务执行完毕后,有的Executor已经执行完毕,有的 Executor 可能还在执行 Task),清理完毕后,Driver 通知 AM,AM 请求ResourceManager,释放所有Container;Client收到Application FINISHED后退出。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_46_02.jpg?sign=1739280825-reDvQ4WrlcjAlGB2NvbyTubnilIuVD0y-0-c4681c2c801731e0b684d1a5d23554b2)
图2-7 Spark程序在Yarn上执行过程图(cluster deploy mode)
2.4.2 Spark on Standalone
Standalone是Spark自带的一个集群管理器,主/从式架构,包括Master和Worker两种角色,Master管理所有的Worker,Worker负责单个节点的管理。
Spark程序可以在Standalone上运行,好处是简单、方便,可以快速部署;缺点是不通用,只支持Spark,不支持MapReduce等,此外功能没有专门的集群管理器如Yarn等强大。
如前所述,Spark on Standalone有两种部署模式:client和cluster,本节针对这两种部署模式,用示例说明。每个示例,都会先介绍具体的操作,然后介绍Spark程序在 Standalone上的运行过程。
1.Spark on Standalone(client deploy mode)
(1)部署Standalone
本书的Spark Standalone框架部署如图2-8所示,分为6层,与之前类似,第1层到第5层是已有的,第6层为Standalone,也是本节需要构建的。Standalone包含在Spark Package中,因此,不需要要装额外的Package,直接在Spark中配置即可。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_47_01.jpg?sign=1739280825-3rKuuc89o2IKaViEPsJQ8Ygi8fRK2VXo-0-539eaa8cddb8f6ae8409c358ac532b90)
图2-8 Spark Standalone部署图
Standalone的Master和Worker都部署在scaladev虚拟机上,具体步骤如下。
1)配置slaves文件,该文件保存了整个集群中被管理节点的主机名。先复制模板文件;
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_47_02.jpg?sign=1739280825-cH9q4QVIIVMMsTDqAMykyGMe2QQqbjUm-0-4a91bb161183513c7773cae575b547ff)
2)编辑slaves文件;
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_47_03.jpg?sign=1739280825-4MFpDA27BDQ1op949rQzbPROpsD8N6e8-0-05c0f6fb17798f5309e4bf91ff6b4e90)
3)将localhost修改为scaladev;
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_47_04.jpg?sign=1739280825-zShZslWKTumhioYxReAkM2TJW0dSewJN-0-8edd4e54568bf9bdb76293ae7414fdb2)
4)添加JAVA_HOME;
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_47_05.jpg?sign=1739280825-T1IigYljkm95VUS17kfTWqryb9LHWs8U-0-3651e08d369a988de830b4fcf1b314a0)
5)编辑spark-env.sh文件;
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_47_06.jpg?sign=1739280825-MsCX7MmPKv8NdrKESKOoExCofOMBxxmZ-0-bdfd397f5afcfe536c1dd44c95c9a931)
6)在最后一行增加下面的内容;
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_48_01.jpg?sign=1739280825-HUinsdHBcd4UPNEkA6uuniskleclzoyt-0-28895a5d35c2836927acf4df848ced7d)
7)启动Standalone集群;
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_48_02.jpg?sign=1739280825-LgKkUSJvBstPWIQKVXTfjyj4RCXbb5II-0-b1501b59700bfb02ffe88fb7acaa1d5e)
8)验证,使用jps查看当前运行的Java进程,如下所示,如果Master和Worker都在,则说明启动、配置成功;
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_48_03.jpg?sign=1739280825-FkBgKvDP316VBeQDjVcMi0aVAn87eCpC-0-7ae10f372bca771b1d56df9f4ef1da26)
9)查看Standalone的Web监控界面,如图2-9所示。
在Host浏览器中输入http://192.168.0.226:8080,其中192.168.0.226是scaladev的IP地址,是Master所在的IP。在Web监控界面可以查看集群信息、Spark的Application运行信息等。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_48_04.jpg?sign=1739280825-ts11YFElLOuiDIrYUE7Aj8Nddpm8ehoU-0-875805c7c3098c466dc628b08cd9d4bc)
图2-9 Standalone的Web界面
(2)提交Spark程序到Standalone上,以client deploy mode运行
提交前应确保HDFS已经启动,HDFS上/output目录下已经清空。具体命令如下。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_48_05.jpg?sign=1739280825-JziJnqc9lJbzx54eDBGq9BMTYBxR2Dnd-0-07f5aa98e6fd941cebf0ade93e1ccafb)
其中,-master spark://scaladev:7077表示连接Standalone集群,scaladev是Master所在的主机名,没有指定--deploy-mode cluster,则部署模式为默认的client。
(3)Spark程序在Standalone上的运行过程(client deploy mode)
client部署模式下,Spark程序在Standalone的运行过程如图2-10所示。
1)Client初始化,内部启动Client模块和Driver模块,并向Master发送Application请求;
2)Master接收请求,为其分配Worker,并通知Worker启动Executor;
3)Executor向Driver注册,Driver向Executor发送Task,Executor执行Task,并反馈执行状态,Driver再根据Excutorer的当前情况,继续发送Task,直到整个Job完成。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_49_01.jpg?sign=1739280825-RH1R7EPUygi7GUYxRWB5iESf0cECfRVa-0-5fa937526d93d31ac3d57c4bd8da7c47)
图2-10 Spark程序在Standalone上的运行过程(client deploy mode)
2.Spark on Standalone(cluster deploy mode)
(1)提交Spark程序到Standalone,以cluster deploy mode运行
具体命令如下。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_49_02.jpg?sign=1739280825-iZxpX4VXpDtmJ5D1fSebxRn9LYZXUmug-0-0808a1fd79031d31915ee4449ea96a4e)
有4点需要特别注意。
●采用cluster deploy mode 时,Driver 需要一个处理器,后续Executor还需要另外的处理器,如果虚拟机 scaladev 只有 1 个处理器的话,就会出现资源不足的警告,导致程序运行失败,如下所示;
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_49_03.jpg?sign=1739280825-ZAfmuZwsBmND9u6L6qCpbJ1fhlI6FBWq-0-63f467641c1d55aa9a8373c99c898406)
解决办法为:增加虚拟机的处理器为两个。
● 命令参数中,--master spark://scaladev:6066 用来指定 Master 的 URL,cluster deploy mode下,Client会向Master提交Rest URL,spark://scaladev:6066就是Spark的Rest URL;如果还是使用原来的参数--master spark://scaladev:7077,则会报下面的错误;
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_49_04.jpg?sign=1739280825-ZVlhypu6IodWb4ingYOyDMVUKVYDSxn3-0-4be89ac729218d548c58021640b5af4a)
●HDFS的路径前面要加hdfs://,因为Cluster Mode下,core-site.xml中的defaultFS设置不起作用;
●Client提交成功后就会退出,而不是等待Application结束后才退出。
cluster deploy mode的还有专门的Driver日志,位于Driver节点Spark目录的work目录下,会创建Driver开头的目录,例如:driver-20180815020432-0015,在这个下面有stderr和stdout两个日志文件。
(2)Spark程序在Standalone上的运行过程(cluster deploy mode)
cluster deploy mode下,Spark程序在Standalone的运行过程如图2-11所示。
1)Client初始化,内部启动Client模块,并向Master注册Driver模块,并等待Driver信息,待后续Driver模块正常运行,Client退出;
2)Master 接收请求,分配一个 Worker,并通知此 Worker 运行 Driver 模块,Driver 向Master发送Application请求;
3)Master接收请求,分配Worker,并通知这些Worker启动Executor;
4)Executor向Driver注册,Driver向Executor发送Task,Executor执行Task,并反馈执行状态,Driver再根据Executor的当前情况,继续发送Task,直到整个Job完成。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_50_02.jpg?sign=1739280825-O4B3yuNjXi8zauWF2mJGNuyKBG7imJao-0-a62844e7d653e13f3202870e4338abca)
图2-11 Spark程序在Standalone上的运行过程(cluster deploy mode)
3.Spark on Standalone的日志
Spark程序运行时,其日志在排查问题时非常重要。其中,Standalone 的日志分为两类:框架日志和应用日志。
框架日志是指Master和Worker的日志,Master日志位于Master的Spark目录下的logs目录下,文件名为:spark-user-org.apache.spark.deploy.master.Master-1-scaladev.out;Worker位于每个Worker节点的Spark目录下的logs目录下,文件名为:spark-user-org.apache.spark.deploy.worker.Worker-1-scaladev.out。
应用日志是指每个Spark程序运行的日志,因为一个Spark程序可能会启动多个Executor,每个Executor都会有一个日志文件,位于Executor所在的Worker节点的Spark目录的work目录下,每个Spark运行会分配一个ID,运行时在控制台会打印该ID的值,如下所示。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_51_01.jpg?sign=1739280825-DBIVlCBZGsZkKRLOtYGQjxvD4mG7nbKd-0-cffec70401170490d137d1de56c3ae92)
列出woker目录下的内容,命令如下。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_51_02.jpg?sign=1739280825-cTVrvGGMF13plPbBJQtIwvFwHkVflrDy-0-44ea87cbcf6d2104ec236dc5206af415)
显示内容如下,在Worker下,每个ID会有一个目录。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_51_03.jpg?sign=1739280825-Rm4brtWYrI2DWuZPsKIKu5lzabnOXByq-0-a357a63aecae81f88bcb621b987d814e)
列出下面路径的内容,命令如下。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_51_04.jpg?sign=1739280825-KnshuCfb7cdVl9ubiiYGv0k4UFhaWcvb-0-5c7a6c1789895044f015941d56fe01f3)
显示内容如下,可见每个Executor的日志在该目录下。
![](https://epubservercos.yuewen.com/16501E/19573973308593806/epubprivate/OEBPS/Images/978-7-111-65100-0_51_05.jpg?sign=1739280825-FoDtz8WTJIJ4IA8hBA1AfnaqDwa6J1fa-0-a458d33bdc23f4dd89987abc115fccb4)
应用日志是分散在Worker节点上的,Executor在哪个Worker节点上运行,日志就在此Worker节点上。
此外,如果使用cluster部署模式,在Client的Spark目录work目录下,还会有对应的driver日志。