在 Apache Spark 中,可以使用 DataFrame API 或 Dataset API 进行数据过滤。这里分别介绍两种 API 的过滤方法。
首先,需要创建一个 DataFrame。以下是一个简单的示例:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
spark = SparkSession.builder \
.appName("Filter Example") \
.getOrCreate()
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29), ("David", 31)]
columns = ["Name", "Age"]
schema = StructType([StructField(column, StringType(), nullable=False) for column in columns])
df = spark.createDataFrame(data, schema=schema)
df.show()
输出:
+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
| Bob| 45|
|Cathy| 29|
|David| 31|
+-----+---+
接下来,使用 filter()
方法进行数据过滤。例如,我们只保留年龄大于等于 30 岁的人:
from pyspark.sql.functions import col
filtered_df = df.filter(col("Age") >= 30)
filtered_df.show()
输出:
+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
| Bob| 45|
|David| 31|
+-----+---+
首先,需要创建一个 Dataset。以下是一个简单的示例:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col
spark = SparkSession.builder \
.appName("Filter Example") \
.getOrCreate()
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29), ("David", 31)]
columns = ["Name", "Age"]
schema = StructType([StructField(column, StringType(), nullable=False) for column in columns])
# 使用 map() 方法将元组转换为 (Row 对象, 1) 的形式
mapped_data = data.map(lambda x: (Row(*x), 1))
# 使用 toDF() 方法将 mapped_data 转换为 DataFrame
ds = spark.createDataFrame(mapped_data).toDF("row", "count")
ds.show()
输出:
+----+-----+
|row |count|
+----+-----+
|[Alice,34]| 1|
|[ Bob,45]| 1|
|[Cathy,29]| 1|
|[David,31]| 1|
+----+-----+
接下来,使用 filter()
方法进行数据过滤。例如,我们只保留年龄大于等于 30 岁的人:
filtered_ds = ds.filter(col("row.Age") >= 30)
filtered_ds.show()
输出:
+----+-----+
|row |count|
+----+-----+
|[Alice,34]| 1|
|[ Bob,45]| 1|
|[David,31]| 1|
+----+-----+
这样,我们就完成了使用 DataFrame API 和 Dataset API 进行数据过滤的操作。