您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
Spark2.2.0中RDD转DataFrame的方式是怎样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
Spark SQL将现有的RDDs转换为数据集。
方法:使用反射来推断包含特定对象类型的RDD的模式。这种基于反射的方法使代码更加简洁,并且当您在编写Spark应用程序时已经了解了模式时,它可以很好地工作。
第一种方法代码实例java版本实现:
数据准备studentDatatxt
1001,20,zhangsan1002,17,lisi1003,24,wangwu1004,16,zhaogang
本地模式代码实现:
package com.unicom.ljs.spark220.study;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SQLContext;/*** @author: Created By lujisen* @company ChinaUnicom Software JiNan* @date: 2020-01-20 08:58* @version: v1.0* @description: com.unicom.ljs.spark220.study*/public class RDD2DataFrameReflect {public static void main(String[] args) {SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD2DataFrameReflect");JavaSparkContext sc = new JavaSparkContext(sparkConf);SQLContext sqlContext=new SQLContext(sc);JavaRDD<String> lines = sc.textFile("C:\\Users\\Administrator\\Desktop\\studentData.txt");JavaRDD<Student2> studentRDD = lines.map(new Function<String, Student2>() {@Overridepublic Student2 call(String line) throws Exception {String[] split = line.split(",");Student2 student=new Student2();student.setId(Integer.valueOf(split[0]));student.setAge(Integer.valueOf(split[1]));student.setName(split[2]);return student;}});//使用反射方式将RDD转换成dataFrame//将Student.calss传递进去,其实就是利用反射的方式来创建DataFrameDataset<Row> dataFrame = sqlContext.createDataFrame(studentRDD, Student2.class);//拿到DataFrame之后将其注册为临时表,然后针对其中的数据执行SQL语句dataFrame.registerTempTable("studentTable");//针对student临时表,执行sql语句查询年龄小于18岁的学生,/*DataFrame rowDF */Dataset<Row> dataset = sqlContext.sql("select * from studentTable where age < 18");JavaRDD<Row> rowJavaRDD = dataset.toJavaRDD();JavaRDD<Student2> ageRDD = rowJavaRDD.map(new Function<Row, Student2>() {@Overridepublic Student2 call(Row row) throws Exception {Student2 student = new Student2();student.setId(row.getInt(0));student.setAge(row.getInt(1));student.setName(row.getString(2));return student;}});ageRDD.foreach(new VoidFunction<Student2>() {@Overridepublic void call(Student2 student) throws Exception {System.out.println(student.toString());}});}}
Student2类:
package com.unicom.ljs.spark220.study;import java.io.Serializable;/*** @author: Created By lujisen* @company ChinaUnicom Software JiNan* @date: 2020-01-20 08:57* @version: v1.0* @description: com.unicom.ljs.spark220.study*/public class Student2 implements Serializable {int id;int age;String name;public int getId() {return id;}public void setId(int id) {this.id = id;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}public String getName() {return name;}public void setName(String name) {this.name = name;}@Overridepublic String toString() {return "Student2{" +"id=" + id +", age=" + age +", name='" + name + '\'' +'}';}}
pom.xml关键依赖:
<spark.version>2.2.0</spark.version>
<scala.version>2.11.8</scala.version><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version></dependency>
看完上述内容,你们掌握Spark2.2.0中RDD转DataFrame的方式是怎样的的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。