Creating a stored procedure (dare I call it a microservice?) to automatically process events on a queue

In this post I want to look at how to create a stored procedure in the database to automatically process events as they are produced on a Transactional Event Queue (TEQ).

Having a small, discrete piece of code that processes events off a queue is a pretty common use case. You could even call it a microservice I guess šŸ™‚ since it does meet the well-established criteria of having its own code base, being loosely coupled, independently deployable and testable. One thing I find really interesting about writing a “microservice” like this and deploying it in the Oracle Database is that I can essentially scale it to zero instances, and it will only run (and consume resources) when there is actually work for it to do. I could also use the Database Resource Manager to control how many resources it is able to consume if I wanted to šŸ™‚ And it would not be all that hard to instrument it so I could get logs, metrics and even distributed tracing – but that’s another story!

So, let’s go ahead and build this thing!

We’ll start with a new Oracle Database. I am going to run it in a Docker container using the standard image provided on Oracle Container Registry. If you have not used it before, you will need to login to Oracle Container Registry at https://container-registry.oracle.com and then navigate to the “Database” section, and then “enterprise” and read and accept the license.

Make sure you are logged into Oracle Container Registry in your Docker client too:

docker login container-registry.oracle.com -u super.user@someplace.com

Once you have authenticated, you can start up an Oracle 21c Database using this command:

docker run -d \
  --name oracle-db \
  -p 1521:1521 \
  -e ORACLE_PWD=Welcome123## \
  -e ORACLE_SID=ORCL \
  -e ORACLE_PDB=PDB1 \
  container-registry.oracle.com/database/enterprise:21.3.0.0

It will take a few minutes (the first time only) for the database files to be created and the instance to start up. You can watch the logs, or use this little shell command to do the waiting for you:

while ! docker logs oracle-db | grep -q "DATABASE IS READY TO USE!";
do 
  sleep 10
done

Great, now we have a database running, let’s set up the necessary permissions for our user. I am going to use the pdbadmin user in the PDB1 pluggable database. So let’s give that user permissions to use the TEQ packages (I am using the new SQLcl command line tool, but you can use SQL*Plus or SQL Developer, or whatever tool you prefer):

# sql sys/Welcome123##@//localhost:1521/pdb1 as sysdba

SQL> alter session set container = pdb1;

SQL> grant dba to pdbadmin;

SQL> grant execute on dbms_aqadm to pdbadmin;
SQL> grant execute on dbms_aq to pdbadmin;

SQL> commit;
SQL> exit

Ok, now we can connect with our pdbadmin user and start setting up our environment:

# sql pdbadmin/Welcome123##@//localhost:1521/pdb1

First we want to create our (TEQ) queue (or topic) and start it. We’ll call the queue my_teq:

begin
    dbms_aqadm.create_transactional_event_queue(
        queue_name         => 'my_teq',
        multiple_consumers => true
    );
    
    dbms_aqadm.start_queue(
        queue_name         => 'my_teq'
    ); 
end;
/

Since we are using a multi-consumer queue (i.e. a topic) we need to add a subscriber too. Let’s call it my_subscriber:

declare
    subscriber sys.aq$_agent;
begin
    dbms_aqadm.add_subscriber(
        queue_name => 'my_teq',
        subscriber => sys.aq$_agent('my_subscriber', null, 0)
    );
end;
/

We’ll keep our microservice super-simple for this demonstration, we’ll just have it record the messages it receives in an “output” table – so let’s create that table now:

create table my_log (
    message varchar(256),
    when timestamp(6)
);

Ok, so here is our consumer microservice:

create or replace procedure receiver (
    context in raw,
    reginfo in sys.aq$_reg_info,
    descr in sys.aq$_descriptor,
    payload in varchar2,
    payloadl in number
) as
  dequeue_options dbms_aq.dequeue_options_t;
  message_properties dbms_aq.message_properties_t;
  message_handle raw ( 16 ) ;
  message sys.aq$_jms_text_message;
  no_messages exception;
  pragma exception_init ( no_messages, -25228 ) ;
begin
  dequeue_options.msgid := descr.msg_id;
  dequeue_options.consumer_name := descr.consumer_name;
  dequeue_options.navigation := dbms_aq.first_message;
  loop
    dbms_aq.dequeue (
      queue_name => 'my_teq',
      dequeue_options => dequeue_options,
      message_properties => message_properties,
      payload => message,
      msgid => message_handle
    );
    insert into my_log values ( message.text_vc, sysdate );
    commit;
  end loop;
exception
when no_messages then
  dbms_output.put_line ( 'No more messages for processing' ) ;
  commit;
end;
/

Let’s walk through that and talk about the details. First, the procedure must have this signature/interface:

procedure receiver (
    context in raw,
    reginfo in sys.aq$_reg_info,
    descr in sys.aq$_descriptor,
    payload in varchar2,
    payloadl in number
)

The name of the procedure is up to you, but it must have those exact parameters in that order, since this is a callback, and the TEQ notification is expecting this signature so that it can pass the data about new messages to the consumer.

When we get the callback, we need to perform a dequeue operation to get the actual message/event off the queue/topic. Since it is possible that there is more than one, its a good idea to use a loop to read and process all of them before we complete. Here we have a simple loop to dequeue a message and then save the details in our log table:

  loop
    dbms_aq.dequeue (
      queue_name => 'my_teq',
      dequeue_options => dequeue_options,
      message_properties => message_properties,
      payload => message,
      msgid => message_handle
    );
    insert into my_log values ( message.text_vc, sysdate );
    commit;
  end loop;

We’ve also defined an exception handler for when there are no messages available (though this should not ever happen, but its still a good practice to cater for it anyway):

when no_messages then
  dbms_output.put_line ( 'No more messages for processing' ) ;
  commit;

I used the JMS message format in this example, but of course you could use RAW or JSON or a user-defined type instead.

Ok, so now that our microservice is ready, we need to tell the database to call it when there is a message to process. To do this, we create a notification as follows:

begin
  dbms_aq.register(
      sys.aq$_reg_info_list(
        sys.aq$_reg_info(
            'my_teq:my_subscriber',
            dbms_aq.namespace_aq,
            'plsql://receiver', 
            HEXTORAW('FF')
        )
      ), 1);
  commit;
end;

Ok, so let’s talk about what is happening here. This register function that we are running will set up the connection between the queue, the subscriber and the consumer. In the aq$_reg_info you can see the first parameter has the queue name followed by a colon and the subscriber name – so this is telling us “when we have a message on my_teq and it is addressed to the subscriber my_subscriber…”

The next parameter tells us that we are interested in AQ (and TEQ) notifications, and the third parameter tells us the callback address. In this case we are telling it to run the PL/SQL procedure called receiver.

Once that is done, you can check on the details with this query:

select r.reg_id, subscription_name, location_name, num_ntfns, num_pending_ntfns
from USER_SUBSCR_REGISTRATIONS r, V$SUBSCR_REGISTRATION_STATS s
where r.reg_id = s.reg_id;

REG_ID                      SUBSCRIPTION_NAME    LOCATION_NAME NUM_NTFNS NUM_PENDING_NTFNS
______ ______________________________________ ________________ _________ _________________
   301 "PDBADMIN"."MY_TEQ":"MY_SUBSCRIBER"    plsql://receiver        40                 0

If you come back and run this again later, you will see the number of notifications sent, and pending (i.e. the last two columns) will increase each time we send a message.

Ok, let’s enqueue a message (publish an event) to test this out!

We can use this command to send a test message. This creates and sends a JMS message on our my_teq queue/topic addressed to our my_subscriber consumer:

declare
  enqueue_options dbms_aq.enqueue_options_t;
  message_properties dbms_aq.message_properties_t;
  message_handle raw(16);
  message sys.aq$_jms_text_message;
begin
  message := sys.aq$_jms_text_message.construct;
  message.set_text('hello from mark');
  message_properties.recipient_list(0) := sys.aq$_agent('my_subscriber', null, null);
  dbms_aq.enqueue(
    queue_name => 'my_teq',
    enqueue_options => enqueue_options,
    message_properties => message_properties,
    payload => message,
    msgid => message_handle);
  commit;
end;
/

Once that is run, the notification will kick in, and the callback will occur and our microservice will run, and consume all of the messages and dump them out into our “output table.” We can check the results with this query:

SQL> select * from my_log;

           MESSAGE                               WHEN
__________________ __________________________________
hello from mark    23-JUN-22 04.44.06.000000000 PM

Feel free to go run that a few more times to see what happens.

So there we go! We created a nice simple, loosely coupled consumer that will process messages/events as they arrive, and will scale to zero (consume no resources) when there is no work to do. Enjoy!

About Mark Nelson

Mark Nelson is a Developer Evangelist at Oracle, focusing on microservices and messaging. Before this role, Mark was an Architect in the Enterprise Cloud-Native Java Team, the Verrazzano Enterprise Container Platform project, worked on Wercker, WebLogic and was a senior member of the A-Team since 2010, and worked in Sales Consulting at Oracle since 2006 and various roles at IBM since 1994.
This entry was posted in Uncategorized and tagged , , , . Bookmark the permalink.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s