Spark SQL - 快速指南


Spark - 简介

各行业广泛使用 Hadoop 来分析其数据集。原因是 Hadoop 框架基于简单的编程模型 (MapReduce),它支持可扩展、灵活、容错且经济高效的计算解决方案。在这里,主要关注的是在查询之间的等待时间和运行程序的等待时间方面保持处理大型数据集的速度。

Spark 是由 Apache 软件基金会推出的,用于加速 Hadoop 计算软件进程。

与普遍看法相反,Spark 不是 Hadoop 的修改版本,并且实际上并不依赖 Hadoop,因为它有自己的集群管理。Hadoop只是Spark的实现方式之一。

Spark以两种方式使用Hadoop——一是存储,二是处理。由于 Spark 有自己的集群管理计算,因此它仅使用 Hadoop 进行存储。

阿帕奇火花

Apache Spark 是一种快如闪电的集群计算技术,专为快速计算而设计。它基于 Hadoop MapReduce,并扩展了 MapReduce 模型,以有效地将其用于更多类型的计算,其中包括交互式查询和流处理。Spark 的主要特点是它的内存集群计算,可以提高应用程序的处理速度。

Spark 旨在涵盖广泛的工作负载,例如批处理应用程序、迭代算法、交互式查询和流式处理。除了支持各个系统中的所有这些工作负载之外,它还减轻了维护单独工具的管理负担。

Apache Spark 的演变

Spark 是 Hadoop 的子项目之一,由 Matei Zaharia 于 2009 年在加州大学伯克利分校的 AMPLab 开发。它于 2010 年在 BSD 许可证下开源。它于 2013 年捐赠给 Apache 软件基金会,现在 Apache Spark 从 2014 年 2 月起已成为 Apache 顶级项目。

Apache Spark 的特点

Apache Spark 具有以下功能。

  • 速度- Spark 有助于在 Hadoop 集群中运行应用程序,在内存中运行速度提高 100 倍,在磁盘上运行时速度提高 10 倍。这可以通过减少磁盘读/写操作的数量来实现。它将中间处理数据存储在内存中。

  • 支持多种语言- Spark 提供 Java、Scala 或 Python 内置 API。因此,您可以用不同的语言编写应用程序。Spark 提供了 80 个用于交互式查询的高级运算符。

  • 高级分析- Spark 不仅支持“Map”和“reduce”。它还支持 SQL 查询、流数据、机器学习 (ML) 和图形算法。

基于 Hadoop 构建的 Spark

下图显示了如何使用 Hadoop 组件构建 Spark 的三种方法。

基于 Hadoop 构建的 Spark

Spark 部署有以下三种方式。

  • 独立- Spark 独立部署意味着 Spark 占据 HDFS(Hadoop 分布式文件系统)之上的位置,并且显式地为 HDFS 分配空间。在这里,Spark 和 MapReduce 将并行运行以覆盖集群上的所有 Spark 作业。

  • Hadoop Yarn - Hadoop Yarn 部署简单来说意味着 Spark 在 Yarn 上运行,无需任何预安装或 root 访问权限。它有助于将 Spark 集成到 Hadoop 生态系统或 Hadoop 堆栈中。它允许其他组件在堆栈顶部运行。

  • Spark in MapReduce (SIMR) - MapReduce 中的 Spark 除了独立部署之外还用于启动 Spark 作业。借助 SIMR,用户可以启动 Spark 并使用其 shell,而无需任何管理访问权限。

Spark的组成部分

下图描述了 Spark 的不同组件。

Spark的组成部分

Apache Spark 核心

Spark Core 是 Spark 平台的底层通用执行引擎,所有其他功能都建立在该平台之上。它提供内存计算和引用外部存储系统中的数据集。

星火SQL

Spark SQL 是 Spark Core 之上的一个组件,它引入了一种称为 SchemaRDD 的新数据抽象,它为结构化和半结构化数据提供支持。

火花流

Spark Streaming 利用 Spark Core 的快速调度功能来执行流分析。它以小批量方式摄取数据,并对这些小批量数据执行 RDD(弹性分布式数据集)转换。

MLlib(机器学习库)

由于采用基于分布式内存的Spark架构,MLlib是Spark之上的分布式机器学习框架。根据基准测试,这是由 MLlib 开发人员针对交替最小二乘法 (ALS) 实现完成的。Spark MLlib 的速度是基于 Hadoop 磁盘的Apache Mahout版本(在 Mahout 获得 Spark 接口之前)的九倍。

图X

GraphX 是一个基于 Spark 的分布式图形处理框架。它提供了一个用于表达图计算的API,可以使用Pregel抽象API对用户定义的图进行建模。它还为此抽象提供了优化的运行时。

Spark——RDD

弹性分布式数据集

弹性分布式数据集(RDD)是 Spark 的基本数据结构。它是一个不可变的分布式对象集合。RDD中的每个数据集被划分为逻辑分区,这些分区可以在集群的不同节点上计算。RDD 可以包含任何类型的 Python、Java 或 Scala 对象,包括用户定义的类。

从形式上来说,RDD 是只读的、分区的记录集合。RDD 可以通过对稳定存储上的数据或其他 RDD 上的确定性操作来创建。RDD 是可以并行操作的容错元素集合。

有两种方法可以创建 RDD:并行化驱动程序中的现有集合,或者引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase 或任何提供 Hadoop 输入格式的数据源。

Spark利用RDD的概念来实现更快、更高效的MapReduce操作。让我们首先讨论 MapReduce 操作是如何发生的以及为什么它们不那么高效。

MapReduce 中数据共享速度慢

MapReduce 被广泛用于在集群上使用并行分布式算法处理和生成大型数据集。它允许用户使用一组高级运算符编写并行计算,而不必担心工作分配和容错。

不幸的是,在大多数当前框架中,在计算之间(例如:两个 MapReduce 作业之间)重用数据的唯一方法是将其写入外部稳定存储系统(例如:HDFS)。尽管该框架提供了许多用于访问集群计算资源的抽象,但用户仍然需要更多。

迭代交互式应用程序都需要在并行作业之间更快地共享数据。由于复制序列化磁盘 IO ,MapReduce 中的数据共享速度很慢。在存储系统方面,大部分Hadoop应用,90%以上的时间都在做HDFS的读写操作。

MapReduce 上的迭代操作

在多阶段应用程序中的多个计算中重用中间结果。下图解释了当前框架在 MapReduce 上执行迭代操作时的工作原理。由于数据复制、磁盘 I/O 和序列化,这会产生大量开销,从而导致系统变慢。

MapReduce 上的迭代操作

MapReduce 上的交互操作

用户对同一数据子集运行即席查询。每个查询都会在稳定存储上执行磁盘 I/O,这可以控制应用程序的执行时间。

下图解释了当前框架在 MapReduce 上进行交互式查询时如何工作。

MapReduce 上的交互操作

使用 Spark RDD 进行数据共享

由于复制序列化磁盘 IO ,MapReduce 中的数据共享速度很慢。大多数Hadoop应用程序,他们90%以上的时间都在做HDFS读写操作。

认识到这个问题,研究人员开发了一个名为 Apache Spark 的专门框架。Spark的核心思想是弹性分布式数据(RDD);它支持内存中处理计算。这意味着,它将内存状态存储为跨作业的对象,并且该对象可以在这些作业之间共享。内存中的数据共享比网络和磁盘快10到100倍。

现在让我们尝试了解一下 Spark RDD 中是如何进行迭代和交互操作的。

Spark RDD 上的迭代操作

下图展示了 Spark RDD 上的迭代操作。它将中间结果存储在分布式内存中而不是稳定存储(磁盘)中,并使系统更快。

注意- 如果分布式内存(RAM)足以存储中间结果(作业的状态),那么它将把这些结果存储在磁盘上

Spark RDD 上的迭代操作

Spark RDD的交互操作

该图显示了 Spark RDD 上的交互式操作。如果对同一组数据重复运行不同的查询,则可以将该特定数据保留在内存中以获得更好的执行时间。

Spark RDD的交互操作

默认情况下,每次对每个转换后的 RDD 运行操作时,都可能会重新计算它。但是,您也可以将 RDD保留在内存中,在这种情况下,Spark 会将元素保留在集群上,以便下次查询时更快地访问。还支持将 RDD 持久保存在磁盘上或跨多个节点复制。

Spark - 安装

Spark是Hadoop的子项目。因此,最好将 Spark 安装到基于 Linux 的系统中。以下步骤显示如何安装 Apache Spark。

第 1 步:验证 Java 安装

Java 安装是安装 Spark 时必须做的事情之一。尝试以下命令来验证 JAVA 版本。

$java -version

如果 Java 已经安装在您的系统上,您将看到以下响应 -

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

如果您的系统上尚未安装 Java,请先安装 Java,然后再继续下一步。

第2步:验证Scala安装

你应该用Scala语言来实现Spark。因此,让我们使用以下命令验证 Scala 安装。

$scala -version

如果您的系统上已经安装了 Scala,您将看到以下响应 -

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

如果您的系统上尚未安装 Scala,请继续执行下一步以安装 Scala。

第三步:下载Scala

通过访问以下链接下载Scala 下载最新版本的 Scala 。在本教程中,我们使用 scala-2.11.6 版本。下载后,您将在下载文件夹中找到 Scala tar 文件。

第四步:安装Scala

请按照以下给出的步骤安装 Scala。

提取 Scala tar 文件

键入以下命令以提取 Scala tar 文件。

$ tar xvf scala-2.11.6.tgz

移动 Scala 软件文件

使用以下命令将 Scala 软件文件移动到相应的目录(/usr/local/scala)

$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv scala-2.11.6 /usr/local/scala
# exit

设置 Scala 的路径

使用以下命令设置 Scala 的 PATH。

$ export PATH = $PATH:/usr/local/scala/bin

验证 Scala 安装

安装后最好验证一下。使用以下命令验证 Scala 安装。

$scala -version

如果您的系统上已经安装了 Scala,您将看到以下响应 -

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

步骤5:下载Apache Spark

通过访问以下链接下载 Spark下载最新版本的 Spark 。在本教程中,我们使用spark-1.3.1-bin-hadoop2.6版本。下载后,您将在下载文件夹中找到 Spark tar 文件。

第六步:安装Spark

请按照以下步骤安装 Spark。

提取 Spark 焦油

以下命令用于提取 Spark tar 文件。

$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz

移动 Spark 软件文件

以下命令用于将 Spark 软件文件移动到相应目录(/usr/local/spark)

$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark
# exit

设置 Spark 环境

将以下行添加到 ~ /.bashrc文件中。意思是将spark软件文件所在的位置添加到PATH变量中。

export PATH = $PATH:/usr/local/spark/bin

使用以下命令获取 ~/.bashrc 文件。

$ source ~/.bashrc

Step7:验证Spark安装

编写以下命令来打开 Spark shell。

$spark-shell

如果 Spark 安装成功,您将看到以下输出。

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
    ____             __
   / __/__ ___ _____/ /__
   _\ \/ _ \/ _ `/ __/ '_/
   /___/ .__/\_,_/_/ /_/\_\ version 1.4.0
      /_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

Spark SQL - 简介

Spark 引入了一个用于结构化数据处理的编程模块,称为 Spark SQL。它提供了一个称为 DataFrame 的编程抽象,可以充当分布式 SQL 查询引擎。

Spark SQL 的特点

以下是 Spark SQL 的功能 -

  • 集成- 将 SQL 查询与 Spark 程序无缝混合。Spark SQL 允许您在 Spark 中将结构化数据作为分布式数据集 (RDD) 进行查询,并使用 Python、Scala 和 Java 中的集成 API。这种紧密的集成使得运行 SQL 查询和复杂的分析算法变得容易。

  • 统一数据访问- 从各种来源加载和查询数据。Schema-RDD 提供了一个单一接口,可以有效地处理结构化数据,包括 Apache Hive 表、parquet 文件和 JSON 文件。

  • Hive 兼容性- 在现有仓库上运行未经修改的 Hive 查询。Spark SQL 重用 Hive 前端和 MetaStore,使您能够与现有 Hive 数据、查询和 UDF 完全兼容。只需将其与 Hive 一起安装即可。

  • 标准连接- 通过 JDBC 或 ODBC 连接。Spark SQL 包括具有行业标准 JDBC 和 ODBC 连接的服务器模式。

  • 可扩展性- 对交互式查询和长查询使用相同的引擎。Spark SQL 利用 RDD 模型来支持中间查询容错,使其也可以扩展到大型作业。不必担心对历史数据使用不同的引擎。

Spark SQL架构

下图解释了 Spark SQL 的架构 -

Spark SQL架构

该架构包含三层,即语言 API、模式 RDD 和数据源。

  • 语言 API - Spark 与不同语言和 Spark SQL 兼容。它还受到这些语言 API(python、scala、java、HiveQL)的支持。

  • Schema RDD - Spark Core 设计有称为 RDD 的特殊数据结构。一般来说,Spark SQL 适用于模式、表和记录。因此,我们可以使用Schema RDD作为临时表。我们可以将这个 Schema RDD 称为 Data Frame。

  • 数据源- 通常spark-core的数据源是文本文件、Avro文件等。但是,Spark SQL的数据源是不同的。这些是 Parquet 文件、JSON 文档、HIVE 表和 Cassandra 数据库。

我们将在后续章节中详细讨论这些内容。

Spark SQL - 数据帧

DataFrame 是分布式数据集合,被组织成命名列。从概念上讲,它相当于具有良好优化技术的关系表。

DataFrame 可以从一系列不同的源(例如 Hive 表、结构化数据文件、外部数据库或现有 RDD)构建。该 API 专为现代大数据和数据科学应用程序而设计,其灵感来自R 编程中的 DataFramePython 中的 Pandas

数据框的特点

这是 DataFrame 的一组特征 -

  • 能够在单节点集群到大型集群上处理千字节到拍字节大小的数据。

  • 支持不同的数据格式(Avro、csv、elasticsearch 和 Cassandra)和存储系统(HDFS、HIVE 表、mysql 等)。

  • 通过 Spark SQL Catalyst 优化器(树转换框架)进行最先进的优化和代码生成。

  • 可以通过 Spark-Core 轻松与所有大数据工具和框架集成。

  • 提供用于 Python、Java、Scala 和 R 编程的 API。

SQL上下文

SQLContext是一个类,用于初始化Spark SQL的功能。初始化 SQLContext 类对象需要 SparkContext 类对象 (sc)。

以下命令用于通过spark-shell初始化SparkContext。

$ spark-shell

默认情况下,SparkContext 对象在 Spark-Shell 启动时使用名称sc进行初始化。

使用以下命令创建 SQLContext。

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

例子

让我们考虑一个名为employee.json的 JSON 文件中的员工记录示例。使用以下命令创建 DataFrame (df) 并读取名为employee.json的 JSON 文档,其中包含以下内容。

employee.json - 将此文件放置在当前scala>指针所在的目录中。

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

数据框操作

DataFrame 为结构化数据操作提供了特定于领域的语言。在这里,我们提供了一些使用 DataFrame 进行结构化数据处理的基本示例。

按照下面给出的步骤执行 DataFrame 操作 -

阅读 JSON 文档

首先,我们必须阅读 JSON 文档。基于此,生成一个名为(dfs)的DataFrame。

使用以下命令读取名为employee.json的JSON文档。数据显示为包含字段的表格 - id、name 和age。

scala> val dfs = sqlContext.read.json("employee.json")

输出- 字段名称自动取自employee.json

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

显示数据

如果您想查看 DataFrame 中的数据,请使用以下命令。

scala> dfs.show()

输出- 您可以以表格格式查看员工数据。

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

使用 printSchema 方法

如果您想查看 DataFrame 的结构(架构),请使用以下命令。

scala> dfs.printSchema()

输出

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

使用选择方法

使用以下命令从 DataFrame 中获取三列中的name -column。

scala> dfs.select("name").show()

输出- 您可以看到名称列的值。

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

使用年龄过滤器

使用以下命令查找年龄大于 23 岁(age > 23)的员工。

scala> dfs.filter(dfs("age") > 23).show()

输出

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

使用groupBy方法

使用以下命令来统计同龄员工的数量。

scala> dfs.groupBy("age").count().show()

输出- 两名员工的年龄为 23 岁。

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

以编程方式运行 SQL 查询

SQLContext 使应用程序能够在运行 SQL 函数时以编程方式运行 SQL 查询,并将结果作为 DataFrame 返回。

一般来说,在后台,SparkSQL 支持两种不同的方法将现有 RDD 转换为 DataFrames -

先生 否 方法与说明
1 使用反射推断模式

此方法使用反射来生成包含特定类型对象的 RDD 的架构。

2 以编程方式指定架构

创建 DataFrame 的第二种方法是通过编程接口,它允许您构建架构,然后将其应用到现有的 RDD。

Spark SQL - 数据源

DataFrame 接口允许不同的数据源在 Spark SQL 上工作。它是一个临时表,可以像普通的RDD一样操作。将 DataFrame 注册为表允许您对其数据运行 SQL 查询。

在本章中,我们将描述使用不同的 Spark DataSource 加载和保存数据的一般方法。此后,我们将详细讨论可用于内置数据源的特定选项。

SparkSQL 中有不同类型的数据源,下面列出了其中一些 -

先生 否 数据源
1 JSON 数据集

Spark SQL 可以自动捕获 JSON 数据集的架构并将其作为 DataFrame 加载。

2 蜂巢表

Hive 作为 HiveContext 与 Spark 库捆绑在一起,它继承自 SQLContext。

3 镶木地板文件

Parquet 是一种柱状格式,许多数据处理系统都支持。