在线精品99_中国九九盗摄偷拍偷看_91免费版在线观看_91.app_91高清视频在线_99热最新网站

Spark结构化流处理机制之容错机制的示例分析

139次阅读
没有评论

共计 3792 个字符,预计需要花费 10 分钟才能阅读完成。

这篇文章给大家分享的是有关 Spark 结构化流处理机制之容错机制的示例分析的内容。丸趣 TV 小编觉得挺实用的,因此分享给大家做个参考,一起跟随丸趣 TV 小编过来看看吧。

容错机制

端到端的有且仅有一次保证, 是结构化流设计的关键目标之一.

结构化流设计了  Structured Streaming sources,sinks 等等, 来跟踪确切的处理进度, 并让其重启或重运行来处理任何故障

streaming source 是类似 kafka 的偏移量 (offsets) 来跟踪流的读取位置. 执行引擎使用检查点 (checkpoint) 和预写日志 (write ahead logs) 来记录每个执行其的偏移范围值

streaming sinks 是设计用来保证处理的幂等性

这样, 依靠可回放的数据源 (streaming source) 和处理幂等(streaming sinks), 结构流来做到任何故障下的端到端的有且仅有一次保证

val lines = spark.readStream
 .format(socket)
 .option(host ,  localhost)
 .option(port , 9999)
 .load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(  ))
// Generate running word count
val wordCounts = words.groupBy(value).count()

其中,spark 是 SparkSession,lines 是 DataFrame,DataFrame 就是 Dataset[Row]。

DataSet

看看 Dataset 的触发因子的代码实现,比如 foreach 操作:

def foreach(f: T =  Unit): Unit = withNewRDDExecutionId { rdd.foreach(f)
 }

 private def withNewRDDExecutionId[U](body: =  U): U = { SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) {  rddQueryExecution.executedPlan.foreach { plan =  plan.resetMetrics()  }  body  }  }

接着看:

 def withNewExecutionId[T](
 sparkSession: SparkSession,
 queryExecution: QueryExecution,
 name: Option[String] = None)(body: =  T): T = {
 val sc = sparkSession.sparkContext
 val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
 val executionId = SQLExecution.nextExecutionId
 sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
 executionIdToQueryExecution.put(executionId, queryExecution)
 try { 
 withSQLConfPropagated(sparkSession) { 
 try { 
 body
 } catch { 
 } finally { 
 }
 }
 } finally { executionIdToQueryExecution.remove(executionId)
 sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId)
 }
 }

执行的真正代码就是 queryExecution: QueryExecution。 

@transient private lazy val rddQueryExecution: QueryExecution = { val deserialized = CatalystSerde.deserialize[T](logicalPlan)
 sparkSession.sessionState.executePlan(deserialized)
 }

看到了看到了,是 sessionState.executePlan 执行 logicalPlan 而得到了 QueryExecution

这里的 sessionState.executePlan 其实就是创建了一个 QueryExecution 对象。然后执行 QueryExecution 的 executedPlan 方法得到 SparkPlan 这个物理计划。怎么生成的呢?

lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) { SparkSession.setActiveSession(sparkSession) 
 planner.plan(ReturnAnswer(optimizedPlan.clone())).next()
 }

通过 planner.plan 方法生成。

planner 是 SparkPlanner。在 BaseSessionStateBuilder 类中定义。

protected def planner: SparkPlanner = { new SparkPlanner(session.sparkContext, conf, experimentalMethods) { override def extraPlanningStrategies: Seq[Strategy] =
 super.extraPlanningStrategies ++ customPlanningStrategies
 }
 }

SparkPlanner 类

SparkPlanner 对 LogicalPlan 执行各种策略,返回对应的 SparkPlan。比如对于流应用来说,有这样的策略:DataSourceV2Strategy。

典型的几个逻辑计划到物理计划的映射关系如下:

StreamingDataSourceV2Relation-》ContinuousScanExec

StreamingDataSourceV2Relation-》MicroBatchScanExec

前一种对应与 Offset 没有 endOffset 的情况,后一种对应于有 endOffset 的情况。前一种是没有结束的连续流,后一种是有区间的微批处理流。

前一种的时延可以达到 1ms,后一种的时延只能达到 100ms。

【代码】:

case r: StreamingDataSourceV2Relation if r.startOffset.isDefined   r.endOffset.isDefined = 
 val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]
 val scanExec = MicroBatchScanExec( r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)
 val withProjection = if (scanExec.supportsColumnar) {
 scanExec
 } else {
 // Add a Project here to make sure we produce unsafe rows.
 ProjectExec(r.output, scanExec)
 }
 withProjection :: Nil
 case r: StreamingDataSourceV2Relation if r.startOffset.isDefined   r.endOffset.isEmpty = 
 val continuousStream = r.stream.asInstanceOf[ContinuousStream]
 val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get)
 val withProjection = if (scanExec.supportsColumnar) {
 scanExec
 } else {
 // Add a Project here to make sure we produce unsafe rows.
 ProjectExec(r.output, scanExec)
 }
 withProjection :: Nil

感谢各位的阅读!关于“Spark 结构化流处理机制之容错机制的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计3792字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)
主站蜘蛛池模板: 久久精品国产只有精品66 | 天无日天天射天天视 | 狠狠操.com | 男人天堂色 | 美女性爽视频国产免费 | 欧美色亚洲 | 精品真实国产乱文在线 | 一本丁香综合久久久久不卡网站 | 亚洲男人网站 | 久久精品成人一区二区三区 | 九九99热久久精品在线9 | 国产女人喷潮视频在线观看 | 欧美日韩中文字幕久久伊人 | 国产精品久久人妻互换 | 暗香影院午夜片 | 美女免费毛片 | 女人l8毛片a一级毛片 | 午夜天堂av天堂久久久 | 一级毛片毛片毛片毛毛片 | 国产精品ⅴ无码大片在线看 | 特黄aa级毛片免费视频播放 | 人妻丰满熟妞av无码区 | 在线观看黄色毛片 | 欧美国产亚洲一区二区三区 | 超级香蕉97视频在线观看一区 | 国产福利视频一区二区三区四区 | 不卡一区二区在线观看 | 一级网站在线观看 | 久久99亚洲精品久久久久网站 | 欧美大杳蕉视频在线观看 | 亚洲另类天堂 | 久久久久成人精品无码中文字幕 | 亚洲色图二区 | 欧美一区二区三区电影 | 曰本aaaaa毛片午夜网站 | 秋霞鲁丝片av无码 | 亚洲精品777 | 欧美成人v视频免费看 | 久久精品国产精品亚洲20 | 女人张开腿让男人桶爽 | 视频在线一区二区三区 |