Task是可执行的实体。是Spark任务调度的最小单元。每个Task都对应一个RDD的分区,也对应Executor任务执国际货代知识行线程池中的教师节文案一个执行线程。
本文介绍Task的基本概念,并分析Task的创建过程。
Task和分区从实现层面讲,一个Stage是一个并行执行的Task集合,它们执行相同的计算逻辑,并作为Spark Job执行的一部分,在同一个Stage中所有的Tasks都具有相同的shuffle依赖(在Stage的划分一节分析过:Stage是按Shuffle依赖为边界进行划分的,所以同一个Stage中的Task可以通过Pipeline运行)。
前面的文章提到过,Job中的分区对应RDD的分区,而在Spark狮驼岭中RDD中的一个分区对应了Stage中的一个任务,它甲天下香烟属于一个RDD用于计算执行函数的部分结果,这些结果作为Spark Job结果的一部分。
图1 分区和Task的对应关系Task的分类
与Stage相对应,Task分为两类:食品防腐剂
ResultTask:这类Task会计算Job的最终结果,并返回结果。ShuffleMapTask:这类Task计算Job中间步骤的结果,并把结果保存到中间的输出文件中。Ta都市王朝sk的实现ResultTaskResultTask是由于执行了Action操作,在提交任务时创建的。执行Action操作时,其实就是使用一个函数来处理中间阶段的输出分区文件。
我们以action的count()操作为例来具体说ResultTask的创建和使用过程。count()操作的代码实现非常简单,如下:
def c电磁制动器ount(): Long = sc.runJob(this, Utils.getIteratorSize _).s塘朗山郊野公园um我们再看一下代码中runJob函数的定义:
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U残酷戏剧] = { runJob(rdd, func, 0 until rdd.partitions.length)格林斯潘 }代码中,runJob函数的含义是,把func函数作用于rdd的每个分区,这样就得到了每个分区的count结果(其结果是一个数组),然后再调用sum函数把这个数组的各个count数加起来,得到最终结果。
我们来看一下ResultTask的实现类,在该类中,定义了运行Task时的函数r汶川地震unTask,在该函数中会执行以下代码:
override def runTask(context: TaskContext): U = { ... // 触发action时执行的函数 func(context, rdd.counterpartsiterator(partition, context)) }从实现代码可以看出ResultTask,会直接使用func在输出的分区数据上。
ShuffleMapTask类这类任务主要为shuffle过程生成中间文件,它会通过shuffleManager来获取ShuffleWriter,并使用该ShuffleWriter来保存shuffle过程中的RDD分区。该类的runTask函数的主要实现代码如下:
override def runTask(context: TaskContext): MapStatus = { ... var writer: ShuffleWriter[Any, Any] = null try { // 获取shfflf站eManager对象 val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.养鱼养几条好shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { ... }从以上代码可以看出:ShuffleMa甜瓜子pTask会先狗大户通过ShuffleManager来获取ShuffleWriter。并通过ShuffleWriter来写入RDD的明末边军分区。
注意:当写入RDD的分区时,会根据RDD的依赖关系依次计算其父RDD的分区数据。
Task的最佳位置的选择为了减少网络传输,提升计算效率,Spark会为Task选择最佳的执行位置,所谓选择最佳执行位置,其实就是选择在哪个worker节点上执行Task。不同RDD的寻找最佳执行位置的方式不同,所以,在RDD的抽象父类中,定义了一个preferredLocations函数,具体类型的RDD通过自己的方式来实现该函数。
在DAGScheduler中,RDD分区计算预算管控的最佳位置在DAGScheduler#getPreferredLocs函数中完成。该函数的实现步骤如下:
若分区已经被访问过,说明该分区的位置已经被记录下来了,量子力学导论此时不需要再查找该分区的最佳位置。若RDD的分区数据被缓存起来了,就直接返回缓存的位置。缓存的位置是通过存储系统BlockManagerMaster来进行查找的,会调用BlockM颈椎牵引anagerMaster#getLocations函数。若1和2都没有找到RDD分区的最佳位置,就查看RDD是否在创建时带有最佳位置信息,每种RDD的最佳位置信息是不同的,这些最佳位置通过RDD#preferredLocations函数返回。下表是其中几种RDD的最佳位置信息:小结本文介绍了Task的基本概念和分类,并分析了Task最佳位置的确定。
本文发布于:2023-06-02 10:25:38,感谢您对本站的认可!
本文链接:http://www.ranqi119.com/ge/85/189734.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |