在线精品99_中国九九盗摄偷拍偷看_91免费版在线观看_91.app_91高清视频在线_99热最新网站

如何进行JobScheduler内幕实现和深度思考

181次阅读
没有评论

共计 3982 个字符,预计需要花费 10 分钟才能阅读完成。

本篇文章为大家展示了如何进行 JobScheduler 内幕实现和深度思考,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

DStream 的 foreachRDD 方法,实例化 ForEachDStream 对象,并将用户定义的函数 foreachFunc 传入到该对象中。foreachRDD 方法是输出操作,foreachFunc 方法会作用到这个 DStream 中的每个 RDD。

/**
 * Apply a function to each RDD in this DStream. This is an output operator, so
 * this DStream will be registered as an output stream and therefore materialized.
 * @param foreachFunc foreachRDD function
 * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
 *  in the `foreachFunc` to be displayed in the UI. If `false`, then
 *  only the scopes and callsites of `foreachRDD` will override those
 *  of the RDDs on the display.
 */
private def foreachRDD(
 foreachFunc: (RDD[T], Time) = Unit,
 displayInnerRDDOps: Boolean): Unit = {
 new ForEachDStream(this,
 context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}

ForEachDStream 对象中重写了 generateJob 方法,调用父 DStream 的 getOrCompute 方法来生成 RDD 并封装 Job,传入对该 RDD 的操作函数 foreachFunc 和 time。dependencies 方法定义为父 DStream 的集合。

/**
 * An internal DStream used to represent output operations like DStream.foreachRDD.
 * @param parent  Parent DStream
 * @param foreachFunc  Function to apply on each RDD generated by the parent DStream
 * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
 *  by `foreachFunc` will be displayed in the UI; only the scope and
 *  callsite of `DStream.foreachRDD` will be displayed.
 */
private[streaming]
class ForEachDStream[T: ClassTag] (
 parent: DStream[T],
 foreachFunc: (RDD[T], Time) = Unit,
 displayInnerRDDOps: Boolean
 ) extends DStream[Unit](parent.ssc) {

 override def dependencies: List[DStream[_]] = List(parent)

 override def slideDuration: Duration = parent.slideDuration

 override def compute(validTime: Time): Option[RDD[Unit]] = None

 override def generateJob(time: Time): Option[Job] = {
 parent.getOrCompute(time) match {
 case Some(rdd) =
 val jobFunc = () = createRDDWithLocalProperties(time, displayInnerRDDOps) {
 foreachFunc(rdd, time)
 }
 Some(new Job(time, jobFunc))
 case None = None
 }
 }
}

DStreamGraph 的 generateJobs 方法中会调用 outputStream 的 generateJob 方法,就是调用 ForEachDStream 的 generateJob 方法。

def generateJobs(time: Time): Seq[Job] = {
 logDebug(Generating jobs for time + time)
 val jobs = this.synchronized {
 outputStreams.flatMap {outputStream =
 val jobOption = outputStream.generateJob(time)
 jobOption.foreach(_.setCallSite(outputStream.creationSite))
 jobOption
 }
 }
 logDebug(Generated + jobs.length + jobs for time + time)
 jobs
}

DStream 的 generateJob 定义如下,其子类中只有 ForEachDStream 重写了 generateJob 方法。

/**
 * Generate a SparkStreaming job for the given time. This is an internal method that
 * should not be called directly. This default implementation creates a job
 * that materializes the corresponding RDD. Subclasses of DStream may override this
 * to generate their own jobs.
 */
private[streaming] def generateJob(time: Time): Option[Job] = {
 getOrCompute(time) match {
 case Some(rdd) = {
 val jobFunc = () = {
 val emptyFunc = {(iterator: Iterator[T]) = {}}
 context.sparkContext.runJob(rdd, emptyFunc)
 }
 Some(new Job(time, jobFunc))
 }
 case None = None
 }
}

DStream 的 print 方法内部还是调用 foreachRDD 来实现,传入了内部方法 foreachFunc,来取出 num+ 1 个数后打印输出。

/**
 * Print the first num elements of each RDD generated in this DStream. This is an output
 * operator, so this DStream will be registered as an output stream and there materialized.
 */
def print(num: Int): Unit = ssc.withScope {
 def foreachFunc: (RDD[T], Time) = Unit = {
 (rdd: RDD[T], time: Time) = {
 val firstNum = rdd.take(num + 1)
 // scalastyle:off println
 println(——————————————-)
 println(Time: + time)
 println(——————————————-)
 firstNum.take(num).foreach(println)
 if (firstNum.length num) println(…)
 println()
 // scalastyle:on println
 }
 }
 foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}

总结:JobScheduler 是 SparkStreaming 所有 Job 调度的中心,内部有两个重要的成员:

JobGenerator 负责 Job 的生成,ReceiverTracker 负责记录输入的数据源信息。

JobScheduler 的启动会导致 ReceiverTracker 和 JobGenerator 的启动。ReceiverTracker 的启动导致运行在 Executor 端的 Receiver 启动并且接收数据,ReceiverTracker 会记录 Receiver 接收到的数据 meta 信息。JobGenerator 的启动导致每隔 BatchDuration,就调用 DStreamGraph 生成 RDD Graph,并生成 Job。JobScheduler 中的线程池来提交封装的 JobSet 对象 (时间值,Job,数据源的 meta)。Job 中封装了业务逻辑,导致最后一个 RDD 的 action 被触发,被 DAGScheduler 真正调度在 Spark 集群上执行该 Job。

上述内容就是如何进行 JobScheduler 内幕实现和深度思考,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注丸趣 TV 行业资讯频道。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-17发表,共计3982字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)
主站蜘蛛池模板: 1024你懂的国产日韩欧美 | 日本高清2019免费视频 | 熟女少妇丰满一区二区 | 日韩中文字幕电影 | 国产偷国产偷亚洲清高动态图 | 伊人网视频在线观看 | 国产成人做受免费视频 | 99久久精品免费看国产麻豆 | 精品国内视频 | 国产精品久久成人影院 | 亚洲va国产va欧美va综合 | 四虎免费在线 | 强开小婷嫩苞又嫩又紧视频 | av一本久道久久波多野结衣 | 久久久久成人精品免费播放动漫 | 欧美一级毛片高清毛片 | 久久久久亚洲av成人无码 | 欧美色成人综合 | 天天操天天摸天天舔 | 欧美色欧美亚洲高清在线视频 | 九九九国产视频 | 第一福利在线观看永久视频 | 国产精品丝袜黑色高跟鞋 | 国产人妻黑人一区二区三区 | 欧美精品黑人粗大 | 色噜噜的亚洲男人的天堂 | 国产精品无码一区二区三区免费 | 在线精品一区二区三区电影 | 国内精品一区二区三区最新 | 蒂法本子h精品一区二区 | 国产日韩欧美一区二区三区综合 | 成人麻豆日韩在无码视频 | 亚洲欧美日韩一区高清中文字幕 | 国产香蕉91tv永久在线 | 亚洲不卡av不卡一区二区 | 日韩欧美中文在线 | 国产aa视频 | 青青草色久综合网 | 男人不识本网站 | 亚洲无av码一区二区三区 | 欧美成人久久久免费播放 |