spark2学习笔记四

Posted by Peterliao Blog on June 27, 2017

RDD操作


理解闭包

spark中最困难的事情之一就是理解代码在集群中执行时变量和方法的生命周期范围。在变量生命周期之外使用RDD操作修改变量可能是一个常见的错误。在下面的例子中,有一段使用foreach去增加计数器的代码,但是除了这个操作导致问题外,其他操作也可能会产生相似的问题。

将下面当做本地RDD元素求和,对于操作是否在同一个JVM中,这决定了操作的结果。一个普遍的例子是spark应用是运行在本地模式(–master=local[n])还是发布到一个集群(例如用spark-submit提交到YARN)

{ highlight java } int counter = 0; JavaRDD rdd = sc.parallelize(data);

// Wrong: Don’t do this!! rdd.foreach(x -> counter += x);

println(“Counter value: “ + counter); { endhighlight }

上述代码的结果时未定义的,并且可能不会像预期的那样工作。执行job时,spark将RDD操作分解为多个在一个执行器上执行的task。在执行之前,spark计算出task的闭包。闭包是执行器在RDD上执行必须的变量和方法的集合(在这个例子中时foreach)。这个闭包被序列化发送到每一个执行器。

被发送到每个执行器节点上的闭包中的变量是一个值拷贝,那么当counter变量被foreach引用,这个counter变量就已经和驱动程序节点上的counter变量是不一样的。在驱动程序节点的内存中依然有一个counter变量,但是此变量对于执行器来说是不可见的。执行器仅仅可见被序列化闭包中的值拷贝。结果,counter变量的最终值将还是0,因为所有在counter变量上的操作引用的都是被序列化闭包的counter拷贝值。

在本地模式,在一些环境下,foreach函数可能和驱动程序使用同一个jvm,这将导致foreach函数引用原始counter变量,因此foreach将正确执行并正确更新counter变量。

为了确保在这类场景中运行正确,应该使用累加器变量。在spark中累加器变量专门提供一种机制,当操作被划分到集群中的工作节点上运行时正确更新变量。后续的累加器部分会更详细介绍累加器变量。

一般来说,闭包像循环和局部方法一样被构建,不应该用来改变全局状态。spark没有定义和保证对于引用的闭包外部的变量的更改操作。有些代码可以在本地模式下工作,但这只是偶然的,这样的代码在分布式模式下不会表现得像预期的那样。如果需要一些全局聚合,需要使用累加器。

另一个常见的习惯是尝试用rdd.foreach(println)或rdd.map(println)打印RDD的元素。在单机模式下,这将生成预期的输出并打印所有的RDD元素。然而,在集群模式中,控制台的输出被执行器调用,而不是在驱动程序上,因此驱动程序的控制台不会显示这些,但是结果会打印到工作节点的控制台。要在驱动程序控制台上的打印所有元素,首先可以使用collect()方法将RDD带到驱动节点,然后使用rdd. collect(). foreach(println)打印结果。这个操作可能会导致驱动程序耗尽内存,因为collect()方法将整个RDD提取到一台机器上;如果只需要打印一些元素,更安全的方法是使用take():rdd. take(100). foreach(println)。

使用键值对


虽然大多数Spark操作在包含任何类型对象的RDDs上执行,但是一些特殊的操作只能在键值对的RDDs上使用。最常见的是分布式的shuffle操作,例如按一个key分组或聚合元素。

在Java中,键值对用来自Scala标准库的Tuple2类来表示。您可以简单地调用new Tuple2(a,b)创建一个tuple对象,然后使用tuple._1()和tuple._2()访问其字段。

由键值对组成的RDDs由JavaPairRDD类表示。您可以使用map操作的特殊版本(如mapToPair和flatMapToPair)来用JavaRDDs对象构造JavaPairRDDs对象。JavaPairRDD将具有标准的RDD函数和特殊的键值

例如,下面的代码在键值对RDD上使用reducebykey操作来计算一个文件中每行文本的出现次数

{ highlight java } JavaRDD lines = sc.textFile("data.txt"); JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b); { endhighlight }

例如,我们还可以使用counts.sortByKey()来对这些键值对按字母顺序进行排序,最后通过collect()函数将它们作为对象数组返回到驱动程序中。

注意:当使用自定义对象作为键值对操作的键时,你必须确认有一个匹配hashcode方法的合适的equal方法。更详细的情况可以参考object.hashcode方法的官方API文档。