记录一次冷门技术oracle aq的使用
版本
oracle 11g
创建用户
-- 创建用户
create user testaq identified by 123456;
grant connect, resource to testaq;-- 创建aq所需要的权限
grant execute on dbms_aq to testaq;
grant execute on dbms_aqadm to testaq;
begindbms_aqadm.grant_system_privilege('enqueue_any', 'testaq', false);dbms_aqadm.grant_system_privilege('dequeue_any', 'testaq', false);
end;grant execute on dbms_aq to testaq;
grant resource to testaq;
grant connect to testaq;
grant execute any procedure to testaq;
grant aq_administrator_role to testaq;
grant aq_user_role to testaq;
grant execute on dbms_aqadm to testaq;
grant execute on dbms_aq to testaq;
grant execute on dbms_aqin to testaq;
grant create procedure to testaq;
grant create procedure to testaq with admin option;
创建列队表
begindbms_aqadm.create_queue_table(queue_table => 'testaq.xml_queue_table',queue_payload_type => 'SYS.XMLTYPE',multiple_consumers => false);
end;
创建列队及启动队列
begindbms_aqadm.create_queue (queue_name => 'testaq.xml_queue',queue_table => 'testaq.xml_queue_table');dbms_aqadm.start_queue(queue_name => 'testaq.xml_queue');
end;
停止及删除队列
begindbms_aqadm.stop_queue (queue_name => 'testaq.xml_queue');dbms_aqadm.drop_queue (queue_name => 'testaq.xml_queue');dbms_aqadm.drop_queue_table (queue_table => 'testaq.xml_queue_table');
end;
发送消息
declarer_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;v_message_handle RAW(16);o_payload testaq.test_queue_type;
begino_payload := testaq.test_queue_type('<ROOT><ROWSET><ROW><APPLYNO>test</APPLYNO></ROW></ROWSET></ROOT>');dbms_aq.enqueue(queue_name => 'testaq.test_queue',enqueue_options => r_enqueue_options,message_properties => r_message_properties,payload => o_payload,msgid => v_message_handle);commit;
end;
Java接收消息
oracle-aq:jdbcUrl: jdbc:oracle:thin:@localhost:1521:testaqusername: testaqpassword: 123456queueNameUser: testaqqueueName: xml_queue
@Component
@ConfigurationProperties(prefix = "oracle-aq")
@Data
public class OracleAqJmsConfig {private String jdbcUrl;private String username;private String password;private String queueNameUser;private String queueName;
}
import lombok.extern.slf4j.Slf4j;
import oracle.jms.AQjmsAdtMessage;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import oracle.xdb.XMLType;
import oracle.xdb.XMLTypeFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import javax.jms.*;
import javax.xml.bind.JAXBException;
import java.util.Properties;@Service
@Slf4j
public class TestOracleAq {@Autowiredprivate OracleAqJmsConfig config;@PostConstructpublic void messageListener() throws JMSException {QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(config.getJdbcUrl(), new Properties());QueueConnection conn = queueConnectionFactory.createQueueConnection(config.getUsername(), config.getPassword());AQjmsSession session = (AQjmsSession)conn.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);conn.start();Queue queue = (AQjmsDestination)session.getQueue(config.getQueueNameUser(), config.getQueueName());XMLTypeFactory factory = new XMLTypeFactory();MessageConsumer consumer = session.createConsumer(queue, null, factory, null, false);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;try {Object adtPayload = adtMessage.getAdtPayload();XMLType xmlType = (XMLType)adtPayload;saveXml(xmlType.getStringVal());log.info("接收到oracle aq数据:{}", xmlType.getStringVal());} catch (Exception e) {log.error("", e);}}});}public void saveXml(String xml) throws JAXBException {// todo ...}}
依赖
在oracle安装目录中查找这些依赖
<!-- oracle aq --><dependency><groupId>com.oracle</groupId><artifactId>ojdbc6</artifactId><version>11.1.0.7.0</version><scope>system</scope><systemPath>${project.basedir}/libs/ojdbc6.jar</systemPath>
</dependency>
<dependency><groupId>com.oracle</groupId><artifactId>jmscommon</artifactId><version>1.0</version><scope>system</scope><systemPath>${project.basedir}/libs/jmscommon.jar</systemPath>
</dependency>
<dependency><groupId>com.oracle</groupId><artifactId>orai18n</artifactId><version>11.1.0.7.0</version><scope>system</scope><systemPath>${project.basedir}/libs/orai18n.jar</systemPath>
</dependency>
<dependency><groupId>com.oracle</groupId><artifactId>jta</artifactId><version>1.0</version><scope>system</scope><systemPath>${project.basedir}/libs/jta.jar</systemPath>
</dependency>
<dependency><groupId>com.oracle</groupId><artifactId>aqapi_g</artifactId><version>1.0</version><scope>system</scope><systemPath>${project.basedir}/libs/aqapi_g.jar</systemPath>
</dependency>
<dependency><groupId>oracle.xdb</groupId><artifactId>xdb</artifactId><version>21.9.0.0</version><scope>system</scope><systemPath>${project.basedir}/libs/xdb.jar</systemPath>
</dependency>
<!-- oracle aq -->