掌握大数据领域Spark SQL的高级用法:从原理到实战的全面指南

摘要
Apache Spark已成为大数据处理领域的事实标准,而Spark SQL作为其核心组件,为开发者提供了以SQL方式处理结构化数据的强大能力。本文将深入探讨Spark SQL的高级用法,从执行原理到优化技巧,从复杂数据类型处理到性能调优,全方位解析如何充分发挥Spark SQL的潜力。无论你是数据工程师、数据分析师还是大数据平台开发人员,本文都将帮助你掌握Spark SQL的高级特性,构建高效、可扩展的数据处理 pipelines。
关键词:Spark SQL, 大数据, 性能优化, 自定义函数, 窗口函数, 复杂数据类型, 执行计划, Structured Streaming
目录
- 引言:Spark SQL的演进与价值
- Spark SQL核心架构深度解析
- 执行计划与优化器:Catalyst优化器详解
- 高级数据类型操作:JSON、数组与结构体
- 窗口函数与高级聚合
- 自定义函数:UDF、UDAF与UDTF
- 性能优化实战:从理论到实践
- Spark SQL与Hive集成
- 数据源扩展与自定义
- Structured Streaming与实时SQL分析
- 项目实战:电商用户行为分析平台
- Spark SQL最佳实践与陷阱规避
- 未来趋势与挑战
- 总结
- 附录:常用工具与资源
1. 引言:Spark SQL的演进与价值
1.1 从MapReduce到Spark SQL
大数据处理技术经历了从复杂到简单、从低级API到高级API的演进过程。Apache Hadoop的MapReduce框架为批处理奠定了基础,但需要开发者编写大量样板代码。随后出现的Hive通过SQL接口简化了大数据查询,但性能和灵活性仍有不足。
Spark SQL的出现彻底改变了这一局面。它将SQL的易用性与Spark的高性能完美结合,允许用户使用SQL或DataFrame API查询结构化数据,同时支持复杂的数据分析和转换操作。
1.2 Spark SQL的核心价值
Spark SQL提供了以下关键价值:
- 统一数据访问:通过DataFrame API和SQL,统一访问各种数据源(Hive、Avro、Parquet、JSON、JDBC等)
- 高性能:基于Catalyst优化器和Tungsten执行引擎,提供比传统SQL-on-Hadoop解决方案更高的性能
- 兼容性:兼容ANSI SQL和HiveQL,保护现有SQL代码投资
- 扩展性:支持自定义函数、数据源和优化规则
- 集成性:与Spark生态系统无缝集成,包括MLlib(机器学习)、GraphX(图处理)和Structured Streaming(流处理)
1.3 Spark SQL版本演进与重要特性
| 版本 | 发布时间 | 关键特性 |
|---|---|---|
| Spark 1.0 | 2014年 | 引入DataFrame API,支持SQL查询 |
| Spark 1.3 | 2015年 | 引入Catalyst优化器,支持Parquet列式存储 |
| Spark 1.6 | 2016年 | 引入Dataset API,结合DataFrame和RDD的优势 |
| Spark 2.0 | 2016年 | 统一DataFrame和Dataset API,引入SparkSession |
| Spark 2.2 | 2017年 | 增强Structured Streaming,引入水印机制 |
| Spark 2.4 | 2018年 | 引入矢量化Parquet读取,增强SQL功能 |
| Spark 3.0 | 2020年 | 引入自适应查询执行(AQE),动态分区裁剪,显著性能提升 |
| Spark 3.2 | 2021年 | 增强CBO,支持ANSI SQL模式,性能优化 |
| Spark 3.4 | 2023年 | 改进自适应查询执行,增强Python UDF性能 |
本文将基于Spark 3.x版本进行讲解,重点关注最新版本中的高级特性和优化。
1.4 本文学习路径
本文旨在带领读者从Spark SQL基础逐步深入到高级应用。我们将首先解析Spark SQL的核心架构和执行原理,然后详细讲解各种高级功能,最后通过实战项目展示如何综合运用这些技术解决实际问题。建议读者具备基本的Spark和SQL知识,但即使是初学者,通过系统学习也能掌握这些高级概念。
2. Spark SQL核心架构深度解析
2.1 Spark SQL整体架构
Spark SQL构建在Spark Core之上,提供了处理结构化数据的高级抽象。其核心架构如图所示:
Spark SQL架构主要包含以下组件:
- API层:提供SQL、DataFrame和Dataset三种接口
- Catalyst优化器:负责查询解析、优化和代码生成
- Tungsten执行引擎:基于内存管理和二进制处理的高效执行引擎
- 数据源API:统一的数据源访问接口
- Catalog:元数据管理中心,支持Hive Metastore集成
2.2 DataFrame与Dataset
DataFrame和Dataset是Spark SQL提供的两种主要数据抽象,它们构建在RDD之上,但提供了更高效的处理能力:
-
DataFrame:分布式的行数据集,组织成命名列,类似于关系数据库中的表。DataFrame提供了结构化API,支持编译时类型安全检查。
-
Dataset:结合了DataFrame的结构化优势和RDD的类型安全特性,是强类型的分布式数据集。
在Spark 2.0之后,DataFrame被定义为Dataset[Row],即每行数据类型为Row的Dataset。
DataFrame创建示例(Python):
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession.builder \
.appName("DataFrameExample") \
.master("local[*]") \
.getOrCreate()
# 从列表创建DataFrame
data = [("Alice", 25, "F"), ("Bob", 30, "M"), ("Charlie", 35, "M")]
df = spark.createDataFrame(data, ["name", "age", "gender"])
# 显示DataFrame
df.show()
"""
+-------+---+------+
| name|age|gender|
+-------+---+------+
| Alice| 25| F|
| Bob| 30| M|
|Charlie| 35| M|
+-------+---+------+
"""
# 从CSV文件创建DataFrame
df = spark.read.csv("people.csv", header=True, inferSchema=True)
Dataset创建示例(Scala):
import org.apache.spark.sql.SparkSession
case class Person(name: String, age: Int, gender: String)
val spark = SparkSession.builder()
.appName("DatasetExample")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// 创建Dataset
val data = Seq(Person("Alice", 25, "F"), Person("Bob", 30, "M"), Person("Charlie", 35, "M"))
val ds = data.toDS()
// 显示Dataset
ds.show()
DataFrame/Dataset相比RDD的优势:
- 结构化优化:利用模式信息进行查询优化
- 内存效率:采用列式存储和Tungsten二进制格式
- 查询优化:通过Catalyst优化器自动优化执行计划
- API简洁:提供高级操作,减少样板代码
2.3 Catalyst优化器工作流程
Catalyst是Spark SQL的核心,负责查询优化。其工作流程分为几个阶段:
graph LR
A[解析(Parse)] --> B[分析(Analyze)]
B --> C[逻辑优化(Optimize)]
C --> D[物理计划(Physical Plan)]
D --> E[代码生成(Code Gen)]
- 解析(Parse):将SQL字符串转换为抽象语法树(AST)
- 分析(Analyze):结合Catalog元数据解析AST,生成逻辑计划
- 逻辑优化:应用规则优化逻辑计划(如谓词下推、常量折叠)
- 物理计划:将逻辑计划转换为多个物理计划,选择最优计划
- 代码生成:将物理计划转换为Java字节码执行
Catalyst采用基于规则和基于成本的混合优化策略,既应用预定义规则(如谓词下推),又根据统计信息选择成本最低的执行计划。
2.4 Tungsten执行引擎
Tungsten是Spark的执行引擎,旨在充分利用现代硬件特性,主要优化包括:
- 内存管理:使用自定义内存管理器,减少JVM对象开销和垃圾回收压力
- 二进制处理:数据以二进制格式存储,避免Java对象开销
- 向量化执行:批处理方式处理数据,提高CPU缓存利用率
- 代码生成:使用WHOLESTAGE_CODEGEN将物理计划转换为高效的Java字节码
Tungsten的核心思想是"以数据为中心"的计算,最大限度地减少数据移动和序列化开销,提高执行效率。
3. 执行计划与优化器:Catalyst优化器详解
3.1 查询执行流程
要深入理解Spark SQL的高级用法,首先需要掌握其查询执行流程。一个Spark SQL查询从提交到执行的完整流程如下:
- 用户提交SQL查询或DataFrame操作
- Spark SQL将查询转换为未解析的逻辑计划
- 分析器结合元数据解析逻辑计划
- 优化器应用规则优化逻辑计划
- 规划器生成多个物理计划并选择最优方案
- 执行引擎执行物理计划并返回结果
我们可以通过explain方法查看查询的执行计划,这是优化查询性能的关键工具。
示例:查看DataFrame操作的执行计划
# 创建示例DataFrame
df = spark.createDataFrame([(1, "Alice", 25), (2, "Bob", 30)], ["id", "name", "age"])
# 执行查询并查看执行计划
df.filter(df.age > 28).groupBy("name").count().explain()
执行结果将显示详细的执行计划,包括逻辑计划和物理计划。
3.2 逻辑计划与物理计划
3.2.1 逻辑计划
逻辑计划表示查询要执行的操作,不涉及具体的执行细节。逻辑计划分为三个阶段:
- 未解析的逻辑计划:仅包含语法结构,不验证表和列是否存在
- 解析后的逻辑计划:结合Catalog验证元数据,解析表和列
- 优化后的逻辑计划:应用优化规则后的逻辑计划
Catalyst使用启发式规则优化逻辑计划,主要优化包括:
- 谓词下推(Predicate Pushdown):将过滤操作尽可能下推到数据源,减少数据读取量
- 常量折叠(Constant Folding):预先计算表达式中的常量
- 列裁剪(Column Pruning):只读取查询所需的列
- 聚合重写(Aggregation Rewriting):优化聚合操作,如替换distinct为group by
3.2.2 物理计划
物理计划是逻辑计划的具体实现,考虑了执行细节,如连接算法选择、分区策略等。Spark支持多种物理计划实现,如:
- HashAggregate vs SortAggregate:两种聚合实现方式
- BroadcastHashJoin vs ShuffledHashJoin vs SortMergeJoin:不同的连接算法
- InMemoryTableScan vs FileScan:内存扫描 vs 文件扫描
Catalyst使用基于成本的优化(CBO)选择最优物理计划,考虑因素包括:
- 表的大小和行数
- 列的基数和数据分布
- 可用内存和CPU资源
要启用CBO,需要设置:
spark.conf.set("spark.sql.cbo.enabled", "true")
并确保收集了表的统计信息:
ANALYZE TABLE table_name COMPUTE STATISTICS;
ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS column1, column2;
3.3 执行计划分析实战
理解执行计划是优化Spark SQL查询的关键。让我们通过一个复杂示例详细分析执行计划:
示例查询:
SELECT
c.category_name,
COUNT(DISTINCT o.order_id) AS order_count,
SUM(oi.quantity * oi.unit_price) AS total_sales
FROM
orders o
JOIN
order_items oi ON o.order_id = oi.order_id
JOIN
products p ON oi.product_id = p.product_id
JOIN
categories c ON p.category_id = c.category_id
WHERE
o.order_date >= '2023-01-01'
AND o.order_status = 'completed'
GROUP BY
c.category_name
ORDER BY
total_sales DESC
LIMIT 10;
使用explain extended命令查看详细执行计划,我们可以分析以下关键信息:
- 连接顺序:Spark会根据表大小和统计信息决定最优连接顺序
- 连接算法:小表可能使用BroadcastHashJoin,大表可能使用SortMergeJoin
- 谓词位置:检查过滤条件是否被下推到数据源或连接之前
- 聚合策略:查看是否使用了HashAggregate或SortAggregate
- 排序操作:ORDER BY操作是否导致了全局排序
通过分析执行计划,我们可以识别查询中的性能瓶颈,如不必要的Shuffle、缺少谓词下推等。
3.4 基于规则的优化(RBO)与基于成本的优化(CBO)
Spark SQL同时支持基于规则的优化(RBO)和基于成本的优化(CBO):
- RBO:应用预定义规则优化查询,如谓词下推、列裁剪等
- CBO:基于表和列的统计信息(行数、基数、数据大小等)选择成本最低的执行计划
启用和配置CBO:
# 启用CBO
spark.conf.set("spark.sql.cbo.enabled", "true")
# 配置统计信息收集
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true") # 启用连接重排序
收集统计信息:
-- 收集表级统计信息
ANALYZE TABLE orders COMPUTE STATISTICS;
-- 收集列级统计信息
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS order_id, order_date, order_status;
CBO在处理多表连接、复杂子查询时特别有效,能显著提升查询性能。但CBO的效果依赖于准确的统计信息,因此定期收集统计信息至关重要。
3.5 执行计划分析工具与技巧
分析执行计划是优化Spark SQL查询的关键技能。以下是一些实用技巧:
-
使用不同级别的explain:
explain():简要显示物理计划explain(True)或explain(mode="extended"):显示逻辑计划和物理计划explain(mode="codegen"):显示生成的代码explain(mode="cost"):显示成本信息
-
识别Shuffle操作:执行计划中的
Exchange操作表示Shuffle,通常是性能瓶颈 -
检查谓词下推:寻找
PushedFilters信息,确认过滤条件是否被下推 -
分析连接策略:确认是否使用了最优的连接算法(BroadcastHashJoin通常优于SortMergeJoin)
-
查看数据倾斜:执行计划中的数据大小和行数统计可帮助识别数据倾斜
示例:分析包含Shuffle的执行计划
# 查看可能导致Shuffle的操作的执行计划
df.groupBy("category").agg({"sales": "sum"}).explain(mode="extended")
在执行计划中寻找Exchange节点,这表示发生了Shuffle操作。Shuffle通常是必要的,但应尽量减少其数据量。
4. 高级数据类型操作:JSON、数组与结构体
4.1 复杂数据类型概述
Spark SQL支持多种复杂数据类型,用于处理半结构化和嵌套数据:
- 数组(Array):有序元素集合,如
Array[String] - 结构体(Struct):命名字段的集合,如
Struct(name: String, age: Int) - 映射(Map):键值对集合,如
Map[String, Int] - JSON:可以解析为Struct或Array的JSON字符串
这些复杂类型在处理现实世界数据时非常有用,如用户行为日志、产品属性、嵌套事件数据等。
4.2 JSON数据处理
Spark SQL提供了强大的JSON数据处理功能,可以直接读取JSON文件,或处理字符串类型的JSON数据。
4.2.1 读取JSON文件
# 读取JSON文件,自动推断schema
df = spark.read.json("path/to/json/files", multiLine=True) # multiLine=True支持多行JSON
# 显示schema
df.printSchema()
# 指定schema读取JSON,提高性能
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("address", StructType([
StructField("street", StringType(), True),
StructField("city", StringType(), True)
]))
])
df = spark.read.json("path/to/json/files", schema=schema)
4.2.2 处理JSON字符串
当JSON数据以字符串形式存储在列中时,可以使用from_json函数解析:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, IntegerType
# 示例数据
data = [(1, '{"name":"Alice","age":25,"address":{"city":"New York","zip":10001}}'),
(2, '{"name":"Bob","age":30,"address":{"city":"Los Angeles","zip":90001}}')]
df = spark.createDataFrame(data, ["id", "json_data"])
# 定义JSON schema
json_schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("address", StructType([
StructField("city", StringType(), True),
StructField("zip", IntegerType(), True)
]))
])
# 解析JSON字符串
df_parsed = df.withColumn("data", from_json(col("json_data"), json_schema)) \
.select("id",
col("data.name").alias("name"),
col("data.age").alias("age"),
col("data.address.city").alias("city"),
col("data.address.zip").alias("zip"))
df_parsed.show()
4.2.3 生成JSON字符串
使用to_json函数可以将Struct类型转换为JSON字符串:
from pyspark.sql.functions import to_json, struct
# 将多列合并为JSON字符串
df_json = df_parsed.withColumn("json_result", to_json(struct("name", "age", "city")))
df_json.select("id", "json_result").show(truncate=False)
4.3 数组操作
数组是Spark SQL中常用的复杂类型,用于表示有序集合。Spark提供了丰富的数组操作函数。
4.3.1 创建数组
from pyspark.sql.functions import array, array_contains
# 创建数组列
df = spark.createDataFrame([(1, "Alice", [25, 30, 35])], ["id", "name", "ages"])
# 或使用array函数
df_with_array = df.withColumn("hobbies", array(lit("reading"), lit("sports"), lit("music")))
4.3.2 数组元素访问与查询
from pyspark.sql.functions import col, size, array_contains, element_at
# 访问数组元素(索引从1开始)
df.select(col("ages")[0].alias("first_age")).show() # Python风格索引(从0开始)
df.select(element_at("ages", 1).alias("first_age")).show() # SQL风格索引(从1开始)
# 数组长度
df.select(size("ages").alias("age_count")).show()
# 检查数组包含元素
df.select(array_contains("ages", 30).alias("has_30")).show()
4.3.3 数组展开与聚合
from pyspark.sql.functions import explode, explode_outer, flatMapGroupsInPandas, collect_list, collect_set
# explode: 将数组元素展开为多行
df_exploded = df.select("id", "name", explode("ages").alias("age"))
# explode_outer: 处理空数组,保留null值
df_exploded_outer = df.select("id", "name", explode_outer("ages").alias("age"))
# collect_list: 将多行聚合为数组
df_aggregated = df_exploded.groupBy("id", "name").agg(collect_list("age").alias("ages_list"))
# collect_set: 聚合为去重数组
df_aggregated_set = df_exploded.groupBy("id", "name").agg(collect_set("age").alias("ages_set"))
4.3.4 高级数组函数
Spark SQL提供了许多高级数组函数,如排序、过滤、转换等:
from pyspark.sql.functions import array_sort, array_filter, transform, aggregate
# 数组排序
df.select(array_sort("ages").alias("sorted_ages")).show()
# 数组过滤
df.select(array_filter("ages", lambda x: x > 28).alias("filtered_ages")).show()
# 数组转换
df.select(transform("ages", lambda x: x * 2).alias("doubled_ages")).show()
# 数组聚合
df.select(aggregate("ages", 0, lambda acc, x: acc + x).alias("total")).show()
4.4 结构体操作
结构体(Struct)用于表示嵌套数据,类似于关系数据库中的对象类型。
4.4.1 创建结构体
from pyspark.sql.functions import struct, col
# 使用struct函数创建结构体列
df = spark.createDataFrame([(1, "Alice", "New York", 25)], ["id", "name", "city", "age"])
df_with_struct = df.withColumn("person_info", struct(col("name"), col("age"), col("city")))
df_with_struct.printSchema()
4.4.2 访问结构体字段
# 访问结构体字段
df_with_struct.select("id", "person_info.name", "person_info.age").show()
# 或使用点表示法
df_with_struct.select(col("id"), col("person_info.name").alias("username")).show()
4.4.3 结构体与关系转换
# 将结构体展开为多个列
df_flattened = df_with_struct.select("id", "person_info.*")
# 嵌套结构体操作
data = [(1, ("Alice", 25), ("New York", "USA")),
(2, ("Bob", 30), ("London", "UK"))]
df_nested = spark.createDataFrame(data, ["id", "person", "address"])
# 访问嵌套字段
df_nested.select("id", "person._1", "address._2").show()
# 重命名字段并创建更清晰的嵌套结构
df_renamed = df_nested.withColumn("name", col("person._1")) \
.withColumn("age", col("person._2")) \
.withColumn("city", col("address._1")) \
.withColumn("country", col("address._2")) \
.drop("person", "address") \
.withColumn("info", struct(col("name"), col("age"))) \
.withColumn("location", struct(col("city"), col("country")))
4.5 复杂类型综合应用
在实际应用中,我们经常需要综合使用各种复杂类型。以下是一个处理电商订单数据的示例:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, DoubleType
# 定义订单数据schema
order_schema = StructType([
StructField("order_id", StringType(), True),
StructField("customer_id", StringType(), True),
StructField("order_date", StringType(), True),
StructField("items", ArrayType(StructType([
StructField("product_id", StringType(), True),
StructField("quantity", IntegerType(), True),
StructField("price", DoubleType(), True),
StructField("attributes", MapType(StringType(), StringType()))
]))),
StructField("shipping_address", StructType([
StructField("street", StringType(), True),
StructField("city", StringType(), True),
StructField("state", StringType(), True),
StructField("zipcode", StringType(), True)
]))
])
# 读取订单数据(假设为JSON格式)
orders_df = spark.read.schema(order_schema).json("path/to/orders")
# 1. 展开订单商品
order_items_df = orders_df.select(
F.col("order_id"),
F.col("customer_id"),
F.col("order_date"),
F.explode("items").alias("item")
).select(
"order_id", "customer_id", "order_date",
"item.product_id", "item.quantity", "item.price",
F.col("item.attributes").getItem("color").alias("color"),
F.col("item.attributes").getItem("size").alias("size")
)
# 2. 计算订单总金额
order_totals_df = order_items_df.groupBy("order_id", "customer_id") \
.agg(
F.sum(F.col("quantity") * F.col("price")).alias("total_amount"),
F.collect_set("product_id").alias("product_ids"),
F.count("product_id").alias("item_count")
)
# 3. 查找包含特定属性的订单
red_orders_df = orders_df.select(
"order_id", "customer_id",
F.explode("items").alias("item")
).filter(
(F.col("item.attributes.color") == "red") &
(F.col("item.attributes.size") == "L")
).select("order_id", "customer_id", "item.product_id").distinct()
这个示例展示了如何处理包含数组、结构体和映射的复杂订单数据,包括展开数组、访问嵌套字段、聚合计算等操作。
5. 窗口函数与高级聚合
5.1 窗口函数概述
窗口函数(Window Functions)是Spark SQL中最强大的功能之一,允许用户在一组行上执行计算,同时保留原始行数据。窗口函数结合了聚合函数和行级操作的优点,特别适用于排名、移动平均值、累积计算等场景。
窗口函数的基本语法如下:
function_name() OVER (
[PARTITION BY partition_column]
[ORDER BY order_column]
[ROWS BETWEEN frame_start AND frame_end]
)
其中:
- PARTITION BY:将数据划分为多个组(窗口)
- ORDER BY:定义窗口内的排序方式
- ROWS BETWEEN:定义窗口的物理范围(行框架)
在Spark SQL中,窗口函数可以通过DataFrame API或SQL使用。
5.2 窗口函数类型
Spark SQL支持多种窗口函数,主要分为以下几类:
- 排名函数:ROW_NUMBER, RANK, DENSE_RANK, PERCENT_RANK, NTILE
- 分析函数:CUME_DIST, LAG, LEAD, FIRST_VALUE, LAST_VALUE
- 聚合函数:SUM, AVG, COUNT, MAX, MIN等(作为窗口函数使用)
5.3 排名函数实战
5.3.1 ROW_NUMBER, RANK与DENSE_RANK
这三个函数用于为每行分配排名,但处理并列排名的方式不同:
- ROW_NUMBER:为每行分配唯一序号,即使有并列值
- RANK:并列值获得相同排名,下一名次跳过相应数量
- DENSE_RANK:并列值获得相同排名,下一名次连续不跳过
示例:学生成绩排名
from pyspark.sql import Window
from pyspark.sql.functions import row_number, rank, dense_rank, col
# 创建示例数据
data = [("Alice", "Math", 90), ("Alice", "English", 85),
("Bob", "Math", 90), ("Bob", "English", 95),
("Charlie", "Math", 80), ("Charlie", "English", 85)]
df = spark.createDataFrame(data, ["name", "subject", "score"])
# 定义窗口规范
window_spec = Window.partitionBy("subject").orderBy(col("score").desc())
# 应用排名函数
rank_df = df.withColumn("row_num", row_number().over(window_spec)) \
.withColumn("rank", rank().over(window_spec)) \
.withColumn("dense_rank", dense_rank().over(window_spec))
rank_df.orderBy("subject", "row_num").show()
执行结果:
+-------+-------+-----+-------+----+----------+
| name|subject|score|row_num|rank|dense_rank|
+-------+-------+-----+-------+----+----------+
| Bob|English| 95| 1| 1| 1|
| Alice|English| 85| 2| 2| 2|
|Charlie|English| 85| 3| 2| 2|
| Alice| Math| 90| 1| 1| 1|
| Bob| Math| 90| 2| 1| 1|
|Charlie| Math| 80| 3| 3| 2|
+-------+-------+-----+-------+----+----------+
可以看到,在Math科目中,Alice和Bob都是90分:
- ROW_NUMBER给他们分配了1和2
- RANK都给了1,下一名直接是3(跳过了2)
- DENSE_RANK都给了1,下一名是2(连续)
5.3.2 PERCENT_RANK与NTILE
- PERCENT_RANK:计算相对排名百分比,公式为
(rank - 1) / (total_rows - 1) - NTILE:将数据划分为指定数量的桶,为每行分配桶编号
示例:
from pyspark.sql.functions import percent_rank, ntile
percentile_df = df.withColumn("percent_rank", percent_rank().over(window_spec)) \
.withColumn("ntile_3", ntile(3).over(window_spec))
percentile_df.orderBy("subject", "score").show()
5.4 分析函数实战
5.4.1 LAG与LEAD
LAG和LEAD函数用于访问窗口中当前行之前或之后的行数据,无需自连接:
- LAG(col, n): 获取当前行之前第n行的col值
- LEAD(col, n): 获取当前行之后第n行的col值
示例:分析用户连续登录情况
from pyspark.sql.functions import lag, lead, datediff, col
# 创建用户登录数据
login_data = [("user1", "2023-01-01"), ("user1", "2023-01-02"), ("user1", "2023-01-04"),
("user2", "2023-01-01"), ("user2", "2023-01-03"), ("user2", "2023-01-04")]
login_df = spark.createDataFrame(login_data, ["user_id", "login_date"])
login_df = login_df.withColumn("login_date", col("login_date").cast("date"))
# 定义窗口规范
user_window = Window.partitionBy("user_id").orderBy("login_date")
# 计算连续登录天数间隔
login_analysis_df = login_df.withColumn("prev_login", lag("login_date", 1).over(user_window)) \
.withColumn("next_login", lead("login_date", 1).over(user_window)) \
.withColumn("days_since_prev", datediff("login_date", "prev_login"))
login_analysis_df.show()
这个示例可以帮助识别用户是否连续登录,或计算两次登录之间的间隔天数。
5.4.2 FIRST_VALUE与LAST_VALUE
这两个函数用于获取窗口中的第一个或最后一个值:
from pyspark.sql.functions import first_value, last_value
# 定义窗口规范,包含框架定义
window_spec = Window.partitionBy("subject") \
.orderBy(col("score").desc()) \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
# 获取最高分和最低分
score_extremes_df = df.withColumn("highest_score", first_value("score").over(window_spec)) \
.withColumn("lowest_score", last_value("score").over(window_spec))
score_extremes_df.show()
注意:使用LAST_VALUE时需要特别注意窗口框架的定义,默认框架是从开始到当前行,可能无法得到预期结果。
5.5 窗口聚合函数
聚合函数(如SUM、AVG、COUNT等)可以作为窗口函数使用,计算窗口内的聚合值,而不减少行数。
5.5.1 移动平均值计算
from pyspark.sql.functions import avg, sum, count
# 创建销售数据
sales_data = [("2023-01-01", 100), ("2023-01-02", 150), ("2023-01-03", 200),
("2023-01-04", 180), ("2023-01-05", 220), ("2023-01-06", 250)]
sales_df = spark.createDataFrame(sales_data, ["date", "amount"])
sales_df = sales_df.withColumn("date", col("date").cast("date"))
# 定义滑动窗口:前2天到当前天
sliding_window = Window.orderBy("date") \
.rowsBetween(-2, Window.currentRow) # 前2行到当前行
# 计算移动平均和总和
moving_avg_df = sales_df.withColumn("3day_avg", avg("amount").over(sliding_window)) \
.withColumn("3day_sum", sum("amount").over(sliding_window)) \
.withColumn("3day_count", count("amount").over(sliding_window))
moving_avg_df.show()
5.5.2 累积聚合
# 定义累积窗口
cumulative_window = Window.orderBy("date") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
# 计算累积总和和平均值
cumulative_df = sales_df.withColumn("cumulative_sum", sum("amount").over(cumulative_window)) \
.withColumn("cumulative_avg", avg("amount").over(cumulative_window))
cumulative_df.show()
5.6 窗口框架详解
窗口框架定义了窗口中的具体行范围,是窗口函数的重要组成部分。Spark SQL支持两种类型的窗口框架:
- 行框架(ROWS BETWEEN):基于物理行数定义范围
- 范围框架(RANGE BETWEEN):基于排序键的逻辑范围
框架通过rowsBetween或rangeBetween方法定义,语法如下:
# 行框架示例
window.rowsBetween(Window.unboundedPreceding, Window.currentRow) # 从开始到当前行
window.rowsBetween(-3, 0) # 前3行到当前行(共4行)
# 范围框架示例(适用于数值或日期类型的排序键)
window.rangeBetween(-7, 0) # 排序键值在当前行-7到当前行之间
示例:行框架 vs 范围框架
# 创建包含重复值的数据
data = [(1, "A", 10), (2, "A", 10), (3, "A", 20), (4, "A", 20), (5, "A", 30)]
df = spark.createDataFrame(data, ["id", "category", "value"])
# 行框架:前1行到当前行(物理行)
row_window = Window.partitionBy("category").orderBy("value").rowsBetween(-1, 0)
# 范围框架:值在当前值-5到当前值之间(逻辑范围)
range_window = Window.partitionBy("category").orderBy("value").rangeBetween(-5, 0)
# 添加聚合列
frame_comparison_df = df.withColumn("row_sum", sum("id").over(row_window)) \
.withColumn("range_sum", sum("id").over(range_window))
frame_comparison_df.show()
这个示例将展示行框架和范围框架在处理重复值时的区别:行框架严格基于物理行数,而范围框架基于排序键的逻辑范围。
5.7 高级窗口函数应用
5.7.1 会话化分析
窗口函数可用于将连续事件分组为会话(Sessions):
from pyspark.sql.functions import sum as f_sum, when, col
# 定义会话超时时间(例如30分钟)
SESSION_TIMEOUT = 30 * 60 # 秒
# 假设我们有用户事件数据
event_data = [("user1", "2023-01-01 08:00:00"), ("user1", "2023-01-01 08:15:00"),
("user1", "2023-01-0108:40:00"), ("user1", "2023-01-01 08:45:00")]
event_df = spark.createDataFrame(event_data, ["user_id", "event_time"])
event_df = event_df.withColumn("event_time", col("event_time").cast("timestamp"))
# 计算时间差并判断是否开始新会话
user_window = Window.partitionBy("user_id").orderBy("event_time")
session_df = event_df.withColumn("prev_time", lag("event_time", 1).over(user_window)) \
.withColumn("time_diff",
when(col("prev_time").isNotNull(),
col("event_time").cast("long") - col("prev_time").cast("long"))
.otherwise(SESSION_TIMEOUT + 1)) \
.withColumn("new_session", when(col("time_diff") > SESSION_TIMEOUT, 1).otherwise(0)) \
.withColumn("session_id", f_sum("new_session").over(user_window.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
session_df.show()
这个示例将用户事件按时间间隔分组为不同会话,当两次事件间隔超过30分钟时,创建新会话。
5.7.2 同比环比分析
使用窗口函数可以方便地进行同比(与去年同期比较)和环比(与上一周期比较)分析:
from pyspark.sql.functions import month, year, lag, round
# 创建月度销售数据
sales_data = [("2021-01-01", 100), ("2021-02-01", 120), ("2021-03-01", 130),
("2022-01-01", 110), ("2022-02-01", 125), ("2022-03-01", 140)]
sales_df = spark.createDataFrame(sales_data, ["month", "amount"])
sales_df = sales_df.withColumn("month", col("month").cast("date")) \
.withColumn("year", year("month")) \
.withColumn("month_of_year", month("month"))
# 同比分析(与去年同期比较)
yoy_window = Window.partitionBy("month_of_year").orderBy("year")
yoy_analysis_df = sales_df.withColumn("prev_year_amount", lag("amount", 1).over(yoy_window)) \
.withColumn("yoy_growth", col("amount") - col("prev_year_amount")) \
.withColumn("yoy_growth_rate",
round((col("amount") / col("prev_year_amount") - 1) * 100, 2))
# 环比分析(与上月比较)
mom_window = Window.orderBy("month")
mom_analysis_df = yoy_analysis_df.withColumn("prev_month_amount", lag("amount", 1).over(mom_window)) \
.withColumn("mom_growth", col("amount") - col("prev_month_amount")) \
.withColumn("mom_growth_rate",
round((col("amount") / col("prev_month_amount") - 1) * 100, 2))
mom_analysis_df.select("month", "amount", "yoy_growth_rate", "mom_growth_rate").show()
这个示例展示了如何使用窗口函数计算同比增长率(与去年同期比较)和环比增长率(与上月比较),这在业务分析中非常常见。
6. 自定义函数:UDF、UDAF与UDTF
6.1 自定义函数概述
虽然Spark SQL提供了丰富的内置函数,但在实际应用中,我们经常需要实现特定业务逻辑的函数 Spark SQL支持三种类型的自定义函数:
- UDF(User-Defined Function):自定义标量函数,一行输入一行输出
- UDAF(User-Defined Aggregate Function):自定义聚合函数,多行输入一行输出
- UDTF(User-Defined Table-Generating Function):自定义表生成函数,一行输入多行输出
自定义函数扩展了Spark SQL的能力,允许我们将复杂的业务逻辑封装为可重用的函数。
6.2 用户自定义标量函数(UDF)
UDF是最常用的自定义函数类型,用于对单个或多个列进行转换操作。
6.2.1 Python UDF
在PySpark
9317

被折叠的 条评论
为什么被折叠?



