在线精品99_中国九九盗摄偷拍偷看_91免费版在线观看_91.app_91高清视频在线_99热最新网站

Spark RDD的创建方式及算子的使用方法是什么

143次阅读
没有评论

共计 6085 个字符,预计需要花费 16 分钟才能阅读完成。

这篇文章主要介绍“Spark RDD 的创建方式及算子的使用方法是什么”,在日常操作中,相信很多人在 Spark RDD 的创建方式及算子的使用方法是什么问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Spark RDD 的创建方式及算子的使用方法是什么”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

一:简单了解 RDD 和 RDD 处理数据

 RDD,全称为 Resilient Distributed Datasets,是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。

 RDD:Spark 的核心概念是 RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。

 RDD 本质上是一个内存数据集,在访问 RDD 时,指针只会指向与操作相关的部分。例如存在一个面向列的数据结构,其中一个实现为 Int 的数组,另一个实现为 Float 的数组。如果只需要访问 Int 字段,RDD 的指针可以只访问 Int 数组,避免了对整个数据结构的扫描。

 RDD 将操作分为两类:transformation 与 action。无论执行了多少次 transformation 操作,RDD 都不会真正执行运算,只有当 action 操作被执行时,运算才会触发。而在 RDD 的内部实现机制中,底层接口则是基于迭代器的,从而使得数据访问变得更高效,也避免了大量中间结果对内存的消耗。

  在实现时,RDD 针对 transformation 操作,都提供了对应的继承自 RDD 的类型,例如 map 操作会返回 MappedRDD,而 flatMap 则返回 FlatMappedRDD。当我们执行 map 或 flatMap 操作时,不过是将当前 RDD 对象传递给对应的 RDD 对象而已。

注意:创建的 Maven 工程,以下是 pom.xml 中的依赖:

dependencies 
 dependency 
 groupId junit /groupId 
 artifactId junit /artifactId 
 version 4.12 /version 
 /dependency 
 dependency 
 groupId org.apache.spark /groupId 
 artifactId spark-core_2.10 /artifactId 
 version 1.6.1 /version 
 /dependency 
 dependency 
 groupId org.apache.hadoop /groupId 
 artifactId hadoop-client /artifactId 
 version 2.6.4 /version 
 /dependency 
 dependency 
 groupId org.apache.spark /groupId 
 artifactId spark-sql_2.10 /artifactId 
 version 1.6.1 /version 
 /dependency 
 /dependencies

二:从 Hadoop 文件系统(或与 Hadoop 兼容的其他持久化存储系统,如 Hive,HBase)输出(HDFS)创建。

    eg:  求 HDFS 文件中内容所有行数据长度及总长度。

public class TestRDD1 {public static void main(String[] args) {createRDDFromHDFS();
 private static void createRDDFromHDFS(){SparkConf conf = new SparkConf();
 conf.set( spark.testing.memory ,  269522560000 
 JavaSparkContext sc = new JavaSparkContext(local ,  Spark Test , conf);
 System.out.println( sc );
 JavaRDD String  rdd = sc.textFile( hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt 
 JavaRDD Integer  newRDD = rdd.map( new Function String,Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(String string) throws Exception {System.out.println( string +     + string.length() );
 return string.length();
 System.out.println( newRDD.count() );
 int length = newRDD.reduce( new Function2 Integer, Integer, Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(Integer int1, Integer int2) throws Exception {
 return int1+int2;
 System.out.println(总和  + length);
}

三:通过 parallelize 或 makeRDD 将单击数据创建为分布式 RDD。

eg:求总和。

public class TestRDD2 {public static void main(String[] args) {createRDDFromSuperRDD();
  * JavaSparkContext(String master, String appName, SparkConf conf)
  * master - Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
  * appName - A name for your application, to display on the cluster web UI
  * conf - a SparkConf object specifying other Spark parameters
  * */
 private static void createRDDFromSuperRDD(){SparkConf conf = new SparkConf();
 conf.set( spark.testing.memory ,  269522560000 
 JavaSparkContext sc = new JavaSparkContext(local ,  Spark Test , conf);
 System.out.println( sc );
 List Integer  list = new ArrayList Integer 
 for( int i=1;i i++){list.add(i);
 JavaRDD Integer  rdd = sc.parallelize(list);
 JavaRDD Integer  newRDD = rdd.map( new Function Integer,Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(Integer int1) throws Exception {
 return int1;
 int count = newRDD.reduce( new Function2 Integer, Integer, Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(Integer int1, Integer int2) throws Exception {
 return int1+int2;
 System.out.println(总和  + count);
}

注意:上述两段代码中,在获取 JavaSparkContext 的时候,是这样写的:

 SparkConf conf = new SparkConf();

     conf.set(spark.testing.memory , 269522560000  // 给 jvm 足够的资源。

     JavaSparkContext sc = new JavaSparkContext(local , Spark Test , conf);

而对于标记的加粗红色部分,参照 API 如下:

 JavaSparkContext(String master, String appName, SparkConf conf)

 -master – Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
 -appName – A name for your application, to display on the cluster web UI
 -conf – a SparkConf object specifying other Spark parameters

对于 master,官网有详细的介绍:

我这里写的是 local,表示的是:

    对于本地模式测试和单元测试,可以通过 local 在 spark 内运行程序。

******************************

另外写的一段,对算子中一些基本方法的使用

参考学习:

    RDD 算子分类: http://my.oschina.net/gently/blog/686800 (自己的。)

public class TestRDD3 {
 private static String appName =  Test Spark RDD 
 private static String master =  local 
 public static void main(String[] args) {SparkConf conf = new SparkConf();
 conf.set( spark.testing.memory ,  269522560000 
 JavaSparkContext sc = new JavaSparkContext(master, appName, conf);
 System.out.println( sc );
 List String  list = new ArrayList String 
 list.add(  Berg  );
 list.add(  Hadoop  );
 list.add(  HBase  );
 list.add(  Hive  );
 list.add(  Spark  );
 JavaRDD String  rdd = sc.parallelize(list);
 JavaRDD Integer  newrdd = rdd.map( new Function String,Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(String string) throws Exception {System.out.println( string +  \t  +string.length() );
 return string.length();
 Integer length = newrdd.reduce( new Function2 Integer, Integer, Integer () {
 private static final long serialVersionUID = 1L;
 public Integer call(Integer i1, Integer i2) throws Exception {
 return i1+i2;
 long count = newrdd.count();
 List Integer  listnewrdd = newrdd.collect();
 for (Integer integer : listnewrdd) {System.out.print(integer +   \t  );
 System.out.println(  \nlength --    + length +     + count );
 System.out.println(  \n\n**************************************\n\n 
 List Integer  list1 = new ArrayList Integer 
 for( int i=1; i i++){list1.add( i );
 JavaRDD Integer  rdd1 = sc.parallelize(list1);
 JavaRDD Integer  unionrdd = newrdd.union(rdd1);
 JavaRDD Integer  rdd2 = unionrdd.map( new Function Integer,Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(Integer i) throws Exception {
 return i;
 long count2 = rdd2.reduce( new Function2 Integer, Integer, Integer () {
 private static final long serialVersionUID = 1L;
 public Integer call(Integer arg0, Integer arg1) throws Exception {
 return arg0 + arg1;
 System.out.println(count2 --    +count2 );
 rdd2.foreach( new VoidFunction Integer (){
 private static final long serialVersionUID = 1L;
 public void call(Integer arg0) throws Exception {System.out.println(  foreach--    + arg0 );
}

到此,关于“Spark RDD 的创建方式及算子的使用方法是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计6085字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)
主站蜘蛛池模板: 国产福利一区二区三区 | 国产在线精品一区二区不卡麻豆 | 青青青在线观看国产精品 | 国产99热在线观看 | 可以直接看的毛片 | 台湾无码一区二区 | 欧美综合区 | 色综合欧美综合天天综合 | 中国漂亮护士一级毛片 | 四虎4hu亚洲精品 | 中文字幕亚洲激情 | 成人久久网站 | 中国丰满熟妇av | 苍井空张开腿实干12次 | 欧美性a爱片 | 亚洲色图偷 | 久久在线视频免费观看 | 99久久精品免费 | 亚洲国产一区二区三区综合片 | 日韩精品欧美国产精品亚 | 一级片中文字幕 | 中国美女一级毛片 | 少妇性荡欲午夜性开放视频剧场 | 呦交小u女国产秘密入口 | 中文在线免费视频 | 国产成人av三级在线观看按摩 | 欧美一级成人一区二区三区 | 韩国美女一级毛片 | 色偷偷偷久久伊人大杳蕉 | 少妇做爰免费视频了 | 国产成人精品白浆久久69 | 亚洲欧美成人一区二区三区 | 国产一区二区三区四 | 二个人看的www免费视频 | 久久久久久久久性潮 | 色成人综合 | 人人鲁人人莫人人爱精品 | 成人在线综合网 | 亚洲欧美韩国日产综合在线 | 成人免费777777被爆出 | 四虎精品成人免费观看 |