Friday, 21 October 2011

PL/SQL AQ Notification


Beware of the following bugs when using PL/SQL AQ Notifications:

Available on Metalink:
11g
1. Bug 2556939: AQ PL/SQL NOTIFICATION DOES
   NOT WORK WHEN JOB_QUEUE_PROCESSES<4

10g and 11g
1. AQ PL/SQL Notification reporting ORA-06502 when schema.queue_name
   length is greater than 30 characters [ID 465220.1]

2. AQ PL/SQL Notification No Longer Work Due To "register_driver()"
   Jobs Not Terminating [ID 331372.1]

3. Procedure invoked by AQ PL/SQL notification is executed as SYS not
   the Enqueuer [ID 552771.1]

10g only
1. Natively Compiled PL/SQL is not executed properly when using Advanced
   Queueing PL/SQL Notification in a RAC environment [ID 434024.1]

2. ORA-25263 produced in Job Queue Process trace files when using AQ
   PL/SQL Notification [ID 302659.1]

Read this for a better understanding of PL/SQL notification.
1. What Is The Advanced Queuing Asynchronous
   Notification Feature[ID 235397.1]


variable queue_user varchar2(30);
-- MODIFY THE USERNAME TO MATCH THE SCHEMA to test on
exec :queue_user := '&Schema_name';


Ensure schema has the AQ_ADMINISTRATOR_ROLE and the AQ_USER_ROLE granted from SYS.

Create Queue first
/* Create a user defined data type first */
create or replace type ty_Msg as object
(
  id     number
 ,msg    varchar2(100)
);
/

set serveroutput on
begin
  dbms_aqadm.create_queue_table
    (queue_table         => 't_aq_ems_queue'
    ,queue_payload_type  => 'ty_Msg'
    ,multiple_consumers  => true
    ,comment             => 'Multiconsumer queue table');

  dbms_output.put_line('Queue table T_AQ_EMS_QUEUE created');

  exception
    when others then
      dbms_output.put_line('Failure ' || sqlerrm);
end;
/


set serveroutput on
begin

  dbms_aqadm.create_queue(queue_name  => 'aq_ems_queue'
                         ,queue_table => 't_aq_ems_queue');

  dbms_output.put_line('Queue AQ_EMS_QUEUE sucessfully created');

  exception
    when others then
      dbms_output.put_line('Failure ' || sqlerrm);
end;
/


Create the Notification Routines next
-- Create dummy log table to prove our Notification routines were
-- successfully called, by inserting into LOG_MSG table within
-- each routine.
create table log_msg (msg varchar2(200)
                     ,t timestamp default current_timestamp)
/
-- Both of the following two procedures, once registered, will
-- automatically be invoked when a successful en-queue occurs.
create or replace procedure 

notify_Proc1( context   raw
             ,reginfo   sys.aq$_reg_info
             ,descr     sys.aq$_descriptor
             ,payload   raw
             ,payloadl  number) as

  dequeue_options     dbms_aq.dequeue_options_t;
  message_properties  dbms_aq.message_properties_t;
  message_handle      raw(16);
  message             ty_Msg;

begin
   dequeue_options.msgid          := descr.msg_id;
   dequeue_options.consumer_name  := descr.consumer_name;

   dbms_aq.dequeue(queue_name         => descr.queue_name,
                   dequeue_options    => dequeue_options,
                   message_properties => message_properties,
                   payload            => message,
                   msgid              => message_handle);

   insert into Log_Msg (Msg)
   values (Message.Msg || ' ' || reginfo.name);
   commit;

end notify_Proc1;
/
create or replace procedure notify_Proc2

( context   raw
 ,reginfo   sys.aq$_reg_info
 ,descr     sys.aq$_descriptor
 ,payload   raw
 ,payloadl  number) as

  dequeue_options     dbms_aq.dequeue_options_t;
  message_properties  dbms_aq.message_properties_t;
  message_handle      raw(16);
  message             ty_Msg;

begin
  dequeue_options.msgid          := descr.msg_id;
  dequeue_options.consumer_name  := descr.consumer_name;

  dbms_aq.dequeue(queue_name         => descr.queue_name,
                  dequeue_options    => dequeue_options,
                  message_properties => message_properties,
                  payload            => message,
                  msgid              => message_handle);

  insert into Log_Msg (Msg)
  values (Message.Msg || ' ' || reginfo.name);
  commit;

end notify_Proc2;
/

set serveroutput on
declare
  l_Proc_Name1 varchar2(30) := 'notify_Proc1';
  l_Proc_Name2 varchar2(30) := 'notify_Proc2';
begin
  dbms_aq.register
    (sys.aq$_reg_info_list
      (sys.aq$_reg_info(:queue_user || '.aq_ems_queue:SUB1'
              ,dbms_aq.namespace_aq
              ,'plsql://' || :queue_user ||'.'|| l_Proc_Name1
              ,hextoraw('FF')
                       )
    ,  sys.aq$_reg_info(:queue_user || '.aq_ems_queue:SUB2'
              ,dbms_aq.namespace_aq
              ,'plsql://' || :queue_user ||'.'|| l_Proc_Name2
              ,hextoraw('FF') )
      )
 , 2); -- Indicates number of notifiers

 dbms_output.put_line('Procedures '
                    || l_Proc_Name1
                    || ' and '
                    || l_Proc_Name2
                    || ' have been registered');
end;
/


Finally, the test
Ensure the queue is started first
exec dbms_aqadm.start_queue(queue_name=>:queue_user||'.aq_ems_queue');


SQL> truncate table log_msg;

Table truncated.

SQL> set serveroutput on
SQL> declare
  2    enqueue_options    dbms_aq.enqueue_options_t;
  3    message_properties dbms_aq.message_properties_t;
  4    message_handle     raw(16);
  5    message            ty_Msg :=ty_Msg(1,'test 1 AQ demo 2');
  6  begin
  7 
  8    dbms_aq.enqueue
  9      (queue_name          => :queue_user||'.aq_ems_queue'
 10      ,enqueue_options     => enqueue_options
 11      ,message_properties  => message_properties
 12      ,payload             => message
 13      ,msgid               => message_handle);
 14 
 15    commit;
 16 
 17    dbms_output.put_line('Message sucessfully enqueued ' );
 18 
 19    exception
 20      when others then
 21        dbms_output.put_line('Failure ' || sqlerrm);
 22  end;
 23  /
Message sucessfully enqueued

PL/SQL procedure successfully completed.


SQL> select * from log_msg;

MSG                                               T
------------------------------------------------- -------------------------
test 1 AQ demo 2 "XDB_DEV4"."AQ_EMS_QUEUE":"SUB2" 21-OCT-11 16.44.36.782000
test 1 AQ demo 2 "XDB_DEV4"."AQ_EMS_QUEUE":"SUB1" 21-OCT-11 16.44.36.797000

Oracle AQ Propagation

Refer to the following Metalink documents for troubleshooting issues;
368237.1
382994.1
118884.1
203225.1
1233675.1
Bug 8467005

Ensure DST is the same on both databases, 977512.1
select version from v$timezone_file;

Execute the following on the DESTINATION schema first;
conn destination_schema/password@destination_db

variable p_Schema varchar2(30)
exec :p_Schema := sys_context('userenv','current_schema');

exec dbms_aqadm.stop_queue(:p_Schema||'.AQ_EMS_QUEUE');
exec dbms_aqadm.drop_queue(:p_Schema||'.AQ_EMS_QUEUE');
begin
  dbms_aqadm.drop_queue_table(queue_table=>:p_Schema||'.T_AQ_EMS_QUEUE'
                             ,force=>true);
end;
/
/* Ensure payload type is created prior to queue creation */
create or replace type ty_Msg as object (id number
                                        ,msg varchar2(100));
/
begin
  dbms_aqadm.create_queue_table(queue_table=>:p_Schema||'.T_AQ_EMS_QUEUE'
                               ,queue_payload_type=>'TY_MSG'
                               ,multiple_consumers=>true);
end;
/

begin
  dbms_aqadm.create_queue(queue_name =>:p_Schema||'.AQ_EMS_QUEUE'
                         ,queue_table=>:p_Schema||'.T_AQ_EMS_QUEUE');
end;
/
begin
  dbms_aqadm.start_queue (queue_name => :p_Schema||'.AQ_EMS_QUEUE');
end;
/
set serveroutput on
declare
  already_subscriber  exception;

begin
  dbms_aqadm.add_subscriber(queue_name=>:p_Schema|| '.AQ_EMS_QUEUE'
                           ,subscriber=> sys.aq$_agent('SUB2'
                                                      ,null
                                                      ,null));
  dbms_output.put_line('Subscriber sucessfully set');
end;
/

-----------------------------------------------------------------------
Next ,  execute the following on the SOURCE schema;
conn source_schema/password@source_db
variable p_Schema varchar2(30)
exec :p_Schema := sys_context('userenv','current_schema');

variable p_Dest_Schema varchar2(30)
exec :p_Dest_Schema := '&Destination';

variable p_Dest_Password varchar2(30)
exec :p_Dest_Password := '&Password';

variable p_Dest_SID varchar2(30)
exec :p_Dest_SID := '&SID';

exec dbms_aqadm.stop_queue(:p_Schema||'.AQ_EMS_QUEUE');
exec dbms_aqadm.drop_queue(:p_Schema||'.AQ_EMS_QUEUE');
begin
  dbms_aqadm.drop_queue_table(queue_table=>:p_Schema||'.T_AQ_EMS_QUEUE'
                             ,force=>true);
end;
/
/* Create a user defined data type first */
create or replace type ty_Msg as object
(id number,msg varchar2(100));
/
begin
  execute immediate
    'create database link d8_link connect to ' || :p_Dest_Schema
                                               || ' identified by '
                                               || :p_Dest_Password
                                               || ' using '''
                                               || :p_Dest_SID
                                               || '''';
end;
/

-- Run this to ensure the database link works.
-- If it doesn't then do not continue until the
-- database link can be sucessfully established
select  1 l_Num
from    dual@D8_LINK;

begin
  dbms_aqadm.create_queue_table(queue_table=>:p_Schema||'.T_AQ_EMS_QUEUE'
                               ,queue_payload_type=>'TY_MSG'
                               ,multiple_consumers=>true);
end;
/
begin
  dbms_aqadm.create_queue(queue_name=>:p_Schema||'.AQ_EMS_QUEUE'
                         ,queue_table =>:p_Schema||'.T_AQ_EMS_QUEUE');
end;
/
-- associate a subscriber with the queue
set serveroutput on
begin
  dbms_aqadm.add_subscriber
    (queue_name=>:p_Schema||'.AQ_EMS_QUEUE'
                ,subscriber=>sys.aq$_agent
                                          (
                                           'SUB1'
                                          ,:p_Dest_Schema
                                           ||'.AQ_EMS_QUEUE@D8_LINK'
                                          ,0)
                           ,queue_to_queue  => true);
  dbms_output.put_line('Subscriber sucessfully set');
end;
/
exec dbms_aqadm.start_queue(queue_name=>:p_Schema||'.AQ_EMS_QUEUE');

-- Run the following script to verify that the SOURCE and
-- DESTINATION queue's match
-- the query should be run on the SOURCE schema
set serveroutput on
declare
  l_RC        binary_integer := 0;
  l_db_Link   all_db_links.db_link%type;
  l_Continue  boolean := false;
begin

  -- make sure you get the full domain name
  begin
    select  db_link
    into    l_db_Link
    from    all_db_links
    where   owner     = :p_Schema
    and     db_link   like 'D8_LINK%';

    exception
      when too_many_rows then
        l_Continue := false;
      when no_data_found then
        l_Continue := false;
  end;

  l_Continue := (l_db_Link is not null);

  if (l_Continue) then
    -- The queue names for source and desitination are the same,
    -- the only difference being the schema names
    dbms_aqadm.verify_queue_types
      (src_queue_name   => :p_Schema||'.AQ_EMS_QUEUE'
      ,dest_queue_name  => :p_Dest_Schema||'.AQ_EMS_QUEUE'
      ,destination      => l_db_Link -- Database link name
      ,rc               => l_RC);

    dbms_output.put_line(case l_RC
                          when 0 then
                            'Queues do not match'
                          when 1 then
                            'Queues match'
                          else
                            'Error determining status'
                         end);
  else
    dbms_output.put_line('Unknown DB Link,Verification unsuccessful');
  end if;

end;
/

set serveroutput on
declare
  l_db_Link   all_db_links.db_link%type;
  l_Continue  boolean := false;
  PROPAGATION_EXISTS    exception;
  pragma                exception_init(PROPAGATION_EXISTS, -24041);

begin

  -- make sure you get the full domain name
  begin
    select  db_link
    into    l_db_Link
    from    all_db_links
    where   owner     = :p_Schema
    and     db_link   like 'D8_LINK%';

    exception
      when too_many_rows then
        l_Continue := false;
      when no_data_found then
        l_Continue := false;
  end;

  l_Continue := (l_db_Link is not null);

  if (l_Continue) then
    dbms_aqadm.schedule_propagation
      (queue_name        => :p_Schema||'.AQ_EMS_QUEUE'
      ,destination_queue => :p_Dest_Schema||'.AQ_EMS_QUEUE'
      ,destination       => l_db_Link
      ,start_time        => sysdate--SYSDATE indicate immediate
      ,duration          => null   --propagation until stopped
      ,latency           => 0);    --Indicates gap before propagating

    dbms_output.put_line('Propagation successfully scheduled ');
  else
    dbms_output.put_line('Unknown DB Link,Propagation not scheduled');
  end if;

  exception
    when PROPAGATION_EXISTS then
      dbms_output.put_line('Propagation schedule already exists');

end;
/

prompt Test enqueue ,but only after successfully starting scheduling propagation
set serveroutput on
declare
  enqueue_options     dbms_aq.enqueue_options_t;
  message_properties  dbms_aq.message_properties_t;
  message_handle      raw(16);
  message             ty_Msg := ty_Msg(1,'test 1 AQ demo 3');
begin
  dbms_aq.enqueue(queue_name        =>:p_Schema||'.AQ_EMS_QUEUE'
                 ,enqueue_options   =>enqueue_options
                 ,message_properties=>message_properties
                 ,payload           =>message
                 ,msgid             =>message_handle);
  commit;

  dbms_output.put_line('Message sucessfully enqueued ' );
  exception
    when others then
      dbms_output.put_line('Failure ' || sqlerrm);

end;
/

-- This needs to be run when there have been multiple failures with
-- propogation due to incorect configuration parameters.
-- Oracle automatically disables the propagation in such cases
-- for security reasons.
set serveroutput on
declare

  NO_PROPAGATION_EXISTS exception;
  pragma                exception_init(NO_PROPAGATION_EXISTS, -24042);

  l_db_Link   all_db_links.db_link%type;
  l_Continue  boolean := false;
begin

  -- make sure you get the full domain name
  begin
    select  db_link
    into    l_db_Link
    from    all_db_links
    where   owner     = :p_Schema
    and     db_link   like 'D8_LINK%';

    exception
      when too_many_rows then
        l_Continue := false;
      when no_data_found then
        l_Continue := false;
  end;

  l_Continue := (l_db_Link is not null);

  if (l_Continue) then

    dbms_aqadm.enable_propagation_schedule
      (queue_name         => :p_Schema||'.AQ_EMS_QUEUE'
      ,destination_queue  => :p_Dest_Schema||'.AQ_EMS_QUEUE'
      ,destination        => l_db_Link);

    dbms_output.put_line('Propagation successfully enabled ');
  else
    dbms_output.put_line('Unknown DB Link,Propagation not enabled');
  end if;

  exception
    when NO_PROPAGATION_EXISTS then
      dbms_output.put_line('No propagation exists between queues');
end;
/

-- To unschedule propagation
set serveroutput on
declare

  NO_PROPAGATION_EXISTS exception;
  pragma                exception_init(NO_PROPAGATION_EXISTS, -24042);

  l_db_Link   all_db_links.db_link%type;
  l_Continue  boolean := false;
begin

  -- make sure you get the full domain name
  begin
    select  db_link
    into    l_db_Link
    from    all_db_links
    where   owner     = :p_Schema
    and     db_link   like 'D8_LINK%';

    exception
      when too_many_rows then
        l_Continue := false;
      when no_data_found then
        l_Continue := false;
  end;

  l_Continue := (l_db_Link is not null);

  if (l_Continue) then

    dbms_aqadm.unschedule_propagation
      (queue_name         => :p_Schema||'.AQ_EMS_QUEUE'
      ,destination_queue  => :p_Dest_Schema||'.AQ_EMS_QUEUE'
      ,destination        => l_db_Link);

    dbms_output.put_line('Propagation successfully stopped');
  else
    dbms_output.put_line('Unknown DB Link,Propagation not unscheduled');
  end if;

  exception
    when NO_PROPAGATION_EXISTS then
      dbms_output.put_line('No propagation exists between queues');
end;
/

-- To stop propagation
set serveroutput on
declare

  NO_PROPAGATION_EXISTS exception;
  pragma                exception_init(NO_PROPAGATION_EXISTS, -24042);

  l_db_Link   all_db_links.db_link%type;
  l_Continue  boolean := false;
begin

  -- make sure you get the full domain name
  begin
    select  db_link
    into    l_db_Link
    from    all_db_links
    where   owner     = :p_Schema
    and     db_link   like 'D8_LINK%';

    exception
      when too_many_rows then
        l_Continue := false;
      when no_data_found then
        l_Continue := false;
  end;

  l_Continue := (l_db_Link is not null);

  if (l_Continue) then

    dbms_aqadm.disable_propagation_schedule
      (queue_name        => :p_Schema||'.AQ_EMS_QUEUE'
      ,destination_queue => :p_Dest_Schema||'.AQ_EMS_QUEUE'
      ,destination       => l_db_Link);

    dbms_output.put_line('Propagation successfully stopped');
  else
        dbms_output.put_line('Unknown DB Link,Propagation not disabled');
  end if;


  exception
    when NO_PROPAGATION_EXISTS then
      dbms_output.put_line('No propagation exists between queues');

end;
/

-- to purge a queue table, use the following;
set serveroutput on
declare
  po dbms_aqadm.aq$_purge_options_t;
begin

  po.block := true;

  dbms_aqadm.purge_queue_table
    (queue_table     =>:p_Schema||'.AQ_EMS_QUEUE'
    ,purge_condition =>null
    ,purge_options   =>po);

  dbms_output.put_line('Purge sucessful');
  exception
    when others then
      dbms_output.put_line('Fail ' || sqlerrm);
end;
/

/*

Troubleshooting propogation
---------------------------
1.  Ensure the job_queue_processes initialization parameter
    is greater than zero. If it is not greater than zero,
    then unschedule propagation, before setting it
    to a value greater than zero.

2.  Ensure database link is active, i.e.
       select 1 from dual@database_link_name
    should return 1

3.  Ensure the payload is the same on both instances

4.  Verify that the queues match using the
    DBMS_AQADN.VERIFY_QUEUE_TYPES built-in procedure

5.  Verify that propagation is currently not disabled,
    using the following SQL;

    select schedule_disabled from  user_queue_schedules;
    should return "N"

6.  Check the NEXT_RUN_DATE, NEXT_RUN_TIME and PROPAGATION_WINDOW
    on USER_QUEUE_SCHEDULES. If the date and time are in the past and
    remain in the past, then the propagation schedule has been
    incorrectly configured.

    If the propagation window is null then the DURATION parameter may
    have been set to NULL, which is correct, but ensure that this is
    in fact the case.

    use the following SQL;
      select next_run_date, next_run_time,PROPAGATION_WINDOW
      from user_queue_schedules;

7.  Ensure JOBNO is not null in SYS.AQ$_SCHEDULES using the
    following SQL;
      select jobno from sys.aq$_schedules;

8.  Verify that the AQ_TM_PROCESSES is zero

9.  Verify that the AQ


Short Script
col next_run_date format a40
col next_run_time format a20
col owner format a20
col object_table format a30
col object_type format a40
col type format a20
col name format a20
col value format a20
set linesize 400
set pages 2000


select jobno
from   sys.aq$_schedules
/
select schedule_disabled
from   user_queue_schedules
/
select next_run_date
      ,next_run_time
      ,propagation_window
from user_queue_schedules
/
select count(*) from sys.job$
/
select name
       ,value
from   v$parameter
where name in('job_queue_processes','aq_tm_processes')
/
select owner
      ,queue_table
      ,type
      ,object_type
from dba_queue_tables
/


-- To trace for AQ problems
alter session set events '24040 trace name context forever, level 10';

*/


Thursday, 20 October 2011

Oracle JMS Dequeue

Read and test the enqueue before attempting the dequeue;
http://bluefrog-oracle.blogspot.com/2011/10/oracle-jms-enqueue.html
 

SQL> create or replace and compile java source named BasicAQDequeue as
  2  import java.sql.Connection;
  3  import java.sql.Driver;
  4  import java.sql.DriverManager;
  5  import javax.jms.Queue;
  6  import javax.jms.QueueConnection;
  7  import javax.jms.QueueConnectionFactory;
  8  import javax.jms.QueueReceiver;
  9  import javax.jms.QueueSession;
 10  import javax.jms.Message;
 11  import javax.jms.TextMessage;
 12  import javax.jms.Session;
 13  import oracle.jms.AQjmsFactory;
 14  import java.sql.PreparedStatement;
 15  import java.sql.ResultSet;
 16  import oracle.jdbc.*;
 17  import oracle.jdbc.aq.*;
 18  import oracle.jms.*;
 19 
 20  public class BasicAQDequeue {
 21 
 22      private QueueReceiver     consumer;
 23      private Queue             queue;
 24      private QueueConnection   consumerConnection;
 25      private QueueSession      consumerSession;
 26      private Connection        conn;
 27      private static final String   QUEUE_NAME  = "JMS_TEXT_QUE";
 28      private static final int      ACK_MODE    = QueueSession.AUTO_ACKNOWLEDGE;
 29      private static final boolean  TRANSACTED  = true;
 30 
 31      /* Creates and initializes the test. */
 32      public BasicAQDequeue() throws Exception {
 33          super();
 34          // init consumer session - Use Oracle default thin driver connection
 35          conn                = DriverManager.getConnection("jdbc:default:connection:");
 36          consumerConnection  = AQjmsQueueConnectionFactory.createQueueConnection(conn);
 37          consumerConnection.start();
 38          consumerSession     = consumerConnection.createQueueSession(TRANSACTED,ACK_MODE);
 39          queue               = consumerSession.createQueue(QUEUE_NAME);
 40          consumer            = consumerSession.createReceiver(queue);
 41      }
 42      public static void main (String args[]) throws Exception
 43        {
 44          BasicAQDequeue qTest  = new BasicAQDequeue();
 45          qTest.run();
 46          qTest.cleanup();
 47        }
 48      /** Closes the JMS connections.throws Exception */
 49      private void cleanup() throws Exception {
 50          // cleanup consumer
 51          System.out.println("Start cleanup.");
 52          consumerConnection.stop();
 53          consumer.close();
 54          consumerSession.close();
 55          consumerConnection.close();
 56          System.out.println("Done");
 57      }
 58      /* The message send. Throws Exception */
 59      private void run() throws Exception {
 60 
 61          TextMessage message;
 62          message = (TextMessage)consumer.receive(1000);
 63          consumerSession.commit();
 64          System.out.println("Message received: " + message.getText());
 65      }
 66  }
 67  /

Java created.

SQL> create or replace procedure BasicAQDequeue as
  2  language java name 'BasicAQDequeue.main(java.lang.String[])';
  3  /

Procedure created.

SQL> exec dbms_java.set_output(10000);

PL/SQL procedure successfully completed.

SQL> set serveroutput on
SQL> begin
  2    BasicAQEnqueue;
  3  end;
  4  /
Message sent: Test Message 1
Start cleanup.
Done

PL/SQL procedure successfully completed.

Oracle JMS Enqueue

The following performs a test enqueue to an Oracle AQ queue using a JMS session executed within the Oracle database;

First, ensure the schema that the script is executed on has the following privileges granted from SYS;

connect sys/password@orcl as sysdba
grant AQ_ADMINISTRATOR_ROLE to jms_schema
/
grant aq_user_role to jms_schema
/
grant JAVA_ADMIN, JAVAUSERPRIV, JAVASYSPRIV, JAVADEBUGPRIV to jms_schema
/
grant execute on DBMS_AQIN to jms_schema;
/
grant execute on DBMS_JMS_PLSQL to jms_schema
/

Next, ensure a queue with a JMS_TEXT_MESSAGE payload exists, use the following

-- login with the schema you wish to run the demo on, for example JMS_SCHEMA
exec dbms_aqadm.create_queue_table (queue_table=>'jms_qtt_text', queue_payload_type=>'SYS.AQ$_JMS_TEXT_MESSAGE');
exec dbms_aqadm.create_queue (Queue_name=>'jms_text_que',Queue_table=>'jms_qtt_text');
exec dbms_aqadm.create_queue_table (queue_table=>'jms_qtt_text', queue_payload_type=>'SYS.AQ$_JMS_TEXT_MESSAGE');


Ready to create and compile the Java source;

SQL> create or replace and compile java source named BasicAQEnqueue as
  2  import java.sql.Connection;
  3  import java.sql.Driver;
  4  import java.sql.DriverManager;
  5  import javax.jms.Queue;
  6  import javax.jms.QueueConnection;
  7  import javax.jms.QueueSender;
  8  import javax.jms.QueueSession;
  9  import oracle.jms.AQjmsFactory;
 10  import oracle.jdbc.*;
 11  import oracle.jdbc.aq.*;
 12  import oracle.jms.*;
 13 
 14  public class BasicAQEnqueue {
 15 
 16      private QueueSender       producer;
 17      private Queue             queue;
 18      private QueueConnection   producerConnection;
 19      private QueueSession      producerSession;
 20      private static final String   QUEUE_NAME  = "JMS_TEXT_QUE";
 21      private static final int      ACK_MODE    = QueueSession.AUTO_ACKNOWLEDGE;
 22      private static final boolean  TRANSACTED  = true;
 23 
 24      /* Creates and initializes the test. */
 25      public BasicAQEnqueue() throws Exception {
 26          super();
 27          // init producer session - Use Oracle default thin driver connection
 28          Connection conn     = DriverManager.getConnection("jdbc:default:connection:");
 29          producerConnection  = AQjmsQueueConnectionFactory.createQueueConnection(conn);
 30          producerSession     = producerConnection.createQueueSession(TRANSACTED,ACK_MODE);
 31          queue               = producerSession.createQueue(QUEUE_NAME);
 32          producer            = producerSession.createSender(queue);
 33      }
 34 
 35      public static void main (String args[]) throws Exception
 36        {
 37          BasicAQEnqueue qTest  = new BasicAQEnqueue();
 38          qTest.run();
 39          qTest.cleanup();
 40        }
 41 
 42      /** Closes the JMS connections.throws Exception */
 43      private void cleanup() throws Exception {
 44          // cleanup producer
 45          System.out.println("Start cleanup.");
 46          producerConnection.stop();
 47          producer.close();
 48          producerSession.close();
 49          producerConnection.close();
 50          System.out.println("Done");
 51      }
 52 
 53      /* The message send. Throws Exception */
 54      private void run() throws Exception {
 55          String msg = "Test Message 1";
 56          producer.send(producerSession.createTextMessage(msg));
 57          producerSession.commit();
 58          System.out.println("Message sent: " + msg);
 59      }
 60  }
 61  /

Java created.



SQL> create or replace procedure BasicAQEnqueue as
  2  language java name 'BasicAQEnqueue.main(java.lang.String[])';
  3  /

Procedure created.


SQL> set serveroutput on
SQL> begin
  2    BasicAQEnqueue;
  3  end;
  4  /
Message sent: Test Message 1
Start cleanup.
Done


SQL> -- To view the messages in SQL Plus, use the following SQL;
SQL> col Msg format a20
SQL> select msg_id, msg_state, qt.user_data.text_vc Msg
  2  from aq$jms_qtt_text qt
  3  /

MSG_ID                           MSG_STATE        MSG
-------------------------------- ---------------- --------------
E39998B62A5046DCA6F2316D64EF3048 READY            Test Message 1

JMS connection within an Oracle instance


The schema must have the following privileges;
granted from SYS/password@orcl as sysdba to schema

connect sys/password@orcl as sysdba

grant AQ_ADMINISTRATOR_ROLE to jms_schema
/
grant aq_user_role to jms_schema
/
grant JAVA_ADMIN
     ,JAVAUSERPRIV
     ,JAVASYSPRIV
     ,JAVADEBUGPRIV 
to jms_schema
/
grant execute on DBMS_AQIN to jms_schema;
/
grant execute on DBMS_JMS_PLSQL to jms_schema
/

connect as jms_schema/password@orcl

SQL> create or replace and compile java source named BasicAQConnection as
  2  import java.sql.Connection;
  3  import java.sql.Driver;
  4  import java.sql.DriverManager;
  5  import java.sql.PreparedStatement;
  6  import java.sql.ResultSet;
  7  import javax.jms.QueueConnection;
  8  import javax.jms.QueueConnectionFactory;
  9  import javax.jms.QueueSession;
 10  import javax.jms.Session;
 11  import oracle.jms.AQjmsFactory;
 12  import oracle.jdbc.*;
 13  import oracle.jdbc.aq.*;
 14  import oracle.jms.*;
 15 
 16  public class BasicAQConnection
 17    {
 18      public static void main (String args[]) throws Exception
 19        {
 20          Connection conn = DriverManager.getConnection("jdbc:default:connection:");
 21          QueueConnection qc = AQjmsQueueConnectionFactory.createQueueConnection(conn);
 22          QueueSession qs = qc.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);
 23 
 24          PreparedStatement pStmt = conn.prepareStatement("select count(*) from emp");
 25          ResultSet rSet = pStmt.executeQuery();
 26          rSet.next();
 27          System.out.println("Number of Employees is: " + rSet.getString(1));
 28          rSet.close();
 29 
 30          qc.stop();
 31          conn.close();
 32          qc.close();
 33 
 34        }
 35    }
 36  /


Java created.


SQL>
SQL> create or replace procedure BasicAQConnection as
  2  language java name 'BasicAQConnection.main(java.lang.String[])';
  3  /


Procedure created.


SQL>
SQL> exec dbms_java.set_output(10000);


PL/SQL procedure successfully completed.


SQL>
SQL> select count(*) from emp
  2  /


  COUNT(*)
----------
         1


SQL> set serveroutput on
SQL> begin
  2    BasicAQConnection;
  3  end;
  4  /
Number of Employees is: 1


PL/SQL procedure successfully completed.