Spark学习笔记

Published: by Creative Commons Licence

  • Tags:

Spark基础

Spark是一种软件框架,用于管理和协调多台计算机上的计算任务。

我们通过向集群管理器提交Spark应用,而集群管理器会为应用分配足够的计算资源。

一个Spark应用由一个驱动器进程和一组执行器进程组成。驱动器进程运行应用的main函数,负责:

  • 维护Spark应用的信息
  • 回应用户的程序或输入
  • 分析任务并分发给若干执行器进行处理

执行器只负责执行由驱动器分配给它的代码,并向驱动器报告计算状态。

SparkSession

驱动器对应的类型为SparkSession,每个Spark应用都需要创建一个SparkSession。在交互模式中会自动创建一个名为spark的SparkSession对象。

结构化数据

DataFrame和Dateset数据类型代表一张结构化的分布式表,有若干个命名列以及若干条记录,每个列都有相同的数据类型。DataFrame是无类型的,它的记录类型都是Row类型,它的模式校验发生在运行期;Dataset仅在JVM语言上被支持,它允许编译期的类型校验,它的记录类型都是特定的Java Bean类型。

数据转换和动作

为了让多个执行器并发执行,Spark会将数据分解为多个数据块,每个数据块称为分区。

Spark中的数据在计算过程中是不变的,你可以通过转换操作从某个数据集生成一个新的数据集。

需要注意的是转换操作是惰性的,只有在你执行动作操作的时候,才会发生真正转换。对于用户表达的一系列转换操作,Spark会在真正需要的时候,建立从原始数据到目标数据的一套转换计划,并将其编译为可以在集群中运行的流水线式的物理执行计划,这时候由于信息充分,所以Spark可以加入很多优化,比如把过滤器提前执行从而减少处理的数据量。

如果一个转换仅生成一个新的分区,那么称为窄转换。如果转换生成多个新的分区,那么称为宽转换,也称为洗牌(shuffle)。窄转换的时候得到的数据会保留在同一个执行器的内存中,而宽转换得到的分区会写入到磁盘上并在多个执行器之间交换。

动作操作会触发真正的计算过程,动作操作会根据转换后的数据计算最终结果。比如count统计数据的行数。

SQL

由于Spark会先将转换流程编译成物理执行计划,之后进行执行,因此很自然地Spark可以支持SQL查询。 由于最终都是编译后执行,因此SQL查询和指定转换计划拥有相同的性能。

要进行SQL查询,需要将DataFrame注册成为临时视图。

flightData2015.createOrReplaceTempView("flight_data_2015")

之后就可以执行sql查询。

sqlWay = spark.sql("""
select DEST_COUNTRY_NAME, count(1) 
from flight_data_2015
group by DESCT_COUNTRY_NAME
""")

执行流程

我们首先通过控制台或以Spark作业的方式提交我们的应用程序。

第一阶段,我们的用户代码会被转换为逻辑计划。

  1. 用户代码会被转换为未解析的逻辑计划。
  2. Spark利用catalog在分析器中解析逻辑计划出现的表名和列名。
  3. Catalyst优化器优化逻辑计划。

第二阶段,会选择合适的物理执行计划,以最小代价在集群上执行我们的逻辑计划。

  1. 生成多个不同的物理执行计划
  2. 通过代价模型对物理执行计划进行比较
  3. 选择合适的物理执行计划

第三阶段,Spark将所有代码运行在Spark的底层编程接口RDD上。最终结果会被返回给用户。

模式

Schema代表DataFrame的模式(元数据),Schema定义了表的列名,列的类型。

模式可能来自数据源,这称为读时模式(schema-on-read),也可以我们自行定义模式。

Spark SQL

Spark SQL中最高级别的抽象是Catalog。Catalog负责存储用户数据中的元数据。

在编程的时候,可以通过spark.sql方法执行具体的sql代码。

数据表

数据表在逻辑上与DataFrame是相同的,只不过DataFrame是定义在编程语言中,而数据表定义在数据库中。

数据表是实际存储数据的,而视图则是不存储数据的。

如果表的实际数据由Spark管理,并且数据会在表删除的时候被清除,那么这种表称为托管表,否则称为非托管表。比如你定义一组文件为一个数据表的时候,这个表就是非托管表;在DataFrame上使用saveAsTable函数来创建一个数据表的时候,它实际上是托管表。

你需要在数据库上下文操作表数据。默认情况下使用的数据库是default。

Spark组件

driver

客户端使用driver提交spark应用。spark driver负责创建spark context,并将所有转换操作编译为一个DAG,DAG中包含tasks和stages,tasks是spark程序的最小可调度工作单位,而stages表示一组可以一起运行的tasks,不同的stages之间往往有相互依赖关系。

executor

executor是用于执行tasks的宿主,它负责保留需要的CPU和内存资源。executor会在spark应用开始时被创建,并在结束后被销毁,一个executor可以用于执行数以百计的tasks。

一个worker结点的资源是有限的,因此一个结点上只能运行固定数量的executor,而一个集群中的worker结点也是有限的,因此一个集群能运行的executor也是有限的。

master

master是一个进程,负责为driver从集群中请求资源,并且跟踪集群中结点的状态和进度。

Cluster Manager

cluster manager是一个进程,负责管理、监控工作结点,并按照master的要求保留资源。

流处理

设计权衡

记录级别API与声明式API

记录级别API是指将每个事件传递给应用程序处理。早期的流处理系统一般会使用这种方式。这种方式的好处是给了应用程序很大的灵活性,并且流处理系统的逻辑比较简单。缺点是应用程序需要管理复杂的状态,开发难度较大。

而声明式API是指应用程序只指定具体的计算步骤,而不需要考虑中间的流程,也不需要考虑如何从失败中恢复。比如仅提供map、reduce接口。流处理系统会自动保存相关状态并从失败中恢复。

连续处理与微批次处理

连续处理模式是指数据一旦处理完成就直接发送给下游,这样做的优点是拥有很低的延时,但是吞吐量会比较低。而微批次处理是指处理完的数据会等待一段时间(比如1s),之后会同一批次发送这个时间段所有加工完成的数据,这种模式拥有适当的延迟,但是会大幅提高吞吐量。

Spark目前选择支持微批次处理模式。

事件时间与处理时间

事件时间是指根据数据源插入记录时间来处理数据,而不是根据流处理应用程序接受数据的时间(处理时间)。

如果选择使用事件时间,那么由于延迟的存在,记录可能会以乱序到达我们的流处理系统。这时候需要指定一个时间窗口,以及时间窗口的最大超时时间(水位)。

结构化流处理

结构化流处理是指以流处理方式处理DataFrame。其背后的思想是将数据流视为连续追加的数据表,然后作业定期检查新的输入数据,对其进行处理,在需要的时候更新内部状态。

参考资料