大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语— 此篇为大家带来的是RDD缓存和设置检查点 RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。 Spark 中对于数据的保存除了持久化操作之外,还提供了一种检查点的机制,检查点(本质是通过将RDD写入Disk做检查点)是为了通过 Lineage 做容错的辅助 Lineage 过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的 RDD 开始重做 Lineage,就会减少开销。 检查点通过将数据写入到 HDFS 文件系统实现了 RDD 的检查点功能。 为当前 RDD 设置检查点。该函数将会创建一个二进制的文件,并存储到 checkpoint 目录中,该目录是用 SparkContext.setCheckpointDir()设置的。在 checkpoint 的过程中,该RDD 的所有依赖于父 RDD中 的信息将全部被移除。 对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发, 在触发的时候需要对这个 RDD 重新计算. 本次的就到这里了, 好书不厌读百回,熟读课思子自知。而我想要成为全场最靓的仔,就必须坚持通过学习来获取更多知识,用知识改变命运,用博客见证成长,用行动证明我在努力。
不温不火
,本意是希望自己性情温和
。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客主页:https://buwenbuhuo.blog.csdn.net/
一. RDD缓存
但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
在存储级别的末尾加上“_2”来把持久化数据存为两份
缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。// 1.创建一个RDD scala> val rdd = sc.makeRDD(Array("buwenbuhuo")) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at makeRDD at <console>:25 // 2.将RDD转换为携带当前时间戳不做缓存 scala> val nocache = rdd.map(_.toString+System.currentTimeMillis) nocache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[20] at map at <console>:27 // 3.多次打印结果 scala> nocache.collect res0: Array[String] = Array(buwenbuhuo1538978275359) scala> nocache.collect res1: Array[String] = Array(buwenbuhuo1538978282416) scala> nocache.collect res2: Array[String] = Array(buwenbuhuo1538978283199) // 4.将RDD转换为携带当前时间戳并做缓存 scala> val cache = rdd.map(_.toString+System.currentTimeMillis).cache cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at map at <console>:27 // 5.多次打印做了缓存的结果 scala> cache.collect res3: Array[String] = Array(buwenbuhuo1538978435705) scala> cache.collect res4: Array[String] = Array(buwenbuhuo1538978435705) scala> cache.collect res5: Array[String] = Array(buwenbuhuo1538978435705)
二. 设置检查点(checkpoint)
package Day04 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** ** @author 不温卜火 ** * @create 2020-07-26 15:35 ** * MyImapBox :https://buwenbuhuo.blog.csdn.net/ */ object CheckPointDemo { def main(args: Array[String]): Unit = { // 要在SparkContext初始化之前设置, 都在无效 System.setProperty("HADOOP_USER_NAME", "buwenbuhuo") val conf = new SparkConf().setAppName("Practice").setMaster("local[2]") val sc = new SparkContext(conf) // 设置 checkpoint的目录. 如果spark运行在集群上, 则必须是 hdfs 目录 sc.setCheckpointDir("./ck1") val rdd1 = sc.parallelize(Array("abc")) val rdd2: RDD[String] = rdd1.map(_ + " : " + System.currentTimeMillis()) /* 标记 RDD2的 checkpoint. RDD2会被保存到文件中(文件位于前面设置的目录中), 并且会切断到父RDD的引用, 也就是切断了它向上的血缘关系 该函数必须在job被执行之前调用. 强烈建议把这个RDD序列化到内存中, 否则, 把他保存到文件的时候需要重新计算. */ rdd2.checkpoint() rdd2.collect().foreach(println) rdd2.collect().foreach(println) rdd2.collect().foreach(println) } }
如果我的博客对你有帮助、如果你喜欢我的博客内容,请“” “评论”“”
一键三连哦!听说的人运气不会太差,每一天都会元气满满呦!如果实在要白嫖的话,那祝你开心每一天,欢迎常来我博客看看。
码字不易,大家的支持就是我坚持下去的动力。后不要忘了关注
我哦!
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算