Oracle 流stream将删除的数据保存
--实验的目的是捕获hr.employees表的删除行,将删除行插入到emp_del表中。
--设置初始化参数
AQ_TM_PROCESSES=1
COMPATIBLE=9.2.0
LOG_PARALLELISM=1
--查看数据库的名称,我的为ora9,将以下的ora9全部替换为你的数据库名称
--数据库为归档模式
--建立表emp_del,用于存放EMPLOYEES的删除数据
conn hr/hr
CREATE TABLE emp_del( employee_id NUMBER(6), first_name VARCHAR2(20), last_name VARCHAR2(25), email VARCHAR2(25), phone_number VARCHAR2(20), hire_date DATE, job_id VARCHAR2(10), salary NUMBER(8,2), commission_pct NUMBER(2,2), manager_id NUMBER(6), department_id NUMBER(4),timestamp DATE);CREATE UNIQUE INDEX emp_del_id_pk ON emp_del (employee_id);ALTER TABLE emp_del ADD (CONSTRAINT emp_del_id_pk PRIMARY KEY (employee_id));
--建立管理用户,设定默认表空间,授权
conn / as sysdba
drop user strmadmin cascade;
GRANT CONNECT, RESOURCE, SELECT_CATALOG_ROLE TO strmadmin IDENTIFIED BY strmadmin;
ALTER USER strmadmin DEFAULT TABLESPACE users;GRANT ALL ON hr.emp_del TO strmadmin;GRANT EXECUTE ON DBMS_APPLY_ADM TO strmadmin;
GRANT EXECUTE ON DBMS_AQ TO strmadmin;
GRANT EXECUTE ON DBMS_AQADM TO strmadmin;
GRANT EXECUTE ON DBMS_CAPTURE_ADM TO strmadmin;
GRANT EXECUTE ON DBMS_FLASHBACK TO strmadmin;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO strmadmin;BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, grantee => 'strmadmin', grant_option => FALSE);
END;
/BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ, grantee => 'strmadmin', grant_option => FALSE);
END;
/
--建立流队列,名称叫streams_queue ,用于存储捕获的变化
CONNECT strmadmin/strmadmin
EXEC DBMS_STREAMS_ADM.SET_UP_QUEUE();
--配置logmnr使用的表空间,我们就用tools
conn / as sysdba
EXECUTE DBMS_LOGMNR_D.SET_TABLESPACE('TOOLS');
--增强日志的模式
ALTER TABLE hr.employees ADD SUPPLEMENTAL LOG GROUP log_group_employees_pk(employee_id) ALWAYS;
--配置捕获程序
CONNECT strmadmin/strmadmin
BEGINDBMS_STREAMS_ADM.ADD_TABLE_RULES(table_name => 'hr.employees', streams_type => 'capture',streams_name => 'capture_emp',queue_name => 'strmadmin.streams_queue',include_dml => true,include_ddl => false);
END;
/
--设置scn
DECLAREiscn NUMBER; -- Variable to hold instantiation SCN value
BEGINiscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(source_object_name => 'hr.employees',source_database_name => 'ora9',instantiation_scn => iscn);
END;
/
--配置叫emp_agent的代理程序
BEGINDBMS_AQADM.DROP_AQ_AGENT(agent_name => 'emp_agent');
END;
/BEGINDBMS_AQADM.CREATE_AQ_AGENT(agent_name => 'emp_agent');DBMS_AQADM.ENABLE_DB_ACCESS(agent_name => 'emp_agent',db_username => 'strmadmin');
END;
/
--建立队列订户
DECLAREsubscriber SYS.AQ$_AGENT;
BEGINsubscriber := SYS.AQ$_AGENT('emp_agent', NULL, NULL); SYS.DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'strmadmin.streams_queue',subscriber => subscriber,rule => NULL,transformation => NULL);
END;
/
--建立存储过程enq_row_lcr
CREATE OR REPLACE PROCEDURE enq_row_lcr(in_any IN SYS.ANYDATA) ISenqopt DBMS_AQ.ENQUEUE_OPTIONS_T;mprop DBMS_AQ.MESSAGE_PROPERTIES_T;recipients DBMS_AQ.AQ$_RECIPIENT_LIST_T;enq_eventid RAW(16);
BEGINmprop.SENDER_ID := SYS.AQ$_AGENT(name => 'emp_agent',address => NULL,protocol => NULL);recipients(1) := SYS.AQ$_AGENT(name => 'emp_agent',address => NULL,protocol => NULL);mprop.RECIPIENT_LIST := recipients;DBMS_AQ.ENQUEUE(queue_name => 'strmadmin.streams_queue',enqueue_options => enqopt,message_properties => mprop,payload => in_any,msgid => enq_eventid);
END;
/
--建立DML处理存储过程
CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN SYS.ANYDATA) ISlcr SYS.LCR$_ROW_RECORD;rc PLS_INTEGER;command VARCHAR2(10);old_values SYS.LCR$_ROW_LIST;
BEGIN-- Re-enqueue the row LCR for explicit dequeue by another applicationenq_row_lcr(in_any);-- Access the LCRrc := in_any.GETOBJECT(lcr);-- Get the object command typecommand := lcr.GET_COMMAND_TYPE();-- Check for DELETE command on the hr.employees tableIF command = 'DELETE' THEN-- Set the command_type in the row LCR to INSERTlcr.SET_COMMAND_TYPE('INSERT');-- Set the object_name in the row LCR to EMP_DELlcr.SET_OBJECT_NAME('EMP_DEL');-- Get the old values in the row LCRold_values := lcr.GET_VALUES('old');-- Set the old values in the row LCR to the new values in the row LCRlcr.SET_VALUES('new', old_values);-- Set the old values in the row LCR to NULLlcr.SET_VALUES('old', NULL);-- Add a SYSDATE value for the timestamp columnlcr.ADD_COLUMN('new', 'TIMESTAMP', SYS.AnyData.ConvertDate(SYSDATE));-- Apply the row LCR as an INSERT into the EMP_DEL tablelcr.EXECUTE(true);END IF;
END;
/
--配置DML管理者,为hr.employees
BEGINDBMS_APPLY_ADM.SET_DML_HANDLER(object_name => 'hr.employees',object_type => 'TABLE',operation_name => 'INSERT',error_handler => false,user_procedure => 'strmadmin.emp_dml_handler',apply_database_link => NULL);
END;
/BEGINDBMS_APPLY_ADM.SET_DML_HANDLER(object_name => 'hr.employees',object_type => 'TABLE',operation_name => 'UPDATE',error_handler => false,user_procedure => 'strmadmin.emp_dml_handler',apply_database_link => NULL);
END;
/BEGINDBMS_APPLY_ADM.SET_DML_HANDLER(object_name => 'hr.employees',object_type => 'TABLE',operation_name => 'DELETE',error_handler => false,user_procedure => 'strmadmin.emp_dml_handler',apply_database_link => NULL);
END;
/
--建立存储过程为出列和再入列事件
CREATE OR REPLACE PROCEDURE emp_dq (consumer IN VARCHAR2) ASdeqopt DBMS_AQ.DEQUEUE_OPTIONS_T;mprop DBMS_AQ.MESSAGE_PROPERTIES_T;msgid RAW(16);payload SYS.AnyData;new_messages BOOLEAN := TRUE;row_lcr SYS.LCR$_ROW_RECORD;tc pls_integer;next_trans EXCEPTION;no_messages EXCEPTION; pragma exception_init (next_trans, -25235);pragma exception_init (no_messages, -25228);
BEGINdeqopt.consumer_name := consumer;deqopt.wait := 1;WHILE (new_messages) LOOPBEGINDBMS_AQ.DEQUEUE(queue_name => 'strmadmin.streams_queue',dequeue_options => deqopt,message_properties => mprop,payload => payload,msgid => msgid);COMMIT;deqopt.navigation := DBMS_AQ.NEXT;IF (payload.GetTypeName = 'SYS.LCR$_ROW_RECORD') THENtc := payload.GetObject(row_lcr); DBMS_OUTPUT.PUT_LINE(row_lcr.GET_COMMAND_TYPE || ' row LCR dequeued');END IF; EXCEPTIONWHEN next_trans THENdeqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;WHEN no_messages THENnew_messages := FALSE;DBMS_OUTPUT.PUT_LINE('No more events');END;END LOOP;
END;
/
--配置应用程序
BEGINDBMS_STREAMS_ADM.ADD_TABLE_RULES(table_name => 'hr.employees',streams_type => 'apply', streams_name => 'apply_emp',queue_name => 'strmadmin.streams_queue',include_dml => true,include_ddl => false,source_database => 'ora9');
END;
/
--启动应用程序
BEGINDBMS_APPLY_ADM.SET_PARAMETER(apply_name => 'apply_emp', parameter => 'disable_on_error', value => 'n');
END;
/BEGINDBMS_APPLY_ADM.START_APPLY(apply_name => 'apply_emp');
END;
/
--启动捕获程序
BEGINDBMS_CAPTURE_ADM.START_CAPTURE(capture_name => 'capture_emp');
END;
/--对hr.employees进行插入,删除和修改
conn hr/hr
INSERT INTO hr.employees values(207, 'JOHN', 'SMITH', 'JSMITH@MYCOMPANY.COM', NULL, '07-JUN-94', 'AC_ACCOUNT', 777, NULL, NULL, 110);
COMMIT;UPDATE hr.employees SET salary=5999 WHERE employee_id=206;
COMMIT;DELETE FROM hr.employees WHERE employee_id=207;
COMMIT;CONNECT strmadmin/strmadmin
SELECT * FROM hr.emp_del;SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE;EXEC emp_dq('emp_agent');SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE;
--显示应用程序的错误
COLUMN APPLY_NAME HEADING 'Apply|Process|Name' FORMAT A8
COLUMN SOURCE_DATABASE HEADING 'Source|Database' FORMAT A8
COLUMN LOCAL_TRANSACTION_ID HEADING 'Local|Transaction|ID' FORMAT A11
COLUMN ERROR_MESSAGE HEADING 'Error Message' FORMAT A50SELECT APPLY_NAME, SOURCE_DATABASE, LOCAL_TRANSACTION_ID, ERROR_MESSAGEFROM DBA_APPLY_ERROR;