Apache Spark 的 diff()
函数用于计算两个 DataFrame 或 Dataset 之间的差异
join()
函数将两个 DataFrame 或 Dataset 进行连接,然后使用 withColumn()
函数创建一个新列,该列表示原始 DataFrame 或 Dataset 中的行与另一个 DataFrame 或 Dataset 中的行的差异。from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
# 创建 Spark 会话
spark = SparkSession.builder \
.appName("Spark Diff Example") \
.getOrCreate()
# 创建第一个 DataFrame
data1 = [("A", 1), ("B", 2), ("C", 3)]
columns1 = ["ID", "Value"]
df1 = spark.createDataFrame(data1, columns1)
# 创建第二个 DataFrame
data2 = [("A", 1), ("B", 2), ("D", 4)]
columns2 = ["ID", "Value"]
df2 = spark.createDataFrame(data2, columns2)
# 使用 join() 函数将两个 DataFrame 连接在一起,然后使用 withColumn() 函数创建一个新列,表示差异
df_diff = df1.join(df2, on=["ID"], how="outer").na.fill({"Value": float("inf")}).withColumn("Diff", when(col("Value1") != col("Value2"), 1).otherwise(0))
# 显示结果
df_diff.show()
在这个例子中,我们首先创建了两个 DataFrame df1
和 df2
。然后,我们使用 join()
函数将它们连接在一起,并通过 na.fill()
函数处理可能出现的空值。接下来,我们使用 withColumn()
函数创建一个新列 “Diff”,该列表示原始 DataFrame 中的行与另一个 DataFrame 中的行的差异。最后,我们使用 show()
函数显示结果。
except()
或 filter()
函数计算两个 DataFrame 或 Dataset 之间的差异。from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# 创建 Spark 会话
spark = SparkSession.builder \
.appName("Spark Diff Example") \
.getOrCreate()
# 创建第一个 DataFrame
data1 = [("A", 1), ("B", 2), ("C", 3)]
columns1 = ["ID", "Value"]
df1 = spark.createDataFrame(data1, columns1)
# 创建第二个 DataFrame
data2 = [("A", 1), ("B", 2), ("D", 4)]
columns2 = ["ID", "Value"]
df2 = spark.createDataFrame(data2, columns2)
# 使用 except() 函数计算两个 DataFrame 之间的差异
df_diff1 = df1.except(df2)
# 或者使用 filter() 函数计算两个 DataFrame 之间的差异
df_diff2 = df1.filter(~col("ID").isin(df2["ID"]))
# 显示结果
df_diff1.show()
df_diff2.show()
在这个例子中,我们首先创建了两个 DataFrame df1
和 df2
。然后,我们使用 except()
函数计算两个 DataFrame 之间的差异,得到一个新的 DataFrame df_diff1
。另外,我们还展示了如何使用 filter()
函数计算两个 DataFrame 之间的差异,得到一个新的 DataFrame df_diff2
。最后,我们使用 show()
函数显示结果。
这两种方法都可以用于处理多种数据,具体取决于你的需求和数据结构。