您好,登录后才能下订单哦!
这篇文章主要介绍DataFrame怎么用,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
DataFrame是一个分布式数据集,可以理解为关系型数据库一张表,由字段和字段类型、字段值按列组织,且支持四种语言,在Scala API中可以理解为: FataFrame=Dataset[ROW]
注:DataFrame产生于V1.3之后,在V1.3前为SchemaRDD,在V1.6以后又添加了Dataset
概念 : 两个都是分布式容器,DF理解是一个表格除了RDD数据以外还有Schema,也支持复杂数据类型(map..) API : DataFrame提供的API比RDD丰富 支持map filter flatMap ..... 数据结构:RDD知道类型没有结构, DF提供Schema信息 有利于优化,性能上好 底层 :基于运行环境不一样,RDD开发的Java/Scala API运行底层环境JVM, DF在SparkSQL中转换成逻辑执行计划(locaical Plan)和物理执行计划(Physical Plan)中间自身优化功能,性能差异大
[hadoop@hadoop001 bin]$./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.34-bin.jar
-- 读取json文件
scala>val df = spark.read.json("file:///home/hadoop/data/people.json")
18/09/02 11:47:20 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
-- 打印schema信息
scala> df.printSchema
root |-- age: long (nullable = true) -- 字段 类型 允许为空 |-- name: string (nullable = true)
-- 打印字段内容
scala> df.show
+----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
-- 打印查询字段
scala> df.select("name").show
+-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+
-- 单引号,存在隐式转换
scala> df.select('name).show
+-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+
-- 双引号隐式转换不识别
scala> df.select("name).show
<console>:1: error: unclosed string literal
df.select("name).show
^
-- 年龄计算,NULL无法计算
scala> df.select($"name",$"age" + 1).show
+-------+---------+ | name|(age + 1)| +-------+---------+ |Michael| null| | Andy| 31| | Justin| 20| +-------+---------+
-- 年龄过滤
scala> df.filter($"age" > 21).show
+---+----+ |age|name| +---+----+ | 30|Andy| +---+----+
-- 年龄分组 汇总
scala> df.groupBy("age").count.show
+----+-----+ | age|count| +----+-----+ | 19| 1| |null| 1| | 30| 1| +----+-----+
-- 创建一个临时视图
scala> df.createOrReplaceTempView("people")
scala>spark.sql("select * from people").show
+----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
-- 定义case class 用来创建Schema
case class Student(id:String,name:String,phone:String,Email:String)
-- RDD与DF反射方式实现
val students = sc.textFile("file:///home/hadoop/data/student.data").map(_.split("\\|")).map(x=>Student(x(0),x(1),x(2),x(3))).toDF()
-- 打印DF信息
students.printSchema
-- show(numRows: Int, truncate: Boolean)
-- numRows截取前20行和truncate读取前20字符串
-- students.show(5,false) 读取前五行和所有字符串
scala> students.show
+---+--------+--------------+--------------------+ | id| name| phone| Email| +---+--------+--------------+--------------------+ | 1| Burke|1-300-746-8446|ullamcorper.velit...| | 2| Kamal|1-668-571-5046|pede.Suspendisse@...| | 3| Olga|1-956-311-1686|Aenean.eget.metus...| | 4| Belle|1-246-894-6340|vitae.aliquet.nec...| | 5| Trevor|1-300-527-4967|dapibus.id@acturp...| | 6| Laurel|1-691-379-9921|adipiscing@consec...| | 7| Sara|1-608-140-1995|Donec.nibh@enimEt...| | 8| Kaseem|1-881-586-2689|cursus.et.magna@e...| | 9| Lev|1-916-367-5608|Vivamus.nisi@ipsu...| | 10| Maya|1-271-683-2698|accumsan.convalli...| | 11| Emi|1-467-270-1337|est@nunc.com|.......| | 12| Caleb|1-683-212-0896|Suspendisse@Quisq...| | 13|Florence|1-603-575-2444|sit.amet.dapibus@...| | 14| Anika|1-856-828-7883|euismod@ligulaeli...| | 15| Tarik|1-398-171-2268|turpis@felisorci.com| | 16| Amena|1-878-250-3129|lorem.luctus.ut@s...| | 17| Blossom|1-154-406-9596|Nunc.commodo.auct...| | 18| Guy|1-869-521-3230|senectus.et.netus...| | 19| Malachi|1-608-637-2772|Proin.mi.Aliquam@...| | 20| Edward|1-711-710-6552|lectus@aliquetlib...| +---+--------+--------------+--------------------+ only showing top 20 rows
-- students.head(5) 返回前几行数据
scala> students.head(5).foreach(println) [1,Burke,1-300-746-8446,ullamcorper.velit.in@ametnullaDonec.co.uk] [2,Kamal,1-668-571-5046,pede.Suspendisse@interdumenim.edu] [3,Olga,1-956-311-1686,Aenean.eget.metus@dictumcursusNunc.edu] [4,Belle,1-246-894-6340,vitae.aliquet.nec@neque.co.uk] [5,Trevor,1-300-527-4967,dapibus.id@acturpisegestas.net]
-- 查询具体字段
scala> students.select("id","name").show(5) +---+------+ | id| name| +---+------+ | 1| Burke| | 2| Kamal| | 3| Olga| | 4| Belle| | 5|Trevor| +---+------+
-- 修改字段取别名
scala> students.select($"name".as("new_name")).show(5)
+--------+ |new_name| +--------+ | Burke| | Kamal| | Olga| | Belle| | Trevor| +--------+
--查询id大于五
scala> students.filter("id>5").show(5)
+---+------+--------------+--------------------+ | id| name| phone| Email| +---+------+--------------+--------------------+ | 6|Laurel|1-691-379-9921|adipiscing@consec...| | 7| Sara|1-608-140-1995|Donec.nibh@enimEt...| | 8|Kaseem|1-881-586-2689|cursus.et.magna@e...| | 9| Lev|1-916-367-5608|Vivamus.nisi@ipsu...| | 10| Maya|1-271-683-2698|accumsan.convalli...| +---+------+--------------+--------------------+
-- 查询名称为空或者名称为NULL(filter=where)
scala> students.filter("name=''or name='NULL'").show(false)
+---+----+--------------+--------------------------+ |id |name|phone |Email | +---+----+--------------+--------------------------+ |21 | |1-711-710-6552|lectus@aliquetlibero.co.uk| |22 | |1-711-710-6552|lectus@aliquetlibero.co.uk| |23 |NULL|1-711-710-6552|lectus@aliquetlibero.co.uk| +---+----+--------------+--------------------------+
-- 查询ID大于5且名称模糊查询
scala> students.filter("id>5 and name like 'M%'").show(5)
+---+-------+--------------+--------------------+ | id| name| phone| Email| +---+-------+--------------+--------------------+ | 10| Maya|1-271-683-2698|accumsan.convalli...| | 19|Malachi|1-608-637-2772|Proin.mi.Aliquam@...| +---+-------+--------------+--------------------+
-- 按照名称升序排序且不等于空
scala> students.sort($"name").select("id","name").filter("name <> ''").show(3)
+---+-----+ | id| name| +---+-----+ | 16|Amena| | 14|Anika| | 4|Belle| +---+-----+
-- 按照名称倒叙排序(sort = orderBy)
scala> students.sort($"name".desc).select("name").show(5)
+------+ | name| +------+ |Trevor| | Tarik| | Sara| | Olga| | NULL| +------+
-- 年龄分组 汇总
scala> students.groupBy("age").count().show
+----+-----+ | age|count| +----+-----+ | 19| 1| |null| 1| | 30| 1| +----+-----+
-- 聚合函数使用
scala> students.agg("id" -> "max", "id" -> "sum").show(false)
+-------+-------+ |max(id)|sum(id)| +-------+-------+ |9 |276.0 | +-------+-------+
-- join操作,using模式seq指定多个字段
students.join(students2, Seq("id", "name"), "inner")
-- DataFrame的join操作有inner, outer, left_outer, right_outer, leftsemi类型
-- 指定类型,指定join的类型
students.join(students2 , students("id" ) === students2( "t1_id"), "inner")
1.maven依赖下载
<spark.version>2.3.1</spark.version> <!-- 添加Spark Core的dependency --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- 添加Spark SQL的dependency --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency>
2、IDEA实现方式:
package com.zrc.ruozedata.sparkSQL import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object SparkSQL001 extends App { /* * RDD与DataFrame反射方式实现(一) * 创建RDD --> DataFrema * 利用case class创建Schema,来解析输出文本每一行信息 */ val spark = SparkSession.builder() .master("local[2]") .appName("SparkSQL001") .getOrCreate() // 操作hive添加 val infos = spark.sparkContext.textFile("file:///F:/infos.txt") /* import spark.implicits._ val infoDF = infos.map(_.split(",")).map(x=>Info(x(0).toInt,x(1),x(2).toInt)).toDF() infoDF.show() */ /* * RDD与DataFrame使用StructType方式实现(二) * StructType构造了StructField方法传入name和dataType * 每一个字段就是为一个StructField * Schema和RDD通过createDataFrame方法作用起来 */ // 注意通过ROW获取的需要转换对应类型 val infoss = infos.map(_.split(",")).map(x=>Row(x(0).trim.toInt,x(1),x(2).trim.toInt)) val fields = StructType( Array( StructField("id",IntegerType,true), StructField("name",StringType,true), StructField("age",IntegerType,true) ) ) val schema = StructType(fields) val infoDF = spark.createDataFrame(infoss,schema) infoDF.show() spark.stop() } // case class Info (id:Int,name:String,age:Int)
以上是“DataFrame怎么用”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注亿速云行业资讯频道!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。