在oracle数据库迁移至mysql数据库,除了oracle数据库模型移到mysql外,还一个重要环节就是要将oracle数据库的数据移到mysql数据库,本人尝试用过多款数据迁移程序,性能都不是很好的,于是自己动手写一个针对于oracle数据库数据迁移到mysql数据程序,其具体过程如下:
CREATE OR REPLACE PROCEDURE P_ETL_ORA_DATA
(
P_ORA_DIR VARCHAR2,
P_DATA_PATH VARCHAR2
) IS
TYPE T_REC IS RECORD(
TBN VARCHAR2(40),
WHR VARCHAR2(4000));
TYPE T_TABS IS TABLE OF T_REC;
V_TABS T_TABS := T_TABS();
V_ETL_DIR VARCHAR2(40) := P_ORA_DIR;
V_LOAD_FILE UTL_FILE.FILE_TYPE;
PROCEDURE ETL_DATA
(
P_SQL_STMT VARCHAR2,
P_DATA_PATH VARCHAR2,
P_TB_NAME VARCHAR2
) IS
BEGIN
DECLARE
V_VAR_COL VARCHAR2(32767);
V_NUM_COL NUMBER;
V_DATE_COL DATE;
V_TMZ TIMESTAMP;
V_COLS NUMBER;
V_COLS_DESC DBMS_SQL.DESC_TAB;
V_ROW_STR VARCHAR2(32767);
V_COL_STR VARCHAR2(32767);
V_SQL_ID NUMBER;
V_SQL_REF SYS_REFCURSOR;
V_EXP_FILE UTL_FILE.FILE_TYPE;
V_DATA_PATH VARCHAR2(200);
BEGIN
V_DATA_PATH := P_DATA_PATH;
IF REGEXP_SUBSTR(V_DATA_PATH, '\\$') IS NULL
THEN
V_DATA_PATH := V_DATA_PATH || '\';
END IF;
V_DATA_PATH := REPLACE(V_DATA_PATH, '', '\\');
OPEN V_SQL_REF FOR P_SQL_STMT;
V_SQL_ID := DBMS_SQL.TO_CURSOR_NUMBER(V_SQL_REF);
DBMS_SQL.DESCRIBE_COLUMNS(V_SQL_ID, V_COLS, V_COLS_DESC);
FOR I IN V_COLS_DESC.FIRST .. V_COLS_DESC.LAST
LOOP
CASE
WHEN V_COLS_DESC(I).COL_TYPE IN (1, 9, 96) THEN
DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_VAR_COL, 32767);
WHEN V_COLS_DESC(I).COL_TYPE = 2 THEN
DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_NUM_COL);
WHEN V_COLS_DESC(I).COL_TYPE = 12 THEN
DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_DATE_COL);
WHEN V_COLS_DESC(I).COL_TYPE = 180 THEN
DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_TMZ);
END CASE;
END LOOP;
DECLARE
V_FLUSH_OVER PLS_INTEGER := 1;
V_FILE_OVER PLS_INTEGER := 1;
V_FILE_NO PLS_INTEGER := 1;
V_FILE_NAME VARCHAR2(200);
V_LINE VARCHAR2(400);
BEGIN
WHILE DBMS_SQL.FETCH_ROWS(V_SQL_ID) > 0
LOOP
IF V_FILE_OVER = 1
THEN
V_FILE_NAME := P_TB_NAME || '_' || V_FILE_NO || '.csv';
V_EXP_FILE := UTL_FILE.FOPEN(V_ETL_DIR, V_FILE_NAME, OPEN_MODE => 'w', MAX_LINESIZE => 32767);
END IF;
V_ROW_STR := '';
FOR I IN 1 .. V_COLS
LOOP
V_COL_STR := '\N';
BEGIN
CASE
WHEN V_COLS_DESC(I).COL_TYPE IN (1, 9, 96) THEN
DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_VAR_COL);
IF V_VAR_COL IS NOT NULL
THEN
V_COL_STR := '^' || V_VAR_COL || '^';
END IF;
WHEN V_COLS_DESC(I).COL_TYPE = 2 THEN
DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_NUM_COL);
IF V_NUM_COL IS NOT NULL
THEN
V_COL_STR := V_NUM_COL;
END IF;
WHEN V_COLS_DESC(I).COL_TYPE = 12 THEN
DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_DATE_COL);
IF V_DATE_COL IS NOT NULL
THEN
V_COL_STR := '^' || TO_CHAR(V_DATE_COL, 'yyyy-mm-dd hh24:mi:ss') || '^';
END IF;
WHEN V_COLS_DESC(I).COL_TYPE IN (180, 181, 231) THEN
DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_TMZ);
IF V_TMZ IS NOT NULL
THEN
V_COL_STR := '^' || TO_CHAR(V_TMZ, 'yyyy-mm-dd hh24:mi:ss.ff6') || '^';
END IF;
END CASE;
IF I = 1
THEN
V_ROW_STR := V_COL_STR;
ELSE
V_ROW_STR := V_ROW_STR || ',' || V_COL_STR;
END IF;
END;
END LOOP;
UTL_FILE.PUT_LINE(V_EXP_FILE, CONVERT(V_ROW_STR, 'UTF8'));
IF V_FILE_OVER > 200000 /*每200000条记录就产生一个新的文件*/
THEN
V_FILE_OVER := 1;
V_FLUSH_OVER := 1;
V_FILE_NO := V_FILE_NO + 1;
UTL_FILE.FCLOSE(V_EXP_FILE);
V_LINE := 'load data infile "' || V_DATA_PATH || V_FILE_NAME || '" into table ' || P_TB_NAME;
V_LINE := V_LINE || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";';
UTL_FILE.PUT_LINE(V_LOAD_FILE, V_LINE);
UTL_FILE.FFLUSH(V_LOAD_FILE);
CONTINUE;
END IF;
V_FILE_OVER := V_FILE_OVER + 1;
IF V_FLUSH_OVER > 2000 /*每2000条记录就刷新缓存,写到文件中 */
THEN
UTL_FILE.FFLUSH(V_EXP_FILE);
V_FLUSH_OVER := 1;
ELSE
V_FLUSH_OVER := V_FLUSH_OVER + 1;
END IF;
END LOOP;
DBMS_SQL.CLOSE_CURSOR(V_SQL_ID);
IF UTL_FILE.IS_OPEN(V_EXP_FILE)
THEN
UTL_FILE.FCLOSE(V_EXP_FILE);
V_LINE := 'load data infile "' || V_DATA_PATH || V_FILE_NAME || '" into table ' || P_TB_NAME;
V_LINE := V_LINE || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";';
UTL_FILE.PUT_LINE(V_LOAD_FILE, V_LINE);
UTL_FILE.FFLUSH(V_LOAD_FILE);
END IF;
END;
EXCEPTION
WHEN OTHERS THEN
IF DBMS_SQL.IS_OPEN(V_SQL_ID)
THEN
DBMS_SQL.CLOSE_CURSOR(V_SQL_ID);
END IF;
IF UTL_FILE.IS_OPEN(V_EXP_FILE)
THEN
UTL_FILE.FCLOSE(V_EXP_FILE);
END IF;
DBMS_OUTPUT.PUT_LINE(SQLERRM);
DBMS_OUTPUT.PUT_LINE(P_SQL_STMT);
END;
END;
BEGIN
BEGIN
EXECUTE IMMEDIATE 'create table mysql_etl_tbs(tn varchar2(40),cn varchar2(40),ci number) ';
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
EXECUTE IMMEDIATE 'truncate table mysql_etl_tbs';
DECLARE
V_CI PLS_INTEGER;
V_CN VARCHAR2(40);
V_ETL_COLS VARCHAR2(32767);
V_TBN VARCHAR2(30);
V_ETL_CFG VARCHAR2(32767);
V_CNF_FILE UTL_FILE.FILE_TYPE;
V_FROM_POS PLS_INTEGER;
BEGIN
V_CNF_FILE := UTL_FILE.FOPEN(V_ETL_DIR, 'ETL_TABS.CNF', 'r', 32767);
LOOP
UTL_FILE.GET_LINE(V_CNF_FILE, V_ETL_CFG, 32767);
V_FROM_POS := REGEXP_INSTR(V_ETL_CFG, 'from', 1, 1, 0, 'i');
V_ETL_COLS := SUBSTR(V_ETL_CFG, 1, V_FROM_POS - 1);
V_ETL_COLS := REGEXP_SUBSTR(V_ETL_COLS, '(select)(.+)', 1, 1, 'i', 2);
V_TBN := REGEXP_SUBSTR(V_ETL_CFG, '(\s+from\s+)(\w+)(\s*)', 1, 1, 'i', 2);
V_TBN := UPPER(V_TBN);
V_TABS.EXTEND();
V_TABS(V_TABS.LAST).TBN := V_TBN;
V_TABS(V_TABS.LAST).WHR := REGEXP_SUBSTR(V_ETL_CFG, '\s+where .+', 1, 1, 'i');
V_CI := 1;
LOOP
V_CN := REGEXP_SUBSTR(V_ETL_COLS, '\S+', 1, V_CI);
EXIT WHEN V_CN IS NULL;
V_CN := UPPER(V_CN);
EXECUTE IMMEDIATE 'insert into mysql_etl_tbs(tn,cn,ci) values(:1,:2,:3)'
USING V_TBN, V_CN, V_CI;
COMMIT;
V_CI := V_CI + 1;
END LOOP;
END LOOP;
EXCEPTION
WHEN UTL_FILE.INVALID_PATH THEN
DBMS_OUTPUT.PUT_LINE('指定的目录:ETL_DIR"' || '"无效!');
RETURN;
WHEN UTL_FILE.INVALID_FILENAME THEN
DBMS_OUTPUT.PUT_LINE('指定的文件:" ETL_TABS.CNF' || '"无效!');
RETURN;
WHEN NO_DATA_FOUND THEN
UTL_FILE.FCLOSE(V_CNF_FILE);
WHEN OTHERS THEN
DBMS_OUTPUT.PUT_LINE(SQLERRM);
RETURN;
END;
DECLARE
V_CUR_MATCH SYS_REFCURSOR;
V_SQL_SMT VARCHAR2(32767);
V_TN VARCHAR2(40);
V_CN VARCHAR2(40);
V_CI PLS_INTEGER;
V_COLUMN_NAME VARCHAR2(40);
V_ETL_COLS VARCHAR2(32767);
V_LINE VARCHAR2(4000);
V_TBN VARCHAR2(40);
BEGIN
V_LOAD_FILE := UTL_FILE.FOPEN(V_ETL_DIR, 'load_data.sql', OPEN_MODE => 'w', MAX_LINESIZE => 32767);
FOR T_IX IN V_TABS.FIRST .. V_TABS.LAST
LOOP
V_SQL_SMT := 'select tn,cn,column_name,ci from ( select * from mysql_etl_tbs where tn='':tbn:'' ) l left join user_tab_columns r on l.tn = r.table_name and l.cn= r.column_name order by ci';
V_TBN := V_TABS(T_IX).TBN;
V_SQL_SMT := REPLACE(V_SQL_SMT, ':tbn:', V_TBN);
V_ETL_COLS := NULL;
OPEN V_CUR_MATCH FOR V_SQL_SMT;
LOOP
FETCH V_CUR_MATCH
INTO V_TN, V_CN, V_COLUMN_NAME, V_CI;
EXIT WHEN V_CUR_MATCH%NOTFOUND;
IF V_CI > 1
THEN
V_ETL_COLS := V_ETL_COLS || ' , ';
END IF;
IF V_COLUMN_NAME IS NULL
THEN
V_ETL_COLS := V_ETL_COLS || ' cast(null as number) ' || V_CN;
ELSE
V_ETL_COLS := V_ETL_COLS || V_CN;
END IF;
END LOOP;
CLOSE V_CUR_MATCH;
V_TBN := LOWER(V_TBN);
V_SQL_SMT := 'select ' || V_ETL_COLS || ' from ' || V_TBN || V_TABS(T_IX).WHR;
ETL_DATA(V_SQL_SMT, P_DATA_PATH, V_TBN);
END LOOP;
IF UTL_FILE.IS_OPEN(V_LOAD_FILE)
THEN
UTL_FILE.FCLOSE(V_LOAD_FILE);
END IF;
END;
END P_ETL_ORA_DATA;