|
|
|
Search the Reference Library pages: |
|
|
|
Oracle AQ Demo 1
|
Version 11.1 |
|
Setup As SYS |
conn / as sysdba
-- validate Oracle parameters
show parameter aq_tm_processes
show parameter job_queue_processes
ALTER SYSTEM SET job_queue_processes = 10 scope=BOTH;
SELECT owner, queue_name, queue_table, consumer_name
FROM dba_queue_subscribers;
SELECT *
FROM queue_privileges;
-- create AQ administrator
CREATE USER aqadmin
IDENTIFIED BY aqadmin
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 20M ON uwdata;
GRANT create session TO aqadmin;
GRANT create procedure TO aqadmin;
GRANT create table TO aqadmin;
GRANT create type TO aqadmin;
GRANT create public synonym TO aqadmin;
GRANT aq_administrator_role TO aqadmin IDENTIFIED BY aqadmin;
GRANT execute ON dbms_aq TO aqadmin;
GRANT execute ON dbms_aqadm TO aqadmin;
GRANT execute ON dbms_lock TO aqadmin; -- required for demo but not for AQ
GRANT execute on dbms_crypto TO aqadmin; -- required for demo but not for AQ
SELECT username, account_status, created
FROM dba_users
ORDER BY 1;
SELECT *
FROM dba_sys_privs
WHERE grantee = 'AQADMIN';
set linesize 131
col privilege format a15
col owner format a15
SELECT role, owner, table_name, privilege
FROM role_tab_privs
WHERE role = 'AQ_ADMINISTRATOR_ROLE'
ORDER BY 4, 2, 3;
-- create AQ users
CREATE USER pharm_a1
IDENTIFIED BY pharm_a1
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 0 ON SYSAUX
QUOTA 5M ON uwdata;
GRANT create session TO pharm_a1;
GRANT create synonym TO pharm_a1;
GRANT execute ON dbms_aq TO pharm_a1;
GRANT execute ON dbms_lock TO pharm_a1; -- required for demo but not for AQ
CREATE USER icu_a1
IDENTIFIED BY icu_a1
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 0 ON SYSAUX
QUOTA 5M ON uwdata;
GRANT create session TO icu_a1;
GRANT create synonym TO icu_a1;
GRANT execute ON dbms_aq TO icu_a1;
GRANT execute on dbms_crypto TO icu_a1; -- required for demo but not for AQ
GRANT execute ON dbms_lock TO icu_a1; -- required for demo but not for AQ
CREATE USER nurse_a1
IDENTIFIED BY nurse_a1
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 0 ON SYSAUX
QUOTA 5M ON uwdata;
GRANT create session TO nurse_a1;
GRANT execute ON dbms_aq TO nurse_a1;
GRANT execute on dbms_crypto TO nurse_a1; -- required for demo but not for AQ
GRANT execute ON dbms_lock TO nurse_a1; -- required for demo but not for AQ
|
|
Setup As AQADMIN |
conn aqadmin/aqadmin
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;
-- create message user-defined data type
CREATE OR REPLACE TYPE message_t AS OBJECT (
id NUMBER,
rx VARCHAR2(30),
source VARCHAR2(30));
/
GRANT execute ON message_t TO pharm_a1;
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;
desc message_t
GRANT execute ON message_t TO public;
-- examine message for message content demo
SELECT text
FROM user_source
WHERE name = 'MESSAGE_T'
ORDER BY line;
CREATE TABLE test (
reg_col VARCHAR2(11),
see_msg message_t);
desc test
set describe depth all linenum on indent on
desc test
INSERT INTO test
(reg_col, see_msg)
VALUES
('Test Values', message_t(1, 'Thorazine', USER));
SELECT * FROM test;
DROP TABLE test PURGE; |
|
Build As AQADMIN |
-- table to hold dequeued messages
CREATE TABLE rx_processed_data (
id NUMBER,
rx VARCHAR2(30),
source NUMBER,
processed_by VARCHAR2(30),
dt_processed TIMESTAMP);
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;
-- create a queue table
/* -- CREATE QUEUE TABLE syntax
dbms_aqadm.create_queue_table(
queue_table IN VARCHAR2,
-- table's name
queue_payload_type IN VARCHAR2, -- user defined data
type's name
storage_clause IN VARCHAR2
DEFAULT NULL, -- define pctfree
sort_list IN VARCHAR2
DEFAULT NULL, -- priority and/or enq_time
multiple_consumers IN BOOLEAN DEFAULT FALSE,
message_grouping IN BINARY_INTEGER DEFAULT NONE,
comment IN VARCHAR2
DEFAULT NULL, -- definer's comments
auto_commit IN BOOLEAN
DEFAULT TRUE, -- user commit not required
primary_instance IN BINARY_INTEGER DEFAULT 0, -- manage
queue in primary
secondary_instance IN BINARY_INTEGER DEFAULT 0, -- RAC
failover if possible
compatible IN VARCHAR2
DEFAULT NULL, -- lowest compatible version
non_repudiation IN BINARY_INTEGER DEFAULT 0,
secure IN BOOLEAN
DEFAULT FALSE);
*/ |
exec dbms_aqadm.create_queue_table(
queue_table =>
'rx_queue_table',
queue_payload_type => 'message_t',
storage_clause => 'PCTFREE 0 PCTUSED 99',
sort_list => 'ENQ_TIME',
multiple_consumers => TRUE,
comment =>
'Pharmacy queue table',
compatible => '10.0',
secure => FALSE);
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;
col object_name format a30
SELECT o.object_name, o.object_type, i.table_name
FROM user_objects o,user_indexes i
WHERE o.object_name = i.table_name (+)
ORDER BY 2, 1;
set linesize 121
col evaluation_function format a30
col evaluation_context_comment format a30
col table_name format a30
col user_comment format a30
-- examine evaluation context
SELECT * FROM user_evaluation_contexts;
-- examine evaluation context table
SELECT * FROM user_evaluation_context_tables;
-- examine queues
SELECT name, queue_table, user_comment
FROM user_queues;
-- examine sequence
SELECT sequence_name, min_value, max_value,
increment_by, cycle_flag, order_flag, cache_size
FROM user_sequences;
set describe depth 1
-- examine tables
desc RX_QUEUE_TABLE
set describe depth all
desc RX_QUEUE_TABLE
desc AQ$_RX_QUEUE_TABLE_G
desc AQ$_RX_QUEUE_TABLE_H
desc AQ$_RX_QUEUE_TABLE_I
desc AQ$_RX_QUEUE_TABLE_S
desc AQ$_RX_QUEUE_TABLE_T
desc RX_PROCESSED_DATA
SELECT table_name, iot_name
FROM user_tables;
-- examine views
desc AQ$RX_QUEUE_TABLE
desc AQ$RX_QUEUE_TABLE_S
-- create the rx_queue using the rx_queue_table
/* -- CREATE QUEUE syntax
dbms_aqadm.create_queue (
queue_name IN VARCHAR2,
-- queue's name
queue_table IN VARCHAR2,
-- previously defined queue table
queue_type IN
BINARY_INTEGER DEFAULT NORMAL_QUEUE, -- Normal or Exception
max_retries IN NUMBER
DEFAULT NULL, -- default is 2**31-1
retry_delay IN NUMBER
DEFAULT 0, -- in seconds
retention_time IN NUMBER
DEFAULT 0,
dependency_tracking IN BOOLEAN
DEFAULT FALSE, -- must be FALSE: the default
comment
IN VARCHAR2 DEFAULT NULL, --
definer's comment
auto_commit IN BOOLEAN
DEFAULT TRUE);
*/ |
exec dbms_aqadm.create_queue(
queue_name =>
'rx_queue',
queue_table =>
'rx_queue_table',
queue_type => dbms_aqadm.NORMAL_QUEUE,
max_retries => 3,
retry_delay => 0,
comment =>
'Prescription Queue');
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type
ORDER BY 1;
-- note QUEUE and RULE SET creation plus new views
SELECT name, queue_table, user_comment
FROM user_queues;
SELECT rule_set_name, rule_set_eval_context_owner, rule_set_eval_context_name
FROM user_rule_sets;
set long 10000
set pagesize 0
SELECT view_name, text
FROM user_views;
SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;
set pagesize 20
-- Start rx queue rx_queue
/* -- CREATE START_QUEUE syntax
dbms_aqadm.start_queue (
queue_name IN VARCHAR2,
enqueue IN BOOLEAN DEFAULT TRUE,
dequeue IN BOOLEAN DEFAULT TRUE);
*/ |
SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;
exec dbms_aqadm.start_queue(queue_name => 'RX_QUEUE');
SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;
-- create queue subscribers
/* -- GRANT QUEUE PRIVILEGE syntax
dbms_aqadm.grant_queue_privilege (
privilege IN VARCHAR2,
queue_name IN VARCHAR2,
grantee IN VARCHAR2,
grant_option IN BOOLEAN DEFAULT FALSE);
*/ |
BEGIN
dbms_aqadm.grant_queue_privilege('DEQUEUE', 'RX_QUEUE', 'pharm_a1', FALSE);
dbms_aqadm.grant_queue_privilege('ENQUEUE', 'RX_QUEUE', 'pharm_a1', FALSE);
dbms_aqadm.grant_queue_privilege('ENQUEUE', 'RX_QUEUE', 'icu_a1', FALSE);
dbms_aqadm.grant_queue_privilege('ENQUEUE', 'RX_QUEUE', 'nurse_a1', FALSE);
END;
/
-- where is this privilege information stored?
-- create queue subscribers
/* -- AQ$_AGENT TYPE definition
TYPE aq$_agent AS OBJECT
(
name VARCHAR2(30), --
name of message producer or consumer
address VARCHAR2(1024), -- Protocol-specific address of the recipient.
-- Must be in the form [schema.]queue[@dblink].
protocol NUMBER DEFAULT 0); -- must be 0, other values for internal use only
*/
/* -- ADD SUBSCRIBER syntax
dbms_aqadm.add_subscriber(
queue_name IN VARCHAR2,
-- name of queue
subscriber IN sys.aq$_agent,
-- name, address and, protocol
rule IN VARCHAR2 DEFAULT NULL,
-- conditional / similar to WHERE clause
transformation IN VARCHAR2 DEFAULT NULL -- message transformation rule
queue_to_queue IN BOOLEAN DEFAULT FALSE, -- TRUE indicates queue-to-queue messaging
delivery_mode IN PLS_INTEGER DEFAULT dbms_aqadm.persistent);
-- BUFFERED,
-- PERSISTENT_OR_BUFFERED, or PERSISTENT:
-- Not modifiable by ALTER_SUBSCRIBER
*/
|
DECLARE
subsc_t sys.aq$_agent;
subsc_addr VARCHAR2(1024) := 'AQADMIN.RX_QUEUE';
BEGIN
subsc_t := sys.aq$_agent('pharm_a1',
subsc_addr, 0);
dbms_aqadm.add_subscriber('rx_queue',
subsc_t);
subsc_t := sys.aq$_agent('icu_a1',
subsc_addr, 0);
dbms_aqadm.add_subscriber('rx_queue',
subsc_t);
subsc_t := sys.aq$_agent('nurse_a1',
subsc_addr, 0);
dbms_aqadm.add_subscriber('rx_queue',
subsc_t, 'priority >
10');
END;
/
-- create propagation schedule
/* -- CREATE SCHEDULE PROPAGATION syntax
dbms_aqadm.schedule_propagation (
queue_name IN VARCHAR2,
-- name of queue
destination IN VARCHAR2 DEFAULT NULL,
-- destination database link
start_time IN DATE DEFAULT
SYSDATE,
-- initial propagation start time
duration IN NUMBER DEFAULT NULL,
-- propagation window in seconds
next_time IN VARCHAR2 DEFAULT NULL,
-- date-time of next window
latency IN NUMBER
DEFAULT 60, -- maximum wait in seconds
destination_queue IN VARCHAR2 DEFAULT NULL); -- target queue name
and db link
*/ |
exec dbms_aqadm.schedule_propagation(queue_name=>'rx_queue', latency =>0);
/* -- DEQUEUE_OPTIONS_T definition
TYPE DEQUEUE_OPTIONS_T IS RECORD (
consumer_name VARCHAR2(30) DEFAULT NULL,
dequeue_mode BINARY_INTEGER DEFAULT REMOVE,
navigation BINARY_INTEGER DEFAULT NEXT_MESSAGE,
visibility BINARY_INTEGER DEFAULT ON_COMMIT,
wait BINARY_INTEGER DEFAULT FOREVER,
msgid RAW(16) DEFAULT NULL,
correlation VARCHAR2(128) DEFAULT NULL,
deq_condition VARCHAR2(4000) DEFAULT NULL,
signature aq$_sig_prop DEFAULT NULL,
transformation VARCHAR2(60) DEFAULT NULL,
delivery_mode PLS_INTEGER DEFAULT PERSISTENT);
*/
/* -- MESSAGE_PROPERTIES_T definition
TYPE MESSAGE_PROPERTIES_T IS RECORD (
priority BINARY_INTEGER DEFAULT 1,
delay BINARY_INTEGER DEFAULT
NO_DELAY,
expiration BINARY_INTEGER DEFAULT NEVER,
correlation VARCHAR2(128) DEFAULT NULL,
attempts BINARY_INTEGER,
recipient_list aq$_recipient_list_t,
exception_queue VARCHAR2(51) DEFAULT NULL,
enqueue_time DATE,
state
BINARY_INTEGER,
sender_id aq$_agent DEFAULT NULL,
original_msgid RAW(16) DEFAULT NULL);
*/
/* -- AQ$_RECIPIENT_LIST_T definition
TYPE SYS.AQ$_RECIPIENT_LIST_T IS TABLE OF sys.aq$_agent
INDEX BY BINARY_INTEGER;
*/
/* -- DEQUEUE syntax
dbms_aq.dequeue(
queue_name IN VARCHAR2,
dequeue_options IN dequeue_options_t,
message_properties OUT message_properties_t,
payload OUT <user_defined_data_type_name>
msgid OUT RAW);
*/ |
|
|
CREATE TABLE rx_inventory (
pharm_id VARCHAR2(3),
rx_name VARCHAR2(30),
rx_quantity NUMBER(5));
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Aspirin', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Compazine', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Prozac', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Lipitor', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Oxycontin', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'aaa', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'bbb', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'ccc', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Tetracycline', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Thorazine', 10);
COMMIT;
SELECT *
FROM rx_inventory;
GRANT SELECT, UPDATE ON rx_inventory TO pharm_a1;
|
|
-- create procedure to dequeue messages
CREATE OR REPLACE PROCEDURE demo_dequeue(appname
VARCHAR2)
AUTHID DEFINER IS
deq_msgid RAW(16);
dopt dbms_aq.dequeue_options_t;
mprop dbms_aq.message_properties_t;
payload_t message_t;
q_on_hand PLS_INTEGER;
no_messages EXCEPTION;
pragma exception_init(no_messages, -25228);
pragma autonomous_transaction;
BEGIN
dopt.consumer_name := appname;
dopt.dequeue_mode := dbms_aq.remove;
dopt.navigation := dbms_aq.first_message;
dopt.visibility := dbms_aq.immediate;
dopt.wait := 10;
-- add functionality with mprop.attempts and mprop.enqueue_time
dbms_aq.dequeue('aqadmin.rx_queue', dopt, mprop,
payload_t, deq_msgid);
SELECT rx_quantity
INTO q_on_hand
FROM rx_inventory
WHERE rx_name = payload_t.rx;
IF q_on_hand > 0 THEN
UPDATE rx_inventory
SET rx_quantity = rx_quantity - 1
WHERE rx_name = payload_t.rx;
ELSE
NULL;
-- send back a failure message.
END IF;
INSERT INTO rx_processed_data
(id, rx, source, processed_by, dt_processed)
VALUES
(payload_t.id, payload_t.rx, payload_t.source, APPNAME, SYSTIMESTAMP);
COMMIT;
EXCEPTION
WHEN no_messages THEN
dbms_output.put_line('All queue messages processed');
COMMIT;
END demo_dequeue;
/
CREATE OR REPLACE PUBLIC SYNONYM demo_dequeue FOR aqadmin.demo_dequeue; |
GRANT execute ON demo_dequeue TO pharm_a1;
conn pharm_a1/pharm_a1
CREATE SYNONYM demo_dequeue FOR aqadmin.demo_dequeue;
conn aqadmin/aqadmin
-- create procedure to enqueue messages
/* -- ENQUEUE_OPTIONS_T definition
TYPE ENQUEUE_OPTIONS_T IS RECORD (
visibility BINARY_INTEGER DEFAULT
ON_COMMIT,
relative_msgid RAW(16) DEFAULT NULL,
sequence_deviation BINARY_INTEGER DEFAULT NULL,
transformation VARCHAR2(60) DEFAULT NULL);
*/
/* -- MESSAGE_PROPERTIES_T definition
TYPE MESSAGE_PROPERTIES_T IS RECORD (
priority BINARY_INTEGER DEFAULT 1,
delay BINARY_INTEGER DEFAULT
NO_DELAY,
expiration BINARY_INTEGER DEFAULT NEVER,
correlation VARCHAR2(128) DEFAULT NULL,
attempts BINARY_INTEGER,
recipient_list aq$_recipient_list_t,
exception_queue VARCHAR2(51) DEFAULT NULL,
enqueue_time DATE,
state BINARY_INTEGER,
sender_id aq$_agent DEFAULT NULL,
original_msgid RAW(16) DEFAULT NULL);
*/
/* -- ENQUEUE syntax
dbms_aq.enqueue(
queue_name
IN VARCHAR2,
enqueue_options IN enqueue_options_t,
message_properties IN message_properties_t,
payload IN <user_defined_data_type>,
msgid OUT RAW);
*/
|
CREATE OR REPLACE PROCEDURE demo_enqueue(usermsg MESSAGE_T, urgency NUMBER)
AUTHID DEFINER IS
enq_msgid RAW(16);
eopt dbms_aq.enqueue_options_t;
mprop dbms_aq.message_properties_t;
aprop sys.aq$_agent;
pragma autonomous_transaction;
BEGIN
aprop := sys.aq$_agent(USER, NULL, 0);
IF urgency < 5 THEN
eopt.sequence_deviation := dbms_aq.top;
ELSE
eopt.sequence_deviation := NULL;
END IF;
mprop.priority := urgency;
mprop.sender_id := aprop;
IF urgency < 10 THEN
mprop.delay := dbms_aq.no_delay;
mprop.expiration := 300; -- push to exception
queue in 5 min.
ELSE
mprop.delay :=
1; -- one second delay
before sending
mprop.expiration := 1800; -- push to exception queue
in 30 min.
END IF;
dbms_aq.enqueue('aqadmin.rx_queue', eopt, mprop,
usermsg, enq_msgid);
COMMIT;
END demo_enqueue;
/
CREATE OR REPLACE PUBLIC SYNONYM demo_enqueue FOR aqadmin.demo_enqueue;
|
GRANT execute ON demo_enqueue TO icu_a1;
GRANT execute ON demo_enqueue TO nurse_a1;
conn icu_a1/icu_a1
CREATE SYNONYM demo_enqueue FOR aqadmin.demo_enqueue;
conn nurse_a1/nurse_a1
CREATE SYNONYM demo_enqueue FOR aqadmin.demo_enqueue;
|
-- order_rx MUST be assign priorities based on who ordered
what |
Enqueues Message Procedure |
conn aqadmin/aqadmin
CREATE OR REPLACE PROCEDURE order_rx IS
TYPE ixbb IS TABLE OF VARCHAR2(30)
INDEX BY BINARY_INTEGER;
TYPE ixbi IS TABLE OF NUMBER(2)
INDEX BY BINARY_INTEGER;
rxarray ixbb; -- the name of the rx
rxurgnt ixbi; -- the priority value for an rx
usermsg aqadmin.message_t;
x PLS_INTEGER;
priority PLS_INTEGER;
endcnt PLS_INTEGER;
BEGIN
rxarray(0):='ccc';
rxarray(1):='Aspirin';
rxarray(2):='Prozac';
rxarray(3):='Lipitor';
rxarray(4):='Tetracycline';
rxarray(5):='Thorazine';
rxarray(6):='Oxycontin';
rxarray(7):='Compazine';
rxarray(8):='aaa';
rxarray(9):='bbb';
rxurgnt(0) := 1;
rxurgnt(1) := 3;
rxurgnt(2) := 5;
rxurgnt(3) := 11;
rxurgnt(4) := 19;
rxurgnt(5) := 20;
rxurgnt(6) := 32;
rxurgnt(7) := 89;
rxurgnt(8) := 42;
rxurgnt(9) := 66;
IF USER = 'ICU_A1' THEN
endcnt := 100;
ELSE
endcnt := 30;
END IF;
FOR i IN 1..endcnt
LOOP
SELECT TO_NUMBER(SUBSTR(dbms_crypto.randominteger,3,1))
INTO x
FROM dual;
SELECT DECODE(x, 6, 2,
4, 5, 7,
9, 99)
INTO priority
FROM dual;
usermsg := aqadmin.message_t(i, rxarray(x), priority);
IF priority IN (2,5,9) THEN
demo_enqueue(usermsg, 5);
ELSE
demo_enqueue(usermsg, 11);
END IF;
IF USER = 'ICU_A1' THEN
dbms_lock.sleep(3);
ELSE
dbms_lock.sleep(1);
END IF;
END LOOP;
END order_rx;
/
CREATE PUBLIC SYNONYM order_rx FOR aqadmin.order_rx;
GRANT execute ON order_rx TO icu_a1;
GRANT execute ON order_rx TO nurse_a1;
|
|
Dequeues Message Procedure |
conn aqadmin/aqadmin
CREATE OR REPLACE PROCEDURE get_rx_order AUTHID DEFINER IS
qlist dbms_aq.aq$_agent_list_t;
agent_w_msg sys.aq$_agent;
listen_timeout EXCEPTION;
pragma exception_init(listen_timeout, -25254);
BEGIN
qlist(0) := sys.aq$_agent(USER,
'AQADMIN.RX_QUEUE',
NULL);
/* if retrieving message for multiple users simultaneously
example
qlist(0) := sys.aq$_agent('GenPharm', 'AQADMIN.RX_QUEUE',
NULL);
qlist(1) := sys.aq$_agent('ICUPharm', 'AQADMIN.RX_QUEUE',
NULL);
*/
LOOP
BEGIN
dbms_aq.listen(agent_list
=> qlist, wait => 15, agent => agent_w_msg);
IF agent_w_msg.name = USER THEN
demo_dequeue(USER);
ELSIF agent_w_msg.name = 'ICUPHARM' THEN
demo_dequeue('ICUPharm');
END IF;
EXCEPTION
WHEN listen_timeout THEN
EXIT;
END;
END LOOP;
END get_rx_order;
/
CREATE PUBLIC SYNONYM get_rx_order FOR aqadmin.get_rx_order;
GRANT execute ON get_rx_order TO pharm_a1; |
|
To Run Demo |
The following requires that you
simultaneously open four SQL*Plus sessions. In the first session log on as the ICU
and type step 2 but do not execute it. In the second session do the same thing as
the Nurses Station, and in the third as the Pharmacy. Again not executing the stored
procedure. Then log on as the aqadmin, set up the SQL*Plus environment and execute
one of the two queries then enter a slash (to repeat the query) but do not press the
<Enter> key.
Step |
ICU
order meds |
Nurses
Stn orders meds |
Fill orders |
1 |
conn icu_a1/icu_a1 |
conn
nurse_a1/nurse_a1 |
conn
pharm_a1/pharm_a1 |
2 |
exec order_rx |
exec order_rx |
exec get_rx_order |
conn aqadmin/aqadmin
SELECT COUNT(*) FROM rx_queue_table;
set linesize 131
col rx format a15
col processed_by format a12
SELECT * FROM rx_processed_data ORDER BY id;
SELECT * FROM rx_processed_data ORDER BY dt_processed; |
Then, with everything set up ... go to the first window and hit the <Enter> key, then
the second, then the third and finally the same in the AQADMIN session. Continue to monitor the AQADMIN session while
the ICU and Nurses order medications and the pharmacy fills them.
|
|
ICU Pharmacy - Dequeues
Messages and passes low priority message to the General Pharmacy |
conn icupharm/icupharm
set serveroutput on
DECLARE
qlist dbms_aq.aq$_agent_list_t;
agent_w_msg sys.aq$_agent;
start_tx NUMBER;
finish_tx NUMBER;
listen_timeout EXCEPTION;
pragma exception_init(listen_timeout, -25254);
BEGIN
qlist(0) := sys.aq$_agent('ICUPharm', 'aqadmin.rx_queue', NULL);
qlist(1) := sys.aq$_agent('GenPharm', 'aqadmin.rx_queue', NULL);
start_tx := dbms_utility.get_time;
LOOP
BEGIN
finish_tx := dbms_utility.get_time;
IF finish_tx - start_tx > 1200 THEN
EXIT;
END IF;
dbms_aq.listen(agent_list
=> qlist, wait => 30, agent => agent_w_msg);
-- IF agent_w_msg.name = 'PROG1' THEN
demo_dequeue('ICUPHARM');
-- ELSIF agent_w_msg.name = 'PROG2' THEN
-- demo_dequeue('prog2');
-- END IF;
EXCEPTION
WHEN listen_timeout THEN
NULL;
END;
END LOOP;
END;
/ |
|
Cleanup As AQADMIN |
conn aqadmin/aqadmin
-- add purge queue_table here
exec dbms_aqadm.purge_queue_tables('RX_QUEUE_TABLE',
queue_table IN VARCHAR2,
purge_condition IN VARCHAR2,
purge_options IN aq$_purge_options_t);
/* STOP_QUEUE syntax
dbms_aqadm.stop_queue (
queue_name IN VARCHAR2,
enqueue IN BOOLEAN DEFAULT TRUE,
dequeue IN BOOLEAN DEFAULT TRUE,
wait IN BOOLEAN DEFAULT TRUE);
*/
/* DROP_QUEUE sytnax
dbms_aqadm.drop_queue (
queue_name IN VARCHAR2,
auto_commit IN BOOLEAN DEFAULT TRUE);
*/
/* DROP_QUEUE_TABLE syntax
dbms_aqadm.drop_queue_table (
queue_table IN VARCHAR2,
force IN BOOLEAN DEFAULT FALSE,
auto_commit IN BOOLEAN DEFAULT TRUE);
*/ |
BEGIN
-- Stop Queue rx_queue
dbms_aqadm.stop_queue(queue_name => 'rx_queue',
wait => TRUE);
-- Drop Queue rx_queue
dbms_aqadm.drop_queue(queue_name => 'rx_queue');
-- Drop rx Queue Table
dbms_aqadm.drop_queue_table(queue_table =>
'rx_queue_table', force => TRUE);
END;
/
-- Verify queues is dropped
SELECT queue_name
FROM user_queues;
-- Verify queue table is dropped
SELECT table_name
FROM user_tables;
drop user pharm_a1 cascade;
drop user icu_a1 cascade;
drop user nurse_a1 cascade;
drop user aqadmin cascade;
|
|
|
-----
|