Apache Spark - 部署


Spark应用程序使用spark-submit,是一个shell命令,用于在集群上部署Spark应用程序。它通过统一的接口使用所有相应的集群管理器。因此,您不必为每一个应用程序配置。

例子

让我们采用之前使用 shell 命令进行字数统计的相同示例。在这里,我们考虑与 Spark 应用程序相同的示例。

输入样本

以下文本是输入数据,文件名为in.txt

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

看下面的程序 -

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
} 

将上述程序保存到名为SparkWordCount.scala的文件中,并将其放置在名为Spark-application的用户定义目录中。

注意- 在将 inputRDD 转换为 countRDD 时,我们使用 flatMap() 将行(来自文本文件)标记为单词,使用 map() 方法来计算单词频率,使用 reduceByKey() 方法来计算每个单词的重复次数。

请按照以下步骤提交此申请。通过终端执行spark-application目录下的所有步骤。

第 1 步:下载 Spark Ja

编译需要Spark core jar,因此,从以下链接下载spark-core_2.10-1.3.0.jar Spark core jar并将jar文件从下载目录移动到spark-application目录。

第二步:编译程序

使用下面给出的命令编译上述程序。该命令应从spark-application 目录执行。这里,/usr/local/spark/ lib/spark- assembly-1.4.0-hadoop2.6.0.jar 是取自 Spark 库的 Hadoop 支持 jar。

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

第 3 步:创建 JAR

使用以下命令创建 Spark 应用程序的 jar 文件。这里,wordcount是 jar 文件的文件名。

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

第四步:提交spark申请

使用以下命令提交 Spark 应用程序 -

spark-submit --class SparkWordCount --master local wordcount.jar

如果执行成功,您将看到下面给出的输出。OK输入以下输出用于用户识别,这是程序的最后一行如果仔细阅读以下输出,您会发现不同的东西,例如 -

  • 在端口 42954 上成功启动服务“sparkDriver”
  • MemoryStore 启动容量为 267.3 MB
  • 在 http://192.168.1.217:4040 启动 SparkUI
  • 添加JAR文件:/home/hadoop/piapplication/count.jar
  • ResultStage 1(SparkPi.scala:11 处的 saveAsTextFile)在 0.566 秒内完成
  • 已停止 Spark Web UI http://192.168.1.217:4040
  • 内存存储已清除
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!  

第 5 步:检查输出

程序执行成功后,您将在spark-application目录中找到名为outfile的目录。

以下命令用于打开和检查 outfile 目录中的文件列表。

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

检查part-00000文件中输出的命令是 -

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

检查part-00001文件中输出的命令是 -

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

请阅读以下部分以了解有关“spark-submit”命令的更多信息。

Spark-提交语法

spark-submit [options] <app jar | python file> [app arguments]

选项

序列号 选项 描述
1 - 掌握 Spark://host:port、mesos://host:port、yarn 或 local。
2 --部署模式 是否在本地(“客户端”)或集群内的一台工作计算机(“集群”)上启动驱动程序(默认:客户端)。
3 - 班级 您的应用程序的主类(对于 Java / Scala 应用程序)。
4 - 姓名 您的应用程序的名称。
5 --罐子 要包含在驱动程序和执行程序类路径中的以逗号分隔的本地 jar 列表。
6 --包 要包含在驱动程序和执行程序类路径中的 jar 的 Maven 坐标的逗号分隔列表。
7 --存储库 以逗号分隔的其他远程存储库列表,用于搜索使用 --packages 给出的 Maven 坐标。
8 --py 文件 要放置在 Python 应用程序的 PYTHON PATH 上的以逗号分隔的 .zip、.egg 或 .py 文件列表。
9 --文件 要放置在每个执行程序的工作目录中的以逗号分隔的文件列表。
10 --conf (属性=val) 任意 Spark 配置属性。
11 --属性文件 从中加载额外属性的文件的路径。如果未指定,这将查找conf/spark-defaults。
12 --驱动程序内存 驱动程序内存(例如1000M、2G)(默认:512M)。
13 --driver-java-选项 传递给驱动程序的额外 Java 选项。
14 --驱动程序库路径 要传递给驱动程序的额外库路径条目。
15 --驱动程序类路径

要传递给驱动程序的额外类路径条目。

请注意,使用 --jars 添加的 jar 会自动包含在类路径中。

16 --执行器内存 每个执行器的内存(例如1000M、2G)(默认:1G)。
17 号 --代理用户 提交申请时模拟的用户。
18 --帮助,-h 显示此帮助消息并退出。
19 --详细,-v 打印额外的调试输出。
20 - 版本 打印当前 Spark 的版本。
21 --驱动程序核心数 驱动程序的核心(默认值:1)。
22 - 监督 如果给出,则在失败时重新启动驱动程序。
23 - 杀 如果给出,则杀死指定的驱动程序。
24 - 地位 如果给出,则请求指定驱动程序的状态。
25 --总执行器核心数 所有执行器的核心总数。
26 --执行器核心 每个执行器的核心数。(默认值:YARN 模式下为 1,或独立模式下工作线程上的所有可用核心)。