不使用Sqoop流程,利用CacheManager直接完成SparkSQL数据流直接回写Oracle

发布时间:2020-07-28 07:16:19 作者:Rawirm
来源:网络 阅读:6848

以前都是使用Sqoop来完成数据从生成的hdfs数据存储上来抽取至oracle的数据库:sqoop抽取语句:
sqoop export --connect "jdbc:oracle:thin:@ip:port:sid" --username 用户名 --password 密码 --table sid.表名 --export-dir hdfs://nameservice1/user/XXX(hdfs地址) --fields-terminated-by "\001" --null-non-string '' --null-string '' -m 10;

 由于项目需求我们现在要完成在代码中省城所需字段之后,直接回写到oracle中,因为数据量每天都很大,用实例或者List存有很大的局限性,可能会出现内存异常等不可预料的东西,所以我通过缓存器机制来存储数据,然后进行生成结果的临时表直接回写(后面做的hbase接口封装批量提交也比较类似)
 废话不多说直接上代码:
 1、建立缓存实体
 package usi.java.oracle;

/**

2、建立缓存控制器
package usi.java.oracle;

import java.util.Date;
import java.util.HashMap;

/**

}

3、建立需要导出数据对象
package usi.java.oracle;

public class TaskAll {
private String mme_eid;
private String mme_editor;
private String entitytype_eid;
private String project_eid;
private String resource_eid;
public String getMme_eid() {
return mme_eid;
}
public void setMme_eid(String mme_eid) {
this.mme_eid = mme_eid;
}
public String getMme_editor() {
return mme_editor;
}
public void setMme_editor(String mme_editor) {
this.mme_editor = mme_editor;
}
public String getEntitytype_eid() {
return entitytype_eid;
}
public void setEntitytype_eid(String entitytype_eid) {
this.entitytype_eid = entitytype_eid;
}
public String getProject_eid() {
return project_eid;
}
public void setProject_eid(String project_eid) {
this.project_eid = project_eid;
}
public String getResource_eid() {
return resource_eid;
}
public void setResource_eid(String resource_eid) {
this.resource_eid = resource_eid;
}

}
5、执行逻辑主体,回写数据,批量提交

package usi.java.oracle;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
//import java.sql.ResultSet;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;

public class redict_to_171ora {
public static void main(String[] args) {
SparkConf sc = new SparkConf().setAppName("redict_to_171ora");
SparkContext jsc = new SparkContext(sc);
HiveContext hc = new HiveContext(jsc);
String hivesql1="select t.mme_eid,t.mme_editor,t.entitytype_eid,t.project_eid,t.resource_eid from usi_odso.c_taskall t limit 150000";

    DataFrame redict_to_171ora= hc.sql(hivesql1);
    //redict_to_171ora.registerTempTable("hivesql1");       
    List<Row> collect=redict_to_171ora.javaRDD().collect();

    int o=0;
    for (Row lists: collect){
        TaskAll task=new TaskAll();
        task.setMme_eid(lists.getString(0));
        task.setMme_editor(lists.getString(1));
        task.setEntitytype_eid(lists.getString(2));
        task.setProject_eid(lists.getString(3));
        task.setResource_eid(lists.getString(4));
        CacheManager.putContent(o+"", task, 30000000);
        o++;
    /* System.out.println(lists.size());
     System.out.println(lists.getString(0));
     System.out.println(lists.getString(1));
     System.out.println(lists.getString(2));
     System.out.println(lists.getString(3));
     System.out.println(lists.getString(4));*/
      }
    System.out.println(o);

        Connection con = null;// 创建一个数据库连接
        PreparedStatement pre = null;// 创建预编译语句对象,一般都是用这个而不用Statement
        //ResultSet result = null;// 创建一个结果集对象
        try
        {
            Class.forName("oracle.jdbc.driver.OracleDriver");// 加载Oracle驱动程序
            System.out.println("开始尝试连接数据库!");
            String url = "jdbc:oracle:" + "thin:@ip:1521:sid";// 127.0.0.1是本机地址,XE是精简版Oracle的默认数据库名
            String user = "user";// 用户名,系统默认的账户名
            String password = "password";// 你安装时选设置的密码
            con = DriverManager.getConnection(url, user, password);// 获取连接
            System.out.println("连接成功!");
            String sql = "insert into c_taskall_test(mme_eid,mme_editor,entitytype_eid,project_eid,resource_eid) values(?,?,?,?,?)";// 预编译语句,“?”代表参数
            pre = con.prepareStatement(sql);// 实例化预编译语句
            for(int i=0;i<o;i++){
           // for (Row lists: collect){             
           // String sql = "insert into c_taskall_test(mme_eid,mme_editor,entitytype_eid,project_eid,resource_eid) values('"+task.getMme_eid()+"','"+task.getMme_editor()+"','"+task.getEntitytype_eid()+"','"+task.getProject_eid()+"','"+task.getResource_eid()+"')";// 预编译语句,“?”代表参数                                         
           // pre.setString(1, "三星");// 设置参数,前面的1表示参数的索引,而不是表中列名的索引
            TaskAll task=(TaskAll) CacheManager.getContent(""+i).getValue();
            pre.setString(1, task.getMme_eid());
            pre.setString(2, task.getMme_editor());
            pre.setString(3, task.getEntitytype_eid());
            pre.setString(4, task.getProject_eid());
            pre.setString(5, task.getResource_eid());
            pre.addBatch(); 
            if(i%20000==0){//可以设置不同的大小;如50,100,500,1000等等      
            pre.executeBatch();      
            con.commit();      
            pre.clearBatch();      
           // System.out.println("i的值"+i);
            }
           // result = pre.executeQuery();// 执行查询,注意括号中不需要再加参数             
            }
            pre.executeBatch();      
            con.commit();      
            pre.clearBatch();      
           // System.out.println("i的值"+i);
          /*  if (result != null)
                result.close();*/
            if (pre != null)
                pre.close();
           /* while (result.next())
                // 当结果集不为空时
                System.out.println("usernum:" + result.getString("usernum") + "flow:"
                        + result.getString("flow"));*/
           }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            try
            {
                // 逐一将上面的几个对象关闭,因为不关闭的话会影响性能、并且占用资源
                // 注意关闭的顺序,最后使用的最先关闭
              /*  if (result != null)
                    result.close();*/
                if (pre != null)
                    pre.close();
                if (con != null)
                    con.close();
                //System.out.println("数据库连接已关闭!");
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }

    }

}

推荐阅读:
  1. sparkSQL来完成对Hive的操作
  2. sqoop基本使用

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

hive sparksql sqoop

上一篇:bash的算术运算和条件测试语句基础

下一篇:25G与100G以太网光模块解决方案

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》