在线精品99_中国九九盗摄偷拍偷看_91免费版在线观看_91.app_91高清视频在线_99热最新网站

结构化处理之Spark Session的示例分析

145次阅读
没有评论

共计 3460 个字符,预计需要花费 9 分钟才能阅读完成。

丸趣 TV 小编给大家分享一下结构化处理之 Spark Session 的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

创建 DataFrame,有三种模式,一种是 sql() 主要是访问 Hive 表;一种是从 RDD 生成 DataFrame,主要从 ExistingRDD 开始创建;还有一种是 read/format 格式,从 json/txt/csv 等数据源格式创建。

先看看第三种方式的创建流程。

1、read/format

def read: DataFrameReader = new DataFrameReader(self)

SparkSession.read() 方法直接创建 DataFrameReader,然后再 DataFrameReader 的 load() 方法来导入外部数据源。load() 方法主要逻辑如下:
 

def load(paths: String*): DataFrame = {
 sparkSession.baseRelationToDataFrame(
 DataSource.apply(
 sparkSession,
 paths = paths,
 userSpecifiedSchema = userSpecifiedSchema,
 className = source,
 options = extraOptions.toMap).resolveRelation())
 }

创建对应数据源类型的 DataSource,DataSource 解析成 BaseRelation, 然后通过 SparkSession 的 baseRelationToDataFrame 方法从 BaseRelation 映射生成 DataFrame。从 BaseRelation 创建 LogicalRelation,然后调用 Dataset.ofRows 方法从 LogicalRelation 创建 DataFrame。DataFrame 实际就是 Dataset。

type DataFrame = Dataset[Row]

baseRelationToDataFrame 的定义:

def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { Dataset.ofRows(self, LogicalRelation(baseRelation))
 }

Dataset.ofRows 方法主要是将逻辑计划转换成物理计划,然后生成新的 Dataset。

2、执行

SparkSession 的执行关键是如何从 LogicalPlan 生成物理计划。我们试试跟踪这部分逻辑。

def count(): Long = withAction( count , groupBy().count().queryExecution) {plan =

  plan.executeCollect().head.getLong(0)

  }

Dataset 的 count() 动作触发物理计划的执行,调用物理计划 plan 的 executeCollect 方法,该方法实际上会调用 doExecute() 方法生成 Array[InternalRow] 格式。executeCollect 方法在 SparkPlan 中定义。

3、HadoopFsRelation

需要跟踪下如何从 HadoopFsRelation 生成物理计划(也就是 SparkPlan)

通过 FileSourceStrategy 来解析。它在 FileSourceScanExec 上叠加 Filter 和 Projection 等操作,看看 FileSourceScanExec 的定义:

case class FileSourceScanExec(
 @transient relation: HadoopFsRelation,
 output: Seq[Attribute],
 requiredSchema: StructType,
 partitionFilters: Seq[Expression],
 dataFilters: Seq[Expression],
 override val metastoreTableIdentifier: Option[TableIdentifier])
 extends DataSourceScanExec with ColumnarBatchScan {}

它的主要执行代码 doExecute() 的功能逻辑如下:

protected override def doExecute(): RDD[InternalRow] = { if (supportsBatch) {
 // in the case of fallback, this batched scan should never fail because of:
 // 1) only primitive types are supported
 // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
 WholeStageCodegenExec(this).execute()
 } else {
 val unsafeRows = {
 val scan = inputRDD
 if (needsUnsafeRowConversion) { scan.mapPartitionsWithIndexInternal { (index, iter) = 
 val proj = UnsafeProjection.create(schema)
 proj.initialize(index)
 iter.map(proj)
 }
 } else {
 scan
 }
 }
 val numOutputRows = longMetric(numOutputRows)
 unsafeRows.map { r = 
 numOutputRows += 1
 r
 }
 }
 }

inputRDD 有两种方式创建,一是 createBucketedReadRDD,二是 createNonBucketedReadRDD。两者没有本质的区别,仅仅是文件分区规则的不同。

private lazy val inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) =  Iterator[InternalRow] =
 relation.fileFormat.buildReaderWithPartitionValues(
 sparkSession = relation.sparkSession,
 dataSchema = relation.dataSchema,
 partitionSchema = relation.partitionSchema,
 requiredSchema = requiredSchema,
 filters = pushedDownFilters,
 options = relation.options,
 hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))

 case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =  createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)  case _ =  createNonBucketedReadRDD(readFile, selectedPartitions, relation)  }  } createNonBucketedReadRDD 调用 FileScanRDD :new FileScanRDD(fsRelation.sparkSession, readFile, partitions)

以上是“结构化处理之 Spark Session 的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注丸趣 TV 行业资讯频道!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计3460字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)
主站蜘蛛池模板: 亚洲精品无码高潮喷水在线 | 四虎永久在线精品免费一区二区 | 亚洲国产精品久久一线不卡 | 国产精品美女一区二区三区 | 亚洲欧美日本另类 | 久久久高清免费视频 | 欧美成在线观看 | 高清性色生活片久久久 | 亚洲∧v久久久无码精品 | 羞羞草影院 | 一级待一黄aaa大片在线还看 | 国产亚洲福利一区二区免费看 | 亚洲一区二区综合 | 日本一卡二卡不卡视频查询 | 91久久偷偷做嫩草影院免 | 国产极品美女高潮抽搐免费网站 | 精品一区二区高清在线观看 | 91久久国产成人免费观看资源 | 中文字幕乱偷无码av先锋蜜桃 | 91国语精品自产拍在线观看一 | 久久久久国产亚洲日本 | 亚洲天天做日日做天天欢毛片 | 麻豆国产人妻欲求不满 | 少妇高清精品毛片在线视频 | 中文字幕99在线精品视频免费看 | 中文无码日韩欧 | 亚洲av丰满熟妇在线播放 | 国产xxx在线 | 一区二区三区在线免费观看视频 | 九九在线精品 | 国产线播放免费人成视频播放 | 亚洲欧美一级久久精品 | 福利国模私拍视频在线观看 | 色久综合网精品一区二区 | 国产精品无码av一区二区三区 | 亚洲大片免费看 | 日本不卡一区二区三区在线观看 | 亚洲黄色一级毛片 | 一级特黄aaa大片在 一级特黄aaa大片在线观看 | 亚洲精品无码av人在线观看 | 久久精品免视着国产成人 |