| Related Data Dictionary Objects |
| cdc_change_sets$ |
dba_queue_publishers |
| change_sets |
wrh$_streams_pool_advice |
|
| |
| Setup As SYS - Prepare Database and Instance |
conn / as sysdba
-- *NIX only
define _editor=vi
-- validate database parameters
archive log list
-- Archive Mode
show parameter aq_tm_processes -- min 3
show parameter compatible -- must be 10.1.0 or above
show parameter global_names -- must
be TRUE
show parameter job_queue_processes -- min 2 recommended 4-6
show parameter
open_links -- not less than the default
4
show parameter shared_pool_size -- must be 0 or at least
200MB
show parameter streams_pool_size -- min.
480MB (10MB/capture 1MB/apply)
show parameter undo_retention -- min. 3600 (1 hr.)
(900)
-- Examples of altering initialization parameters
alter system set aq_tm_processes=3 scope=BOTH;
alter system set compatible='10.2.0.1.0' scope=SPFILE;
alter system set global_names=TRUE scope=BOTH;
alter system set job_queue_processes=6 scope=BOTH;
alter system set open_links=4 scope=SPFILE;
alter system set streams_pool_size=200M scope=BOTH; -- very slow if making smaller
alter system set undo_retention=3600 scope=BOTH;
/*
JOB_QUEUE_PROCESSES (current value) + 2
PARALLEL_MAX_SERVERS (current value) + (5 * (the number of change sets planned))
PROCESSES (current value) + (7 * (the number of change sets planned))
SESSIONS (current value) + (2 * (the number of change sets planned))
*/
-- Retest parameter after modification
shutdown immediate;
startup mount;
alter database archivelog;
-- important
alter database force logging;
-- one option among several
alter database add supplemental log data;
alter database open;
-- validate archivelogging
archive log list
alter system switch logfile;
archive log list
-- validate force and supplemental logging
SELECT supplemental_log_data_min, supplemental_log_data_pk,
supplemental_log_data_ui,
supplemental_log_data_fk, supplemental_log_data_all, force_logging
FROM gv$database;
SELECT force_logging
FROM dba_tablespaces;
-- examine existing queues
desc dba_queues
set linesize 121
col owner format a6
col queue_table format a25
col user_comment format a31
SELECT owner, name, queue_table, queue_type, user_comment
FROM dba_queues
ORDER BY 1,4,2;
-- examine existing streams
desc dba_hist_streams_capture
SELECT capture_name, total_messages_captured, total_messages_enqueued
FROM dba_hist_streams_capture;
desc dba_hist_streams_apply_sum
SELECT apply_name, reader_total_messages_dequeued, reader_lag, server_total_messages_applied
FROM dba_hist_streams_apply_sum;
-- examine CDC related data dictionary objects
SELECT table_name
FROM dba_tables
WHERE owner = 'SYS'
AND table_name LIKE 'CDC%$';
desc cdc_system$
SELECT * FROM cdc_system$; |
| |
| Setup As SYS - Create Streams Administrators |
conn / as sysdba
SELECT *
FROM dba_streams_administrator;
CREATE USER cdcadmin
IDENTIFIED BY cdcadmin
DEFAULT TABLESPACE users
TEMPORARY TABLESPACE temp
QUOTA 0 ON system
QUOTA 10M ON sysaux
QUOTA 20M ON users;
-- system privs
GRANT create session TO cdcadmin;
GRANT create table TO cdcadmin;
GRANT create sequence TO cdcadmin;
GRANT create procedure TO cdcadmin;
GRANT dba TO cdcadmin;
-- role privs
GRANT execute_catalog_role TO cdcadmin;
GRANT select_catalog_role TO cdcadmin;
-- object privileges
GRANT execute ON dbms_cdc_publish TO cdcadmin;
GRANT execute ON dbms_cdc_subscribe TO cdcadmin;
-- required for this demo but not by CDC
GRANT execute ON dbms_lock TO cdcadmin;
-- streams specific priv
execute dbms_streams_auth.grant_admin_privilege('CDCADMIN');
SELECT account_status, created
FROM dba_users
WHERE username = 'CDCADMIN';
SELECT *
FROM dba_sys_privs
WHERE grantee = 'CDCADMIN';
SELECT username
FROM dba_users u, streams$_privileged_user s
WHERE u.user_id = s.user#;
SELECT *
FROM dba_streams_administrator; |
|
|
| Prepare Schema Tables for
CDC Replication |
conn / as sysdba
alter user hr account unlock identified by hr;
connect hr/hr
desc employees
SELECT *
FROM employees;
-- create CDC demo table
CREATE TABLE cdc_demo AS
SELECT * FROM employees;
-- a second way to implement supplemental logging
ALTER TABLE cdc_demo
ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- table to track salary history changes originating
in cdc_demo
CREATE TABLE salary_history (
employee_id NUMBER(6) NOT NULL,
job_id
VARCHAR2(10) NOT NULL,
department_id NUMBER(4),
old_salary NUMBER(8,2),
new_salary NUMBER(8,2),
percent_change NUMBER(4,2),
salary_action_date DATE);
SELECT table_name
FROM user_tables;
|
|
| Instantiate Source Table |
conn / as sysdba
desc dba_capture_prepared_tables
SELECT table_name, scn, supplemental_log_data_pk, supplemental_log_data_ui,
supplemental_log_data_fk, supplemental_log_data_all
FROM dba_capture_prepared_tables;
dbms_capture_adm.prepare_table_instantiation(
table_name IN VARCHAR2,
supplemental_logging IN VARCHAR2 DEFAULT 'keys'); |
exec
dbms_capture_adm.prepare_table_instantiation(table_name => 'HR.CDC_DEMO');
SELECT table_name, scn, supplemental_log_data_pk, supplemental_log_data_ui,
supplemental_log_data_fk, supplemental_log_data_all
FROM dba_capture_prepared_tables;
|
|
| Create Asynchronous HotLog Change Set |
conn cdcadmin/cdcadmin
col object_name format a30
SELECT object_name, object_type
FROM user_objects
ORDER BY 2,1;
dbms_cdc_publish.create_change_set(
change_set_name IN VARCHAR2,
description IN VARCHAR2 DEFAULT NULL,
change_source_name IN VARCHAR2,
stop_on_ddl IN CHAR DEFAULT 'N',
begin_date IN DATE DEFAULT NULL,
end_date IN DATE DEFAULT NULL); |
-- this may take awhile
don't be impatient
exec dbms_cdc_publish.create_change_set('CDC_DEMO_SET',
'CDC Demo 2 Change Set', 'HOTLOG_SOURCE', 'Y', NULL, NULL);
-- here is why
SELECT object_name, object_type
FROM user_objects
ORDER BY 2,1;
SELECT table_name, tablespace_name, iot_type
FROM user_tables;
conn / as sysdba
desc cdc_change_sets$
set linesize 121
col set_name format a20
col capture_name format a20
col queue_name format a20
col queue_table_name format a20
SELECT set_name, capture_name, queue_name, queue_table_name
FROM cdc_change_sets$;
SELECT set_name, change_source_name, capture_enabled, stop_on_ddl, publisher
FROM change_sets;
SELECT process_type, name
FROM streams$_process_params;
|
| |
| Create Change Table |
conn cdcadmin/cdcadmin
dbms_cdc_publish.create_change_table(
owner IN VARCHAR2,
change_table_name IN VARCHAR2,
change_set_name IN VARCHAR2,
source_schema IN VARCHAR2,
source_table IN VARCHAR2,
column_type_list IN VARCHAR2,
capture_values IN VARCHAR2, -- BOTH, NEW, OLD
rs_id IN CHAR,
row_id IN CHAR,
user_id IN CHAR,
timestamp IN CHAR,
object_id IN CHAR,
source_colmap IN CHAR,
target_colmap IN CHAR,
options_string IN VARCHAR2); |
BEGIN
dbms_cdc_publish.create_change_table('CDCADMIN', 'CDC_DEMO_CT',
'CDC_DEMO_SET', 'HR', 'CDC_DEMO', 'EMPLOYEE_ID
NUMBER(6), FIRST_NAME VARCHAR2(20),
LAST_NAME VARCHAR2(25), EMAIL VARCHAR2(25), PHONE_NUMBER VARCHAR2(20), HIRE_DATE DATE,
JOB_ID VARCHAR2(10), SALARY NUMBER, COMMISSION_PCT NUMBER, MANAGER_ID NUMBER,
DEPARTMENT_ID NUMBER', 'BOTH', 'N', 'N', 'N', 'N', 'N', 'N', 'Y', NULL);
END;
/
exec dbms_cdc_publish.alter_change_table('CDCADMIN', 'CDC_DEMO_CT',
rs_id=>'Y');
GRANT select ON cdc_demo_ct TO hr;
conn / as sysdba
SELECT set_name, change_source_name, queue_name, queue_table_name
FROM cdc_change_sets$;
desc cdc_change_tables$
SELECT change_set_name, source_schema_name, source_table_name
FROM cdc_change_tables$;
|
|
| Enable Capture |
conn / as sysdba
SELECT set_name, change_source_name, capture_enabled
FROM cdc_change_sets$;
conn cdcadmin/cdcadmin
dbms_cdc_publish.alter_change_set(
change_set_name IN VARCHAR2,
description IN VARCHAR2 DEFAULT NULL,
remove_description IN CHAR DEFAULT 'N',
enable_capture IN CHAR DEFAULT NULL,
recover_after_error IN CHAR DEFAULT NULL,
remove_ddl IN CHAR DEFAULT NULL,
stop_on_ddl IN CHAR DEFAULT NULL); |
exec
dbms_cdc_publish.alter_change_set(change_set_name=>'CDC_DEMO_SET',
enable_capture=> 'Y');
conn / as sysdba
SELECT set_name, change_source_name, capture_enabled
FROM cdc_change_sets$;
|
| |
| Create Subscription |
conn hr/hr
dbms_cdc_subscribe.create_subscription(
change_set_name IN VARCHAR2,
description IN VARCHAR2,
subscription_name IN VARCHAR2); |
exec dbms_cdc_subscribe.create_subscription('CDC_DEMO_SET', 'cdc_demo
subx', 'CDC_DEMO_SUB');
conn / as sysdba
set linesize 121
col description format a30
col subscription_name format a20
col username format a10
SELECT subscription_name, handle, set_name, username, earliest_scn, description
FROM cdc_subscribers$;
|
|
| Subscribe
to and Activate Subscription |
conn hr/hr
dbms_cdc_subscribe.subscribe(
subscription_name IN VARCHAR2,
source_schema IN
VARCHAR2,
source_table IN
VARCHAR2,
column_list IN VARCHAR2,
subscriber_view IN VARCHAR2); |
BEGIN
dbms_cdc_subscribe.subscribe('CDC_DEMO_SUB', 'HR',
'CDC_DEMO',
'EMPLOYEE_ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE_NUMBER, HIRE_DATE, JOB_ID, SALARY,
COMMISSION_PCT, MANAGER_ID, DEPARTMENT_ID',
'CDC_DEMO_SUB_VIEW');
END;
/
desc user_subscriptions
SELECT set_name, subscription_name, status
FROM user_subscriptions;
dbms_cdc_subscribe.activate_subscription(
subscription_name IN VARCHAR2); |
exec dbms_cdc_subscribe.activate_subscription('CDC_DEMO_SUB');
SELECT set_name, subscription_name, status
FROM user_subscriptions;
|
|
| Create Procedure To Populate Salary History Table |
conn hr/hr
/* Create a stored procedure to populate the new HR.SALARY_HISTORY table. The procedure
extends the subscription window of the CDC_DEMP_SUB subscription to get the most recent set of source table
changes. It uses the subscriber's DEMO_SUB_VIEW view to scan the changes and insert them into the SALARY_HISTORY
table. It then purges the subscription window to indicate that it is finished with
that set of changes. */
CREATE OR REPLACE PROCEDURE update_salary_history IS
CURSOR cur IS
SELECT *
FROM (
SELECT 'I' opt, cscn$, rsid$, employee_id, job_id, department_id, 0 old_salary,
salary new_salary, commit_timestamp$
FROM cdc_demo_sub_view
WHERE operation$ = 'I '
UNION ALL
SELECT 'D' opt, cscn$, rsid$, employee_id, job_id, department_id, salary old_salary,
0 new_salary, commit_timestamp$
FROM cdc_demo_sub_view
WHERE operation$ = 'D '
UNION ALL
SELECT 'U' opt , v1.cscn$, v1.rsid$, v1.employee_id, v1.job_id, v1.department_id,
v1.salary old_salary, v2.salary new_salaryi, v1.commit_timestamp$
FROM cdc_demo_sub_view v1, cdc_demo_sub_view v2
WHERE v1.operation$ = 'UO' and v2.operation$ = 'UN'
AND v1.cscn$ = v2.cscn$
AND v1.rsid$ = v2.rsid$
AND ABS(v1.salary - v2.salary) > 0)
ORDER BY cscn$, rsid$;
percent NUMBER;
BEGIN
-- Get the next set of changes to the
HR.CDC_DEMO source table
dbms_cdc_subscribe.extend_window('CDC_DEMO_SUB');
-- Process each change
FOR rec IN cur
LOOP
IF rec.opt = 'I' THEN
INSERT INTO salary_history VALUES
(rec.employee_id, rec.job_id, rec.department_id, 0,
rec.new_salary, NULL, rec.commit_timestamp$);
END IF;
IF rec.opt = 'D' THEN
INSERT INTO salary_history VALUES
(rec.employee_id, rec.job_id, rec.department_id,
rec.old_salary, 0,
NULL, rec.commit_timestamp$);
END IF;
IF rec.opt = 'U' THEN
percent := (rec.new_salary - rec.old_salary) /
rec.old_salary * 100;
INSERT INTO salary_history VALUES
(rec.employee_id, rec.job_id,
rec.department_id, rec.old_salary,
rec.new_salary, percent,
rec.commit_timestamp$);
END IF;
END LOOP;
-- Indicate subscriber is finished with this set of changes
dbms_cdc_subscribe.purge_window('CDC_DEMO_SUB');
END update_salary_history;
/
|
|
| Create Procedure To Wait For Changes |
/* Create function CDCADMIN.WAIT_FOR_CHANGES to enable this demo to run predictably. The
asynchronous nature of CDC HotLog mode means that there is a delay for source table changes to appear in the CDC change
table and the subscriber view. By default this procedure waits up to 3 minutes for the change table and 1 additional
minute for the subscriber view. This can be adjusted if it is insufficient. The caller must specify the name of the
change table and the number of rows expected to be in the change table. The caller may also optionally specify a different
number of seconds to wait for changes to appear in the change table. */
conn cdcadmin/cdcadmin
CREATE OR REPLACE FUNCTION wait_for_changes (
rowcount
NUMBER, -- number of rows to wait for
maxwait_seconds NUMBER := 300) -- maximum time to wait, in seconds
RETURN VARCHAR2 AUTHID CURRENT_USER AS
numrows NUMBER := 0;
-- number of rows in change table
slept NUMBER := 0;
-- total time slept
sleep_time NUMBER := 3;
-- number of seconds to sleep
return_msg VARCHAR2(100); -- informational message
keep_waiting BOOLEAN := TRUE; -- whether to keep waiting
BEGIN
WHILE keep_waiting LOOP
SELECT COUNT(*)
INTO numrows
FROM CDC_DEMO_CT;
-- Got expected number of rows
IF numrows >= rowcount THEN
keep_waiting := FALSE;
return_msg := 'Change table contains at least ' ||
TO_CHAR(rowcount) || ' rows';
EXIT;
-- Reached maximum number of seconds to wait
ELSIF slept > maxwait_seconds THEN
return_msg := ' - Timed out while waiting for
the change table to reach ' ||
TO_CHAR(rowcount) || ' rows';
EXIT;
END IF;
dbms_lock.sleep(sleep_time);
slept := slept+sleep_time;
END LOOP;
-- additional wait time for changes to become available to subscriber view
dbms_lock.sleep(60);
RETURN return_msg;
END wait_for_changes;
/
|
|
| Preparation for
DML |
-- In a separate terminal
window
cd $ORACLE_BASE/admin/ORCL/bdump
tail -f alertorcl.log
-- tailing the alert log allows us to watch log miner at work
-- open a SQL*Plus session as SYS
desc gv$streams_capture
set linesize 121
col state format a20
SELECT capture_name, logminer_id, state, total_messages_captured
FROM gv$streams_capture;
-- open a SQL*Plus session as SYS
desc gv$streams_apply_reader
set linesize 121
col state format a20
SELECT apply_name, state, total_messages_dequeued
FROM gv$streams_apply_reader;
|
|
| DML On Source Table |
conn hr/hr
UPDATE cdc_demo SET salary = salary + 500 WHERE job_id = 'SH_CLERK';
UPDATE cdc_demo SET salary = salary + 1000 WHERE job_id = 'ST_CLERK';
UPDATE cdc_demo SET salary = salary + 1500 WHERE job_id = 'PU_CLERK';
COMMIT;
INSERT INTO cdc_demo
(employee_id, first_name, last_name, email, phone_number, hire_date,
job_id, salary, commission_pct, manager_id, department_id)
VALUES
(207, 'Mary', 'Lee', 'MLEE', '310.234.4590', TO_DATE('10-JAN-2003'), 'SH_CLERK',
4000, NULL, 121, 50);
INSERT INTO cdc_demo
(employee_id, first_name, last_name, email, phone_number, hire_date,
job_id, salary, commission_pct, manager_id, department_id)
VALUES
(208, 'Karen', 'Prince', 'KPRINCE', '345.444.6756', TO_DATE('10-NOV-2003'), 'SH_CLERK', 3000, NULL, 111, 50);
INSERT INTO cdc_demo
(employee_id, first_name, last_name, email, phone_number, hire_date,
job_id, salary, commission_pct, manager_id, department_id)
VALUES
(209, 'Frank', 'Gate', 'FGATE', '451.445.5678', TO_DATE('13-NOV-2003'), 'IT_PROG',
8000, NULL, 101, 50);
INSERT INTO cdc_demo
(employee_id, first_name, last_name, email, phone_number, hire_date,
job_id, salary, commission_pct, manager_id, department_id)
VALUES
(210, 'Paul', 'Jeep', 'PJEEP', '607.345.1112', TO_DATE('28-MAY-2003'), 'IT_PROG',
8000, NULL, 101, 50);
COMMIT;
|
|
| Validate Capture |
-- Expecting 94 rows to appear in the change table CDCADMIN.CDC_DEMO_CT. This first
-- capture may take a few minutes. Later captures should be substantially faster.
conn cdcadmin/cdcadmin
SELECT wait_for_changes(94, 180) message
FROM dual;
|
|
| Another Test |
conn hr/hr
/* The wait_for_changes function having indicated
the changes have been populated apply the changes to the salary_history
table */
exec update_salary_history;
SELECT employee_id, job_id, department_id, old_salary, new_salary, percent_change
FROM salary_history
ORDER BY 1, 4, 5;
delete from cdc_demo where first_name = 'Mary' and last_name = 'Lee';
delete from cdc_demo where first_name = 'Karen' and last_name = 'Prince';
delete from cdc_demo where first_name = 'Frank' and last_name = 'Gate';
delete from cdc_demo where first_name = 'Paul' and last_name = 'Jeep';
COMMIT;
update cdc_demo set salary = salary + 5000 where job_id = 'AD_VP';
update cdc_demo set salary = salary - 1000 where job_id = 'ST_MAN';
update cdc_demo set salary = salary - 500 where job_id = 'FI_ACCOUNT';
COMMIT;
-- Expecting 122 rows to appear in the change table
CDCADMIN.CDC_DEMO_CT.
-- (94 rows from the first set of DMLs and 28 from the second set)
conn cdcadmin/cdcadmin
SELECT wait_for_changes(122, 180) message from dual;
conn hr/hr
exec update_salary_history
SELECT employee_id, job_id, department_id, old_salary, new_salary, percent_change
FROM salary_history
order by 1, 4, 5;
|
|
| Capture Cleanup |
conn hr/hr
exec dbms_cdc_subscribe.drop_subscription('CDC_DEMO_SUB');
conn / as sysdba
-- reverse prepare table instantiation
exec dbms_capture_adm.abort_table_instantiation('HR.CDC_DEMO');
-- drop the change table
exec dbms_cdc_publish.drop_change_table('CDCADMIN',
'CDC_DEMO_CT', 'Y');
-- drop the change set
exec dbms_cdc_publish.drop_change_set('CDC_DEMO_SET');
conn cdcadmin/cdcadmin
drop function wait_for_changes;
SELECT COUNT(*)
FROM user_objects;
conn hr/hr
drop table salary_history purge;
drop table cdc_demo purge;
drop procedure update_salary_history;
conn / as sysdba
drop user cdcadmin;
|
|