spark2学习笔记二

Posted by Peterliao Blog on June 19, 2017

连接spark



spark对于jdk和scala的版本都有严格的要求,根据官方文档的说明,spark2.1.1支持jdk7和以上,但是jdk7.0在spark2.0.0就已经被标注过时。并且官网预估在spark2.2.0将取消对于jdk7的支持,因此推荐大家使用jdk8,另外spark2.1.1支持的scala版本为2.11。另外使用jdk8版本将能够直接使用lambda表达式,否则你就只能使用spark的org.apache.spark.api.java.function包。

初始化spark



创建一个spark程序的首要步骤是新建一个JavaSparkContext对象,有了JavaSparkContext对象程序就能够和集群通信。在新建一个JavaSparkContext对象前,我们要新建一个带有应用信息的SparkConf对象。

    SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);  
    JavaSparkContext sc = new JavaSparkContext(conf);

appName参数是集群web界面上展示的应用名称,master参数是spark、Mesos或者YARN集群的集群URL,当应用运行在本地时也可以是
为”local”字符串。实际上,当程序在集群中执行时,你一般不想将master硬编码到程序中,这样的话你能够使用spark-submit发布应
用,能够在命令参数中设置它的值。并且,在本地测试和单元测试时,在执行spark测试应用时你能够传递local参数。

使用spark-shell



在spark-shell中,已经为你创建了一个名为sc的内嵌的SparkContext对象。新建自己的SparkContext对象将是无效的。你能通过设置 –master参数来设置sc连接的master,同时也能通过–jars参数添加一个由逗号分隔的jar包集合。并且你也能通过–packages添加包 依赖到自己的spark-shell会话中。使用–help参数将能够获取到spark-shell的参数帮助。

弹性分布式数据集(RDD)



spark提出了一个弹性分布式数据集RDD的概念,RDD是一个能具有并行操作和容错能力的元素集合。有两种方式来创建RDD:对一个存在于驱动程序的集合进行并行化,或者引用一个存在于外部存储系统的数据集,例如共享文件系统,HDFS,HBase或者任何提供Hadoop输入格式的数据源。


并行化后的集合是在驱动程序通过调用JavaSparkContext的parallelizer方法处理一个存在的集合创建的。集合元素是被拷贝去形成能进行并行化处理的分布式数据集。例如

    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
    JavaRDD<Integer> distData = sc.parallelize(data);

RDD一旦创建成功,就能够进行并行化的操作。例如,我们可以通过调用distData.reduce((a, b) -> a + b)去计算集合元素的累加和,后面将会介绍在RDD上的并行操作。 **

并行集合的一个重要参数是分区数,这个参数决定将集合划分为多少份。spark将在集群中为每个分区上执行一个任务,一般集群中的每个CPU分成2-4个分区。一般说来spark会自动根据集群情况设置分区数,你也可以通过parallelize方法的第二个参数设置分区数,例如sc.parallelize(data, 10)。

外部数据集



spark能从hadoop支持的任何数据集创建分布式数据集,包括本地文件系统,HDFS,Cassandra, HBase, Amazon S3等。spark支持文本文件,流文件和其他任何hadoop输入格式的数据。


文本文件RDDs能使用SparkContext的textFile方法创建,此方法需要提供文件的URI,比如本地文件路径,hdfs://,s3n://等URI,textFile方法将文件解析为文件行的集合。例如

    JavaRDD<String> distFile = sc.textFile("data.txt");

一旦创建完毕,distFile就能进行数据集操作。例如我们可以使用map和reduce函数计算文件行数:distFile.map(s -> s.length()).reduce((a,b) -> a + b)。 注意

  • 如果使用本地文件路径,工作节点必须能够访问同路径文件。否则拷贝文件到工作节点或者使用网络挂载的共享文件系统。
  • 所有spark给予文件的输入方法,都支持目录,压缩文件和通配符。例如你能够使用textFile(“/my/directory”), textFile (“/my/directory/.txt”)和textFile(“/my/directory/.gz”)。
  • textFile方法也有第二个可选参数来控制文件的分区数。默认来说,spark为文件的每个块创建一个分区,hdfs中每块默认为128MB, 但是你也可以通过更大的值创建更多的分区。注意分区数不能够少于默认分区数。

除了文本文件,spark的java api也支持其他的一些数据格式。

  • JavaSparkContext.wholeTextFiles读取一个包含多个小文件的目录,返回一个(文件名,文件内容)的键值对集,这一点和 textFile返回文件每行内容的集合不一致。
  • 对于流文件,使用SparkContext的sequenceFile[K, V]方法,K和V文件中key和value的类型,并且必须为Hadoop的writable的子 类,例如intWritable和Text。
  • 对于其他Hadoop输入格式,你能使用JavaSparkContext.hadoopRDD方法。方法有Joncon类,输入格式类,建值类这些参数。这些参 数的设置方法和设置hadoop的job的参数是一样的。你也可以为给予hadoop新api的hadoop输入格式数据使用JavaSparkContext.newAPIHadoopRDD方法。
  • JavaRDD.saveAsObjectFile和JavaSparkContext.objectFile支持将RDD以java序列化对象集合保存,这样不如Avro这样的专用格 式高效,但是它提供了一种通用的简单方法。