在线精品99_中国九九盗摄偷拍偷看_91免费版在线观看_91.app_91高清视频在线_99热最新网站

Spark中两个类似的api是什么

144次阅读
没有评论

共计 4941 个字符,预计需要花费 13 分钟才能阅读完成。

这篇“Spark 中两个类似的 api 是什么”文章的知识点大部分人都不太理解,所以丸趣 TV 小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Spark 中两个类似的 api 是什么”文章吧。

Spark 中有两个类似的 api,分别是 reduceByKey   和 groupByKey  。这两个的功能类似,但底层实现却有些不同,那么为什么要这样设计呢?我们来从源码的角度分析一下。

先看两者的调用顺序(都是使用默认的 Partitioner,即 defaultPartitioner)

所用 spark 版本:spark 2.1.0

#### 先看 reduceByKey
Step1
“`
  def reduceByKey(func: (V, V) = V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }
“`
Setp2
“`
  def reduceByKey(partitioner: Partitioner, func: (V, V) = V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) = v, func, func, partitioner)
  }
“`
Setp3
“`
def combineByKeyWithClassTag[C](
      createCombiner: V = C,
      mergeValue: (C, V) = C,
      mergeCombiners: (C, C) = C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, mergeCombiners must be defined) // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException(Cannot use map-side combining with array keys.)
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException(HashPartitioner cannot partition array keys.)
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter = {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }
“`

姑且不去看方法里面的细节,我们会只要知道最后调用的是 combineByKeyWithClassTag 这个方法。这个方法有两个参数我们来重点看一下,
“`
def combineByKeyWithClassTag[C](
      createCombiner: V = C,
      mergeValue: (C, V) = C,
      mergeCombiners: (C, C) = C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)
“`
首先是 **partitioner** 参数,这个即是 RDD 的分区设置。除了默认的 defaultPartitioner,Spark 还提供了 RangePartitioner 和 HashPartitioner 外,此外用户也可以自定义 partitioner。通过源码可以发现如果是 HashPartitioner 的话,那么是会抛出一个错误的。

然后是 **mapSideCombine** 参数,这个参数正是 reduceByKey 和 groupByKey 最大不同的地方,它决定是是否会先在节点上进行一次 Combine 操作,下面会有更具体的例子来介绍。

#### 然后是 groupByKey
Step1
“`
  def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(defaultPartitioner(self))
  }
“`
Step2
“`
  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn t use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) = CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) = buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) = c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }
“`
Setp3
“`
def combineByKeyWithClassTag[C](
      createCombiner: V = C,
      mergeValue: (C, V) = C,
      mergeCombiners: (C, C) = C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, mergeCombiners must be defined) // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException(Cannot use map-side combining with array keys.)
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException(HashPartitioner cannot partition array keys.)
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter = {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }
“`

结合上面 reduceByKey 的调用链,可以发现最终其实都是调用 combineByKeyWithClassTag 这个方法的,但调用的参数不同。
reduceByKey 的调用
“`
combineByKeyWithClassTag[V]((v: V) = v, func, func, partitioner)
“`
groupByKey 的调用
“`
combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
“`
正是两者不同的调用方式导致了两个方法的差别,我们分别来看
– reduceByKey 的泛型参数直接是 [V],而 groupByKey 的泛型参数是 [CompactBuffer[V]]。这直接导致了 reduceByKey 和 groupByKey 的返回值不同,前者是 RDD[(K, V)],而后者是 RDD[(K, Iterable[V])]

– 然后就是 mapSideCombine = false 了,这个 mapSideCombine 参数的默认是 true 的。这个值有什么用呢,上面也说了,这个参数的作用是控制要不要在 map 端进行初步合并(Combine)。可以看看下面具体的例子。

img src= https://cache.yisu.com/upload/information/20210523/355/698556.png width= 65% /

img src= https://cache.yisu.com/upload/information/20210523/355/698557.png width= 65% /

从功能上来说,可以发现 ReduceByKey 其实就是会在每个节点先进行一次 ** 合并 ** 的操作,而 groupByKey 没有。

这么来看 ReduceByKey 的性能会比 groupByKey 好很多,因为有些工作在节点已经处理了。

以上就是关于“Spark 中两个类似的 api 是什么”这篇文章的内容,相信大家都有了一定的了解,希望丸趣 TV 小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注丸趣 TV 行业资讯频道。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-04发表,共计4941字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)
主站蜘蛛池模板: 久久99九九99九九精品 | 国产激情小视频 | 在线精品免费观看综合 | 国产区欧美| 国产日韩欧美不卡www | 午夜成人理论无码电影在线播放 | 久久黄色免费视频 | 免费在线一级毛片 | 边做边流奶水的人妻 | 古代一级毛片 | 亚洲国产香蕉碰碰人人 | 琪琪电影午夜理论片八戒八戒 | 丰满少妇高潮在线播放不卡 | 在线播放无码后入内射少妇 | 人妻精品无码一区二区三区 | 久久―日本道色综合久久 | av网站的免费观看 | 俺去俺来也在线www色官网 | 国产精品手机在线亚洲 | 无码不卡av东京热毛片 | 成人精品一区二区三区电影黑人 | 一级做a爱片就在线看 | 男人女人午夜视频免费 | 日本a毛片在线播放 | 久久91这里精品国产2020 | 欧美啪啪一级毛片 | 黄色在线免费观看 | 成 人 黄 色 视频 免费观看 | 日本大尺度吃奶呻吟视频 | 国产性色强伦免费视频 | 亚欧精品在线观看 | 亚洲一区二区三区高清视频 | 欧美成人18 | 亚洲国产成人手机在线观看 | 91精品欧美一区二区三区 | 在线观看av无需播放器 | 欧美成人一区亚洲一区 | 福利片成人午夜在线 | 精品视频一区二区三区在线播放 | 四虎国产精品免费五月天 | 亚洲人成色7777在线观看不卡 |