spark2学习笔记三

Posted by Peterliao Blog on June 20, 2017

RDD操作


RDDs提供两类操作,转化:用一个存在的RDD创建一个新的数据集(RDD),执行:在数据集上执行计算返回一个值到到驱动程序。例如,map是一个转化函数,传递每一个数据集元素到相应函数,返回函数处理后的值组成的数据集(RDD)。另一方面来说,reduce操作是一个执行操作,用一系列函数聚合数据集的元素,然后返回最后的结果到驱动函数。但是reducebykey返回一个分布式数据集。


spark的所有转化操作都是懒执行的,懒执行的意思是操作不会立即执行获取结果,而是记住应用于基础数据集(例如文件)上的转化。转化只在执行操作执行返回结果到驱动程序需要他们的结果时才执行,这个设计使得spark更高效的运行。例如,map函数被使用到reduce函数中时,只有reduce函数返回结果到驱动函数时,map函数才会执行。


默认来说,当有执行操作在转化数据集上执行时每个被转化的数据集都要重新计算。另外,你能够使用persist或者cache函数持久化一个RDD到内存,下一次你查询使用它将会快很多。spark也支持持久化RDD到硬盘或者在多节点间复制。


基础

下面的示例程序介绍了RDD基础

    JavaRDD<String> lines = sc.textFile("data.txt");
    JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
    int totalLength = lineLengths.reduce((a, b) -> a + b);

第一行用一个外部文件定义了一个基本的RDD.这个数据集没有加载到内存,除非在数据集上有执行操作。lines仅仅是一个指向文件的指针,第二行定义lineLengths为map转化操作的结果。由于懒执行机制lineLengths不是立马计算的。最后,我们执行reduce执行操作。当reduce操作返回结果到驱动程序时,spark将这个计算划分为在不同机器上执行的任务,每台机器执行自己的reduce和map任务。


如果你你想在后面再一次使用lineLengths,你能在reduce前加上如下语句

    lineLengths.persist(StorageLevel.MEMORY_ONLY());

这将导致lineLengths在第一次计算后缓存到内存中。


传递函数到spark

Spark的API很多程度依赖传递函数到集群上运行。在java中,函数被实现了包org.apache.spark.api.java.function中接口的类来表示。有两种方法去创建这些函数。

  • 在自己的类中实现Function接口,作为匿名内部类或命名的内部类,然后传递一个类的实例到spark
  • 在jdk8中,使用lambda表达式来简洁地定义一个Function接口实现 使用lambda表达式比较便捷,例如使用lambda表达式
    JavaRDD<String> lines = sc.textFile("data.txt");
    JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
        public Integer call(String s) { return s.length(); }
    });
    int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
        public Integer call(Integer a, Integer b) { return a + b; }
    });

但是,如果不使用lambda

    class GetLength implements Function<String, Integer> {
        public Integer call(String s) { return s.length(); }
    }
    class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) { return a + b; }
    }
    JavaRDD<String> lines = sc.textFile("data.txt");
    JavaRDD<Integer> lineLengths = lines.map(new GetLength());
    int totalLength = lineLengths.reduce(new Sum());

Java中的匿名内部类也可以访问闭包中的变量,只要它们被标记为final变量。Spark将会将这些变量的副本以与其他语言一样的方式发送到每个工作节点。