RDD到底是什么?RDD的API_rdd bank-程序员宅基地

技术标签: Spark  分布式  大数据  

RDD到底是什么?RDD的API

大家好,我是W

今天给大家带来一篇关于Spark和RDD的博客,由于我也是初学者,所以没法带来那么深刻的东西,但是我希望用我的感性认知带给大家一点灵感,毕竟刚开始学习Spark的时候我对RDD概念、Spark流程是有很多困惑的,我觉得大家也可能存在这种问题。OK,接下来我将从以下几个角度来讲RDD和Spark:1、 Spark简介、对比hadoop、生态,2、 RDD概念

1、 Spark简介、对比hadoop、生态

1.1 Spark简介

Spark官网,可以看到官方对Spark的概述:

Spark Overview
Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.
Apache的Spark是一个用于大规模数据处理的统一分析引擎。它提供了一系列Java、Scaala、Python的高级API以及优化引擎,所以支持统一的操作。它同样的提供了一系列丰富的高阶工具,包括用于SQL查询、结构化数据处理的Spark SQL,用于机器学习的MLlib库,用于图处理的GraphX库,以及用于增量计算和流处理的Streaming库。

可以看到官网对Spark的定义就是一个大一统的框架,其中存在做结构化数据处理的组件Spark SQL,有用于机器学习的MLlib组件等等。在我实际学习的过程中可以感觉到组件间的关系就好像积木一样,需要的时候插上即可。

1.2 Spark对比Hadoop

Spark对比hadoop最大的特点就是快,在官网上第一张图就摆出来Spark比hadoop快了百倍,Spark的运算是基于内存的,而hadoop则需要通过HDFS将数据持久化到磁盘,所以显然是快的,但是快多少还是要看实际生产环境吧。

可是除了这点就没了吗?其实还有的,在《大数据基础:Spark工作原理及基础概念》中给大家罗列出来了:

特点 说明
spark 计算速度快 spark将每个任务构建成DAG进行计算,内部的计算过程通过弹性式分布式数据集RDD在内存在进行计算,相比于hadoop的mapreduce效率提升了100倍。
易于使用 spark 提供了大量的算子,开发只需调用相关api进行实现无法关注底层的实现原理。相较于以前离线任务采用mapreduce实现,实时任务采用storm实现,目前这些都可以通过spark来实现,降低来开发的成本。同时spark 通过spark SQL降低了用户的学习使用门槛,还提供了机器学习,图计算引擎等。
支持多种的资源管理模式 学习使用中可以采用local 模型进行任务的调试,在正式环境中又提供了standalone,yarn等模式,方便用户选择合适的资源管理模式进行适配。
社区支持 spark 生态圈丰富,迭代更新快,成为大数据领域必备的计算引擎。

1.3 Spark生态圈

其实刚刚介绍Spark的时候已经讲了一点了,大家请看图:

在这里插入图片描述

这是我找到比较合理的一张图,它把不同的工作内容分层,结构比较清晰。

说明
资源调度层 因为我们的任务是要提交到集群上运行的,不同的结点有不同的工作,所以需要对计算资源进行调度,而在这一层的资源调度方式就有很多:local模式、StandAlone模式、yarn模式、mesos模式等等。
计算层 计算层主要使用的是spark-core这个spark的核心库,其面向的是离线的计算,而R、Python这些就是所支持的语言。
存储层 存储层包括一系列的存储组件,最常见的比如有hadoop-HDFS、MySQL、HBASE、MongoDB、Redis等等,这些均是spark生态可以对接的存储组件,而右边的sparkSQL显然是支持这些数据源的,而下方的MLlib等等显然需要数据的支持。
数据流 在做实时计算的时候streaming可以对接flume、kafka等组件。

2、 RDD的概念(RDD到底是什么)、Spark的工作流程

这两个话题涉及了很多因素,我感觉这一篇文章还是不可能讲的很清楚,但是我会用我能做到的最朴素的语言给大家感性的讲一讲。同时,我建议大家多做几个小案例来加深认识。

2.1 RDD的概念

2.1.1 官方的定义

RDD是Spark中最重要的概念,其全称叫做Resilient Distributed Dataset (RDD),即弹性分布式数据集,是一种可容错的、可以被并行操作元素集合,是Spark中处理所有数据的一种基本抽象。

光是看这一句还是不够的,我在源码中找来注释给大家看一下,我建议大家仔细看下源码的注释

/**
 * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
 * partitioned collection of elements that can be operated on in parallel. This class contains the
 * basic operations available on all RDDs, such as `map`, `filter`, and `persist`.
 * 一个弹性分布式数据集(RDD),是Spark里的基本抽象。
 * 它代表了可以被并行操作的不可变的分区元素集合。这个类包含了各种RDD都支持的基本操作,比如map、filter、persist等。
 * 
 * In addition,[[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such as `groupByKey` and `join`;
 * 此外,org.apache.spark.rdd.PairRDDFunctions里还包含了只有键值对(key-value)类型RDD可用的操作,比如groupByKey、join等。
 * 
 * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of Doubles; 
 * org.apache.spark.rdd.DoubleRDDFunctions 里包含了只有Double数据类型的RDD可用的操作。
 * 
 * and [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can be saved as SequenceFiles.
 * org.apache.spark.rdd.SequenceFileRDDFunctions 里包含了可以被序列化成文件的RDD所包含的操作。
 * 
 * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit.
 * 所有的操作都可以通过implicit来赋予。
 * 
 * Internally, each RDD is characterized by five main properties:
 * 在RDD内部,每一个RDD都由这五个主要特征来描述:
 * 
 *  - A list of partitions
 *  - 一系列分区
 *  
 *  - A function for computing each split
 *  - 对每一个分片做计算的函数
 *  
 *  - A list of dependencies on other RDDs
 *  - 一系列对其他RDD的依赖
 *  
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
 *  - 视情况而定,一个作用于键值对RDD的分区器
 *  
 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
 *  - 视情况而定, 要计算每个分片的首选位置的列表
 *
 * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
 * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
 * reading data from a new storage system) by overriding these functions. Please refer to the
 * <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
 * for more details on RDD internals.
 * Spark里的所有scheduling和execution都是基于这些方法(通过赋予RDD操作的方式)来实现其自身的计算方式,当然用户可以通过重写方法自定义RDD。
 */

最后注释中还贴心的给出了RDD的提出的论文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》

RDD的操作分为两大类,Transformation、Action。

Transformation是对已有的RDD进行转换(记录下一步操作)然后生成新的RDD,采用的是lazy策略,不会立即计算出结果。

Action是让已有的RDD对数据执行它的操作。

表格来自:大数据之Spark简介及RDD说明

Transformation
方法(算子) 说明
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset) 笛卡尔积
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)
Action
方法(算子) 说明
reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering])
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func进行更新。
2.1.2 我的感性认识

刚开始我对RDD也是很迷惑,它是在哪里体现了并行化计算的?但是当我真正正正做一个完整的案例时,我才对他有那么一点理解。

大家可以想一个完整的离线计算案例,比如:

我们需要计算美团上外卖的标签,那么我们会有类似以下数据集:

商品ID 用户ID 评价(String)
109283 yyyyxxx 味道还不错,就是有点贵
109283 swssim 虽然有点贵,但是分量足
109284 swssim 好难吃!

我们的目标是针对商品做标签,依据是商品出现最多的5个评价标签。

  • 1、 首先我们通过sparkContext读取数据
  • 2、 因为我们拿到的是评价String,所以做分词,这里假设分词调包成功,评价此时不再是一个长长的话,而是:评价1,评价2
  • 3、 接下来,提取出商品ID,评价
  • 4、 根据商品ID聚类,即groupByKey
  • 5、 对后面标签做操作…

请大家注意第3步,我们的程序放到集群中,而集群中显然不止一台worker,即显然不止一个executor,所以我们整个spark集群中每一个executor拿到的只是整个数据集的一部分(第一台拿0 - n-1行,第二台拿n - 2n-1行类似这样),但是我们的操作是写在一份程序里面,如何对不同机器中的数据集做统一的操作呢?

这显然就是RDD的作用,程序提交时会经过cluster manager分配资源、通过driver提交代码到executor,然后经过各种scheduler把程序进行分析,分成多个stage每一个stage代表了不需要跨机器执行的操作的集合(比如map、filter),而当出现要跨机器操作(比如collect、reduce)时,则会把数据集中到一台机器去操作。

说了那么多,RDD到底是什么呢?

解释1 : 因为每一台机器都知道哪几步本机器不需要依靠别人可以自己做(stage),所以可以先做,不需要看别人脸色,而遇到大家统一的操作时通过网络把数据合并由一台机器做。RDD就是定义这些操作的对象,RDD操作的对象就是分布在不同机器上的同一格式的数据集。

解释2 : 数据集分布在不同机器中,RDD定义了各个机器对这份数据的同一操作(先做什么再做什么)。就好像你安排你的小弟,去不同银行,插入银行卡,输入密码,取5000块钱,然后拿回来,最后给你汇总一样。

参考

总结

Spark毫无疑问是个非常优秀的框架,其中的组件就仿佛积木一般随时插拔。RDD作为Spark的最重要的概念,对Spark整个框架起着至关重要的作用。RDD的操作分为Transoformation和Action两种,其核心理念是定义一个抽象的数据操作,从而方便每个分区针对各自所管理的数据做统一的操作。今天这篇博客可能还有很多没法讲清楚的地方,接下来我会继续把Spark的其他概念、RDD涉及的相关概念更详细的给大家理清楚。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/Alian_W/article/details/109770644

智能推荐

如何配置DNS服务的正反向解析_dns反向解析-程序员宅基地

文章浏览阅读3k次,点赞3次,收藏13次。root@server ~]# vim /etc/named.rfc1912.zones #添加如下内容,也可直接更改模板。[root@server ~]# vim /etc/named.conf #打开主配置文件,将如下两处地方修改为。注意:ip地址必须反向书写,这里文件名需要和反向解析数据文件名相同。新建或者拷贝一份进行修改。nslookup命令。_dns反向解析

设置PWM占空比中TIM_SetCompare1,TIM_SetCompare2,TIM_SetCompare3,TIM_SetCompare4分别对应引脚和ADC通道对应引脚-程序员宅基地

文章浏览阅读2.5w次,点赞16次,收藏103次。这个函数TIM_SetCompare1,这个函数有四个,分别是TIM_SetCompare1,TIM_SetCompare2,TIM_SetCompare3,TIM_SetCompare4。位于CH1那一行的GPIO口使用TIM_SetCompare1这个函数,位于CH2那一行的GPIO口使用TIM_SetCompare2这个函数。使用stm32f103的除了tim6和tim7没有PWM..._tim_setcompare1

多线程_进程和线程,并发与并行,线程优先级,守护线程,实现线程的四种方式,线程周期;线程同步,线程中的锁,Lock类,死锁,生产者和消费者案例-程序员宅基地

文章浏览阅读950次,点赞33次,收藏19次。多线程_进程和线程,并发与并行,线程优先级,守护线程,实现线程的四种方式,线程周期;线程同步,线程中的锁,Lock类,死锁,生产者和消费者案例

在 Linux 系统的用户目录下安装 ifort 和 MKL 库并配置_在linux系统的用户目录下安装ifort和mkl库并配置-程序员宅基地

文章浏览阅读2.9k次。ifort 编译器的安装ifort 编译器可以在 intel 官网上下载。打开https://software.intel.com/content/www/us/en/develop/tools/oneapi/components/fortran-compiler.html#gs.7iqrsm点击网页中下方处的 Download, 选择 Intel Fortran Compiler Classic and Intel Fortran Compiler(Beta) 下方对应的版本。我选择的是 l_在linux系统的用户目录下安装ifort和mkl库并配置

使用ftl文件生成图片中图片展示无样式,不显示_ftl格式pdf的样式调整-程序员宅基地

文章浏览阅读689次,点赞7次,收藏8次。些项目时需要一个生成图片的方法,我在网上找到比较方便且适合我去设置一些样式的生成方式之一就是使用Freemarker,在对应位置上先写好一个html格式的ftl文件,在对应位置用${参数名}填写上。还记得当时为了解决图片大小设置不上,搜索了好久资料,不记得是在哪看到的需要在里面使用width与height直接设置,而我当时用style去设置,怎么都不对。找不到,自己测试链接,准备将所有含有中文的图片链接复制一份,在服务器上存储一份不带中文的文件。突然发现就算无中文,有的链接也是打不开的。_ftl格式pdf的样式调整

orin Ubuntu 20.04 配置 Realsense-ROS_opt/ros/noetic/lib/nodelet/nodelet: symbol lookup -程序员宅基地

文章浏览阅读1.5k次,点赞6次,收藏12次。拉取librealsense。_opt/ros/noetic/lib/nodelet/nodelet: symbol lookup error: /home/admin07/reals

随便推点

操作系统精选习题——第四章_系统抖动现象的发生由什么引起的-程序员宅基地

文章浏览阅读3.4k次,点赞3次,收藏29次。一.单选题二.填空题三.判断题一.单选题静态链接是在( )进行的。A、编译某段程序时B、装入某段程序时C、紧凑时D、装入程序之前Pentium处理器(32位)最大可寻址的虚拟存储器地址空间为( )。A、由内存的容量而定B、4GC、2GD、1G分页系统中,主存分配的单位是( )。A、字节B、物理块C、作业D、段在段页式存储管理中,当执行一段程序时,至少访问()次内存。A、1B、2C、3D、4在分段管理中,( )。A、以段为单位分配,每._系统抖动现象的发生由什么引起的

UG NX 12零件工程图基础_ug-nx工程图-程序员宅基地

文章浏览阅读2.4k次。在实际的工作生产中,零件的加工制造一般都需要二维工程图来辅助设计。UG NX 的工程图主要是为了满足二维出图需要。在绘制工程图时,需要先确定所绘制图形要表达的内容,然后根据需要并按照视图的选择原则,绘制工程图的主视图、其他视图以及某些特殊视图,最后标注图形的尺寸、技术说明等信息,即可完成工程图的绘制。1.视图选择原则工程图合理的表达方案要综合运用各种表达方法,清晰完整地表达出零件的结构形状,并便于看图。确定工程图表达方案的一般步骤如下:口分析零件结构形状由于零件的结构形状以及加工位置或工作位置的不._ug-nx工程图

智能制造数字化工厂智慧供应链大数据解决方案(PPT)-程序员宅基地

文章浏览阅读920次,点赞29次,收藏18次。原文《智能制造数字化工厂智慧供应链大数据解决方案》PPT格式主要从智能制造数字化工厂智慧供应链大数据解决方案框架图、销量预测+S&OP大数据解决方案、计划统筹大数据解决方案、订单履约大数据解决方案、库存周转大数据解决方案、采购及供应商管理大数据模块、智慧工厂大数据解决方案、设备管理大数据解决方案、质量管理大数据解决方案、仓储物流与网络优化大数据解决方案、供应链决策分析大数据解决方案进行建设。适用于售前项目汇报、项目规划、领导汇报。

网络编程socket accept函数的理解_当在函数 'main' 中调用 'open_socket_accept'时.line: 8. con-程序员宅基地

文章浏览阅读2w次,点赞38次,收藏102次。在服务器端,socket()返回的套接字用于监听(listen)和接受(accept)客户端的连接请求。这个套接字不能用于与客户端之间发送和接收数据。 accept()接受一个客户端的连接请求,并返回一个新的套接字。所谓“新的”就是说这个套接字与socket()返回的用于监听和接受客户端的连接请求的套接字不是同一个套接字。与本次接受的客户端的通信是通过在这个新的套接字上发送和接收数_当在函数 'main' 中调用 'open_socket_accept'时.line: 8. connection request fa

C#对象销毁_c# 销毁对象及其所有引用-程序员宅基地

文章浏览阅读4.3k次。对象销毁对象销毁的标准语法Close和Stop何时销毁对象销毁对象时清除字段对象销毁的标准语法Framework在销毁对象的逻辑方面遵循一套规则,这些规则并不限用于.NET Framework或C#语言;这些规则的目的是定义一套便于使用的协议。这些协议如下:一旦销毁,对象不可恢复。对象不能被再次激活,调用对象的方法或者属性抛出ObjectDisposedException异常重复地调用对象的Disposal方法会导致错误如果一个可销毁对象x 包含或包装或处理另外一个可销毁对象y,那么x的Disp_c# 销毁对象及其所有引用

笔记-中项/高项学习期间的错题笔记1_大型设备可靠性测试可否拆解为几个部分进行测试-程序员宅基地

文章浏览阅读1.1w次。这是记录,在中项、高项过程中的错题笔记;https://www.zenwu.site/post/2b6d.html1. 信息系统的规划工具在制订计划时,可以利用PERT图和甘特图;访谈时,可以应用各种调查表和调查提纲;在确定各部门、各层管理人员的需求,梳理流程时,可以采用会谈和正式会议的方法。为把企业组织结构与企业过程联系起来,说明每个过程与组织的联系,指出过程决策人,可以采用建立过程/组织(Process/Organization,P/O)矩阵的方法。例如,一个简单的P/O矩阵示例,其中._大型设备可靠性测试可否拆解为几个部分进行测试