Friday, 21 October 2011

Oracle AQ Propagation

Refer to the following Metalink documents for troubleshooting issues;
368237.1
382994.1
118884.1
203225.1
1233675.1
Bug 8467005

Ensure DST is the same on both databases, 977512.1
select version from v$timezone_file;

Execute the following on the DESTINATION schema first;
conn destination_schema/password@destination_db

variable p_Schema varchar2(30)
exec :p_Schema := sys_context('userenv','current_schema');

exec dbms_aqadm.stop_queue(:p_Schema||'.AQ_EMS_QUEUE');
exec dbms_aqadm.drop_queue(:p_Schema||'.AQ_EMS_QUEUE');
begin
  dbms_aqadm.drop_queue_table(queue_table=>:p_Schema||'.T_AQ_EMS_QUEUE'
                             ,force=>true);
end;
/
/* Ensure payload type is created prior to queue creation */
create or replace type ty_Msg as object (id number
                                        ,msg varchar2(100));
/
begin
  dbms_aqadm.create_queue_table(queue_table=>:p_Schema||'.T_AQ_EMS_QUEUE'
                               ,queue_payload_type=>'TY_MSG'
                               ,multiple_consumers=>true);
end;
/

begin
  dbms_aqadm.create_queue(queue_name =>:p_Schema||'.AQ_EMS_QUEUE'
                         ,queue_table=>:p_Schema||'.T_AQ_EMS_QUEUE');
end;
/
begin
  dbms_aqadm.start_queue (queue_name => :p_Schema||'.AQ_EMS_QUEUE');
end;
/
set serveroutput on
declare
  already_subscriber  exception;

begin
  dbms_aqadm.add_subscriber(queue_name=>:p_Schema|| '.AQ_EMS_QUEUE'
                           ,subscriber=> sys.aq$_agent('SUB2'
                                                      ,null
                                                      ,null));
  dbms_output.put_line('Subscriber sucessfully set');
end;
/

-----------------------------------------------------------------------
Next ,  execute the following on the SOURCE schema;
conn source_schema/password@source_db
variable p_Schema varchar2(30)
exec :p_Schema := sys_context('userenv','current_schema');

variable p_Dest_Schema varchar2(30)
exec :p_Dest_Schema := '&Destination';

variable p_Dest_Password varchar2(30)
exec :p_Dest_Password := '&Password';

variable p_Dest_SID varchar2(30)
exec :p_Dest_SID := '&SID';

exec dbms_aqadm.stop_queue(:p_Schema||'.AQ_EMS_QUEUE');
exec dbms_aqadm.drop_queue(:p_Schema||'.AQ_EMS_QUEUE');
begin
  dbms_aqadm.drop_queue_table(queue_table=>:p_Schema||'.T_AQ_EMS_QUEUE'
                             ,force=>true);
end;
/
/* Create a user defined data type first */
create or replace type ty_Msg as object
(id number,msg varchar2(100));
/
begin
  execute immediate
    'create database link d8_link connect to ' || :p_Dest_Schema
                                               || ' identified by '
                                               || :p_Dest_Password
                                               || ' using '''
                                               || :p_Dest_SID
                                               || '''';
end;
/

-- Run this to ensure the database link works.
-- If it doesn't then do not continue until the
-- database link can be sucessfully established
select  1 l_Num
from    dual@D8_LINK;

begin
  dbms_aqadm.create_queue_table(queue_table=>:p_Schema||'.T_AQ_EMS_QUEUE'
                               ,queue_payload_type=>'TY_MSG'
                               ,multiple_consumers=>true);
end;
/
begin
  dbms_aqadm.create_queue(queue_name=>:p_Schema||'.AQ_EMS_QUEUE'
                         ,queue_table =>:p_Schema||'.T_AQ_EMS_QUEUE');
end;
/
-- associate a subscriber with the queue
set serveroutput on
begin
  dbms_aqadm.add_subscriber
    (queue_name=>:p_Schema||'.AQ_EMS_QUEUE'
                ,subscriber=>sys.aq$_agent
                                          (
                                           'SUB1'
                                          ,:p_Dest_Schema
                                           ||'.AQ_EMS_QUEUE@D8_LINK'
                                          ,0)
                           ,queue_to_queue  => true);
  dbms_output.put_line('Subscriber sucessfully set');
end;
/
exec dbms_aqadm.start_queue(queue_name=>:p_Schema||'.AQ_EMS_QUEUE');

-- Run the following script to verify that the SOURCE and
-- DESTINATION queue's match
-- the query should be run on the SOURCE schema
set serveroutput on
declare
  l_RC        binary_integer := 0;
  l_db_Link   all_db_links.db_link%type;
  l_Continue  boolean := false;
begin

  -- make sure you get the full domain name
  begin
    select  db_link
    into    l_db_Link
    from    all_db_links
    where   owner     = :p_Schema
    and     db_link   like 'D8_LINK%';

    exception
      when too_many_rows then
        l_Continue := false;
      when no_data_found then
        l_Continue := false;
  end;

  l_Continue := (l_db_Link is not null);

  if (l_Continue) then
    -- The queue names for source and desitination are the same,
    -- the only difference being the schema names
    dbms_aqadm.verify_queue_types
      (src_queue_name   => :p_Schema||'.AQ_EMS_QUEUE'
      ,dest_queue_name  => :p_Dest_Schema||'.AQ_EMS_QUEUE'
      ,destination      => l_db_Link -- Database link name
      ,rc               => l_RC);

    dbms_output.put_line(case l_RC
                          when 0 then
                            'Queues do not match'
                          when 1 then
                            'Queues match'
                          else
                            'Error determining status'
                         end);
  else
    dbms_output.put_line('Unknown DB Link,Verification unsuccessful');
  end if;

end;
/

set serveroutput on
declare
  l_db_Link   all_db_links.db_link%type;
  l_Continue  boolean := false;
  PROPAGATION_EXISTS    exception;
  pragma                exception_init(PROPAGATION_EXISTS, -24041);

begin

  -- make sure you get the full domain name
  begin
    select  db_link
    into    l_db_Link
    from    all_db_links
    where   owner     = :p_Schema
    and     db_link   like 'D8_LINK%';

    exception
      when too_many_rows then
        l_Continue := false;
      when no_data_found then
        l_Continue := false;
  end;

  l_Continue := (l_db_Link is not null);

  if (l_Continue) then
    dbms_aqadm.schedule_propagation
      (queue_name        => :p_Schema||'.AQ_EMS_QUEUE'
      ,destination_queue => :p_Dest_Schema||'.AQ_EMS_QUEUE'
      ,destination       => l_db_Link
      ,start_time        => sysdate--SYSDATE indicate immediate
      ,duration          => null   --propagation until stopped
      ,latency           => 0);    --Indicates gap before propagating

    dbms_output.put_line('Propagation successfully scheduled ');
  else
    dbms_output.put_line('Unknown DB Link,Propagation not scheduled');
  end if;

  exception
    when PROPAGATION_EXISTS then
      dbms_output.put_line('Propagation schedule already exists');

end;
/

prompt Test enqueue ,but only after successfully starting scheduling propagation
set serveroutput on
declare
  enqueue_options     dbms_aq.enqueue_options_t;
  message_properties  dbms_aq.message_properties_t;
  message_handle      raw(16);
  message             ty_Msg := ty_Msg(1,'test 1 AQ demo 3');
begin
  dbms_aq.enqueue(queue_name        =>:p_Schema||'.AQ_EMS_QUEUE'
                 ,enqueue_options   =>enqueue_options
                 ,message_properties=>message_properties
                 ,payload           =>message
                 ,msgid             =>message_handle);
  commit;

  dbms_output.put_line('Message sucessfully enqueued ' );
  exception
    when others then
      dbms_output.put_line('Failure ' || sqlerrm);

end;
/

-- This needs to be run when there have been multiple failures with
-- propogation due to incorect configuration parameters.
-- Oracle automatically disables the propagation in such cases
-- for security reasons.
set serveroutput on
declare

  NO_PROPAGATION_EXISTS exception;
  pragma                exception_init(NO_PROPAGATION_EXISTS, -24042);

  l_db_Link   all_db_links.db_link%type;
  l_Continue  boolean := false;
begin

  -- make sure you get the full domain name
  begin
    select  db_link
    into    l_db_Link
    from    all_db_links
    where   owner     = :p_Schema
    and     db_link   like 'D8_LINK%';

    exception
      when too_many_rows then
        l_Continue := false;
      when no_data_found then
        l_Continue := false;
  end;

  l_Continue := (l_db_Link is not null);

  if (l_Continue) then

    dbms_aqadm.enable_propagation_schedule
      (queue_name         => :p_Schema||'.AQ_EMS_QUEUE'
      ,destination_queue  => :p_Dest_Schema||'.AQ_EMS_QUEUE'
      ,destination        => l_db_Link);

    dbms_output.put_line('Propagation successfully enabled ');
  else
    dbms_output.put_line('Unknown DB Link,Propagation not enabled');
  end if;

  exception
    when NO_PROPAGATION_EXISTS then
      dbms_output.put_line('No propagation exists between queues');
end;
/

-- To unschedule propagation
set serveroutput on
declare

  NO_PROPAGATION_EXISTS exception;
  pragma                exception_init(NO_PROPAGATION_EXISTS, -24042);

  l_db_Link   all_db_links.db_link%type;
  l_Continue  boolean := false;
begin

  -- make sure you get the full domain name
  begin
    select  db_link
    into    l_db_Link
    from    all_db_links
    where   owner     = :p_Schema
    and     db_link   like 'D8_LINK%';

    exception
      when too_many_rows then
        l_Continue := false;
      when no_data_found then
        l_Continue := false;
  end;

  l_Continue := (l_db_Link is not null);

  if (l_Continue) then

    dbms_aqadm.unschedule_propagation
      (queue_name         => :p_Schema||'.AQ_EMS_QUEUE'
      ,destination_queue  => :p_Dest_Schema||'.AQ_EMS_QUEUE'
      ,destination        => l_db_Link);

    dbms_output.put_line('Propagation successfully stopped');
  else
    dbms_output.put_line('Unknown DB Link,Propagation not unscheduled');
  end if;

  exception
    when NO_PROPAGATION_EXISTS then
      dbms_output.put_line('No propagation exists between queues');
end;
/

-- To stop propagation
set serveroutput on
declare

  NO_PROPAGATION_EXISTS exception;
  pragma                exception_init(NO_PROPAGATION_EXISTS, -24042);

  l_db_Link   all_db_links.db_link%type;
  l_Continue  boolean := false;
begin

  -- make sure you get the full domain name
  begin
    select  db_link
    into    l_db_Link
    from    all_db_links
    where   owner     = :p_Schema
    and     db_link   like 'D8_LINK%';

    exception
      when too_many_rows then
        l_Continue := false;
      when no_data_found then
        l_Continue := false;
  end;

  l_Continue := (l_db_Link is not null);

  if (l_Continue) then

    dbms_aqadm.disable_propagation_schedule
      (queue_name        => :p_Schema||'.AQ_EMS_QUEUE'
      ,destination_queue => :p_Dest_Schema||'.AQ_EMS_QUEUE'
      ,destination       => l_db_Link);

    dbms_output.put_line('Propagation successfully stopped');
  else
        dbms_output.put_line('Unknown DB Link,Propagation not disabled');
  end if;


  exception
    when NO_PROPAGATION_EXISTS then
      dbms_output.put_line('No propagation exists between queues');

end;
/

-- to purge a queue table, use the following;
set serveroutput on
declare
  po dbms_aqadm.aq$_purge_options_t;
begin

  po.block := true;

  dbms_aqadm.purge_queue_table
    (queue_table     =>:p_Schema||'.AQ_EMS_QUEUE'
    ,purge_condition =>null
    ,purge_options   =>po);

  dbms_output.put_line('Purge sucessful');
  exception
    when others then
      dbms_output.put_line('Fail ' || sqlerrm);
end;
/

/*

Troubleshooting propogation
---------------------------
1.  Ensure the job_queue_processes initialization parameter
    is greater than zero. If it is not greater than zero,
    then unschedule propagation, before setting it
    to a value greater than zero.

2.  Ensure database link is active, i.e.
       select 1 from dual@database_link_name
    should return 1

3.  Ensure the payload is the same on both instances

4.  Verify that the queues match using the
    DBMS_AQADN.VERIFY_QUEUE_TYPES built-in procedure

5.  Verify that propagation is currently not disabled,
    using the following SQL;

    select schedule_disabled from  user_queue_schedules;
    should return "N"

6.  Check the NEXT_RUN_DATE, NEXT_RUN_TIME and PROPAGATION_WINDOW
    on USER_QUEUE_SCHEDULES. If the date and time are in the past and
    remain in the past, then the propagation schedule has been
    incorrectly configured.

    If the propagation window is null then the DURATION parameter may
    have been set to NULL, which is correct, but ensure that this is
    in fact the case.

    use the following SQL;
      select next_run_date, next_run_time,PROPAGATION_WINDOW
      from user_queue_schedules;

7.  Ensure JOBNO is not null in SYS.AQ$_SCHEDULES using the
    following SQL;
      select jobno from sys.aq$_schedules;

8.  Verify that the AQ_TM_PROCESSES is zero

9.  Verify that the AQ


Short Script
col next_run_date format a40
col next_run_time format a20
col owner format a20
col object_table format a30
col object_type format a40
col type format a20
col name format a20
col value format a20
set linesize 400
set pages 2000


select jobno
from   sys.aq$_schedules
/
select schedule_disabled
from   user_queue_schedules
/
select next_run_date
      ,next_run_time
      ,propagation_window
from user_queue_schedules
/
select count(*) from sys.job$
/
select name
       ,value
from   v$parameter
where name in('job_queue_processes','aq_tm_processes')
/
select owner
      ,queue_table
      ,type
      ,object_type
from dba_queue_tables
/


-- To trace for AQ problems
alter session set events '24040 trace name context forever, level 10';

*/


No comments:

Post a Comment