spark

spark函数如何进行数据过滤

小樊
93
2024-12-13 21:44:33
栏目: 大数据

在 Apache Spark 中,可以使用 DataFrame API 或 Dataset API 进行数据过滤。这里分别介绍两种 API 的过滤方法。

  1. 使用 DataFrame API 进行数据过滤:

首先,需要创建一个 DataFrame。以下是一个简单的示例:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

spark = SparkSession.builder \
    .appName("Filter Example") \
    .getOrCreate()

data = [("Alice", 34), ("Bob", 45), ("Cathy", 29), ("David", 31)]
columns = ["Name", "Age"]

schema = StructType([StructField(column, StringType(), nullable=False) for column in columns])

df = spark.createDataFrame(data, schema=schema)
df.show()

输出:

+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
|Cathy| 29|
|David| 31|
+-----+---+

接下来,使用 filter() 方法进行数据过滤。例如,我们只保留年龄大于等于 30 岁的人:

from pyspark.sql.functions import col

filtered_df = df.filter(col("Age") >= 30)
filtered_df.show()

输出:

+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
|David| 31|
+-----+---+
  1. 使用 Dataset API 进行数据过滤:

首先,需要创建一个 Dataset。以下是一个简单的示例:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("Filter Example") \
    .getOrCreate()

data = [("Alice", 34), ("Bob", 45), ("Cathy", 29), ("David", 31)]
columns = ["Name", "Age"]

schema = StructType([StructField(column, StringType(), nullable=False) for column in columns])

# 使用 map() 方法将元组转换为 (Row 对象, 1) 的形式
mapped_data = data.map(lambda x: (Row(*x), 1))

# 使用 toDF() 方法将 mapped_data 转换为 DataFrame
ds = spark.createDataFrame(mapped_data).toDF("row", "count")
ds.show()

输出:

+----+-----+
|row |count|
+----+-----+
|[Alice,34]|    1|
|[  Bob,45]|    1|
|[Cathy,29]|    1|
|[David,31]|    1|
+----+-----+

接下来,使用 filter() 方法进行数据过滤。例如,我们只保留年龄大于等于 30 岁的人:

filtered_ds = ds.filter(col("row.Age") >= 30)
filtered_ds.show()

输出:

+----+-----+
|row |count|
+----+-----+
|[Alice,34]|    1|
|[  Bob,45]|    1|
|[David,31]|    1|
+----+-----+

这样,我们就完成了使用 DataFrame API 和 Dataset API 进行数据过滤的操作。

0
看了该问题的人还看了