如何运行一个spark java程序
我写了一个Java程序的火花。 但是如何从unix命令行运行和编译它。 编译运行时是否需要包含任何jar?
结合官方快速入门指南和在YARN上启动Spark,我们得到:
我们将创build一个非常简单的Spark应用程序SimpleApp.java:
/*** SimpleApp.java ***/ import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; public class SimpleApp { public static void main(String[] args) { String logFile = "$YOUR_SPARK_HOME/README.md"; // Should be some file on your system JavaSparkContext sc = new JavaSparkContext("local", "Simple App", "$YOUR_SPARK_HOME", new String[]{"target/simple-project-1.0.jar"}); JavaRDD<String> logData = sc.textFile(logFile).cache(); long numAs = logData.filter(new Function<String, Boolean>() { public Boolean call(String s) { return s.contains("a"); } }).count(); long numBs = logData.filter(new Function<String, Boolean>() { public Boolean call(String s) { return s.contains("b"); } }).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); } }
这个程序只是在文本文件中统计包含'a'的行数和包含'b'的数量。 请注意,您需要将$ YOUR_SPARK_HOMEreplace为安装Spark的位置。 和Scala例子一样,我们初始化一个SparkContext ,尽pipe我们使用特殊的JavaSparkContext类来获得一个Java友好的类。 我们还创buildRDD(由JavaRDD代表)并对其进行转换。 最后,我们通过创build扩展spark.api.java.function.Function的类来将函数传递给Spark。 Java编程指南更详细地描述了这些差异。
为了构build程序,我们还编写了一个Maven pom.xml文件,其中列出了Spark作为依赖项。 请注意,Spark工件被标记为Scala版本。
<project> <groupId>edu.berkeley</groupId> <artifactId>simple-project</artifactId> <modelVersion>4.0.0</modelVersion> <name>Simple Project</name> <packaging>jar</packaging> <version>1.0</version> <repositories> <repository> <id>Akka repository</id> <url>http://repo.akka.io/releases</url> </repository> </repositories> <dependencies> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>0.9.0-incubating</version> </dependency> </dependencies> </project>
如果您还希望从Hadoop的HDFS中读取数据,则还需要为您的HDFS版本添加对hadoop-client的依赖关系:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>...</version> </dependency>
我们根据规范的Maven目录结构来布局这些文件:
$ find . ./pom.xml ./src ./src/main ./src/main/java ./src/main/java/SimpleApp.java
现在,我们可以使用Maven执行应用程序:
$ mvn package $ mvn exec:java -Dexec.mainClass="SimpleApp" ... Lines with a: 46, Lines with b: 23
然后按照从Yarn启动Spark的步骤:
构build一个支持YARN的组件JAR
我们需要一个整合的Spark JAR(捆绑所有必需的依赖关系)在YARN集群上运行Spark作业。 这可以通过设置Hadoop版本和SPARK_YARN环境variables来构build,如下所示:
SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly
组装的JAR将如下所示:./assembly/target/scala-2.10/spark-assembly_0.9.0-incubating-hadoop2.0.5.jar。
构build过程现在也支持新的YARN版本(2.2.x)。 见下文。
准备工作
- 构buildYARN启用的程序集(参见上文)。
- 组装好的jar可以安装到HDFS中或在本地使用。
- 您的应用程序代码必须打包到单独的JAR文件中。
如果要testingYARN部署模式,可以使用当前的Spark示例。 spark-examples_2.10-0.9.0-incubating文件可以通过运行生成:
sbt/sbt assembly
注意:由于您正在阅读的文档是针对Spark版本0.9.0进行孵化的,因此我们假设您已经下载了Spark 0.9.0-孵化或者将其从源代码pipe理中删除。 如果您使用的是不同版本的Spark,那么由sbt package命令生成的jar版本号显然是不同的。
组态
YARN上的Spark大多数configuration与其他部署相同。 有关这些的更多信息,请参阅configuration页面。 这些configuration是特定于YARN上的SPARK。
环境variables:
- SPARK_YARN_USER_ENV ,将环境variables添加到在YARN上启动的Spark进程。 这可以是逗号分隔的环境variables列表,例如
SPARK_YARN_USER_ENV="JAVA_HOME=/jdk64,FOO=bar"
系统属性:
- spark.yarn.applicationMaster.waitTries属性,用于设置ApplicationMaster等待Spark主机的次数,以及等待Spark Context初始化的次数。 缺省值是10。
- spark.yarn.submit.file.replication ,为应用程序上传到HDFS的文件的HDFS复制级别。 这些包括诸如spark jar,app jar和任何分布式caching文件/档案等。
- spark.yarn.preserve.staging.files ,设置为true以在作业结束时保留staged文件(spark jar,app jar,distributed cache files),而不是删除它们。
- spark.yarn.scheduler.heartbeat.interval-ms ,Spark应用程序主节点在YARN ResourceManager中心跳的时间间隔。 默认值是5秒。
- spark.yarn.max.worker.failures ,应用程序失败前的最大工作失败次数。 缺省值是要求的工人数量2,最less为3。
在YARN上启动Spark
确保HADOOP_CONF_DIR或YARN_CONF_DIR指向包含hadoop集群(客户端)configuration文件的目录。 这将用于连接到群集,写入到dfs并提交作业到资源pipe理器。
有两种调度器模式可用于在YARN上启动Spark应用程序。
纱线独立模式下YARN客户端启动Spark应用程序。
启动YARN客户端的命令如下:
SPARK_JAR=<SPARK_ASSEMBLY_JAR_FILE> ./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar <YOUR_APP_JAR_FILE> \ --class <APP_MAIN_CLASS> \ --args <APP_MAIN_ARGUMENTS> \ --num-workers <NUMBER_OF_WORKER_MACHINES> \ --master-class <ApplicationMaster_CLASS> --master-memory <MEMORY_FOR_MASTER> \ --worker-memory <MEMORY_PER_WORKER> \ --worker-cores <CORES_PER_WORKER> \ --name <application_name> \ --queue <queue_name> \ --addJars <any_local_files_used_in_SparkContext.addJar> \ --files <files_for_distributed_cache> \ --archives <archives_for_distributed_cache>
例如:
# Build the Spark assembly JAR and the Spark examples JAR $ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly # Configure logging $ cp conf/log4j.properties.template conf/log4j.properties # Submit Spark's ApplicationMaster to YARN's ResourceManager, and instruct Spark to run the SparkPi example $ SPARK_JAR=./assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.0.5-alpha.jar \ ./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating.jar \ --class org.apache.spark.examples.SparkPi \ --args yarn-standalone \ --num-workers 3 \ --master-memory 4g \ --worker-memory 2g \ --worker-cores 1 # Examine the output (replace $YARN_APP_ID in the following with the "application identifier" output by the previous command) # (Note: YARN_APP_LOGS_DIR is usually /tmp/logs or $HADOOP_HOME/logs/userlogs depending on the Hadoop version.) $ cat $YARN_APP_LOGS_DIR/$YARN_APP_ID/container*_000001/stdout Pi is roughly 3.13794
以上启动了一个启动默认应用程序主控的YARN客户端程序。 然后SparkPi将作为应用程序主控的子线程运行,YARN客户端将定期轮询应用程序主控进行状态更新并将其显示在控制台中。 一旦你的应用程序运行完毕,客户端将退出。
使用此模式,您的应用程序实际上在运行应用程序主机的远程机器上运行。 因此,涉及本地交互的应用程序将无法正常工作,如火花shell。
前几天我也有同样的问题,昨天设法解决了这个问题。
这就是我所做的:
- 下载sbt并解压并解压缩:http://www.scala-sbt.org/download.html
- 我已经下载了Hadoop 2的Spark Prebuild包,解压缩并解压: http : //www.apache.org/dyn/closer.cgi/spark/spark-1.0.2/spark-1.0.2-bin-hadoop2.tgz
- 我已经创build了独立的应用程序SimpleApp.scala,如下所示: http ://spark.apache.org/docs/latest/quick-start.html#standalone-applications with simple.sbt文件(刚刚从描述中复制)和正确的目录布局
- 确保你有你的path。 转到目录与您的应用程序,并使用
sbt package
build立你的sbt package
- 使用
SPARK_HOME_DIR/sbin/spark_master.sh
启动Spark Server - 去
localhost:8080
,并确保你的服务器正在运行。 从URL复制链接(从服务器描述,不是本地主机,它是端口7077或类似的东西) - 启动使用
SPARK_HOME_DIR/bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT
工人SPARK_HOME_DIR/bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT
其中IP:PORT是在6中复制的URL - 将您的应用程序部署到服务器:
SPARK_HOME_DIR/bin/spark-submit --class "SimpleApp" --master URL target/scala-2.10/simple-project_2.10-1.0.jar
这对我有用,希望能帮助你。
帕维尔
如果你想连接到一个外部的独立的Spark实例,
SparkConf conf = new SparkConf() .setAppName("Simple Application") .setMaster("spark://10.3.50.139:7077"); JavaSparkContext sc = new JavaSparkContext(conf);
在这里,您可以根据Spark的运行位置find更多的“主”configuration: http : //spark.apache.org/docs/latest/submitting-applications.html#master-urls