Stream 同步错误之解决方案 ORA

stream是 Oracle 11g  支持的数据同步技术,虽然该技术已经不是什么新技术,但目前国内采用该技术开发的软件不多见。 stream 同步软件项目参与近一年,近期软件上线实施,效果不是很理想。  同步过程中会偶尔出现  ORA-00001,ORA-26786,ORA-26787 等常见错误。 经过几天的的研究,开发了一个守护平台,采用 Java 平台和存储过程相结合的方式自动处理以上错误。 我写的这些算法具有通用性,不需要手工指定删除表的列,通过实践具有较好的效果。 相对于set_update_conflict_handler这个方法,简单易用。 java 平台代码便不开源了,其实核心的思想还是在存储过程中,特分享给大家,共同进步。

自定义 type

create or replace type myvarray_list as varray(300) of varchar2(50)

CREATE OR REPLACE PROCEDURE EXECUTE_TRANSACTION_1(applyname IN VARCHAR2,ltxnid IN VARCHAR2) IS
    i      NUMBER;                                                                               
    x      NUMBER;
    loopdog NUMBER;
    txnid  VARCHAR2(30);                                                                         
    source VARCHAR2(128);                                                                         
    msgno  NUMBER;                                                                               
    msgcnt NUMBER;                                                                               
    errno  NUMBER;                                                                               
    errmsg VARCHAR2(2000);                                                                       
    lcr    ANYDATA;                                                                               
    rowlcr    SYS.LCR$_ROW_RECORD;                                                               
    typenm    VARCHAR2(61);                                                                       
    res      NUMBER;                                                                             
    command      VARCHAR2(10);                                                                   
    old_values  SYS.LCR$_ROW_LIST;                                                             
    new_values  SYS.LCR$_ROW_LIST;                                                                                                                           
    v_code  NUMBER;
    v_errm  VARCHAR2(1024);
    object_owner    VARCHAR2(30);
      object_name      VARCHAR2(40);
      key_column      myvarray_list;
      remove_column  myvarray_list;
      remove_flag    NUMBER;
      remove_count    NUMBER;
BEGIN
    SELECT LOCAL_TRANSACTION_ID,                                                                 
        SOURCE_DATABASE,                                                                       
        MESSAGE_NUMBER,                                                                       
        MESSAGE_COUNT,                                                                         
        ERROR_NUMBER,                                                                         
        ERROR_MESSAGE                                                                         
          INTO txnid, source, msgno, msgcnt, errno, errmsg                                         
          FROM DBA_APPLY_ERROR                                                                     
          WHERE LOCAL_TRANSACTION_ID =  ltxnid;
    DBMS_OUTPUT.PUT_LINE(' --- Local Transaction ID: ' || txnid);                               
      DBMS_OUTPUT.PUT_LINE(' --- Source Database: ' || source);                                   
      DBMS_OUTPUT.PUT_LINE(' ---Error in Message: '|| msgno);                                       
      DBMS_OUTPUT.PUT_LINE(' ---Error Number: '||errno);                                           
      DBMS_OUTPUT.PUT_LINE(' ---Message Text: '||errmsg);
    i := msgno;
    loopdog :=0;
      WHILE i <= msgcnt LOOP
          loopdog :=loopdog+1;
        DBMS_OUTPUT.PUT_LINE('---message: ' || i);                                                     
        lcr := DBMS_APPLY_ADM.GET_ERROR_MESSAGE(i, txnid); -- gets the LCR                                                                                                       
        typenm := lcr.GETTYPENAME();                                                               
        DBMS_OUTPUT.PUT_LINE('type name: ' || typenm);                                             
        IF (typenm = 'SYS.LCR$_ROW_RECORD') THEN
            res := lcr.GETOBJECT(rowlcr);                                                           
            command := rowlcr.GET_COMMAND_TYPE();                                                 
            DBMS_OUTPUT.PUT_LINE('command type name: ' || command);                               
            IF command = 'INSERT' THEN
                rowlcr.SET_COMMAND_TYPE('DELETE');                                                                                           
                new_values := rowlcr.GET_VALUES('new');                                             
                -- Set the old values in the row LCR to the new values in the row LCR               
                rowlcr.SET_VALUES('old', new_values);                                               
                -- Set the old values in the row LCR to NULL                                       
                rowlcr.SET_VALUES('new', NULL);                                                           
                old_values := rowlcr.GET_VALUES('old');           
                -- Apply the row LCR as an DELETE FROM  the  table                                     
                object_name :=rowlcr.GET_OBJECT_NAME();
                object_owner :=rowlcr.GET_OBJECT_OWNER();
                key_column := myvarray_list();  --init array.   
                i :=1;
                FOR emm IN (select DISTINCT CO.COLUMN_NAME from
                                                  DBA_cons_columns CO, dba_constraints PK
                                                      where CO.constraint_name = PK.constraint_name 
                                                          AND CO.OWNER = PK.OWNER
                                                          AND PK.constraint_type IN ('P','C','U')
                                                          and PK.table_name = object_name
                                                          AND PK.OWNER=object_owner) LOOP                                           
                    DBMS_OUTPUT.PUT_LINE('key column ' || emm.COLUMN_NAME);   
                    key_column.extend;
                    key_column(i) := emm.COLUMN_NAME;
                    i :=i+1;
                END LOOP;                   
                remove_column := myvarray_list();  --init array.
                  remove_count := 1;   
                FOR i in 1..old_values.count LOOP
                      IF old_values(i) IS NOT NULL THEN
                        --DBMS_OUTPUT.PUT_LINE('old(' || i || '): ' || old_values(i).column_name);
                        x :=1;
                        remove_flag :=1;
                          WHILE x <= key_column.count AND remove_flag=1 loop
                            --dbms_output.put_line('key_column('||x||')='||key_column(x));
                            IF old_values(i).column_name = key_column(x) THEN
                                remove_flag :=0;
                            END IF;
                            x :=x +1;
                        END loop;
                        IF remove_flag = 1 then
                            remove_column.extend;
                            remove_column(remove_count) := old_values(i).column_name;
                            remove_count :=remove_count+1;
                        END IF;
                      END IF;
                END LOOP;   
                FOR x in 1..remove_column.count loop
                    dbms_output.put_line('remove_column('||x||')='||remove_column(x));
                    rowlcr.DELETE_COLUMN(remove_column(x),'old');
                END loop;
                rowlcr.EXECUTE(true);                                                               
            END IF;
            BEGIN                                                                                 
            --dbms_apply_adm.execute_all_errors(applyname);
            dbms_apply_adm.EXECUTE_ERROR(ltxnid);                                     
            return;                                                                           
            EXCEPTION when OTHERS then                                                       
                SELECT MESSAGE_NUMBER  INTO  i FROM DBA_APPLY_ERROR  WHERE LOCAL_TRANSACTION_ID =  ltxnid;
                v_code := SQLCODE;
                v_errm := SUBSTR(SQLERRM, 1, 1024);
                DBMS_OUTPUT.PUT_LINE('Error message: ' || v_errm);
                IF loopdog > msgcnt then
                    RAISE_APPLICATION_ERROR(-20002,'Insert or Delete error. please check your procedure.');
                ELSIF v_code = -1 then
                    DBMS_OUTPUT.PUT_LINE('Error code(-1): ' || v_code);
                    --null;
                ELSIF v_code = -26786 then
                    DBMS_OUTPUT.PUT_LINE('Error code(-26786): ' || v_code);
                    RAISE_APPLICATION_ERROR(-20786,v_errm);
                ELSIF v_code = -26787 then
                    DBMS_OUTPUT.PUT_LINE('Error code(-26787): ' || v_code);
                    RAISE_APPLICATION_ERROR(-20787,v_errm);
                ELSE
                    RAISE_APPLICATION_ERROR(-20000,v_errm);
                END IF;                                                                             
            END;   
        END IF;
    END LOOP;      --loop

END EXECUTE_TRANSACTION_1;
--需要这两个 权限
--grant select on dba_constraints to DWESBSTREAMUSER;
--granT select on DBA_cons_columns to DWESBSTREAMUSER;

ORA-00001 错误是目标端存在数据,但是存在冲突列, 这里储存储过程的思想是根据唯一列删除目标端的对应行, 并将LCR 是old 值插入到目标端.

-----------------------------------------------------------------------------------------------------------------------------------

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/4cc37d12dfad7cbfa2a2f4125eb8a4f8.html