Thursday, 20 October 2011

Oracle JMS Dequeue

Read and test the enqueue before attempting the dequeue;
http://bluefrog-oracle.blogspot.com/2011/10/oracle-jms-enqueue.html
 

SQL> create or replace and compile java source named BasicAQDequeue as
  2  import java.sql.Connection;
  3  import java.sql.Driver;
  4  import java.sql.DriverManager;
  5  import javax.jms.Queue;
  6  import javax.jms.QueueConnection;
  7  import javax.jms.QueueConnectionFactory;
  8  import javax.jms.QueueReceiver;
  9  import javax.jms.QueueSession;
 10  import javax.jms.Message;
 11  import javax.jms.TextMessage;
 12  import javax.jms.Session;
 13  import oracle.jms.AQjmsFactory;
 14  import java.sql.PreparedStatement;
 15  import java.sql.ResultSet;
 16  import oracle.jdbc.*;
 17  import oracle.jdbc.aq.*;
 18  import oracle.jms.*;
 19 
 20  public class BasicAQDequeue {
 21 
 22      private QueueReceiver     consumer;
 23      private Queue             queue;
 24      private QueueConnection   consumerConnection;
 25      private QueueSession      consumerSession;
 26      private Connection        conn;
 27      private static final String   QUEUE_NAME  = "JMS_TEXT_QUE";
 28      private static final int      ACK_MODE    = QueueSession.AUTO_ACKNOWLEDGE;
 29      private static final boolean  TRANSACTED  = true;
 30 
 31      /* Creates and initializes the test. */
 32      public BasicAQDequeue() throws Exception {
 33          super();
 34          // init consumer session - Use Oracle default thin driver connection
 35          conn                = DriverManager.getConnection("jdbc:default:connection:");
 36          consumerConnection  = AQjmsQueueConnectionFactory.createQueueConnection(conn);
 37          consumerConnection.start();
 38          consumerSession     = consumerConnection.createQueueSession(TRANSACTED,ACK_MODE);
 39          queue               = consumerSession.createQueue(QUEUE_NAME);
 40          consumer            = consumerSession.createReceiver(queue);
 41      }
 42      public static void main (String args[]) throws Exception
 43        {
 44          BasicAQDequeue qTest  = new BasicAQDequeue();
 45          qTest.run();
 46          qTest.cleanup();
 47        }
 48      /** Closes the JMS connections.throws Exception */
 49      private void cleanup() throws Exception {
 50          // cleanup consumer
 51          System.out.println("Start cleanup.");
 52          consumerConnection.stop();
 53          consumer.close();
 54          consumerSession.close();
 55          consumerConnection.close();
 56          System.out.println("Done");
 57      }
 58      /* The message send. Throws Exception */
 59      private void run() throws Exception {
 60 
 61          TextMessage message;
 62          message = (TextMessage)consumer.receive(1000);
 63          consumerSession.commit();
 64          System.out.println("Message received: " + message.getText());
 65      }
 66  }
 67  /

Java created.

SQL> create or replace procedure BasicAQDequeue as
  2  language java name 'BasicAQDequeue.main(java.lang.String[])';
  3  /

Procedure created.

SQL> exec dbms_java.set_output(10000);

PL/SQL procedure successfully completed.

SQL> set serveroutput on
SQL> begin
  2    BasicAQEnqueue;
  3  end;
  4  /
Message sent: Test Message 1
Start cleanup.
Done

PL/SQL procedure successfully completed.

No comments:

Post a Comment