您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Spark MLlib 聚类 KMeans 怎么用
Apache Spark 的 MLlib 是一个强大的机器学习库,提供了多种聚类算法,其中 KMeans 是最常用的无监督学习算法之一。本文将详细介绍如何在 Spark MLlib 中使用 KMeans 进行聚类分析。
## 目录
1. [KMeans 算法简介](#kmeans-算法简介)
2. [环境准备](#环境准备)
3. [数据准备](#数据准备)
4. [KMeans 模型训练](#kmeans-模型训练)
5. [模型评估](#模型评估)
6. [模型保存与加载](#模型保存与加载)
7. [完整代码示例](#完整代码示例)
8. [总结](#总结)
---
## KMeans 算法简介
KMeans 是一种基于距离的聚类算法,通过迭代将数据点划分为 K 个簇,使得每个数据点属于离其最近的簇中心(质心)。算法步骤如下:
1. 随机初始化 K 个质心
2. 将每个数据点分配到最近的质心
3. 重新计算每个簇的质心(取簇内点的均值)
4. 重复步骤 2-3 直到质心不再变化或达到最大迭代次数
Spark MLlib 实现了分布式版本的 KMeans,适合处理大规模数据集。
---
## 环境准备
确保已安装以下环境:
- Java 8+
- Spark 3.x
- Scala 2.12 或 Python(PySpark)
Maven 依赖(Scala):
```xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>3.2.1</version>
</dependency>
KMeans 要求输入数据为 Vector
类型。假设我们有一个文本文件 data.txt
,每行是用空格分隔的数值:
1.2 3.4 2.1
0.5 1.2 0.9
...
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("KMeansExample").getOrCreate()
val data = spark.read.textFile("data.txt")
// 转换为Vector RDD
val parsedData = data.map { line =>
Vectors.dense(line.split(" ").map(_.toDouble))
}.toDF("features")
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("KMeansExample").getOrCreate()
data = spark.read.text("data.txt")
def parse_line(line):
return [float(x) for x in line.value.split(" ")]
parsedData = data.rdd.map(parse_line).map(Vectors.dense).toDF(["features"])
k
: 簇数量(必填)maxIter
: 最大迭代次数(默认20)seed
: 随机种子(可选)tol
: 收敛阈值(默认1e-4)import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans()
.setK(3)
.setMaxIter(20)
.setSeed(1L)
val model = kmeans.fit(parsedData)
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=3, maxIter=20, seed=1)
model = kmeans.fit(parsedData)
val wcss = model.computeCost(parsedData)
println(s"Within-Cluster Sum of Squares = $wcss")
val predictions = model.transform(parsedData)
predictions.show()
输出示例:
+-----------------+----------+
| features|prediction|
+-----------------+----------+
|[1.2,3.4,2.1] | 1|
|[0.5,1.2,0.9] | 0|
...
+-----------------+----------+
// 保存
model.save("/path/to/model")
// 加载
import org.apache.spark.ml.clustering.KMeansModel
val sameModel = KMeansModel.load("/path/to/model")
# 保存
model.save("/path/to/model")
# 加载
from pyspark.ml.clustering import KMeansModel
sameModel = KMeansModel.load("/path/to/model")
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
object KMeansExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("KMeansExample").getOrCreate()
val data = spark.read.textFile("data.txt")
val parsedData = data.map { line =>
Vectors.dense(line.split(" ").map(_.toDouble))
}.toDF("features")
val kmeans = new KMeans()
.setK(3)
.setMaxIter(20)
.setSeed(1L)
val model = kmeans.fit(parsedData)
val wcss = model.computeCost(parsedData)
println(s"WCSS: $wcss")
model.transform(parsedData).show()
spark.stop()
}
}
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.appName("KMeansExample").getOrCreate()
data = spark.read.text("data.txt")
parsedData = data.rdd.map(lambda x: [float(v) for v in x.value.split(" ")])\
.map(Vectors.dense).toDF(["features"])
kmeans = KMeans(k=3, maxIter=20, seed=1)
model = kmeans.fit(parsedData)
wcss = model.computeCost(parsedData)
print(f"WCSS: {wcss}")
model.transform(parsedData).show()
spark.stop()
通过 Spark MLlib 的 KMeans 实现,我们可以轻松处理大规模数据集的聚类任务。关键步骤包括: 1. 准备向量格式的数据 2. 设置合理的 K 值和迭代参数 3. 评估模型质量(如WCSS) 4. 保存模型供后续使用
实际应用中可能需要尝试不同的 K 值,或结合其他技术(如PCA)提高聚类效果。 “`
注:实际字数约1500字,可根据需要调整代码示例的详细程度或增加更多理论解释来扩展内容。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。