Processing math: 0%

Friday, 21 October 2011

Oracle AQ Propagation

Refer to the following Metalink documents for troubleshooting issues;
368237.1
382994.1
118884.1
203225.1
1233675.1
Bug 8467005

Ensure DST is the same on both databases, 977512.1
select version from v$timezone_file;

Execute the following on the DESTINATION schema first;
conn destination_schema/password@destination_db

variable p_Schema varchar2(30)
exec :p_Schema := sys_context('userenv','current_schema');

exec dbms_aqadm.stop_queue(:p_Schema||'.AQ_EMS_QUEUE');
exec dbms_aqadm.drop_queue(:p_Schema||'.AQ_EMS_QUEUE');
begin
  dbms_aqadm.drop_queue_table(queue_table=>:p_Schema||'.T_AQ_EMS_QUEUE'
                             ,force=>true);
end;
/
/* Ensure payload type is created prior to queue creation */
create or replace type ty_Msg as object (id number
                                        ,msg varchar2(100));
/
begin
  dbms_aqadm.create_queue_table(queue_table=>:p_Schema||'.T_AQ_EMS_QUEUE'
                               ,queue_payload_type=>'TY_MSG'
                               ,multiple_consumers=>true);
end;
/

begin
  dbms_aqadm.create_queue(queue_name =>:p_Schema||'.AQ_EMS_QUEUE'
                         ,queue_table=>:p_Schema||'.T_AQ_EMS_QUEUE');
end;
/
begin
  dbms_aqadm.start_queue (queue_name => :p_Schema||'.AQ_EMS_QUEUE');
end;
/
set serveroutput on
declare
  already_subscriber  exception;

begin
  dbms_aqadm.add_subscriber(queue_name=>:p_Schema|| '.AQ_EMS_QUEUE'
                           ,subscriber=> sys.aq$_agent('SUB2'
                                                      ,null
                                                      ,null));
  dbms_output.put_line('Subscriber sucessfully set');
end;
/

-----------------------------------------------------------------------
Next ,  execute the following on the SOURCE schema;
conn source_schema/password@source_db
variable p_Schema varchar2(30)
exec :p_Schema := sys_context('userenv','current_schema');

variable p_Dest_Schema varchar2(30)
exec :p_Dest_Schema := '&Destination';

variable p_Dest_Password varchar2(30)
exec :p_Dest_Password := '&Password';

variable p_Dest_SID varchar2(30)
exec :p_Dest_SID := '&SID';

exec dbms_aqadm.stop_queue(:p_Schema||'.AQ_EMS_QUEUE');
exec dbms_aqadm.drop_queue(:p_Schema||'.AQ_EMS_QUEUE');
begin
  dbms_aqadm.drop_queue_table(queue_table=>:p_Schema||'.T_AQ_EMS_QUEUE'
                             ,force=>true);
end;
/
/* Create a user defined data type first */
create or replace type ty_Msg as object
(id number,msg varchar2(100));
/
begin
  execute immediate
    'create database link d8_link connect to ' || :p_Dest_Schema
                                               || ' identified by '
                                               || :p_Dest_Password
                                               || ' using '''
                                               || :p_Dest_SID
                                               || '''';
end;
/

-- Run this to ensure the database link works.
-- If it doesn't then do not continue until the
-- database link can be sucessfully established
select  1 l_Num
from    dual@D8_LINK;

begin
  dbms_aqadm.create_queue_table(queue_table=>:p_Schema||'.T_AQ_EMS_QUEUE'
                               ,queue_payload_type=>'TY_MSG'
                               ,multiple_consumers=>true);
end;
/
begin
  dbms_aqadm.create_queue(queue_name=>:p_Schema||'.AQ_EMS_QUEUE'
                         ,queue_table =>:p_Schema||'.T_AQ_EMS_QUEUE');
end;
/
-- associate a subscriber with the queue
set serveroutput on
begin
  dbms_aqadm.add_subscriber
    (queue_name=>:p_Schema||'.AQ_EMS_QUEUE'
                ,subscriber=>sys.aq_agent                                           (                                            'SUB1'                                           ,:p_Dest_Schema                                            ||'.AQ_EMS_QUEUE@D8_LINK'                                           ,0)                            ,queue_to_queue  => true);   dbms_output.put_line('Subscriber sucessfully set'); end; / exec dbms_aqadm.start_queue(queue_name=>:p_Schema||'.AQ_EMS_QUEUE'); -- Run the following script to verify that the SOURCE and -- DESTINATION queue's match -- the query should be run on the SOURCE schema set serveroutput on declare   l_RC        binary_integer := 0;   l_db_Link   all_db_links.db_link%type;   l_Continue  boolean := false; begin   -- make sure you get the full domain name   begin     select  db_link     into    l_db_Link     from    all_db_links     where   owner     = :p_Schema     and     db_link   like 'D8_LINK%';     exception       when too_many_rows then         l_Continue := false;       when no_data_found then         l_Continue := false;   end;   l_Continue := (l_db_Link is not null);   if (l_Continue) then     -- The queue names for source and desitination are the same,     -- the only difference being the schema names     dbms_aqadm.verify_queue_types       (src_queue_name   => :p_Schema||'.AQ_EMS_QUEUE'       ,dest_queue_name  => :p_Dest_Schema||'.AQ_EMS_QUEUE'       ,destination      => l_db_Link -- Database link name       ,rc               => l_RC);     dbms_output.put_line(case l_RC                           when 0 then                             'Queues do not match'                           when 1 then                             'Queues match'                           else                             'Error determining status'                          end);   else     dbms_output.put_line('Unknown DB Link,Verification unsuccessful');   end if; end; / set serveroutput on declare   l_db_Link   all_db_links.db_link%type;   l_Continue  boolean := false;   PROPAGATION_EXISTS    exception;   pragma                exception_init(PROPAGATION_EXISTS, -24041); begin   -- make sure you get the full domain name   begin     select  db_link     into    l_db_Link     from    all_db_links     where   owner     = :p_Schema     and     db_link   like 'D8_LINK%';     exception       when too_many_rows then         l_Continue := false;       when no_data_found then         l_Continue := false;   end;   l_Continue := (l_db_Link is not null);   if (l_Continue) then     dbms_aqadm.schedule_propagation       (queue_name        => :p_Schema||'.AQ_EMS_QUEUE'       ,destination_queue => :p_Dest_Schema||'.AQ_EMS_QUEUE'       ,destination       => l_db_Link       ,start_time        => sysdate--SYSDATE indicate immediate       ,duration          => null   --propagation until stopped       ,latency           => 0);    --Indicates gap before propagating     dbms_output.put_line('Propagation successfully scheduled ');   else     dbms_output.put_line('Unknown DB Link,Propagation not scheduled');   end if;   exception     when PROPAGATION_EXISTS then       dbms_output.put_line('Propagation schedule already exists'); end; / prompt Test enqueue ,but only after successfully starting scheduling propagation set serveroutput on declare   enqueue_options     dbms_aq.enqueue_options_t;   message_properties  dbms_aq.message_properties_t;   message_handle      raw(16);   message             ty_Msg := ty_Msg(1,'test 1 AQ demo 3'); begin   dbms_aq.enqueue(queue_name        =>:p_Schema||'.AQ_EMS_QUEUE'                  ,enqueue_options   =>enqueue_options                  ,message_properties=>message_properties                  ,payload           =>message                  ,msgid             =>message_handle);   commit;   dbms_output.put_line('Message sucessfully enqueued ' );   exception     when others then       dbms_output.put_line('Failure ' || sqlerrm); end; / -- This needs to be run when there have been multiple failures with -- propogation due to incorect configuration parameters. -- Oracle automatically disables the propagation in such cases -- for security reasons. set serveroutput on declare   NO_PROPAGATION_EXISTS exception;   pragma                exception_init(NO_PROPAGATION_EXISTS, -24042);   l_db_Link   all_db_links.db_link%type;   l_Continue  boolean := false; begin   -- make sure you get the full domain name   begin     select  db_link     into    l_db_Link     from    all_db_links     where   owner     = :p_Schema     and     db_link   like 'D8_LINK%';     exception       when too_many_rows then         l_Continue := false;       when no_data_found then         l_Continue := false;   end;   l_Continue := (l_db_Link is not null);   if (l_Continue) then     dbms_aqadm.enable_propagation_schedule       (queue_name         => :p_Schema||'.AQ_EMS_QUEUE'       ,destination_queue  => :p_Dest_Schema||'.AQ_EMS_QUEUE'       ,destination        => l_db_Link);     dbms_output.put_line('Propagation successfully enabled ');   else     dbms_output.put_line('Unknown DB Link,Propagation not enabled');   end if;   exception     when NO_PROPAGATION_EXISTS then       dbms_output.put_line('No propagation exists between queues'); end; / -- To unschedule propagation set serveroutput on declare   NO_PROPAGATION_EXISTS exception;   pragma                exception_init(NO_PROPAGATION_EXISTS, -24042);   l_db_Link   all_db_links.db_link%type;   l_Continue  boolean := false; begin   -- make sure you get the full domain name   begin     select  db_link     into    l_db_Link     from    all_db_links     where   owner     = :p_Schema     and     db_link   like 'D8_LINK%';     exception       when too_many_rows then         l_Continue := false;       when no_data_found then         l_Continue := false;   end;   l_Continue := (l_db_Link is not null);   if (l_Continue) then     dbms_aqadm.unschedule_propagation       (queue_name         => :p_Schema||'.AQ_EMS_QUEUE'       ,destination_queue  => :p_Dest_Schema||'.AQ_EMS_QUEUE'       ,destination        => l_db_Link);     dbms_output.put_line('Propagation successfully stopped');   else     dbms_output.put_line('Unknown DB Link,Propagation not unscheduled');   end if;   exception     when NO_PROPAGATION_EXISTS then       dbms_output.put_line('No propagation exists between queues'); end; / -- To stop propagation set serveroutput on declare   NO_PROPAGATION_EXISTS exception;   pragma                exception_init(NO_PROPAGATION_EXISTS, -24042);   l_db_Link   all_db_links.db_link%type;   l_Continue  boolean := false; begin   -- make sure you get the full domain name   begin     select  db_link     into    l_db_Link     from    all_db_links     where   owner     = :p_Schema     and     db_link   like 'D8_LINK%';     exception       when too_many_rows then         l_Continue := false;       when no_data_found then         l_Continue := false;   end;   l_Continue := (l_db_Link is not null);   if (l_Continue) then     dbms_aqadm.disable_propagation_schedule       (queue_name        => :p_Schema||'.AQ_EMS_QUEUE'       ,destination_queue => :p_Dest_Schema||'.AQ_EMS_QUEUE'       ,destination       => l_db_Link);     dbms_output.put_line('Propagation successfully stopped');   else         dbms_output.put_line('Unknown DB Link,Propagation not disabled');   end if;   exception     when NO_PROPAGATION_EXISTS then       dbms_output.put_line('No propagation exists between queues'); end; / -- to purge a queue table, use the following; set serveroutput on declare   po dbms_aqadm.aq_purge_options_t;
begin

  po.block := true;

  dbms_aqadm.purge_queue_table
    (queue_table     =>:p_Schema||'.AQ_EMS_QUEUE'
    ,purge_condition =>null
    ,purge_options   =>po);

  dbms_output.put_line('Purge sucessful');
  exception
    when others then
      dbms_output.put_line('Fail ' || sqlerrm);
end;
/

/*

Troubleshooting propogation
---------------------------
1.  Ensure the job_queue_processes initialization parameter
    is greater than zero. If it is not greater than zero,
    then unschedule propagation, before setting it
    to a value greater than zero.

2.  Ensure database link is active, i.e.
       select 1 from dual@database_link_name
    should return 1

3.  Ensure the payload is the same on both instances

4.  Verify that the queues match using the
    DBMS_AQADN.VERIFY_QUEUE_TYPES built-in procedure

5.  Verify that propagation is currently not disabled,
    using the following SQL;

    select schedule_disabled from  user_queue_schedules;
    should return "N"

6.  Check the NEXT_RUN_DATE, NEXT_RUN_TIME and PROPAGATION_WINDOW
    on USER_QUEUE_SCHEDULES. If the date and time are in the past and
    remain in the past, then the propagation schedule has been
    incorrectly configured.

    If the propagation window is null then the DURATION parameter may
    have been set to NULL, which is correct, but ensure that this is
    in fact the case.

    use the following SQL;
      select next_run_date, next_run_time,PROPAGATION_WINDOW
      from user_queue_schedules;

7.  Ensure JOBNO is not null in SYS.AQ_SCHEDULES using the     following SQL;       select jobno from sys.aq_schedules;

8.  Verify that the AQ_TM_PROCESSES is zero

9.  Verify that the AQ


Short Script
col next_run_date format a40
col next_run_time format a20
col owner format a20
col object_table format a30
col object_type format a40
col type format a20
col name format a20
col value format a20
set linesize 400
set pages 2000


select jobno
from   sys.aq_schedules / select schedule_disabled from   user_queue_schedules / select next_run_date       ,next_run_time       ,propagation_window from user_queue_schedules / select count(*) from sys.job
/
select name
       ,value
from   v$parameter
where name in('job_queue_processes','aq_tm_processes')
/
select owner
      ,queue_table
      ,type
      ,object_type
from dba_queue_tables
/


-- To trace for AQ problems
alter session set events '24040 trace name context forever, level 10';

*/


No comments:

Post a Comment