最近用Apache Spark 处理一些大数据,学了spark官方英文文档,顺便翻译了方便学习。

spark 版本 2.2.0.

翻译官方文档原地址:
http://spark.apache.org/docs/latest/rdd-programming-guide.html

概述

在较高层次上,每个Spark应用程序都包含一个驱动程序,该程序运行用户的main方法,并在集群上执行各种并行操作。 Spark提供的主要抽象是一个弹性分布式数据集(RDD),它是在集群节点间进行分区的元素集合,可以并行操作。 RDD是通过从Hadoop文件系统(或任何其他Hadoop支持的文件系统)中的文件或驱动程序中现有的Scala集合开始创建的,并对其进行转换。用户也可以要求Spark将RDD保存在内存中,以便在并行操作中有效地重用它。最后,RDD自动从节点故障中恢复。

Spark中的第二个抽象是可用于并行操作的共享变量。默认情况下,Spark在不同节点上并行执行一组任务时,会将该函数中使用的每个变量的副本传送给每个任务。有时候,变量需要在任务之间,或任务与驱动程序之间共享。

Spark支持两种类型的共享变量:广播变量,可用于在所有节点上缓存内存中的值,以及累加器,这些变量只是“添加”到的变量,如计数器和总和。

本指南显示了Spark支持的各种语言中的每个功能。如果您启动Spark的交互式shell(最好是Scala shell的bin / spark-shell或Python的bin / pyspark),最简单的方法就是跟随它。

本翻译文中的代码实现部分仅展示了java部分,scala和python自行官网。

导入 Spark

Spark 2.2.0支持简洁地编写函数的lambda表达式,否则您可以使用org.apache.spark.api.java.function包中的类。

请注意,在Spark 2.2.0中删除了对Java 7的支持。

要用Java编写Spark应用程序,您需要在Spark上添加一个依赖项。 Spark可以通过Maven Central获得:

1
2
3
groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.2.0

另外,如果您想访问一个HDFS集群,您需要为您的HDFS版本添加对hadoop-client的依赖。

1
2
3
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最后您需要导入一些spark的类到您的程序:

1
2
3
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

安装 spark

Spark程序必须做的第一件事是创建一个JavaSparkContext对象,它告诉Spark如何访问一个集群。 要创建一个SparkContext,首先需要构建一个包含有关应用程序信息的SparkConf对象。

1
2
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

appName参数是您的应用程序在集群UI上显示的名称。 master是Spark,Mesos或YARN群集URL,或者是以本地模式运行的特殊“本地”字符串。 实际上,在群集上运行时,您不希望在程序中硬编码master,而是使用spark-submit启动应用程序,并在那里接收它。 但是,对于本地测试和单元测试,您可以通过“本地”来运行进程中的Spark。

弹性分布式数据集(RDDs)

Spark围绕弹性分布式数据集(RDD)的概念展开,RDD是可以并行操作的容错元素集合。 有两种方法可以创建RDD:并行化驱动程序中的现有集合,或 外部存储系统(如共享文件系统,HDFS,HBase或提供Hadoop InputFormat的任何数据源)中引用数据集。

并行集合

并行化集合是通过在驱动程序中的现有集合上调用JavaSparkContext的并行化方法来创建的。 集合的元素被复制,以形成可以并行操作的分布式数据集。 例如,下面是如何创建一个包含数字1到5的并行化集合:

1
2
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

一旦创建,分布式数据集(distData)可以并行操作。 例如,我们可以调用distData.reduce((a,b) - > a + b)将列表中的元素相加。 我们稍后介绍分布式数据集上的操作。

并行收集的一个重要参数是要将数据集剪切成的分区数量。 Spark将为群集的每个分区运行一个任务。 通常情况下,您需要为群集中的每个CPU分配2-4个分区。 通常情况下,Spark会根据您的群集自动设置分区数量。 但是,也可以通过将其作为并行化的第二个参数(例如sc.parallelize(data,10))手动设置。 注意:代码中的一些地方使用术语切片(分区的同义词)来维持向后兼容性。

外部数据集

Spark可以从Hadoop支持的任何存储源(包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3等)创建分布式数据集.Spark支持文本文件,SequenceFile和任何其他Hadoop InputFormat。

文本文件RDD可以使用SparkContext的textFile方法创建。 这个方法接受一个文件的URI(机器上的一个本地路径,或者一个hdfs://,s3n://等URI),并把它作为一个行集合来读取。 这是一个示例调用:

1
JavaRDD<String> distFile = sc.textFile("data.txt");

一旦创建,distFile可以通过数据集操作进行操作。 例如,我们可以使用map来缩小所有行的大小,并reduce操作,如下所示:distFile.map(s - > s.length()).reduce((a,b) - > a + b)。

使用Spark读取文件的一些注意事项:

  • 如果在本地文件系统上使用路径,则该文件也必须可以在工作节点上的相同路径上访问。 将文件复制到所有工作人员或使用网络安装的共享文件系统。
  • Spark的所有基于文件的输入方法(包括textFile)都支持在目录,压缩文件和通配符上运行。 例如,您可以使用textFile(“/ my / directory”),textFile(“/ my / directory / *.txt”)和textFile(“/ my / directory / *.gz”)。
  • textFile方法还使用可选的第二个参数来控制文件的分区数量。 默认情况下,Spark为文件的每个块创建一个分区(HDFS中的块默认为128MB),但是您也可以通过传递更大的值来请求更多的分区。 请注意,您不能有比块更少的分区

除了文本文件外,Spark的Java API还支持其他几种数据格式:

  • JavaSparkContext.wholeTextFiles允许您读取包含多个小文本文件的目录,并将它们中的每一个都作为(文件名,内容)对返回。这与textFile相反,它将在每个文件中每行返回一个记录。
  • 对于SequenceFiles,使用SparkContext的sequenceFile [K,V]方法,其中K和V是文件中的键和值的类型。这些应该是Hadoop的Writable接口的子类,如IntWritable和Text。
  • 对于其他Hadoop InputFormats,可以使用JavaSparkContext.hadoopRDD方法,该方法采用任意的JobConf和输入格式类,关键类和值类。将它们设置为您使用输入源进行Hadoop作业的方式相同。您还可以使用基于“新”MapReduce API(org.apache.hadoop.mapreduce)的InputSamples的JavaSparkContext.newAPIHadoopRDD。
  • JavaRDD.saveAsObjectFile和JavaSparkContext.objectFile支持以包含序列化Java对象的简单格式保存RDD。虽然这不像Avro这样的专业格式,但它提供了一种简单的方法来保存任何RDD。

RDD 操作

RDD支持两种类型的操作:转换(transformations)从现有数据集创建新数据集 和 行动(actions)在数据集上运行计算后将值返回给驱动程序。例如,map是一个通过函数传递每个数据集元素的转换(transformations),并返回一个代表结果的新RDD。另一方面,reduce是一个行动(actions),它使用某个函数聚合RDD的所有元素,并将最终结果返回给驱动程序(尽管还有一个并行reduceByKey返回一个分布式数据集)。

Spark中的所有转换(transformations)都是懒惰的,因为它们不会马上计算结果。相反,他们只记得应用于某些基础数据集(例如文件)的转换。只有在行动(actions)需要将结果返回给驱动程序时才会计算转换。这种设计使Spark能够更高效地运行。例如,我们可以认识到通过map创建的数据集将被用于reduce,并且只将reduce的结果返回给驱动程序,而不是返回更大的映射数据集。

默认情况下,每次对其执行行动(actions)时,每个已转换的RDD都可能重新计算。但是,您也可以使用持久化(或缓存)方法将RDD保留在内存中,在这种情况下,Spark将保留群集中的元素,以便在下次查询时快速访问。还支持在磁盘上持久化RDD,或在多个节点上复制RDD。

基本

为了说明RDD基础知识,请考虑下面的简单程序:

1
2
3
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

第一行定义了来自外部文件的基本RDD。 这个数据集不会被加载到内存中,或者作用于其他行上:lines只是一个指向文件的指针。 第二行将lineLengths定义为map 转换(transformations)的结果。 同样,lineLengths由于懒惰而没有立即计算。 最后,我们运行reduce,这是一个行动(actions)。 在这一点上,Spark将计算分解为在不同机器上运行的任务,每台机器既运行其map部分又运行reduce,之后返回驱动程序的答案。

如果我们还想稍后再使用lineLengths,我们可以添加:

1
lineLengths.persist(StorageLevel.MEMORY_ONLY());

reduce之前,这将导致lineLengths被保存在第一次计算后的内存。

将函数传递给Spark

Spark的API在很大程度上依赖于将驱动程序中的函数传递到集群上运行。 在Java中,函数由实现org.apache.spark.api.java.function包中的接口的类来表示。 有两种方法来创建这样的功能:

  • 在您自己的类中实现函数接口,或者作为一个匿名的内部类或者一个命名接口,并且将它的一个实例传递给Spark。
  • 使用lambda表达式来简洁地定义一个实现。

虽然本指南的大部分内容都使用lambda语法进行简洁说明,但很容易以长格式使用所有相同的API。 例如,我们可以写上面的代码如下:

1
2
3
4
5
6
7
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});

或者

1
2
3
4
5
6
7
8
9
10
class GetLength implements Function<String, Integer> {
public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

了解关闭

Spark的难点之一是在集群中执行代码时理解变量和方法的范围和生命周期。 修改范围之外的变量的RDD操作可能是混淆的常见来源。 在下面的例子中,我们将看看使用foreach()来增加计数器的代码,但其他操作也会出现类似的问题。

示例

考虑下面的naive的RDD元素总和,根据执行是否发生在同一个JVM中,这可能会有不同的表现。 一个常见的例子就是在本地模式下运行Spark(–master = local [n])与将Spark应用程序部署到集群(例如,通过spark-submit to YARN):

1
2
3
4
5
6
7
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);

本地或集群模式

上面的代码的行为(actions)是未定义的,并可能无法正常工作。为了执行作业,Spark将RDD操作的处理分解为任务,每个任务由执行者执行。在执行之前,Spark计算任务的关闭。闭包是执行者在RDD上执行计算(在本例中为foreach()))时必须可见的那些变量和方法。这个封闭序列化并发送给每个执行者。

发送给每个执行程序的闭包中的变量现在是副本,因此,当在foreach函数中引用计数器时,它不再是驱动程序节点上的计数器。驱动程序节点的内存中还有一个计数器,但执行程序不再可见!执行者只能看到序列化闭包的副本。因此,计数器的最终值仍然是零,因为计数器上的所有操作都引用了序列化闭包内的值。

在本地模式下,在某些情况下,foreach函数实际上将在与驱动程序相同的JVM内执行,并将引用相同的原始计数器,并可能实际更新它。

为了确保在这种情况下明确的行为,应该使用累加器。 Spark中的累加器专门用于提供一种在集群中的工作节点之间执行拆分时安全地更新变量的机制。本指南的“累加器”部分更详细地讨论了这些内容。

一般来说,闭包 - 像循环或本地定义的方法这样的构造不应该被用来改变一些全局状态。 Spark并没有定义或保证对从封闭外引用的对象的突变行为。这样做的一些代码可能在本地模式下工作,但这是偶然的,这样的代码不会按预期在分布式模式下运行。如果需要全局聚合,请使用累加器。

打印RDD的元素

另一个常见的习惯是试图使用rdd.foreach(println)或rdd.map(println)打印RDD的元素。 在单台机器上,这将生成预期的输出并打印所有RDD的元素。 但是,在集群模式下,执行程序调用的stdout输出现在写入执行程序的stdout,而不是驱动程序的stdout,因此驱动程序上的stdout不会显示这些! 要打印驱动程序中的所有元素,可以使用collect()方法首先将RDD带到驱动程序节点:rdd.collect()。foreach(println)。 但是,这可能会导致驱动程序内存不足,因为collect()会将整个RDD提取到一台计算机; 如果您只需要打印RDD的一些元素,则更安全的方法是使用take():rdd.take(100).foreach(println)。

使用键值对

尽管大多数Spark操作在包含任何类型对象的RDD上工作,但是一些特殊操作仅在键 - 值对的RDD上可用。 最常见的是分布式的“随机”操作,如按键分组或聚合元素。

在Java中,键值对使用Scala标准库中的scala.Tuple2类来表示。 您可以简单地调用新的Tuple2(a,b)来创建一个元组,然后使用tuple._1()tuple._2()来访问它的字段。

键值对的RDD由JavaPairRDD类表示。 您可以使用mapToPair和flatMapToPair等特殊版本的map操作从JavaRDDs构建JavaPairRDD。 JavaPairRDD将同时具有标准的RDD功能和特殊的键值。

例如,以下代码使用键值对上的reduceByKey操作来计算文本中每行文本的出现次数:

1
2
3
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

例如,我们也可以使用counts.sortByKey()来按字母顺序对这些对进行排序,最后count.collect()将它们作为一个对象数组返回给驱动程序。

注意:在使用自定义对象作为键值对操作中的键时,必须确保自定义equals()方法附带有匹配的hashCode()方法。 有关完整的详细信息,请参阅Object.hashCode()文档中概述的合同。

转换(Transformations)

下表列出了Spark支持的一些常见转换。 有关详细信息,请参阅RDD API文档(Scala,Java,Python,R)和RDD函数doc(Scala,Java)。

转换 含义
map(func) 通过函数func传递源的每个元素来形成一个新的分布式数据集。
filter(func) 通过选择func返回true的源的元素返回一个新的数据集。
flatMap(func) 类似于map,但是每个输入项可以映射到0个或更多个输出项(所以func应该返回一个Seq而不是单个项)。
mapPartitions(func) 与map类似,但是在RDD的每个分区(块)上分别运行,所以当在T型RDD上运行时,func必须是Iterator => Iterator 类型。
mapPartitionsWithIndex(func) 类似于mapPartitions,但也提供了一个表示分区索引的整数值的func,所以在T类型的RDD上运行时,func的类型必须是(Int,Iterator )=> Iterator
sample(withReplacement, fraction, seed) 使用给定的随机数发生器种子,对数据的一小部分进行采样,有或没有替换。
union(otherDataset) 返回包含源数据集中的元素和参数的联合的新数据集。
intersection(otherDataset) 返回一个新的RDD,其中包含源数据集中的元素和参数的交集。
distinct([numTasks])) 返回包含源数据集的不同元素的新数据集。
groupByKey([numTasks]) 当调用(K,V)对的数据集时,返回(K,Iterable )对的数据集。 注意:如果您正在对每个键执行聚合(例如总和或平均),则使用reduceByKey或aggregateByKey将会产生更好的性能。 注:默认情况下,输出中的并行级别取决于父RDD的分区数量。 您可以传递一个可选的numTasks参数来设置不同数量的任务。
reduceByKey(func, [numTasks]) 当调用(K,V)对的数据集时,返回(K,V)对的数据集,其中每个键的值使用给定的reduce函数func进行聚合,函数func必须是(V,V)=> V.和groupByKey一样,reduce任务的数量可以通过可选的第二个参数来配置。
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 当调用(K,V)对的数据集时,返回(K,U)对的数据集,其中使用给定的组合函数和中性的“零”值来汇总每个键的值。 允许与输入值类型不同的聚合值类型,同时避免不必要的分配。 就像在groupByKey中一样,reduce任务的数量可以通过可选的第二个参数来配置。
sortByKey([ascending], [numTasks]) 当调用K实现Ordered的(K,V)对的数据集时,按照布尔上升参数的指定,按照升序或降序返回按键排序的(K,V)对的数据集。
join(otherDataset, [numTasks]) 当(K,V)和(K,W)类型的数据集被调用时,返回每个键的所有元素对的(K,(V,W))对的数据集。 外连接通过leftOuterJoin,rightOuterJoin和fullOuterJoin来支持。
cogroup(otherDataset, [numTasks]) 当(K,V)和(K,W)类型的数据集被调用时,返回(K,(Iterable ,Iterable ))元组的数据集。 这个操作也被称为groupWith。
cartesian(otherDataset) 当调用类型T和U的数据集时,返回(T,U)对(所有元素对)的数据集。
pipe(command, [envVars]) 通过shell命令管理RDD的每个分区,例如, 一个Perl或bash脚本。 RDD元素被写入进程的stdin,输出到stdout的行被作为字符串的RDD返回。
coalesce(numPartitions) 减少RDD中的分区数量为numPartitions。 用于在过滤大型数据集后更高效地运行操作。
repartition(numPartitions) 随机调整RDD中的数据以创建更多或更少的分区并在其间进行平衡。 这总是通过网络混洗所有数据。
repartitionAndSortWithinPartitions(partitioner) 根据给定的分区程序对RDD进行重新分区,并在每个结果分区中按键分类记录。 这比调用重新分区,然后在每个分区内进行排序更有效率,因为它可以将排序压入洗牌机器。

行动(actions)

下表列出了Spark支持的一些常用行动(actions)。 有关详细信息,请参阅RDD API文档(Scala,Java,Python,R)和RDD函数doc(Scala,Java)。
| 操作 | 含义 |
|———|——-|
| reduce(func) | 使用函数func(它接受两个参数并返回一个)聚合数据集的元素。 该函数应该是可交换和关联的,以便它可以被正确地并行计算。|
| collect() | 在驱动程序中将数据集的所有元素作为数组返回。 在过滤器或其他操作返回足够小的数据子集之后,这通常很有用。 |
| count() | 返回数据集中元素的数量。 |
| first() | 返回数据集的第一个元素(类似于take(1))。 |
| take(n) | 用数据集的前n个元素返回一个数组。 |
| takeSample(withReplacement, num, [seed]) | 返回一个数组的随机样本数组,有或没有替换,可以预先指定一个随机数发生器种子。 |
| takeOrdered(n, [ordering]) | 使用自然顺序或自定义比较器返回RDD的前n个元素。 |
| saveAsTextFile(path) | 将数据集的元素作为文本文件(或文本文件集)写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统的给定目录中。 Spark将在每个元素上调用toString将其转换为文件中的一行文本。 |
| saveAsSequenceFile(path) (Java and Scala) | 将数据集的元素作为Hadoop SequenceFile写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统的给定路径中。 这在实现Hadoop的Writable接口的键值对的RDD上是可用的。 在Scala中,它也可用于可隐式转换为Writable的类型(Spark包含Int,Double,String等基本类型的转换)。 |
| foreach(func) | 在数据集的每个元素上运行函数func。 这通常用于副作用,如更新累加器或与外部存储系统交互。 注意:修改foreach()之外的累加器以外的变量可能会导致未定义的行为。 请参阅了解更多细节。 |
| countByKey() | 仅适用于类型(K,V)的RDD。 返回(K,Int)对的hashmap和每个键的计数。 |
| saveAsObjectFile(path) (Java and Scala) | 使用Java序列化以简单的格式写入数据集的元素,然后可以使用SparkContext.objectFile()加载。 |

Spark RDD API还公开了一些动作的异步版本,例如foreach的foreachAsync,它立即将FutureAction返回给调用者,而不是在完成动作时阻塞。 这可以用来管理或等待操作的异步执行。

洗牌操作(shuffle operations)

Spark中的某些操作会触发一个称为shuffle的事件。 洗牌(shuffle)是Spark重新分配数据的机制,以便在不同分区之间进行分组。 这通常涉及在执行者和机器之间复制数据,使得洗牌成为复杂而昂贵的操作。

背景

为了理解shuffle过程中发生了什么,我们可以考虑reduceByKey操作的例子。 reduceByKey操作将生成一个新的RDD,其中单个键的所有值都组合为一个元组 - 键和对与该键相关的所有值执行reduce函数的结果。面临的挑战是,并不是所有的单个密钥的值都必须位于同一个分区,甚至是同一个机器上,但是它们必须位于同一地点才能计算出结果。

在Spark中,数据通常不是跨分区分布,而是在特定操作的必要位置。在计算过程中,单个任务将在单个分区上运行 - 因此,要组织单个reduceByKey reduce任务执行的所有数据,Spark需要执行全部操作。它必须从所有分区中读取所有键的值,然后将各个分区上的值汇总在一起,以计算每个键的最终结果 - 这就是所谓的混洗(shuffle)。

虽然新洗牌数据的每个分区中的元素集合是确定性的,分区本身的排序也是确定性的,但是这些元素的排序并不是这样。如果一个人在随机播放之后需要可预测的有序数据,那么可以使用:

  • mapPartitions使用例如.sorted
  • repartitionAndSortWithinPartitions对每个分区进行排序,从而有效地对分区进行排序
  • 同时对sortBy进行重新分区以制作全局排序的RDD

可能导致混洗的操作包括重新分配操作(如重新分区和合并),“像groupByKey和reduceByKey一样的ByKey操作(除计数),以及像cogroup和join一样的连接操作。

性能影响

Shuffle是一个昂贵的操作,因为它牵涉到磁盘I / O,数据序列化和网络I / O。为了组织数据,Spark生成一组任务 - 映射任务来组织数据,以及一组reduce任务来聚合它。这个术语来自MapReduce,并不直接与Spark的map和reduce操作有关。

在内部,来自个别map任务的结果被保存在内存中,直到它们不适合为止。然后,这些将根据目标分区进行排序并写入单个文件。在reduce方面,任务读取相关的排序块。

某些随机操作会消耗大量的堆内存,因为它们使用内存中的数据结构在传输之前或之后组织记录。具体来说,reduceByKey和aggregateByKey在地图上创建这些结构,ByKey操作在reduce方面生成这些结构。当数据不适合存储在内存中时,Spark会将这些表泄露到磁盘,导致额外的磁盘I / O开销和增加的垃圾回收。

Shuffle也会在磁盘上生成大量的中间文件。从Spark 1.3开始,这些文件将被保留,直到相应的RDD不再使用并被垃圾回收。这样做是为了在重新计算谱系时不需要重新创建洗牌文件。如果应用程序保留对这些RDD的引用,或者GC不经常引入,垃圾收集可能会在很长一段时间后才会发生。这意味着长时间运行的Spark作业可能会消耗大量的磁盘空间。在配置Spark上下文时,临时存储目录由spark.local.dir配置参数指定。

随机行为可以通过调整各种配置参数来调整。请参阅“Spark配置指南”中的“Shuffle Behavior”部分。

RDD 持久化

Spark中最重要的功能之一就是在内存中持续(或缓存)一个数据集。当持久化RDD时,每个节点存储它在内存中计算的所有分区,并在该数据集的其他操作(或从中派生的数据集)中重用它们。这使未来的行动(action)更快(通常超过10倍)。缓存是迭代算法和快速交互式使用的关键工具。

您可以使用persist()或cache()方法将RDD标记为持久化。第一次在动作(action)中计算时,它将被保存在节点的内存中。 Spark的缓存是容错的 - 如果RDD的任何分区丢失,它将自动使用最初创建它的转换重新计算。

另外,每个持久RDD可以使用不同的存储级别进行存储,例如,允许您将数据集保存在磁盘上,将其保存在内存中,但是作为序列化的Java对象(以节省空间)将其复制到节点上。这些级别通过传递一个StorageLevel对象(Scala,Java,Python)来持久化()来设置。 cache()方法是使用默认存储级别的简写,即StorageLevel.MEMORY_ONLY(将反序列化的对象存储在内存中)。全套存储级别是:
| 存储级别 | 含义 |
|———|——-|
| MEMORY_ONLY | 将RDD作为反序列化的Java对象存储在JVM中。 如果RDD不适合内存,某些分区将不会被缓存,并且每次需要时都会重新进行计算。 这是默认级别。 |
| MEMORY_AND_DISK | 将RDD作为反序列化的Java对象存储在JVM中。 如果RDD不适合内存,请存储不适合磁盘的分区,并在需要时从中读取。 |
| MEMORY_ONLY_SER (Java and Scala) | 将RDD存储为序列化的Java对象(每个分区一个字节的数组)。 这通常比反序列化的对象更节省空间,特别是在使用快速序列化器的情况下,但需要消耗更多的CPU资源。 |
| MEMORY_AND_DISK_SER (Java and Scala) | 与MEMORY_ONLY_SER类似,但是将不适合内存的分区溢出到磁盘上,而不是在每次需要时重新计算它们。 |
| DISK_ONLY | 将RDD分区仅存储在磁盘上。 |
| MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 与上面的级别相同,但复制两个群集节点上的每个分区。 |
| OFF_HEAP (experimental) | 与MEMORY_ONLY_SER类似,但将数据存储在堆内存中。 这需要启用堆堆内存。 |

注意:在Python中,存储对象将始终与Pickle库序列化,所以选择序列化级别无关紧要。 Python中的可用存储级别包括MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY和DISK_ONLY_2。 Spark也会在shuffle操作(例如reduceByKey)中自动保留一些中间数据,即使没有用户调用persist。 这样做是为了避免在洗牌过程中节点失败时重新输入整个输入。 我们仍然建议用户如果打算重复使用RDD,则调用坚持的RDD。

该选择哪个存储级别?

Spark的存储级别旨在提供内存使用和CPU效率之间的不同折衷。我们建议通过以下过程来选择一个:

  • 如果您的RDD适合默认的存储级别(MEMORY_ONLY),请将其留在原来的位置。这是CPU效率最高的选项,允许RDD上的操作尽可能快地运行。
  • 如果没有,请尝试使用MEMORY_ONLY_SER并选择一个快速序列化库,以使对象更加节省空间,但是访问速度仍然相当快。 (Java和Scala)
  • 除非计算您的数据集的函数是昂贵的,否则它们不会溢出到磁盘上,或者它们会过滤大量的数据。否则,重新计算分区可能与从磁盘读取分区一样快。
  • 如果要快速恢复故障(例如,如果使用Spark来为Web应用程序提供请求),请使用复制的存储级别。所有的存储级别通过重新计算丢失的数据来提供完全的容错能力,但是复制的容量可以让您继续在RDD上运行任务,而无需等待重新计算丢失的分区。

删除数据

Spark会自动监视每个节点上的高速缓存使用情况,并以最近最少使用(LRU)方式删除旧的数据分区。 如果您想要手动删除RDD而不是等待其从缓存中删除,请使用RDD.unpersist()方法。

共享变量

通常,在远程集群节点上执行传递给Spark操作(如map或reduce)的函数时,它将在函数中使用的所有变量的单独副本上运行。 这些变量被复制到每台机器上,远程机器上的变量没有更新传播到驱动程序。 支持通用的,可读写的共享变量将是低效的。 但是,Spark为两种常见的使用模式提供了两种有限类型的共享变量:广播变量和累加器。

广播变量

广播变量允许程序员在每台机器上保存一个只读变量,而不是用任务发送一个只读变量的副本。例如,可以使用它们以有效的方式为每个节点提供大型输入数据集的副本。 Spark还试图使用高效的广播算法来分发广播变量,以降低通信成本。

Spark动作(action)是通过一系列阶段执行的,由分散的“随机”操作分开。 Spark会自动播放每个阶段中任务所需的通用数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前反序列化。这意味着只有跨多个阶段的任务需要相同的数据或以反序列化的形式缓存数据时,显式创建广播变量才是有用的。

广播变量是通过调用SparkContext.broadcast(v)从变量v创建的。广播变量是v的一个包装,它的值可以通过调用value方法来访问。下面的代码显示了这一点:

1
2
3
4
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]

在创建广播变量之后,应该在群集上运行的任何函数中使用值而不是值v,以便v不会多次传送到节点。 另外,对象v在广播之后不应被修改,以确保所有节点获得广播变量的相同值(例如,如果变量稍后被运送到新节点)。

累加器

累加器是只能通过关联和交换操作“添加”的变量,因此可以并行有效地支持。 它们可以用来实现计数器(如在MapReduce中)或者和。 Spark本身支持数字类型的累加器,程序员可以添加对新类型的支持。

作为用户,您可以创建名称或未命名的累加器。 如下图所示,一个命名的累加器(在这种情况下计数器)将显示在修改该累加器的阶段的Web UI中。 Spark显示由“任务”表中的任务修改的每个累加器的值。

累加器

跟踪用户界面中的累加器对于理解运行阶段的进度非常有用(注意:Python尚不支持)。

可以通过调用SparkContext.longAccumulator()或SparkContext.doubleAccumulator()来分别累积Long或Double类型的值来创建数字累加器。 在群集上运行的任务可以使用add方法添加到它。 但是,他们无法读懂它的价值。 只有驱动程序可以使用其值方法读取累加器的值。 下面的代码显示了一个累加器被用来加总一个数组的元素:

1
2
3
4
5
6
7
8
LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10

虽然这段代码使用了对Long类型的累加器的内置支持,程序员也可以通过继承AccumulatorV2来创建它们自己的类型。 AccumulatorV2抽象类有几个方法必须重写:复位重置累加器为零,添加用于向累加器中添加另一个值,合并合并另一个相同类型的累加器到这个累加器中。 其他必须被覆盖的方法包含在API文档中。

例如,假设我们有一个表示数学向量的MyVector类,我们可以这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {

private MyVector myVector = MyVector.createZeroVector();

public void reset() {
myVector.reset();
}

public void add(MyVector v) {
myVector.add(v);
}
...
}

// Then, create an Accumulator of this type:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// Then, register it into spark context:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");

请注意,当程序员定义自己的AccumulatorV2类型时,结果类型可能与添加元素的类型不同。

对于仅在动作内执行的累加器更新,Spark保证每个任务对累加器的更新只会应用一次,即重新启动的任务不会更新该值。 在转换中,用户应该意识到如果任务或作业阶段被重新执行,每个任务的更新可能会被应用多次。

累加器不会改变Spark的懒惰评估模型。 如果它们在RDD上的操作中被更新,则其值仅在RDD作为动作的一部分计算之后才被更新。 因此,在像map()这样的惰性转换中进行累加器更新并不能保证执行。

下面的代码片段演示了这个属性:

1
2
3
LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.

部署到群集

应用程序提交指南介绍了如何将应用程序提交到集群。 简而言之,一旦将应用程序打包成JAR(用于Java / Scala)或一组.py或.zip文件(用于Python),bin / spark-submit脚本就可以将其提交给任何受支持的集群管理器。

从Java / Scala启动Spark作业

org.apache.spark.launcher包提供了使用简单的Java API作为子进程启动Spark作业的类。

单元测试

Spark对任何流行的单元测试框架的单元测试都很友好。 只需在主URL设置为本地的情况下在测试中创建一个SparkContext,运行您的操作,然后调用SparkContext.stop()将其拆除。 确保停止finally块或测试框架的tearDown方法中的上下文,因为Spark不支持在同一个程序中同时运行的两个上下文。

下一步该怎么办

您可以在Spark网站上看到一些Spark程序示例。 另外,Spark在示例目录(Scala,Java,Python,R)中包含了几个示例。

您可以通过将类名传递给Spark的bin / run-example脚本来运行Java和Scala示例; 例如:

1
./bin/run-example SparkPi

对于Python示例,请使用spark-submit代替:

1
./bin/spark-submit examples / src / main / python / pi.py

对于R示例,请使用spark-submit代替:

1
./ bin / spark-submit examples / src / main / r / dataframe.R

有关优化程序的帮助,配置和调优指南提供有关最佳做法的信息。 它们对于确保您的数据以有效的格式存储在内存中尤其重要。 有关部署的帮助,群集模式概述描述了分布式操作中涉及的组件以及支持的群集管理器。

最后,完整的API文档可以在Scala,Java,Python和R.

完结,希望能帮助您!