[生产库实战] 采用存储过程对生产线历史表数据进行归档

发布时间:2020-07-13 07:07:13 作者:s_o_m
来源:网络 阅读:1666

    生产线历史数据归档是数据库运维的一项日常基本工作。在建表设计时,通常都将数据流水表(如:日志、用户登录历史,软件下载记录,用户属性更改历史表等)设计为范围分区表、间隔分区表(11G),当超过业务要求的保留范围时,此数据基本是静态数据且应用程序再不对其进行访问,但有可能会由于某些特殊要求需要手动查询。在这个情况下,都将其数据从生产库归档至历史库,并对其进行压缩保存,如果超出特殊要求查询的范围,那就直接将其导出压缩备份到磁带。

   在Oracle数据库中,用做表数据归档操作的方法很多,如:exp/imp、expdp/impdp、传输表空间等方法,这些都是日常数据库维护可能使用到的,这些工具的使用方法这里不展开了,下面进入今天的主题,使用存储过程归档生产线历史表数据,先对其简单做下总结:


   1、简单,不容易出错;

   2、对其源库(这里就是生产库)来说,就是一条select查询语句,性能影响小;

   3、数据从源库到目标库不需要落地,和dblink+impdp迁移数据的方法类似,节约导出数据所需空间(上百GB的表)及归档时间;

   4、可监控其归档进度及归档记录数;

   5、如果是跨机房传输,请监控好网络带宽流量。

   6、......


   操作流程:

   1、在生产库维护用户的Schema下创建一张视图,视图中包含需要归档的表的分区:


create view log_table_p201209
as
select * from user01.log_table partition(P201209);

   注:为什么要建视图? 因为通过dblink查询不能用 select * from table partition (partition_name).....这样的语句。


   2、在历史库放归档数据的用户下(历史库操作都下面都在此用户下操作)创建数据归档状态表及序列

-- Create table
create table data_archive_status
(
  id   NUMBER,
  threadno   NUMBER,
  table_name VARCHAR2(60),
  syncnum    NUMBER,
  state      NUMBER,
  starttime  DATE,
  synctime   DATE,
  remark     VARCHAR2(2000)
);
-- Add comments to the columns
comment on column data_archive_progress.state
  is '0:开始,1:打开、解析游标,2:提取数据,3:某个表同步完成,4:所有表全部完成,其他负数:错误编码';

-- Create sequence
create sequence seq_id
minvalue 1
maxvalue 9999999999999
start with 1
increment by 1
cache 20;


  3、在历史库创建一个可以通过只读权限连接生产库的dblink,示例:


-- Create database link
create database link XXDB.LOCALDOMAIN
  connect to readonly
  identified by ""
  using '(DESCRIPTION=
    (LOAD_BALANCE=no)
    (ADDRESS_LIST=
      (ADDRESS=
        (PROTOCOL=TCP)
        (HOST=172.16.XX.XX)
        (PORT=1521)
      )
      (ADDRESS=
        (PROTOCOL=TCP)
        (HOST=172.16.XX.XX)
        (PORT=1521)
      )
    )
    (CONNECT_DATA=
      (FAILOVER_MODE=
        (TYPE=select)
        (METHOD=basic)
        (RETRIES=180)
        (DELAY=5)
      )
      (SERVER=dedicated)
      (SERVICE_NAME=XX_service)
    )
  )';


  4、历史库创建一张与生产库相同表结构的表,表名建议改为带上归档数据标识


create tabel log_table_p201209(......);


  5、 创建用于数据归档的存储过程:


 create procedure p_log_table_p201209 as
  --索引表
  type u_type is table of log_table_p201209%rowtype index by pls_integer;
  v_list u_type;
  --定义数组,存放待同步的视图名称。
  type varchar_arrary is table of varchar2(60) index by pls_integer;
  v_remoteview_list varchar_arrary;
  --定义一个引用索引
  type cur_ref is ref cursor;
  cur_data cur_ref;
  --本地变量,记录SQL%ROWCOUNT
  v_counter  number := 0;
  v_rowid    rowid;
  v_sqlcode  varchar2(300) := null;
  v_querystr varchar(1000) := null;
  v_parse_elapsed_s date := null;
  v_parse_elapsed_e date := null;
  v_fetch_elapsed_s date := null;
  v_fetch_elapsed_e date := null;
begin
  --初始化数组(第1步中创建的视图
  v_remoteview_list(1) := 'zhanghui.log_table_p201209';
  --循环同步每个分区表
  for k in 1 .. v_remoteview_list.count loop
    --添加一个同步任务记录
    insert into data_archive_status
    values
      (seq_id.nextval,
       k,
       v_remoteview_list(k),
       0,
       0,
       sysdate,
       sysdate,
       null)
    returning rowid into v_rowid;
    commit;
    v_querystr := 'select /*+ rowid(t) */ * from ' || v_remoteview_list(k) ||
                  '@XXDB.LOCALDOMAIN t';
    update data_archive_status t
       set t.synctime = sysdate, t.state = 1
     where rowid = v_rowid;
    commit;
    --记录打开、解析游标的时间长度。
    v_parse_elapsed_s := sysdate;
    open cur_data for v_querystr;
    v_parse_elapsed_e := sysdate;
    update data_archive_status
       set synctime = sysdate,
           state    = 2,
           remark   = remark || '[' || v_remoteview_list(k) ||
                      ':parse_elapsed=' ||
                      (v_parse_elapsed_e - v_parse_elapsed_s) || 'sec,'
     where rowid = v_rowid;
    commit;
    v_counter         := 0;
    v_fetch_elapsed_s := sysdate;
    --对打开的游标,进行循环同步。
    loop
      --使用Bulk Binding,一次处理10000条记录
      fetch cur_data bulk collect
        into v_list limit 10000;
      forall i in 1 .. v_list.last
        insert into log_table_p201209
        values v_list
          (i);
      --记录当前同步的记录数
      v_counter := v_counter + sql%rowcount;
      update data_archive_status t
         set t.syncnum = v_counter, t.synctime = sysdate
       where rowid = v_rowid;
      commit;
      exit when cur_data%notfound;
    end loop;
    v_fetch_elapsed_e := sysdate;
    --更新进度表,将当前分区完成时间记录到备注中。
    update data_archive_status
       set state    = 3,
           synctime = sysdate,
           remark   = remark || 'fetch_elapsed=' ||
                      round((v_fetch_elapsed_e - v_fetch_elapsed_s) * 24 * 60,
                            4) || 'min,syncnum=' || v_counter ||
                      ',endtime= ' || to_char(sysdate, 'yyyymmddhh34miss') || ']'
     where rowid = v_rowid;
    commit;
    close cur_data;
    --更新进度表
    update data_archive_status t set t.state = 4 where rowid = v_rowid;
    commit;
  end loop;
exception
  when others then
    v_sqlcode := sqlcode;
    update data_archive_status
       set synctime = sysdate, state = v_sqlcode
     where rowid = v_rowid;
    commit;
    raise;
end;


  6、创建压缩对象存储过程,由于move操作需要接近双倍的存储空间,所以压缩前请提前评估空间需求


create procedure p_compress_object(vObject_name    varchar2, --对象
                                   vPartition_name varchar2 default null, --分区名
                                   vParallel       int default 0, --并行度
                                   vPctfree        int default 0, --存储参数pctfree 不再考虑DML操作的设置为0
                                   vTablespace     varchar2 default null, --表空间
                                   vOwner          varchar2 default user, --对象拥有者
                                   vType           number --类型:0、table 1、index 2、 partition table 3、index partition
                                              ) Authid Current_User is
  vSql     varchar2(4000);
  vSqlerrm varchar2(256);
  v_sqlstring  varchar2(4000);
begin
  v_sqlstring := 'alter session set db_file_multiblock_read_count=128';
    execute immediate v_sqlstring;
  if vType = 0 then
    begin
      vSql := 'alter table ' || vOwner || '.' || vObject_name || ' move ' || case when vTablespace is null then null else 'tablespace ' || vTablespace end || ' pctfree ' || vPctfree || ' compress nologging ' || case when vParallel in (0, 1) then null else 'parallel ' || vParallel end;
      execute immediate vSql;
    end;
  elsif vType = 1 then
    begin
      vSql := 'alter index ' || vOwner || '.' || vObject_name ||
              ' rebuild  ' || case when vTablespace is null then null else 'tablespace ' || vTablespace end || ' pctfree ' || vPctfree || ' compress nologging ' || case when vParallel in (0, 1) then null else 'parallel ' || vParallel end;
      execute immediate vSql;
    end;
  elsif vType = 2 then
    begin
      vSql := 'alter table ' || vOwner || '.' || vObject_name ||
              ' move partition ' || vPartition_name || case when vTablespace is null then null else ' tablespace ' || vTablespace end || ' pctfree ' || vPctfree || ' compress nologging ' || case when vParallel in (0, 1) then null else 'parallel ' || vParallel end;
      execute immediate vSql;
    end;
  elsif vType = 3 then
    begin
      vSql := 'alter index ' || vOwner || '.' || vObject_name ||
              ' rebuild partition ' || vPartition_name || case when vTablespace is null then null else ' tablespace ' || vTablespace end || ' pctfree ' || vPctfree || ' compress nologging ' || case when vParallel in (0, 1) then null else 'parallel ' || vParallel end;
      execute immediate vSql;
    end;
  end if;
exception
  when others then
    vSqlerrm := sqlerrm;
    dbms_output.put_line(vSqlerrm||'|'||vSql);
end;


  7、上述工作准备完成,确认历史库表空间情况,调用数据归档存储过程 p_log_table_p201209 ,处理完成后对数据进行压缩,调用存储过程 p_compress_object(....);


  8、确认数据无误,drop掉生产库维护用户对应的视图及业务表的分区,释放对象占用空间(注意:检查分区表的索引是否为local,否则就.....).


  以上......完!


推荐阅读:
  1. 如何搭建PHP+Oracle本地开发环境
  2. 利用java怎么对oracle或mysql数据库进行连接

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

oracle 迁移 数据库

上一篇:java单例模式的实现方式以及差异

下一篇:redis重做从库时报Connection with master lost错误

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》