In this post I want to look at how to create a stored procedure in the database to automatically process events as they are produced on a Transactional Event Queue (TEQ).
Having a small, discrete piece of code that processes events off a queue is a pretty common use case. You could even call it a microservice I guess 🙂 since it does meet the well-established criteria of having its own code base, being loosely coupled, independently deployable and testable. One thing I find really interesting about writing a “microservice” like this and deploying it in the Oracle Database is that I can essentially scale it to zero instances, and it will only run (and consume resources) when there is actually work for it to do. I could also use the Database Resource Manager to control how many resources it is able to consume if I wanted to 🙂 And it would not be all that hard to instrument it so I could get logs, metrics and even distributed tracing – but that’s another story!
So, let’s go ahead and build this thing!
We’ll start with a new Oracle Database. I am going to run it in a Docker container using the standard image provided on Oracle Container Registry. If you have not used it before, you will need to login to Oracle Container Registry at https://container-registry.oracle.com and then navigate to the “Database” section, and then “enterprise” and read and accept the license.
Make sure you are logged into Oracle Container Registry in your Docker client too:
docker login container-registry.oracle.com -u firstname.lastname@example.org
Once you have authenticated, you can start up an Oracle 21c Database using this command:
docker run -d \ --name oracle-db \ -p 1521:1521 \ -e ORACLE_PWD=Welcome123## \ -e ORACLE_SID=ORCL \ -e ORACLE_PDB=PDB1 \ container-registry.oracle.com/database/enterprise:126.96.36.199
It will take a few minutes (the first time only) for the database files to be created and the instance to start up. You can watch the logs, or use this little shell command to do the waiting for you:
while ! docker logs oracle-db | grep -q "DATABASE IS READY TO USE!"; do sleep 10 done
Great, now we have a database running, let’s set up the necessary permissions for our user. I am going to use the pdbadmin user in the PDB1 pluggable database. So let’s give that user permissions to use the TEQ packages (I am using the new SQLcl command line tool, but you can use SQL*Plus or SQL Developer, or whatever tool you prefer):
# sql sys/Welcome123##@//localhost:1521/pdb1 as sysdba SQL> alter session set container = pdb1; SQL> grant dba to pdbadmin; SQL> grant execute on dbms_aqadm to pdbadmin; SQL> grant execute on dbms_aq to pdbadmin; SQL> commit; SQL> exit
Ok, now we can connect with our pdbadmin user and start setting up our environment:
# sql pdbadmin/Welcome123##@//localhost:1521/pdb1
First we want to create our (TEQ) queue (or topic) and start it. We’ll call the queue my_teq:
begin dbms_aqadm.create_transactional_event_queue( queue_name => 'my_teq', multiple_consumers => true ); dbms_aqadm.start_queue( queue_name => 'my_teq' ); end; /
Since we are using a multi-consumer queue (i.e. a topic) we need to add a subscriber too. Let’s call it my_subscriber:
declare subscriber sys.aq$_agent; begin dbms_aqadm.add_subscriber( queue_name => 'my_teq', subscriber => sys.aq$_agent('my_subscriber', null, 0) ); end; /
We’ll keep our microservice super-simple for this demonstration, we’ll just have it record the messages it receives in an “output” table – so let’s create that table now:
create table my_log ( message varchar(256), when timestamp(6) );
Ok, so here is our consumer microservice:
create or replace procedure receiver ( context in raw, reginfo in sys.aq$_reg_info, descr in sys.aq$_descriptor, payload in varchar2, payloadl in number ) as dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle raw ( 16 ) ; message sys.aq$_jms_text_message; no_messages exception; pragma exception_init ( no_messages, -25228 ) ; begin dequeue_options.msgid := descr.msg_id; dequeue_options.consumer_name := descr.consumer_name; dequeue_options.navigation := dbms_aq.first_message; loop dbms_aq.dequeue ( queue_name => 'my_teq', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle ); insert into my_log values ( message.text_vc, sysdate ); commit; end loop; exception when no_messages then dbms_output.put_line ( 'No more messages for processing' ) ; commit; end; /
Let’s walk through that and talk about the details. First, the procedure must have this signature/interface:
procedure receiver ( context in raw, reginfo in sys.aq$_reg_info, descr in sys.aq$_descriptor, payload in varchar2, payloadl in number )
The name of the procedure is up to you, but it must have those exact parameters in that order, since this is a callback, and the TEQ notification is expecting this signature so that it can pass the data about new messages to the consumer.
When we get the callback, we need to perform a dequeue operation to get the actual message/event off the queue/topic. Since it is possible that there is more than one, its a good idea to use a loop to read and process all of them before we complete. Here we have a simple loop to dequeue a message and then save the details in our log table:
loop dbms_aq.dequeue ( queue_name => 'my_teq', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle ); insert into my_log values ( message.text_vc, sysdate ); commit; end loop;
We’ve also defined an exception handler for when there are no messages available (though this should not ever happen, but its still a good practice to cater for it anyway):
when no_messages then dbms_output.put_line ( 'No more messages for processing' ) ; commit;
I used the JMS message format in this example, but of course you could use RAW or JSON or a user-defined type instead.
Ok, so now that our microservice is ready, we need to tell the database to call it when there is a message to process. To do this, we create a notification as follows:
begin dbms_aq.register( sys.aq$_reg_info_list( sys.aq$_reg_info( 'my_teq:my_subscriber', dbms_aq.namespace_aq, 'plsql://receiver', HEXTORAW('FF') ) ), 1); commit; end;
Ok, so let’s talk about what is happening here. This register function that we are running will set up the connection between the queue, the subscriber and the consumer. In the aq$_reg_info you can see the first parameter has the queue name followed by a colon and the subscriber name – so this is telling us “when we have a message on my_teq and it is addressed to the subscriber my_subscriber…”
The next parameter tells us that we are interested in AQ (and TEQ) notifications, and the third parameter tells us the callback address. In this case we are telling it to run the PL/SQL procedure called receiver.
Once that is done, you can check on the details with this query:
select r.reg_id, subscription_name, location_name, num_ntfns, num_pending_ntfns from USER_SUBSCR_REGISTRATIONS r, V$SUBSCR_REGISTRATION_STATS s where r.reg_id = s.reg_id; REG_ID SUBSCRIPTION_NAME LOCATION_NAME NUM_NTFNS NUM_PENDING_NTFNS ______ ______________________________________ ________________ _________ _________________ 301 "PDBADMIN"."MY_TEQ":"MY_SUBSCRIBER" plsql://receiver 40 0
If you come back and run this again later, you will see the number of notifications sent, and pending (i.e. the last two columns) will increase each time we send a message.
Ok, let’s enqueue a message (publish an event) to test this out!
We can use this command to send a test message. This creates and sends a JMS message on our my_teq queue/topic addressed to our my_subscriber consumer:
declare enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle raw(16); message sys.aq$_jms_text_message; begin message := sys.aq$_jms_text_message.construct; message.set_text('hello from mark'); message_properties.recipient_list(0) := sys.aq$_agent('my_subscriber', null, null); dbms_aq.enqueue( queue_name => 'my_teq', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); commit; end; /
Once that is run, the notification will kick in, and the callback will occur and our microservice will run, and consume all of the messages and dump them out into our “output table.” We can check the results with this query:
SQL> select * from my_log; MESSAGE WHEN __________________ __________________________________ hello from mark 23-JUN-22 04.44.06.000000000 PM
Feel free to go run that a few more times to see what happens.
So there we go! We created a nice simple, loosely coupled consumer that will process messages/events as they arrive, and will scale to zero (consume no resources) when there is no work to do. Enjoy!