通过JMS监听Oracle AQ,在数据库变化时触发执行J(2)

Java使用JMS进行相应的处理,需要使用Oracle提供的jar,在Oracle安装目录可以找到:在linux中可以使用find命令进行查找,例如

find `pwd` -name 'jmscommon.jar'

需要的jar为:

app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/jmscommon.jar

app/oracle/product/12.1.0/dbhome_1/jdbc/lib/ojdbc7.jar

app/oracle/product/12.1.0/dbhome_1/jlib/orai18n.jar

app/oracle/product/12.1.0/dbhome_1/jlib/jta.jar

app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/aqapi_g.jar

1. 创建连接参数类

实际使用时可以把参数信息配置在properties文件中,使用Spring进行注入。

package org.kevin.jms; /** * * @author 李文锴 * 连接参数信息 * */ public class JmsConfig { public String username = "c##kevin"; public String password = "a111111111"; public String jdbcUrl = "jdbc:oracle:thin:@127.0.0.1:1521:orcl"; public String queueName = "demo_queue"; }

1

2. 创建消息转换类

因为消息载荷是Oracle数据类型,需要提供一个转换工厂类将Oracle类型转换为Java类型。

package org.kevin.jms; import java.sql.SQLException; import oracle.jdbc.driver.OracleConnection; import oracle.jdbc.internal.OracleTypes; import oracle.jpub.runtime.MutableStruct; import oracle.sql.CustomDatum; import oracle.sql.CustomDatumFactory; import oracle.sql.Datum; import oracle.sql.STRUCT; /** * * @author 李文锴 * 数据类型转换类 * */ @SuppressWarnings("deprecation") public class QUEUE_MESSAGE_TYPE implements CustomDatum, CustomDatumFactory { public static final String _SQL_NAME = "QUEUE_MESSAGE_TYPE"; public static final int _SQL_TYPECODE = OracleTypes.STRUCT; MutableStruct _struct; // 12表示字符串 static int[] _sqlType = { 12 }; static CustomDatumFactory[] _factory = new CustomDatumFactory[1]; static final QUEUE_MESSAGE_TYPE _MessageFactory = new QUEUE_MESSAGE_TYPE(); public static CustomDatumFactory getFactory() { return _MessageFactory; } public QUEUE_MESSAGE_TYPE() { _struct = new MutableStruct(new Object[1], _sqlType, _factory); } public Datum toDatum(OracleConnection c) throws SQLException { return _struct.toDatum(c, _SQL_NAME); } public CustomDatum create(Datum d, int sqlType) throws SQLException { if (d == null) return null; QUEUE_MESSAGE_TYPE o = new QUEUE_MESSAGE_TYPE(); o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory); return o; } public String getContent() throws SQLException { return (String) _struct.getAttribute(0); } } 3. 主类进行消息处理 package org.kevin.jms; import java.util.Properties; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.Session; import oracle.jms.AQjmsAdtMessage; import oracle.jms.AQjmsDestination; import oracle.jms.AQjmsFactory; import oracle.jms.AQjmsSession; /** * * @author 李文锴 消息处理类 * */ public class Main { public static void main(String[] args) throws Exception { JmsConfig config = new JmsConfig(); QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(config.jdbcUrl, new Properties()); QueueConnection conn = queueConnectionFactory.createQueueConnection(config.username, config.password); AQjmsSession session = (AQjmsSession) conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); Queue queue = (AQjmsDestination) session.getQueue(config.username, config.queueName); MessageConsumer consumer = session.createConsumer(queue, null, QUEUE_MESSAGE_TYPE.getFactory(), null, false); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { System.out.println("ok"); AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message; try { QUEUE_MESSAGE_TYPE payload = (QUEUE_MESSAGE_TYPE) adtMessage.getAdtPayload(); System.out.println(payload.getContent()); } catch (Exception e) { e.printStackTrace(); } } }); Thread.sleep(1000000); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/9542b293304c08ac14892a3206a0278d.html