14.spark mllib之快速入门

发布时间:2020-08-02 22:14:07 作者:菲立思教育
来源:网络 阅读:2726

简介

  MLlib是Spark提供提供机器学习的库,专为在集群上并行运行的情况而设计。
MLlib包含很多机器学习算法,可在Spark支持的所有编程语言中使用。

  MLlib设计理念是将数据以RDD的形式表示,然后在分布式数据集上调用各种算法。其实,MLlib就是RDD上一系列可供调用的函数的集合。

数据类型

  MLlib包含一些特有的数据类型,位于org.apache.spark.mllib包(Java/Scala)或pyspark.mllib(Python)中。主要的几个类有:

统计

  不论是在即时的探索中,还是在机器学习的数据理解中,基本的统计都是数据分析的重要部分。MLlib 通过mllib.stat.Statistics 类中的方法提供了几种广泛使用的统计函数,这些函数可以直接在RDD 上使用。一些常用的函数如下所列。

Statistics.colStats(rdd)

  计算由向量组成的RDD 的汇总统计,保存着向量集合中每列的最小值、最大值、平均值和方差。这可以用来在一次执行中获取丰富的统计信息。

Statistics.corr(rdd, method)

 &esmp;计算由向量组成的RDD 中的列间的相关矩阵,使用皮尔森相关(Pearson correlation)或斯皮尔曼相关(Spearman correlation)中的一种(method 必须是pearson 或spearman中的一个)。

Statistics.corr(rdd1, rdd2, method)

  计算两个由浮点值组成的RDD 的相关矩阵,使用皮尔森相关或斯皮尔曼相关中的一种(method 必须是pearson 或spearman 中的一个)。

Statistics.chiSqTest(rdd)

  计算由LabeledPoint 对象组成的RDD 中每个特征与标签的皮尔森独立性测试
(Pearson’s independence test) 结果。返回一个ChiSqTestResult 对象, 其中有p 值、(p-value)、测试统计及每个特征的自由度。标签和特征值必须是分类的(即离散值)。

  下面举个例子:使用三个学生的成绩Vector来构建所需的RDD Vector,这个矩阵里的每个Vector都代表一个学生在四门课程里的分数:

python

from pyspark.mllib.stat import Statistics
from pyspark.mllib.linalg import Vectors
//构建RDD
basicTestRDD = sc.parallelize([Vectors.dense([60, 70, 80, 0]),
                       Vectors.dense([80, 50, 0,  90]),
                       Vectors.dense([60, 70, 80,  0])])

//查看summary里的成员,这个对象中包含了大量的统计内容
>>> print summary.mean()
[ 66.66666667  63.33333333  53.33333333  30.        ]
>>> print summary.variance()
[  133.33333333   133.33333333  2133.33333333  2700.        ]
>>> print summary.numNonzeros()
[ 3.  3.  2.  1.]

scala

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD

val array1: Array[Double] = Array[Double](60, 70, 80, 0)
val array2: Array[Double] = Array[Double](80, 50, 0, 90)
val array3: Array[Double] = Array[Double](60, 70, 80, 0)
val denseArray1 = Vectors.dense(array1)
val denseArray2 = Vectors.dense(array2)
val denseArray3 = Vectors.dense(array3)

val seqDenseArray: Seq[Vector] = Seq(denseArray1, denseArray2, denseArray3)

val basicTestRDD: RDD[Vector] = sc.parallelize[Vector](seqDenseArray)

val summary: MultivariateStatisticalSummary = Statistics.colStats(basicTestRDD)

算法

特征提取

降维

分类与回归

聚类

协同过滤与推荐

实例

使用逻辑回归算法实现垃圾邮件分类处理

def testLogisticRegressionWithSGD = {
    val spam = sc.textFile("src/main/resources/mllib/spam.txt", 1)
    val normal = sc.textFile("src/main/resources/mllib/normal.txt", 1)

    //创建一个HashingTF实例来把邮件文本映射为包含一个10000个特征的向量
    val tf = new HashingTF(numFeatures = 10000)
    //各邮件都被切分为单词,每个单词被映射为一个特征
    val spamFeatures = spam.map { email => tf.transform(email.split(" ")) }
    val normalFeatures = normal.map { email => tf.transform(email.split(" ")) }

    //创建LabeledPoint数据集分别存放阳性(垃圾邮件)和阴性(正常邮件)的例子
    val positiveExamples = spamFeatures.map { features => LabeledPoint(1, features) }
    val negativeExamples = normalFeatures.map { features => LabeledPoint(0, features) }
    val trainingData = positiveExamples.union(negativeExamples)
    trainingData.cache()
    println(trainingData.toDebugString)

    //使用SGD算法运行逻辑回归
    val model = new LogisticRegressionWithSGD().run(trainingData)
    //以阳性(垃圾邮件)和阴性(正常邮件)的例子分别进行测试
    val posTest = tf.transform("O M G get cheap stuff by sending money to .".split(" "))
    val negTest = tf.transform("hello, i started studying Spark ".split(" "))
    println(s"prediction for positive tset example: ${model.predict(posTest)}")
    println(s"prediction for negitive tset example: ${model.predict(negTest)}")

    Thread.sleep(Int.MaxValue)
  }

svm分类算法

# 加载模块
from pyspark.mllib.util import MLUtils
from pyspark.mllib.classification import SVMWithSGD

# 读取数据
dataFile = '/opt/spark-1.6.1-bin-hadoop2.6/data/mllib/sample_libsvm_data.txt'
data = MLUtils.loadLibSVMFile(sc, dataFile)

splits = data.randomSplit([0.8, 0.2], seed = 9L)
training = splits[0].cache()
test = splits[1]

# 打印分割后的数据量
print "TrainingCount:[%d]" % training.count();
print "TestingCount:[%d]" % test.count();

model = SVMWithSGD.train(training, 100)

scoreAndLabels = test.map(lambda point : (model.predict(point.features), point.label))

#输出结果,包含预测的数字结果和0/1结果:
for score, label in scoreAndLabels.collect():
    print score, label

k-means聚类算法

# 读取数据文件,创建RDD
dataFile = "/opt/spark-1.6.1-bin-hadoop2.6/data/mllib/kmeans_data.txt"
lines = sc.textFile(dataFile)

# 创建Vector,将每行的数据用空格分隔后转成浮点值返回numpy的array
data = lines.map(lambda line: np.array([float(x) for x in line.split(' ')]))

# 其中2是簇的个数
model = KMeans.train(data, 2)

print("Final centers: " + str(model.clusterCenters))
print("Total Cost: " + str(model.computeCost(data)))

忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。

14.spark mllib之快速入门

推荐阅读:
  1. JavaEE基础(04):会话跟踪技术,Session和Cookie详解
  2. 关于c语言学生宿舍管理系统

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

sparkmllib vector labeledpoint

上一篇:free命令查看内存

下一篇:go语言碎片整理之strconv

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》