|
Oracle AQ Demo 2 |
| Version 10.1 |
| Setup As SYS |
| CONN / AS SYSDBA CREATE USER aquser IDENTIFIED BY aquser DEFAULT TABLESPACE uwdata TEMPORARY TABLESPACE temp QUOTA 0 ON SYSTEM QUOTA 10M ON uwdata; GRANT create session TO aquser; GRANT create procedure TO aquser; GRANT create session TO aquser; GRANT create table TO aquser; GRANT create type TO aquser; GRANT aq_administrator_role TO aquser; GRANT EXECUTE ON dbms_aq TO aquser; GRANT EXECUTE ON dbms_aqadm TO aquser; |
| Setup As AQUSER |
| CONN aquser/aquser SELECT object_type, COUNT(*) FROM user_objects GROUP BY object_type; CREATE SEQUENCE message_seq INCREMENT BY 1 START WITH 1000; SELECT object_type, COUNT(*) FROM user_objects GROUP BY object_type; CREATE TYPE message_type AS OBJECT ( message_id NUMBER(15), subject VARCHAR2(100), text VARCHAR2(100), dollar_value NUMBER(4,2)); / SELECT object_type, COUNT(*) FROM user_objects GROUP BY object_type; BEGIN /* ** +----------------------------------------------------------------------------+ ** | CREATE QUEUE TABLE | ** | -------------------------------------------------------------------------- | ** | -> (QUEUE_TABLE) - Name of the queue table to create. | ** | -> (QUEUE_PAYLOAD_TYPE) - Name of either the object type or RAW. | ** | | ** | NOTE: The "create_queue_table" procedure will also create the following | ** | items: | ** | | ** | AQ$QUEUE_TABLE_NAME - A read-only view for information on the queue | ** | table. | ** | AQ$QUEUE_TABLE_NAME_E - A default exception queue. | ** | AQ$QUEUE_TABLE_NAME_T - An index for time manager operations. | ** | AQ$QUEUE_TABLE_NAME_I - An index or index-organized table to handle | ** | dequeueing on queues with multiple consumers. | ** | | ** | ADDITIONAL NOTES: | ** | - Payload type can be either RAW or as a custom object type. | ** | - Maximum payload size is 32 KB. | ** | - When a user-defined object type is being used as a payload, the | ** | maximum number of attributes allowed for the object type is 900. | ** | - Messages must be in READY state to be dequeued unless a MSGID value is | ** | specified. Messages can be sorted for dequeue based on msgid or | ** | correlation values. | ** | | ** +----------------------------------------------------------------------------+ */ dbms_aqadm.create_queue_table ( queue_table => 'aquser.msg_qt', queue_payload_type => 'aquser.message_type'); /* ** +----------------------------------------------------------------------------+ ** | CREATE QUEUE | ** | -------------------------------------------------------------------------- | ** | -> (QUEUE_NAME) - Name of the queue to create and place in the | ** | queue table (below). | ** | -> (QUEUE_TABLE) - Name of the queue table to store queue the (above) | ** | named queue in. | ** | -> (QUEUE_TYPE) - Type of queue to create. The types of queues to | ** | are NORMAL queues and EXCEPTION queues. | ** | -> (MAX_RETRIES) - Used to limit the number of times a dequeue with the | ** | REMOVE mode can be attempted on the message. The | ** | default is 0 which allows no retries. When the | ** | maximum number of retries is reached, the message is | ** | moved to the exception queue. The value is | ** | incremented when the application issues a rollback | ** | after executing the dequeue. | ** | -> (RETRY_DELAY) - Specifies the delay time, in seconds, before the | ** | message is scheduled for processing again after an | ** | application rollback. The default value is 0, which | ** | allows a message to be retried as soon as possible. | ** | If MAX_RETRIES is set to 0, the RETRY_DELAY argument | ** | will have no effect. | ** | -> (RETENTION_TIME) - | ** | -> (DEPENDENCY_TRACKING) - | ** | -> (COMMENT) - Assign a comment to the queue. | ** | -> (AUTO_COMMIT) - If you set the AUTO_COMMIT argument to 'TRUE', the | ** | current transaction, if any, will be committed | ** | before the operation is carried out. This is the | ** | default action. If you set the AUTO_COMMIT argument | ** | to 'FALSE', the operation will be part of the | ** | current transaction and will become persistent only | ** | when the user issues a COMMIT. | ** +----------------------------------------------------------------------------+ */ dbms_aqadm.create_queue( queue_name => 'msg_queue', queue_table => 'aquser.msg_qt', queue_type => DBMS_AQADM.NORMAL_QUEUE, max_retries => 0, retry_delay => 0, retention_time => 1209600, dependency_tracking => FALSE, comment => 'Test Object Type Queue', auto_commit => FALSE); -- start queue dbms_aqadm.start_queue('msg_queue'); END; / SELECT object_type, COUNT(*) FROM user_objects GROUP BY object_type; |
| Dequeue User |
| CONN aquser/aquser set serveroutput on DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aquser.message_type; BEGIN /* ** +----------------------------------------------------------------------------------+ ** | DEQUEUE OPTIONS | ** | -------------------------------------------------------------------------------- | ** | -> (CONSUMER_NAME) - Name of the application, process or user receiving the | ** | message. Should be NULL for queues not set up to handle | ** | more than one consumer. | ** | -> (DEQUEUE_MODE) - Specifies locks, if any, to be acquired on the message by | ** | the dequeue() process. Can be: | ** | | ** | BROWSE : For "read-only" access similar to that used in | ** | "select" statements. | ** | LOCKED : For the ability to write to the message during | ** | the transaction, similar to a share lock | ** | acquired in a "select for update" statement. | ** | REMOVE : For the ability to read the message, | ** | updating it or deleting it. The message is | ** | retained according to properties set in queue | ** | table creation. | ** | | ** | -> (NAVIGATION) - Determines the position of the message to be retrieved, | ** | the first step in retrieving messages. The second step is | ** | applying search criteria. The navigation variable can have | ** | one of three values: | ** | | ** | NEXT_MESSAGE : Is used for retrieving the next | ** | message available that matches search | ** | criteria. | ** | NEXT_TRANSACTION : Is used to skip remaining messages in | ** | the current transaction group and | ** | retrieve the first message of the | ** | next transaction group. | ** | FIRST_MESSAGE : Used to retrieve the first message | ** | that fits the search criteria, | ** | resetting the position to the | ** | beginning of queue. | ** | -> (VISIBILITY) - Defines visibility of the message within the transaction | ** | of the application dequeueing it. Values are: | ** | | ** | ON_COMMIT : If the message dequeued is part of | ** | the current transaction. | ** | IMMEDIATE : If the message is its own transaction.| ** | | ** | -> (WAIT) - Specifies how long to wait if an attempt is made to | ** | enqueue() a message and there is no message to retrieve. | ** | Values are: | ** | | ** | FOREVER : Wait forever. | ** | NO_WAIT : Do not wait for any message. | ** | num : Number that represents the number of | ** | seconds it will wait. | ** | | ** | -> (MSGID) - The message identifier for the message to be dequeued. If | ** | specified, the message will be dequeued even if expired. | ** | -> (CORRELATION) - The name of the message to be dequeued. | ** | | ** +----------------------------------------------------------------------------------+ */ dequeue_options.CONSUMER_NAME := NULL; dequeue_options.DEQUEUE_MODE := DBMS_AQ.REMOVE; dequeue_options.NAVIGATION := DBMS_AQ.NEXT_MESSAGE; dequeue_options.VISIBILITY := DBMS_AQ.IMMEDIATE; dequeue_options.WAIT := DBMS_AQ.FOREVER; dequeue_options.MSGID := null; dequeue_options.CORRELATION := 'Jeff, Melody and Alex Message'; dbms_output.put_line('+-----------------+'); dbms_output.put_line('| DEQUEUE OPTIONS |'); dbms_output.put_line('+-----------------+'); dbms_output.put_line(' -> CONSUMER_NAME := ' || NVL(dequeue_options.CONSUMER_NAME, -999)); dbms_output.put_line(' -> DEQUEUE_MODE := ' || NVL(dequeue_options.DEQUEUE_MODE, -999)); dbms_output.put_line(' -> NAVIGATION := ' || NVL(dequeue_options.NAVIGATION, -999)); dbms_output.put_line(' -> VISIBILITY := ' || NVL(dequeue_options.VISIBILITY, -999)); dbms_output.put_line(' -> WAIT := ' || NVL(dequeue_options.WAIT, -999)); dbms_output.put_line(' -> MSGID := ' || NVL(dequeue_options.MSGID, '<null>')); dbms_output.put_line(' -> CORRELATION := ' || NVL(dequeue_options.CORRELATION, '<null>')); /* ** +------------------------------------+ ** | PRINT MESSAGE PROPERTIES | ** +------------------------------------+ */ dbms_output.put_line('+--------------------+'); dbms_output.put_line('| MESSAGE PROPERTIES |'); dbms_output.put_line('+--------------------+'); dbms_output.put_line(' -> PRIORITY := ' || NVL(message_properties.PRIORITY, -999)); dbms_output.put_line(' -> DELAY := ' || NVL(message_properties.DELAY, -999)); dbms_output.put_line(' -> EXPIRATION := ' || NVL(message_properties.EXPIRATION, -999)); dbms_output.put_line(' -> CORRELATION := ' || NVL(message_properties.CORRELATION, -999)); dbms_output.put_line(' -> ATTEMPTS := ' || NVL(message_properties.ATTEMPTS, -999)); dbms_output.put_line(' -> EXCEPTION_QUEUE := ' || NVL(message_properties.EXCEPTION_QUEUE, -999)); dbms_output.put_line(' -> ENQUEUE_TIME := ' || message_properties.ENQUEUE_TIME); dbms_output.put_line(' -> STATE := ' || NVL(message_properties.STATE, -999)); /* ** +------------------------------------+ ** | DEQUEUE THE MESSAGE | ** +------------------------------------+ */ dbms_aq.dequeue(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); /* ** +------------------------------------+ ** | PRINT THE DEQUEUED MESSAGE PAYLOAD | ** +------------------------------------+ */ dbms_output.put_line('+-----------------+'); dbms_output.put_line('| MESSAGE PAYLOAD |'); dbms_output.put_line('+-----------------+'); dbms_output.put_line('- Message ID := ' || message.message_id); dbms_output.put_line('- Subject := ' || message.subject); dbms_output.put_line('- Message := ' || message.text); dbms_output.put_line('- Dollar Value := ' || message.dollar_value); COMMIT; END; / |
| Enqueue User |
| CONN aquser/aquser set serveroutput on DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; -- subscriber dbms_aq.aq$_recipient_list_t; message_handle RAW(16); message aquser.message_type; message_id NUMBER; BEGIN /* ** +----------------------------------------------------------------------------+ ** | GET NEXT MESSAGE ID | ** +----------------------------------------------------------------------------+ */ SELECT message_seq.nextval INTO message_id FROM dual; /* ** +----------------------------------------------------------------------------+ ** | ASSIGN MESSAGE | ** +----------------------------------------------------------------------------+ */ message := message_type(message_id, 'EXAMPLE MESSAGE', 'This is a sample message.', 10.2); /* ** +----------------------------------------------------------------------------------+ ** | ENQUEUE OPTIONS | ** | -------------------------------------------------------------------------------- | ** | -> (VISIBILITY) - Defines transactional behaviour of the queued | ** | request. Can be set to: | ** | | ** | ON_COMMIT : The enqueued message is part of the | ** | current transaction and that the | ** | operation will be complete when the | ** | transaction commits. The default | ** | value is "ON_COMMIT". | ** | IMMEDIATE : the enqueued message is its own | ** | transaction, not part of the current | ** | transaction. | ** | | ** | -> (RELATIVE_MSGID) - Only relevant when "BEFORE" is used in | ** | "sequence_deviation" (below). This variable | ** | defines the message identifier referenced in | ** | "sequence_deviation". | ** | | ** | -> (SEQUENCE_DEVIATION) - Identifies whether the message enqueued should be | ** | dequeued before other messages in the queue. Values | ** | permitted: | ** | | ** | BEFORE : This message should be dequeued | ** | before the message defined by | ** | "RELATIVE_MSGID" (above). | ** | TOP : This message is dequeued before any | ** | other messages. | ** | null : Says this message is dequeued in | ** | regular order. NULL is the default | ** | value. | ** | | ** +----------------------------------------------------------------------------------+ */ enqueue_options.VISIBILITY := DBMS_AQ.ON_COMMIT; -- enqueue_options.RELATIVE_MSGID := '02AB9AD2F4859C5'; enqueue_options.SEQUENCE_DEVIATION := null; dbms_output.put_line('+-----------------+'); dbms_output.put_line('| ENQUEUE OPTIONS |'); dbms_output.put_line('+-----------------+'); dbms_output.put_line(' -> VISIBILITY := ' || NVL(enqueue_options.VISIBILITY, -999)); --dbms_output.put_line(' -> RELATIVE_MSGID := ' ||NVL(enqueue_options.RELATIVE_MSGID,'<null>')); dbms_output.put_line(' -> SEQUENCE_DEVIATION:=' ||NVL(enqueue_options.SEQUENCE_DEVIATION, -999)); /* ** +----------------------------------------------------------------------------------+ ** | MESSAGE PROPERTIES | ** | -------------------------------------------------------------------------------- | ** | -> (PRIORITY) - Specifies the prioriy of the message numerically. Both | ** | negatives and positives are allowed; the lower the | ** | number, the higher the priority. | ** | | ** | -> (DELAY) - Identifies a delay, in seconds, during which time the | ** | message may not be dequeued. Alternately, "NO_DELAY" | ** | may be specified for this variable. It relies on the | ** | setting of the time manager. | ** | | ** | -> (EXPIRATION) - Defines how long the message is available for | ** | dequeueing, in seconds, after which time the message | ** | expires. Alternately, "NEVER" may be specified for | ** | this variable. | ** | | ** | -> (CORRELATION) - Identifies the message with a name. | ** | | ** | -> (ATTEMPTS) - Number of times other consumers attempted to | ** | dequeue() the message. This is not set at time of | ** | enqueue(). | ** | | ** | -> (RECIPIENT_LIST) - Can be used only for queues allowing multiple consumers. | ** | Default recipients are the subscribers to the queue. | ** | Values for this variable cannot be returned in a | ** | dequeue(). The recipient list can be defined with | ** | variable, of type SYS.AQ$_AGENT, which takes three | ** | variables: name, address and protocol, of datatypes | ** | VARCHAR2, VARCHAR2 and NUMBER, respectively. | ** | | ** | -> (EXCEPTION_QUEUE) - Messages moved to the exception queue after value for | ** | "expire" has passed, or if "attempts" exceeded the | ** | maximum number of attempts allowed for the queue. | ** | | ** | -> (ENQUEUE_TIME) - Set internally by the system as the time "enqueue()" | ** | deposited the message. | ** | | ** | -> (STATE) - The current state of the message. This has four | ** | possible values: | ** | | ** | WAITING - If the message is still in delay. | ** | READY - If the message can be obtained via | ** | dequeue(). | ** | PROCESSED - If the message is processed and retained. | ** | EXPIRED - If the message moved to the location | ** | defined by exception queue. | ** | | ** +----------------------------------------------------------------------------------+ */ message_properties.PRIORITY := -5; message_properties.DELAY := DBMS_AQ.NO_DELAY; message_properties.EXPIRATION := DBMS_AQ.NEVER; message_properties.CORRELATION := 'Jeff, Melody and Alex Message'; -- message_properties.ATTEMPTS := (Not set at time of enqueue); -- subscriber(1) := SYS.AQ$_AGENT('JEFF', null, null); -- subscriber(2) := SYS.AQ$_AGENT('MELODY', null, null); -- subscriber(3) := SYS.AQ$_AGENT('ALEX', null, null); -- message_properties.RECIPIENT_LIST := subscriber; -- message_properties.EXCEPTION_QUEUE := 'AQ$MSG_QT_E'; -- message_properties.ENQUEUE_TIME := (Not set by user); -- message_properties.STATE := (Not set by user); dbms_output.put_line('+--------------------+'); dbms_output.put_line('| MESSAGE PROPERTIES |'); dbms_output.put_line('+--------------------+'); dbms_output.put_line(' -> PRIORITY := ' || NVL(message_properties.PRIORITY, -999)); dbms_output.put_line(' -> DELAY := ' || NVL(message_properties.DELAY, -999)); dbms_output.put_line(' -> EXPIRATION := ' || NVL(message_properties.EXPIRATION, -999)); dbms_output.put_line(' -> CORRELATION := ' || NVL(message_properties.CORRELATION, -999)); dbms_output.put_line(' -> ATTEMPTS := ' || '(Not set at time of enqueue)' ); dbms_output.put_line(' -> EXCEPTION_QUEUE := ' || NVL(message_properties.EXCEPTION_QUEUE, -999)); dbms_output.put_line(' -> ENQUEUE_TIME := ' || '(Not set by user)' ); dbms_output.put_line(' -> STATE := ' || '(Not set by user)' ); /* ** +------------------------------------+ ** | ENQUEUE THE MESSAGE | ** +------------------------------------+ */ dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; / |