掌握大数据领域Spark SQL的高级用法

掌握大数据领域Spark SQL的高级用法:从原理到实战的全面指南

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

摘要

Apache Spark已成为大数据处理领域的事实标准,而Spark SQL作为其核心组件,为开发者提供了以SQL方式处理结构化数据的强大能力。本文将深入探讨Spark SQL的高级用法,从执行原理到优化技巧,从复杂数据类型处理到性能调优,全方位解析如何充分发挥Spark SQL的潜力。无论你是数据工程师、数据分析师还是大数据平台开发人员,本文都将帮助你掌握Spark SQL的高级特性,构建高效、可扩展的数据处理 pipelines。

关键词:Spark SQL, 大数据, 性能优化, 自定义函数, 窗口函数, 复杂数据类型, 执行计划, Structured Streaming

目录

  1. 引言:Spark SQL的演进与价值
  2. Spark SQL核心架构深度解析
  3. 执行计划与优化器:Catalyst优化器详解
  4. 高级数据类型操作:JSON、数组与结构体
  5. 窗口函数与高级聚合
  6. 自定义函数:UDF、UDAF与UDTF
  7. 性能优化实战:从理论到实践
  8. Spark SQL与Hive集成
  9. 数据源扩展与自定义
  10. Structured Streaming与实时SQL分析
  11. 项目实战:电商用户行为分析平台
  12. Spark SQL最佳实践与陷阱规避
  13. 未来趋势与挑战
  14. 总结
  15. 附录:常用工具与资源

1. 引言:Spark SQL的演进与价值

1.1 从MapReduce到Spark SQL

大数据处理技术经历了从复杂到简单、从低级API到高级API的演进过程。Apache Hadoop的MapReduce框架为批处理奠定了基础,但需要开发者编写大量样板代码。随后出现的Hive通过SQL接口简化了大数据查询,但性能和灵活性仍有不足。

Spark SQL的出现彻底改变了这一局面。它将SQL的易用性与Spark的高性能完美结合,允许用户使用SQL或DataFrame API查询结构化数据,同时支持复杂的数据分析和转换操作。

1.2 Spark SQL的核心价值

Spark SQL提供了以下关键价值:

  • 统一数据访问:通过DataFrame API和SQL,统一访问各种数据源(Hive、Avro、Parquet、JSON、JDBC等)
  • 高性能:基于Catalyst优化器和Tungsten执行引擎,提供比传统SQL-on-Hadoop解决方案更高的性能
  • 兼容性:兼容ANSI SQL和HiveQL,保护现有SQL代码投资
  • 扩展性:支持自定义函数、数据源和优化规则
  • 集成性:与Spark生态系统无缝集成,包括MLlib(机器学习)、GraphX(图处理)和Structured Streaming(流处理)

1.3 Spark SQL版本演进与重要特性

版本发布时间关键特性
Spark 1.02014年引入DataFrame API,支持SQL查询
Spark 1.32015年引入Catalyst优化器,支持Parquet列式存储
Spark 1.62016年引入Dataset API,结合DataFrame和RDD的优势
Spark 2.02016年统一DataFrame和Dataset API,引入SparkSession
Spark 2.22017年增强Structured Streaming,引入水印机制
Spark 2.42018年引入矢量化Parquet读取,增强SQL功能
Spark 3.02020年引入自适应查询执行(AQE),动态分区裁剪,显著性能提升
Spark 3.22021年增强CBO,支持ANSI SQL模式,性能优化
Spark 3.42023年改进自适应查询执行,增强Python UDF性能

本文将基于Spark 3.x版本进行讲解,重点关注最新版本中的高级特性和优化。

1.4 本文学习路径

本文旨在带领读者从Spark SQL基础逐步深入到高级应用。我们将首先解析Spark SQL的核心架构和执行原理,然后详细讲解各种高级功能,最后通过实战项目展示如何综合运用这些技术解决实际问题。建议读者具备基本的Spark和SQL知识,但即使是初学者,通过系统学习也能掌握这些高级概念。

2. Spark SQL核心架构深度解析

2.1 Spark SQL整体架构

Spark SQL构建在Spark Core之上,提供了处理结构化数据的高级抽象。其核心架构如图所示:

SQL, DataFrame, Dataset
用户API
Catalyst优化器
逻辑计划
物理计划
Tungsten执行引擎
结果输出
数据源API
Catalog元数据
Hive支持

Spark SQL架构主要包含以下组件:

  • API层:提供SQL、DataFrame和Dataset三种接口
  • Catalyst优化器:负责查询解析、优化和代码生成
  • Tungsten执行引擎:基于内存管理和二进制处理的高效执行引擎
  • 数据源API:统一的数据源访问接口
  • Catalog:元数据管理中心,支持Hive Metastore集成

2.2 DataFrame与Dataset

DataFrame和Dataset是Spark SQL提供的两种主要数据抽象,它们构建在RDD之上,但提供了更高效的处理能力:

  • DataFrame:分布式的行数据集,组织成命名列,类似于关系数据库中的表。DataFrame提供了结构化API,支持编译时类型安全检查。

  • Dataset:结合了DataFrame的结构化优势和RDD的类型安全特性,是强类型的分布式数据集。

在Spark 2.0之后,DataFrame被定义为Dataset[Row],即每行数据类型为Row的Dataset。

DataFrame创建示例(Python):

from pyspark.sql import SparkSession

# 创建SparkSession
spark = SparkSession.builder \
    .appName("DataFrameExample") \
    .master("local[*]") \
    .getOrCreate()

# 从列表创建DataFrame
data = [("Alice", 25, "F"), ("Bob", 30, "M"), ("Charlie", 35, "M")]
df = spark.createDataFrame(data, ["name", "age", "gender"])

# 显示DataFrame
df.show()
"""
+-------+---+------+
|   name|age|gender|
+-------+---+------+
|  Alice| 25|     F|
|    Bob| 30|     M|
|Charlie| 35|     M|
+-------+---+------+
"""

# 从CSV文件创建DataFrame
df = spark.read.csv("people.csv", header=True, inferSchema=True)

Dataset创建示例(Scala):

import org.apache.spark.sql.SparkSession

case class Person(name: String, age: Int, gender: String)

val spark = SparkSession.builder()
  .appName("DatasetExample")
  .master("local[*]")
  .getOrCreate()
  
import spark.implicits._

// 创建Dataset
val data = Seq(Person("Alice", 25, "F"), Person("Bob", 30, "M"), Person("Charlie", 35, "M"))
val ds = data.toDS()

// 显示Dataset
ds.show()

DataFrame/Dataset相比RDD的优势:

  1. 结构化优化:利用模式信息进行查询优化
  2. 内存效率:采用列式存储和Tungsten二进制格式
  3. 查询优化:通过Catalyst优化器自动优化执行计划
  4. API简洁:提供高级操作,减少样板代码

2.3 Catalyst优化器工作流程

Catalyst是Spark SQL的核心,负责查询优化。其工作流程分为几个阶段:

graph LR
    A[解析(Parse)] --> B[分析(Analyze)]
    B --> C[逻辑优化(Optimize)]
    C --> D[物理计划(Physical Plan)]
    D --> E[代码生成(Code Gen)]
  1. 解析(Parse):将SQL字符串转换为抽象语法树(AST)
  2. 分析(Analyze):结合Catalog元数据解析AST,生成逻辑计划
  3. 逻辑优化:应用规则优化逻辑计划(如谓词下推、常量折叠)
  4. 物理计划:将逻辑计划转换为多个物理计划,选择最优计划
  5. 代码生成:将物理计划转换为Java字节码执行

Catalyst采用基于规则和基于成本的混合优化策略,既应用预定义规则(如谓词下推),又根据统计信息选择成本最低的执行计划。

2.4 Tungsten执行引擎

Tungsten是Spark的执行引擎,旨在充分利用现代硬件特性,主要优化包括:

  • 内存管理:使用自定义内存管理器,减少JVM对象开销和垃圾回收压力
  • 二进制处理:数据以二进制格式存储,避免Java对象开销
  • 向量化执行:批处理方式处理数据,提高CPU缓存利用率
  • 代码生成:使用WHOLESTAGE_CODEGEN将物理计划转换为高效的Java字节码

Tungsten的核心思想是"以数据为中心"的计算,最大限度地减少数据移动和序列化开销,提高执行效率。

3. 执行计划与优化器:Catalyst优化器详解

3.1 查询执行流程

要深入理解Spark SQL的高级用法,首先需要掌握其查询执行流程。一个Spark SQL查询从提交到执行的完整流程如下:

  1. 用户提交SQL查询或DataFrame操作
  2. Spark SQL将查询转换为未解析的逻辑计划
  3. 分析器结合元数据解析逻辑计划
  4. 优化器应用规则优化逻辑计划
  5. 规划器生成多个物理计划并选择最优方案
  6. 执行引擎执行物理计划并返回结果

我们可以通过explain方法查看查询的执行计划,这是优化查询性能的关键工具。

示例:查看DataFrame操作的执行计划

# 创建示例DataFrame
df = spark.createDataFrame([(1, "Alice", 25), (2, "Bob", 30)], ["id", "name", "age"])

# 执行查询并查看执行计划
df.filter(df.age > 28).groupBy("name").count().explain()

执行结果将显示详细的执行计划,包括逻辑计划和物理计划。

3.2 逻辑计划与物理计划

3.2.1 逻辑计划

逻辑计划表示查询要执行的操作,不涉及具体的执行细节。逻辑计划分为三个阶段:

  1. 未解析的逻辑计划:仅包含语法结构,不验证表和列是否存在
  2. 解析后的逻辑计划:结合Catalog验证元数据,解析表和列
  3. 优化后的逻辑计划:应用优化规则后的逻辑计划

Catalyst使用启发式规则优化逻辑计划,主要优化包括:

  • 谓词下推(Predicate Pushdown):将过滤操作尽可能下推到数据源,减少数据读取量
  • 常量折叠(Constant Folding):预先计算表达式中的常量
  • 列裁剪(Column Pruning):只读取查询所需的列
  • 聚合重写(Aggregation Rewriting):优化聚合操作,如替换distinct为group by
3.2.2 物理计划

物理计划是逻辑计划的具体实现,考虑了执行细节,如连接算法选择、分区策略等。Spark支持多种物理计划实现,如:

  • HashAggregate vs SortAggregate:两种聚合实现方式
  • BroadcastHashJoin vs ShuffledHashJoin vs SortMergeJoin:不同的连接算法
  • InMemoryTableScan vs FileScan:内存扫描 vs 文件扫描

Catalyst使用基于成本的优化(CBO)选择最优物理计划,考虑因素包括:

  • 表的大小和行数
  • 列的基数和数据分布
  • 可用内存和CPU资源

要启用CBO,需要设置:

spark.conf.set("spark.sql.cbo.enabled", "true")

并确保收集了表的统计信息:

ANALYZE TABLE table_name COMPUTE STATISTICS;
ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS column1, column2;

3.3 执行计划分析实战

理解执行计划是优化Spark SQL查询的关键。让我们通过一个复杂示例详细分析执行计划:

示例查询

SELECT 
  c.category_name,
  COUNT(DISTINCT o.order_id) AS order_count,
  SUM(oi.quantity * oi.unit_price) AS total_sales
FROM 
  orders o
JOIN 
  order_items oi ON o.order_id = oi.order_id
JOIN 
  products p ON oi.product_id = p.product_id
JOIN 
  categories c ON p.category_id = c.category_id
WHERE 
  o.order_date >= '2023-01-01'
  AND o.order_status = 'completed'
GROUP BY 
  c.category_name
ORDER BY 
  total_sales DESC
LIMIT 10;

使用explain extended命令查看详细执行计划,我们可以分析以下关键信息:

  1. 连接顺序:Spark会根据表大小和统计信息决定最优连接顺序
  2. 连接算法:小表可能使用BroadcastHashJoin,大表可能使用SortMergeJoin
  3. 谓词位置:检查过滤条件是否被下推到数据源或连接之前
  4. 聚合策略:查看是否使用了HashAggregate或SortAggregate
  5. 排序操作:ORDER BY操作是否导致了全局排序

通过分析执行计划,我们可以识别查询中的性能瓶颈,如不必要的Shuffle、缺少谓词下推等。

3.4 基于规则的优化(RBO)与基于成本的优化(CBO)

Spark SQL同时支持基于规则的优化(RBO)和基于成本的优化(CBO):

  • RBO:应用预定义规则优化查询,如谓词下推、列裁剪等
  • CBO:基于表和列的统计信息(行数、基数、数据大小等)选择成本最低的执行计划

启用和配置CBO

# 启用CBO
spark.conf.set("spark.sql.cbo.enabled", "true")

# 配置统计信息收集
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")  # 启用连接重排序

收集统计信息

-- 收集表级统计信息
ANALYZE TABLE orders COMPUTE STATISTICS;

-- 收集列级统计信息
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS order_id, order_date, order_status;

CBO在处理多表连接、复杂子查询时特别有效,能显著提升查询性能。但CBO的效果依赖于准确的统计信息,因此定期收集统计信息至关重要。

3.5 执行计划分析工具与技巧

分析执行计划是优化Spark SQL查询的关键技能。以下是一些实用技巧:

  1. 使用不同级别的explain

    • explain():简要显示物理计划
    • explain(True)explain(mode="extended"):显示逻辑计划和物理计划
    • explain(mode="codegen"):显示生成的代码
    • explain(mode="cost"):显示成本信息
  2. 识别Shuffle操作:执行计划中的Exchange操作表示Shuffle,通常是性能瓶颈

  3. 检查谓词下推:寻找PushedFilters信息,确认过滤条件是否被下推

  4. 分析连接策略:确认是否使用了最优的连接算法(BroadcastHashJoin通常优于SortMergeJoin)

  5. 查看数据倾斜:执行计划中的数据大小和行数统计可帮助识别数据倾斜

示例:分析包含Shuffle的执行计划

# 查看可能导致Shuffle的操作的执行计划
df.groupBy("category").agg({"sales": "sum"}).explain(mode="extended")

在执行计划中寻找Exchange节点,这表示发生了Shuffle操作。Shuffle通常是必要的,但应尽量减少其数据量。

4. 高级数据类型操作:JSON、数组与结构体

4.1 复杂数据类型概述

Spark SQL支持多种复杂数据类型,用于处理半结构化和嵌套数据:

  • 数组(Array):有序元素集合,如Array[String]
  • 结构体(Struct):命名字段的集合,如Struct(name: String, age: Int)
  • 映射(Map):键值对集合,如Map[String, Int]
  • JSON:可以解析为Struct或Array的JSON字符串

这些复杂类型在处理现实世界数据时非常有用,如用户行为日志、产品属性、嵌套事件数据等。

4.2 JSON数据处理

Spark SQL提供了强大的JSON数据处理功能,可以直接读取JSON文件,或处理字符串类型的JSON数据。

4.2.1 读取JSON文件
# 读取JSON文件,自动推断schema
df = spark.read.json("path/to/json/files", multiLine=True)  # multiLine=True支持多行JSON

# 显示schema
df.printSchema()

# 指定schema读取JSON,提高性能
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("address", StructType([
        StructField("street", StringType(), True),
        StructField("city", StringType(), True)
    ]))
])

df = spark.read.json("path/to/json/files", schema=schema)
4.2.2 处理JSON字符串

当JSON数据以字符串形式存储在列中时,可以使用from_json函数解析:

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, IntegerType

# 示例数据
data = [(1, '{"name":"Alice","age":25,"address":{"city":"New York","zip":10001}}'),
        (2, '{"name":"Bob","age":30,"address":{"city":"Los Angeles","zip":90001}}')]

df = spark.createDataFrame(data, ["id", "json_data"])

# 定义JSON schema
json_schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("address", StructType([
        StructField("city", StringType(), True),
        StructField("zip", IntegerType(), True)
    ]))
])

# 解析JSON字符串
df_parsed = df.withColumn("data", from_json(col("json_data"), json_schema)) \
              .select("id", 
                      col("data.name").alias("name"), 
                      col("data.age").alias("age"),
                      col("data.address.city").alias("city"),
                      col("data.address.zip").alias("zip"))

df_parsed.show()
4.2.3 生成JSON字符串

使用to_json函数可以将Struct类型转换为JSON字符串:

from pyspark.sql.functions import to_json, struct

# 将多列合并为JSON字符串
df_json = df_parsed.withColumn("json_result", to_json(struct("name", "age", "city")))
df_json.select("id", "json_result").show(truncate=False)

4.3 数组操作

数组是Spark SQL中常用的复杂类型,用于表示有序集合。Spark提供了丰富的数组操作函数。

4.3.1 创建数组
from pyspark.sql.functions import array, array_contains

# 创建数组列
df = spark.createDataFrame([(1, "Alice", [25, 30, 35])], ["id", "name", "ages"])

# 或使用array函数
df_with_array = df.withColumn("hobbies", array(lit("reading"), lit("sports"), lit("music")))
4.3.2 数组元素访问与查询
from pyspark.sql.functions import col, size, array_contains, element_at

# 访问数组元素(索引从1开始)
df.select(col("ages")[0].alias("first_age")).show()  # Python风格索引(从0开始)
df.select(element_at("ages", 1).alias("first_age")).show()  # SQL风格索引(从1开始)

# 数组长度
df.select(size("ages").alias("age_count")).show()

# 检查数组包含元素
df.select(array_contains("ages", 30).alias("has_30")).show()
4.3.3 数组展开与聚合
from pyspark.sql.functions import explode, explode_outer, flatMapGroupsInPandas, collect_list, collect_set

# explode: 将数组元素展开为多行
df_exploded = df.select("id", "name", explode("ages").alias("age"))

# explode_outer: 处理空数组,保留null值
df_exploded_outer = df.select("id", "name", explode_outer("ages").alias("age"))

# collect_list: 将多行聚合为数组
df_aggregated = df_exploded.groupBy("id", "name").agg(collect_list("age").alias("ages_list"))

# collect_set: 聚合为去重数组
df_aggregated_set = df_exploded.groupBy("id", "name").agg(collect_set("age").alias("ages_set"))
4.3.4 高级数组函数

Spark SQL提供了许多高级数组函数,如排序、过滤、转换等:

from pyspark.sql.functions import array_sort, array_filter, transform, aggregate

# 数组排序
df.select(array_sort("ages").alias("sorted_ages")).show()

# 数组过滤
df.select(array_filter("ages", lambda x: x > 28).alias("filtered_ages")).show()

# 数组转换
df.select(transform("ages", lambda x: x * 2).alias("doubled_ages")).show()

# 数组聚合
df.select(aggregate("ages", 0, lambda acc, x: acc + x).alias("total")).show()

4.4 结构体操作

结构体(Struct)用于表示嵌套数据,类似于关系数据库中的对象类型。

4.4.1 创建结构体
from pyspark.sql.functions import struct, col

# 使用struct函数创建结构体列
df = spark.createDataFrame([(1, "Alice", "New York", 25)], ["id", "name", "city", "age"])
df_with_struct = df.withColumn("person_info", struct(col("name"), col("age"), col("city")))
df_with_struct.printSchema()
4.4.2 访问结构体字段
# 访问结构体字段
df_with_struct.select("id", "person_info.name", "person_info.age").show()

# 或使用点表示法
df_with_struct.select(col("id"), col("person_info.name").alias("username")).show()
4.4.3 结构体与关系转换
# 将结构体展开为多个列
df_flattened = df_with_struct.select("id", "person_info.*")

# 嵌套结构体操作
data = [(1, ("Alice", 25), ("New York", "USA")), 
        (2, ("Bob", 30), ("London", "UK"))]
df_nested = spark.createDataFrame(data, ["id", "person", "address"])

# 访问嵌套字段
df_nested.select("id", "person._1", "address._2").show()

# 重命名字段并创建更清晰的嵌套结构
df_renamed = df_nested.withColumn("name", col("person._1")) \
                      .withColumn("age", col("person._2")) \
                      .withColumn("city", col("address._1")) \
                      .withColumn("country", col("address._2")) \
                      .drop("person", "address") \
                      .withColumn("info", struct(col("name"), col("age"))) \
                      .withColumn("location", struct(col("city"), col("country")))

4.5 复杂类型综合应用

在实际应用中,我们经常需要综合使用各种复杂类型。以下是一个处理电商订单数据的示例:

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, DoubleType

# 定义订单数据schema
order_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("items", ArrayType(StructType([
        StructField("product_id", StringType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("price", DoubleType(), True),
        StructField("attributes", MapType(StringType(), StringType()))
    ]))),
    StructField("shipping_address", StructType([
        StructField("street", StringType(), True),
        StructField("city", StringType(), True),
        StructField("state", StringType(), True),
        StructField("zipcode", StringType(), True)
    ]))
])

# 读取订单数据(假设为JSON格式)
orders_df = spark.read.schema(order_schema).json("path/to/orders")

# 1. 展开订单商品
order_items_df = orders_df.select(
    F.col("order_id"),
    F.col("customer_id"),
    F.col("order_date"),
    F.explode("items").alias("item")
).select(
    "order_id", "customer_id", "order_date",
    "item.product_id", "item.quantity", "item.price",
    F.col("item.attributes").getItem("color").alias("color"),
    F.col("item.attributes").getItem("size").alias("size")
)

# 2. 计算订单总金额
order_totals_df = order_items_df.groupBy("order_id", "customer_id") \
    .agg(
        F.sum(F.col("quantity") * F.col("price")).alias("total_amount"),
        F.collect_set("product_id").alias("product_ids"),
        F.count("product_id").alias("item_count")
    )

# 3. 查找包含特定属性的订单
red_orders_df = orders_df.select(
    "order_id", "customer_id",
    F.explode("items").alias("item")
).filter(
    (F.col("item.attributes.color") == "red") & 
    (F.col("item.attributes.size") == "L")
).select("order_id", "customer_id", "item.product_id").distinct()

这个示例展示了如何处理包含数组、结构体和映射的复杂订单数据,包括展开数组、访问嵌套字段、聚合计算等操作。

5. 窗口函数与高级聚合

5.1 窗口函数概述

窗口函数(Window Functions)是Spark SQL中最强大的功能之一,允许用户在一组行上执行计算,同时保留原始行数据。窗口函数结合了聚合函数和行级操作的优点,特别适用于排名、移动平均值、累积计算等场景。

窗口函数的基本语法如下:

function_name() OVER (
    [PARTITION BY partition_column]
    [ORDER BY order_column]
    [ROWS BETWEEN frame_start AND frame_end]
)

其中:

  • PARTITION BY:将数据划分为多个组(窗口)
  • ORDER BY:定义窗口内的排序方式
  • ROWS BETWEEN:定义窗口的物理范围(行框架)

在Spark SQL中,窗口函数可以通过DataFrame API或SQL使用。

5.2 窗口函数类型

Spark SQL支持多种窗口函数,主要分为以下几类:

  1. 排名函数:ROW_NUMBER, RANK, DENSE_RANK, PERCENT_RANK, NTILE
  2. 分析函数:CUME_DIST, LAG, LEAD, FIRST_VALUE, LAST_VALUE
  3. 聚合函数:SUM, AVG, COUNT, MAX, MIN等(作为窗口函数使用)

5.3 排名函数实战

5.3.1 ROW_NUMBER, RANK与DENSE_RANK

这三个函数用于为每行分配排名,但处理并列排名的方式不同:

  • ROW_NUMBER:为每行分配唯一序号,即使有并列值
  • RANK:并列值获得相同排名,下一名次跳过相应数量
  • DENSE_RANK:并列值获得相同排名,下一名次连续不跳过

示例:学生成绩排名

from pyspark.sql import Window
from pyspark.sql.functions import row_number, rank, dense_rank, col

# 创建示例数据
data = [("Alice", "Math", 90), ("Alice", "English", 85), 
        ("Bob", "Math", 90), ("Bob", "English", 95),
        ("Charlie", "Math", 80), ("Charlie", "English", 85)]
df = spark.createDataFrame(data, ["name", "subject", "score"])

# 定义窗口规范
window_spec = Window.partitionBy("subject").orderBy(col("score").desc())

# 应用排名函数
rank_df = df.withColumn("row_num", row_number().over(window_spec)) \
            .withColumn("rank", rank().over(window_spec)) \
            .withColumn("dense_rank", dense_rank().over(window_spec))

rank_df.orderBy("subject", "row_num").show()

执行结果:

+-------+-------+-----+-------+----+----------+
|   name|subject|score|row_num|rank|dense_rank|
+-------+-------+-----+-------+----+----------+
|    Bob|English|   95|      1|   1|         1|
|   Alice|English|   85|      2|   2|         2|
|Charlie|English|   85|      3|   2|         2|
|   Alice|   Math|   90|      1|   1|         1|
|    Bob|   Math|   90|      2|   1|         1|
|Charlie|   Math|   80|      3|   3|         2|
+-------+-------+-----+-------+----+----------+

可以看到,在Math科目中,Alice和Bob都是90分:

  • ROW_NUMBER给他们分配了1和2
  • RANK都给了1,下一名直接是3(跳过了2)
  • DENSE_RANK都给了1,下一名是2(连续)
5.3.2 PERCENT_RANK与NTILE
  • PERCENT_RANK:计算相对排名百分比,公式为(rank - 1) / (total_rows - 1)
  • NTILE:将数据划分为指定数量的桶,为每行分配桶编号

示例

from pyspark.sql.functions import percent_rank, ntile

percentile_df = df.withColumn("percent_rank", percent_rank().over(window_spec)) \
                  .withColumn("ntile_3", ntile(3).over(window_spec))

percentile_df.orderBy("subject", "score").show()

5.4 分析函数实战

5.4.1 LAG与LEAD

LAG和LEAD函数用于访问窗口中当前行之前或之后的行数据,无需自连接:

  • LAG(col, n): 获取当前行之前第n行的col值
  • LEAD(col, n): 获取当前行之后第n行的col值

示例:分析用户连续登录情况

from pyspark.sql.functions import lag, lead, datediff, col

# 创建用户登录数据
login_data = [("user1", "2023-01-01"), ("user1", "2023-01-02"), ("user1", "2023-01-04"),
              ("user2", "2023-01-01"), ("user2", "2023-01-03"), ("user2", "2023-01-04")]
login_df = spark.createDataFrame(login_data, ["user_id", "login_date"])
login_df = login_df.withColumn("login_date", col("login_date").cast("date"))

# 定义窗口规范
user_window = Window.partitionBy("user_id").orderBy("login_date")

# 计算连续登录天数间隔
login_analysis_df = login_df.withColumn("prev_login", lag("login_date", 1).over(user_window)) \
                            .withColumn("next_login", lead("login_date", 1).over(user_window)) \
                            .withColumn("days_since_prev", datediff("login_date", "prev_login"))

login_analysis_df.show()

这个示例可以帮助识别用户是否连续登录,或计算两次登录之间的间隔天数。

5.4.2 FIRST_VALUE与LAST_VALUE

这两个函数用于获取窗口中的第一个或最后一个值:

from pyspark.sql.functions import first_value, last_value

# 定义窗口规范,包含框架定义
window_spec = Window.partitionBy("subject") \
                    .orderBy(col("score").desc()) \
                    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# 获取最高分和最低分
score_extremes_df = df.withColumn("highest_score", first_value("score").over(window_spec)) \
                      .withColumn("lowest_score", last_value("score").over(window_spec))

score_extremes_df.show()

注意:使用LAST_VALUE时需要特别注意窗口框架的定义,默认框架是从开始到当前行,可能无法得到预期结果。

5.5 窗口聚合函数

聚合函数(如SUM、AVG、COUNT等)可以作为窗口函数使用,计算窗口内的聚合值,而不减少行数。

5.5.1 移动平均值计算
from pyspark.sql.functions import avg, sum, count

# 创建销售数据
sales_data = [("2023-01-01", 100), ("2023-01-02", 150), ("2023-01-03", 200),
              ("2023-01-04", 180), ("2023-01-05", 220), ("2023-01-06", 250)]
sales_df = spark.createDataFrame(sales_data, ["date", "amount"])
sales_df = sales_df.withColumn("date", col("date").cast("date"))

# 定义滑动窗口:前2天到当前天
sliding_window = Window.orderBy("date") \
                       .rowsBetween(-2, Window.currentRow)  # 前2行到当前行

# 计算移动平均和总和
moving_avg_df = sales_df.withColumn("3day_avg", avg("amount").over(sliding_window)) \
                        .withColumn("3day_sum", sum("amount").over(sliding_window)) \
                        .withColumn("3day_count", count("amount").over(sliding_window))

moving_avg_df.show()
5.5.2 累积聚合
# 定义累积窗口
cumulative_window = Window.orderBy("date") \
                          .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# 计算累积总和和平均值
cumulative_df = sales_df.withColumn("cumulative_sum", sum("amount").over(cumulative_window)) \
                        .withColumn("cumulative_avg", avg("amount").over(cumulative_window))

cumulative_df.show()

5.6 窗口框架详解

窗口框架定义了窗口中的具体行范围,是窗口函数的重要组成部分。Spark SQL支持两种类型的窗口框架:

  1. 行框架(ROWS BETWEEN):基于物理行数定义范围
  2. 范围框架(RANGE BETWEEN):基于排序键的逻辑范围

框架通过rowsBetweenrangeBetween方法定义,语法如下:

# 行框架示例
window.rowsBetween(Window.unboundedPreceding, Window.currentRow)  # 从开始到当前行
window.rowsBetween(-3, 0)  # 前3行到当前行(共4行)

# 范围框架示例(适用于数值或日期类型的排序键)
window.rangeBetween(-7, 0)  # 排序键值在当前行-7到当前行之间

示例:行框架 vs 范围框架

# 创建包含重复值的数据
data = [(1, "A", 10), (2, "A", 10), (3, "A", 20), (4, "A", 20), (5, "A", 30)]
df = spark.createDataFrame(data, ["id", "category", "value"])

# 行框架:前1行到当前行(物理行)
row_window = Window.partitionBy("category").orderBy("value").rowsBetween(-1, 0)

# 范围框架:值在当前值-5到当前值之间(逻辑范围)
range_window = Window.partitionBy("category").orderBy("value").rangeBetween(-5, 0)

# 添加聚合列
frame_comparison_df = df.withColumn("row_sum", sum("id").over(row_window)) \
                        .withColumn("range_sum", sum("id").over(range_window))

frame_comparison_df.show()

这个示例将展示行框架和范围框架在处理重复值时的区别:行框架严格基于物理行数,而范围框架基于排序键的逻辑范围。

5.7 高级窗口函数应用

5.7.1 会话化分析

窗口函数可用于将连续事件分组为会话(Sessions):

from pyspark.sql.functions import sum as f_sum, when, col

# 定义会话超时时间(例如30分钟)
SESSION_TIMEOUT = 30 * 60  # 秒

# 假设我们有用户事件数据
event_data = [("user1", "2023-01-01 08:00:00"), ("user1", "2023-01-01 08:15:00"), 
              ("user1", "2023-01-0108:40:00"), ("user1", "2023-01-01 08:45:00")]
event_df = spark.createDataFrame(event_data, ["user_id", "event_time"])
event_df = event_df.withColumn("event_time", col("event_time").cast("timestamp"))

# 计算时间差并判断是否开始新会话
user_window = Window.partitionBy("user_id").orderBy("event_time")
session_df = event_df.withColumn("prev_time", lag("event_time", 1).over(user_window)) \
                     .withColumn("time_diff", 
                                 when(col("prev_time").isNotNull(), 
                                      col("event_time").cast("long") - col("prev_time").cast("long"))
                                 .otherwise(SESSION_TIMEOUT + 1)) \
                     .withColumn("new_session", when(col("time_diff") > SESSION_TIMEOUT, 1).otherwise(0)) \
                     .withColumn("session_id", f_sum("new_session").over(user_window.rowsBetween(Window.unboundedPreceding, Window.currentRow)))

session_df.show()

这个示例将用户事件按时间间隔分组为不同会话,当两次事件间隔超过30分钟时,创建新会话。

5.7.2 同比环比分析

使用窗口函数可以方便地进行同比(与去年同期比较)和环比(与上一周期比较)分析:

from pyspark.sql.functions import month, year, lag, round

# 创建月度销售数据
sales_data = [("2021-01-01", 100), ("2021-02-01", 120), ("2021-03-01", 130),
              ("2022-01-01", 110), ("2022-02-01", 125), ("2022-03-01", 140)]
sales_df = spark.createDataFrame(sales_data, ["month", "amount"])
sales_df = sales_df.withColumn("month", col("month").cast("date")) \
                   .withColumn("year", year("month")) \
                   .withColumn("month_of_year", month("month"))

# 同比分析(与去年同期比较)
yoy_window = Window.partitionBy("month_of_year").orderBy("year")
yoy_analysis_df = sales_df.withColumn("prev_year_amount", lag("amount", 1).over(yoy_window)) \
                          .withColumn("yoy_growth", col("amount") - col("prev_year_amount")) \
                          .withColumn("yoy_growth_rate", 
                                      round((col("amount") / col("prev_year_amount") - 1) * 100, 2))

# 环比分析(与上月比较)
mom_window = Window.orderBy("month")
mom_analysis_df = yoy_analysis_df.withColumn("prev_month_amount", lag("amount", 1).over(mom_window)) \
                                 .withColumn("mom_growth", col("amount") - col("prev_month_amount")) \
                                 .withColumn("mom_growth_rate", 
                                             round((col("amount") / col("prev_month_amount") - 1) * 100, 2))

mom_analysis_df.select("month", "amount", "yoy_growth_rate", "mom_growth_rate").show()

这个示例展示了如何使用窗口函数计算同比增长率(与去年同期比较)和环比增长率(与上月比较),这在业务分析中非常常见。

6. 自定义函数:UDF、UDAF与UDTF

6.1 自定义函数概述

虽然Spark SQL提供了丰富的内置函数,但在实际应用中,我们经常需要实现特定业务逻辑的函数 Spark SQL支持三种类型的自定义函数:

  • UDF(User-Defined Function):自定义标量函数,一行输入一行输出
  • UDAF(User-Defined Aggregate Function):自定义聚合函数,多行输入一行输出
  • UDTF(User-Defined Table-Generating Function):自定义表生成函数,一行输入多行输出

自定义函数扩展了Spark SQL的能力,允许我们将复杂的业务逻辑封装为可重用的函数。

6.2 用户自定义标量函数(UDF)

UDF是最常用的自定义函数类型,用于对单个或多个列进行转换操作。

6.2.1 Python UDF

在PySpark

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值