通过JMS监听Oracle AQ,在数据库变化时触发执行J

本实验环境基于Oracle 12C和JDK1.8,其中Oracle 12C支持多租户特性,相较于之前的Oracle版本,使用‘C##用户名‘表示用户,例如如果数据库用户叫kevin,则登陆时使用C##kevin进行登陆。

一、Oracle高级消息队列AQ

Oracle AQ是Oracle中的消息队列,是Oracle中的一种高级应用,每个版本都在不断的加强,使用DBMS_AQ系统包进行相应的操作,是Oracle的默认组件,只要安装了Oracle数据库就可以使用。使用AQ可以在多个Oracle数据库、Oracle与Java、C等系统中进行数据传输。

下面分步骤说明如何创建Oracle AQ

1. 创建消息负荷payload

Oracle AQ中传递的消息被称为有效负荷(payloads),格式可以是用户自定义对象或XMLType或ANYDATA。本例中我们创建一个简单的对象类型用于传递消息。

create type demo_queue_payload_type as object (message varchar2(4000)); 2. 创建队列表

队列表用于存储消息,在入队时自动存入表中,出队时自动删除。使用DBMS_AQADM包进行数据表的创建,只需要写表名,同时设置相应的属性。对于队列需要设置multiple_consumers为false,如果使用发布/订阅模式需要设置为true。

begin dbms_aqadm.create_queue_table( queue_table => 'demo_queue_table', queue_payload_type => 'demo_queue_payload_type', multiple_consumers => false ); end;

执行完后可以查看oracle表中自动生成了demo_queue_table表,可以查看影响子段(含义比较清晰)。

3. 创建队列并启动

创建队列并启动队列:

begin dbms_aqadm.create_queue ( queue_name => 'demo_queue', queue_table => 'demo_queue_table' ); dbms_aqadm.start_queue( queue_name => 'demo_queue' ); end;

至此,我们已经创建了队列有效负荷,队列表和队列。可以查看以下系统创建了哪些相关的对象:

SELECT object_name, object_type FROM user_objects WHERE object_name != 'DEMO_QUEUE_PAYLOAD_TYPE'; OBJECT_NAME OBJECT_TYPE ------------------------------ --------------- DEMO_QUEUE_TABLE TABLE SYS_C009392 INDEX SYS_LOB0000060502C00030$$ LOB AQ$_DEMO_QUEUE_TABLE_T INDEX AQ$_DEMO_QUEUE_TABLE_I INDEX AQ$_DEMO_QUEUE_TABLE_E QUEUE AQ$DEMO_QUEUE_TABLE VIEW DEMO_QUEUE QUEUE

1

我们看到一个队列带出了一系列自动生成对象,有些是被后面直接用到的。不过有趣的是,创建了第二个队列。这就是所谓的异常队列(exception queue)。如果AQ无法从我们的队列接收消息,将记录在该异常队列中。

消息多次处理出错等情况会自动转移到异常的队列,对于异常队列如何处理目前笔者还没有找到相应的写法,因为我使用的场景并不要求消息必须一对一的被处理,只要起到通知的作用即可。所以如果消息转移到异常队列,可以执行清空队列表中的数据

delete from demo_queue_table; 4. 队列的停止和删除

如果需要删除或重建可以使用下面的方法进行操作:

BEGIN DBMS_AQADM.STOP_QUEUE( queue_name => 'demo_queue' ); DBMS_AQADM.DROP_QUEUE( queue_name => 'demo_queue' ); DBMS_AQADM.DROP_QUEUE_TABLE( queue_table => 'demo_queue_table' ); END; 5. 入队消息

入列操作是一个基本的事务操作(就像往队列表Insert),因此我们需要提交。

declare r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); o_payload demo_queue_payload_type; begin o_payload := demo_queue_payload_type('what is you name ?'); dbms_aq.enqueue( queue_name => 'demo_queue', enqueue_options => r_enqueue_options, message_properties => r_message_properties, payload => o_payload, msgid => v_message_handle ); commit; end;

通过SQL语句查看消息是否正常入队:

select * from aq$demo_queue_table; select user_data from aq$demo_queue_table; 6. 出队消息

使用Oracle进行出队操作,我没有实验成功(不确定是否和DBMS_OUTPUT的执行权限有关),代码如下,读者可以进行调试:

declare r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); o_payload demo_queue_payload_type; begin DBMS_AQ.DEQUEUE( queue_name => 'demo_queue', dequeue_options => r_dequeue_options, message_properties => r_message_properties, payload => o_payload, msgid => v_message_handle ); DBMS_OUTPUT.PUT_LINE( '***** Browse message is [' || o_payload.message || ']****' ); end; 二、Java使用JMS监听并处理Oracle AQ队列

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/9542b293304c08ac14892a3206a0278d.html