Oracle REST Data Services 22.3 brings new REST APIs for Transactional Event Queueing

Oracle REST Data Services 22.3 was released a couple of weeks ago, and it is now available on Oracle Autonomous Database as well! This release has a slew of new REST APIs for Oracle Transactional Event Queueing (or TxEventQ). If you have not heard of it, TxEventQ is essentially a new, faster implementation of Oracle Advanced Queueing which has been in the database for around twenty years.

Many of these new REST APIs are very simliar to the Kafka REST APIs, since TxEventQ provides Kafka compatibility as one of its features.

In this post, I want to show you how to use a few of the APIs, and then I’ll give you an idea of what kinds of APIs are available and where to find more information.

The first thing to do is the grab an Autonomous Database instance. It’s available in the Always Free tier, so you can try this for free! If you are not familiar with creating one, and accessing SQL and so on – check out this free LiveLab for details.

Make sure you get a 21c database – you may need to toggle the “Always Free” option to see 21c. The APIs described in this post are supported in Oracle Database 21c (and later).

When you get into your SQL worksheet, grab the URL from the browser, it will be something like this:

https://xyzabc-red1.adb.us-phoenix-1.oraclecloudapps.com/ords/admin/_sdw/?nav=worksheet

Now, chop off the end of the URL and replace it with the base URL for the TxEventQ REST APIs, and save that in an environment variable to save us some typing!

export ADDR="https://xyzabc-red1.adb.us-phoenix-1.oraclecloudapps.com/ords/admin/_/db-api/stable/database/teq"

And let’s create another environment variable with the authentication details. You can encode them using base64 like this, assuming your userid is admin and your passsword is your_password:

$ echo -n "admin:your_password" | base64
YWRtaW46eW91cl9wYXNzd29yZA==

Then we can use that value to create the authentication header details:

export AUTH="Authorization: Basic YWRtaW46eW91cl9wYXNzd29yZA=="

Great, that will save us from typing those each time!

Create a topic

Let’s start with by creating a topic. We are going to need to know the database name for this – you can find that by running this query in your SQL worksheet:

select sys_context('userenv','db_name') from dual

You’ll need to put that database name into the URL below after “clusters” and before “topics”, in this example my database name is “XYZABC_RED1“:

curl -X POST -H "$AUTH" -H "Content-Type: application/json" -d '{"topic_name": "mark1", "partitions_count": "6"}' "$ADDR/clusters/XYZABC_RED1/topics/"

In the body we specified the name of the topic (“mark1” in this case) and how many parititions we want the topic to have. When you run this request, you’ll see output something like this:

{"name":"MARK1","partitions":[{"partition":0,"leader":1,"replicas":1}]}

It created our topic for us!

List topics

Let’s list the topics now, try this request:

curl -X GET -H "$AUTH" "$ADDR/topics/"

The output will be a JSON list of topic names, like this. You might want to create a few more to make it more interesting!:

["MARK1"]

Get a topic

We can also get details of a single topic like this, the topic name is in the last part of the URL:

curl -X GET -H "$AUTH" "$ADDR/topics/mark1/"

The output looks like this:

{"name":"MARK1","partitions":[{"partition":0,"leader":1,"replicas":1}]}

Create a consumer group

Now let’s create a consumer group, here’s the request, notice the topic name is in the body, and the name of the consumer group is the last part of the URL (“sub1” in this case):

curl -X POST -H "$AUTH" \
     -H "Content-Type: application/json" \
     -d '{"topic_name": "mark1"}' \
     "$ADDR/clusters/XYZABC_RED1/consumer-groups/sub1/"

The output from this is empty (unless you specify verbose, or get an error), but we can easily check the result in the database by running this query:

select * from user_queue_subscribers

Publish messages

Ok, I think we’re ready to publish a message! Here’s the request:

curl -X POST -H "$AUTH" \
     -H "Content-Type: application/json" \
     -d '{"records": [{"key":1,"value":"bob"}]}' \
     "$ADDR/topics/mark1/"

The output will look something like this:

{"Offsets":[{"partition":0,"offset":0}]}

You can put mutliple records in the body to send put more than one message on the topic.

Consume messages

Now, let’s consume the messages off that topic with our consumer sub1. Here’s the request, notice the topic name is in the body, and the soncumer name is in the URL after “consumers”:

curl -X GET -H "$AUTH" \
     -H "Content-Type: application/json" \
     -d '{ "partitions": [ { "topic": "mark1", "partition": 0 } ] }' \
     "$ADDR/consumers/sub1/instances/1/records"

The output from this one (not surprisingly) looks like this:

[{"topic":"MARK1","key":"1","value":"bob","partition":0,"offset":0}]

Great, hopefully that gives you a feel for how these REST APIs for TxEventQ work!

But wait, there’s more!

Of course there are a lot more APIs available than the few I have shown you so far. They all follow a fairly similar pattern, let’s take a look at a list of what’s available:

Topics APIs

  • Create topic, optionally with partition count
  • List topics
  • Get a topic
  • Create a consumer group
  • Publish message(s)
  • List topics in a specific cluster
  • Get a topic in a specific cluster
  • Delete a topic

Partitions APIs

  • List paritions in a topic
  • Get details of one partition in a topic
  • Get partition message offsets
  • List partitions in a topic in a cluster
  • Get details of one partition in a topic in a cluster
  • List consumer lags for a partition
  • Publish message(s) in a particular partition

Consumer Group APIs

  • List consumer groups
  • Details of one consumer group
  • Get consumer group lag summary
  • Get consumer group lags for all partitions
  • Get consumer group lags for a given partition
  • Delete consumer groupworking
  • Create consumer group

Consumer APIs

  • Create consumer instance on consumer group
  • Delete consumer instance on consumer group
  • List topics that a consumer is subscribed to
  • Subscribe to topics
  • Unsubscribe from topics
  • Send message to given partition

Move Offset APIs

  • Fetch messages
  • Fetch messages from offset in specified partition
  • Move to beginning of partition
  • Move to end of partition
  • Get offsets in specific topics and paritions
  • Commit (ser) offsetes in specific partitions

Where to find more information

You can find more informaiton about the TxEventQ REST APIs in the documentation, here: https://docs.oracle.com/en/database/oracle/oracle-rest-data-services/22.3/orrst/api-oracle-transactional-event-queues.html

Or if you prefer, you can open the OpenAPI specification on your database instance. The URL will be something like this, and you can search the output for “teq” to find the APIs:

https://xyzabc-red1.adb.us-phoenix-1.oraclecloudapps.com/ords/admin/_/db-api/stable/metadata-catalog/openapi.json

I hope you enjoyed this quick introduction to the new REST APIs for Transactional Event Queueing! Of course, this is available in any Oracle database, not just Autonomous Database. If you want to use Oracle REST Data Services with your own database, you might find this post about installing a standalone version interesting too!

Posted in Uncategorized | Tagged , , , , | Leave a comment

Getting started with the new observability exporter for Oracle database

My colleague Paul Parkinson recently published our new unified obserability exporter for Oracle Database on GitHub, you can read about it here. I wanted to start playing around with it to see what we can do with it.

In this post I will start with a really simple example that just gets the exporter up and running and collects a few simple metrics from the database into Prometheus. In subsequent posts, I’ll go further and look at dashboards in Grafana, and also cover the logging and metrics capabilities! But you have to start somewhere right!

First thing we need is a database of course! I just fired one up in a container like this:

docker run -d \
       --name oracle-db \
       -p 1521:1521 \
       -e ORACLE_PWD=Welcome123 \
       -e ORACLE_SID=ORCL \
       -e ORACLE_PDB=PDB1 \
       container-registry.oracle.com/database/enterprise:21.3.0.0

If you have not used this image before, you will first need to go to Oracle Container Registry at https://container-registry.oracle.com, log in, and navigate to the Database category and then the “enterprise” image and accept the license agreement. You will also need to login your docker client so you can pull the image:

docker login container-registry.oracle.com
# this will prompt you for your username and password

The image will take a short time to pull the first time, and the first startup will actually create the database instance, and that takes a few minutes too. You can watch the logs to see when the database is ready:

docker logs -f oracle-db

You only need to have these delays the first time you start the image. After that you can stop and start the container as needed and it will retain the data and startup very quickly.

# to stop the container:
docker stop oracle-db

# to start the container:
docker start oracle-db

Ok, so now we have a database available. Let’s connect to it and create some data to play with. You can use your favorite client – there’s SQL*Plus in that image if you don’t have anything else available. You can start it and connect to the database like this:

docker exec -ti oracle-db sqlplus pdbadmin/Welcome123@//localhost:1521/pdb1

Note: If you have not already, you might want to check out the new SQLcl command line tool which features command line completion and many other great features – check it out at https://www.oracle.com/database/sqldeveloper/technologies/sqlcl/

Let’s create a “customer” table and add a record:

create table customer (id number, name varchar2(256));
insert into customer (id, name) values (1, 'mark');
commit;

Great, and let’s just leave that session connected – that will come in handy later!

Now, let’s get the observability exporter and set it up.

First, you’ll need to clone the project from GitHub:

git clone https://github.com/oracle/oracle-db-appdev-monitoring
cd oracle-db-appdev-monitoring

You can build the project and create a container image (assuming you have Maven, Java and Docker installed) like this:

mvn clean package -DskipTests
docker build -t observability-exporter:0.1.0 .

If you don’t have those installed and you don’t want to – you can skip this step and just grab a pre-built container image from Oracle Container Registry:

docker pull container-registry.oracle.com/database/observability-exporter:0.1.0	

If you do it this way, make sure to use the full name later when we start the exporter, not the short version!

Now we need to create a configuration file and define our metrics. I called mine mark-metrics.toml and here’s the content:

[[metric]]
context = "customers"
request = "SELECT count(*) as num_custs FROM customer"
metricsdesc = { num_custs = "Number of customers." }

[[metric]]
context = "system"
request = "select count(*) as session_count from v$session where username is not null and type = 'USER' and con_id = sys_context('userenv','con_id')"
metricsdesc = { session_count = "Current session count." }

[[metric]]
context = "system"
request = "select count(*) as active_sessions from v$session where username is not null and type = 'USER' and status = 'ACTIVE' and con_id = sys_context('userenv','con_id')"
metricsdesc = { active_sessions = "Active sessions." }

[[metric]]
context = "system"
request = "select (c.session_count - a.active_sessions) as inactive_sessions from (select count(*) as session_count from v$session where username is not null and type = 'USER' and con_id = sys_context('userenv','con_id')) c, (select count(*) as active_sessions from v$session where username is not null and type = 'USER' and status = 'ACTIVE' and con_id = sys_context('userenv','con_id')) a"
metricsdesc = { inactive_sessions = "Inactive sessions." }

[[metric]]
context = "system"
request = "select b.session_count as blocked_sessions from (select count(*) as session_count from v$session where username is not null and type = 'USER' and blocking_session_status = 'VALID' and con_id = sys_context('userenv','con_id')) b"
metricsdesc = { blocked_sessions = "Blocked sessions." }

I defined five metrics in this file. Each metric starts with the [[metric]] heading and can have several fields. You can see more information in the documentation here. In the spirit of keeping this first post simple, I just created basic metrics with no labels or anything fancy πŸ™‚

Let’s take a close look at the first metric, here it is again:

[[metric]]
context = "customers"
request = "SELECT count(*) as num_custs FROM customer"
metricsdesc = { num_custs = "Number of customers." }

It is in the context (or group) called customers. The metric itself is called num_custs. You can see how we use the metricsdesc to create a human-readable documentation/description for the metric. And the metric itself is defined with an SQL statement. Wow! That’s pretty cool, right? That means that anything I can write an SQL statement to get from the database can be exported as a metric! In this one I just count the number of entries in that customer table we just created.

The other four metrics are some simple queries that get the number of sessions in the database as well as how many are active, inactive and blocked. These are all in the system context. You can define whatever contexts you like.

When you later look at a metric in Prometheus its name will be something like this:

oracledb_customers_num_custs

Notice how the context (customers) and the metric name (num_custs) are in there.

Ok, now that we have defined our metrics, we can start up the exporter. Let’s run it in another container, alongside the database. We can start it like this:

docker run -d \
       -v /home/mark/oracle-db-appdev-monitoring/mark-metrics.toml:/metrics.toml \
       -p 9161:9161 \
       -e DEFAULT_METRICS=/metrics.toml \
       -e DATA_SOURCE_NAME=pdbadmin/Welcome123@172.17.0.3:1521/pdb1 \
       --name exporter \
       observability-exporter:0.1.0

There’s a couple of things to note here. First, I am providing the configuration file we just created using the -v mount. This will give the exporter access to the metrics definitions. Second, we need to tell it how to connect to the database. You’ll need to get the IP address of the database container using this command:

docker inspect oracle-db | grep IPAddress

Yours will probably be diffrent to mine, so you’ll need to update the value of DATA_SOURCE_NAME to match your environment. And finally, a reminder – if you pulled the pre-built image down from Oracle Container Registry, you’ll need to use the fully qualified name on the last line.

Once this container starts up, grab its IP address too, we’ll need that in a minute:

docker inspect exporter | grep IPAddress

The exporter should start right up, and assuming we got the address right and no typos, it should be working and we can get metrics like this:

$ curl localhost:9161/metrics
# HELP oracledb_system_inactive_sessions Inactive sessions.
# TYPE oracledb_system_inactive_sessions gauge
oracledb_system_inactive_sessions 1.0
# HELP oracledb_up Whether the Oracle database server is up.
# TYPE oracledb_up gauge
oracledb_up 1.0
# HELP oracledb_system_blocked_sessions Blocked sessions.
# TYPE oracledb_system_blocked_sessions gauge
oracledb_system_blocked_sessions 0.0
# HELP oracledb_customers_num_custs Number of customers.
# TYPE oracledb_customers_num_custs gauge
oracledb_customers_num_custs 2.0
# HELP oracledb_system_active_sessions Active sessions.
# TYPE oracledb_system_active_sessions gauge
oracledb_system_active_sessions 1.0
# HELP oracledb_system_session_count Current session count.
# TYPE oracledb_system_session_count gauge
oracledb_system_session_count 2.0

If you don’t see this, check the container logs to see what the error was:

docker logs exporter

Assuming everything is working now, let’s start up Prometheus and configure it to scrape these metrics.

First, let’s create a configuration file called prometheus.yml with this content:

global:
  scrape_interval:     10s
  evaluation_interval: 10s

scrape_configs:
  - job_name: 'prometheus'
    static_configs:
    - targets: ['127.0.0.1:9090']

  - job_name: 'oracle-exporter'
    metrics_path: '/metrics'
    scrape_interval: 10s
    scrape_timeout: 8s
    static_configs:
    - targets: ['172.17.0.4:9161']

The only thing you’ll need to change here is the very last line – you need to put the IP address of your exporter container in there.

Then you can start up Prometheus using this configuration like this:

docker run -d \
       --name prometheus \
       -p 9090:9090 \
       -v /home/mark/prometheus.yml:/etc/prometheus/prometheus.yml \
       prom/prometheus --config.file=/etc/prometheus/prometheus.yml

It should start right up and you can access it at http://localhost:9090

The user interface looks like this, and you can type into that search field to find a metric. If you start typing “num_custs” it should find our metric. Then hit enter, or click on the Execute button to see the value of the metric. It might take up to 10 seconds for data to be available, since we configured the scrape interval as 10 seconds in our configuration file. You should see something like this – yours will probably say 1, not 2:

If you go insert some more records into that table and then check again, you’ll see the value is updated. You can also click on the Graph tab to view that as a time series graph. Try adding and removing records to see what happens. Remember to wait a little while between each update so that new metrics are collected.

You can also try the other metrics we created! So there we go, that’s covered the very basic starting steps of defining some metrics, running the exporter and scraping the metrics into Prometheus! Stay tuned for some follow up posts where I will build dashboards in Grafana, and also look at exporting logs and distributed tracing!

Bonus info: If you use WSL2 like I do, you might see a warning on the Prometheus web user interface about clock skew. If you do, you can fix that by updating the time in WSL like this:

sudo hwclock -s
Posted in Uncategorized | Tagged , , , , | Leave a comment

New web page for Oracle Transactional Event Queueing

The new web page for Oracle Transactional Event Queueing is live and has lots of great information including sample code, links to hands-on labs, documentation and some user stories! Hope you can check it out!

Posted in Uncategorized | Tagged | Leave a comment

Creating a stored procedure (dare I call it a microservice?) to automatically process events on a queue

In this post I want to look at how to create a stored procedure in the database to automatically process events as they are produced on a Transactional Event Queue (TEQ).

Having a small, discrete piece of code that processes events off a queue is a pretty common use case. You could even call it a microservice I guess πŸ™‚ since it does meet the well-established criteria of having its own code base, being loosely coupled, independently deployable and testable. One thing I find really interesting about writing a “microservice” like this and deploying it in the Oracle Database is that I can essentially scale it to zero instances, and it will only run (and consume resources) when there is actually work for it to do. I could also use the Database Resource Manager to control how many resources it is able to consume if I wanted to πŸ™‚ And it would not be all that hard to instrument it so I could get logs, metrics and even distributed tracing – but that’s another story!

So, let’s go ahead and build this thing!

We’ll start with a new Oracle Database. I am going to run it in a Docker container using the standard image provided on Oracle Container Registry. If you have not used it before, you will need to login to Oracle Container Registry at https://container-registry.oracle.com and then navigate to the “Database” section, and then “enterprise” and read and accept the license.

Make sure you are logged into Oracle Container Registry in your Docker client too:

docker login container-registry.oracle.com -u super.user@someplace.com

Once you have authenticated, you can start up an Oracle 21c Database using this command:

docker run -d \
  --name oracle-db \
  -p 1521:1521 \
  -e ORACLE_PWD=Welcome123## \
  -e ORACLE_SID=ORCL \
  -e ORACLE_PDB=PDB1 \
  container-registry.oracle.com/database/enterprise:21.3.0.0

It will take a few minutes (the first time only) for the database files to be created and the instance to start up. You can watch the logs, or use this little shell command to do the waiting for you:

while ! docker logs oracle-db | grep -q "DATABASE IS READY TO USE!";
do 
  sleep 10
done

Great, now we have a database running, let’s set up the necessary permissions for our user. I am going to use the pdbadmin user in the PDB1 pluggable database. So let’s give that user permissions to use the TEQ packages (I am using the new SQLcl command line tool, but you can use SQL*Plus or SQL Developer, or whatever tool you prefer):

# sql sys/Welcome123##@//localhost:1521/pdb1 as sysdba

SQL> alter session set container = pdb1;

SQL> grant dba to pdbadmin;

SQL> grant execute on dbms_aqadm to pdbadmin;
SQL> grant execute on dbms_aq to pdbadmin;

SQL> commit;
SQL> exit

Ok, now we can connect with our pdbadmin user and start setting up our environment:

# sql pdbadmin/Welcome123##@//localhost:1521/pdb1

First we want to create our (TEQ) queue (or topic) and start it. We’ll call the queue my_teq:

begin
    dbms_aqadm.create_transactional_event_queue(
        queue_name         => 'my_teq',
        multiple_consumers => true
    );
    
    dbms_aqadm.start_queue(
        queue_name         => 'my_teq'
    ); 
end;
/

Since we are using a multi-consumer queue (i.e. a topic) we need to add a subscriber too. Let’s call it my_subscriber:

declare
    subscriber sys.aq$_agent;
begin
    dbms_aqadm.add_subscriber(
        queue_name => 'my_teq',
        subscriber => sys.aq$_agent('my_subscriber', null, 0)
    );
end;
/

We’ll keep our microservice super-simple for this demonstration, we’ll just have it record the messages it receives in an “output” table – so let’s create that table now:

create table my_log (
    message varchar(256),
    when timestamp(6)
);

Ok, so here is our consumer microservice:

create or replace procedure receiver (
    context in raw,
    reginfo in sys.aq$_reg_info,
    descr in sys.aq$_descriptor,
    payload in varchar2,
    payloadl in number
) as
  dequeue_options dbms_aq.dequeue_options_t;
  message_properties dbms_aq.message_properties_t;
  message_handle raw ( 16 ) ;
  message sys.aq$_jms_text_message;
  no_messages exception;
  pragma exception_init ( no_messages, -25228 ) ;
begin
  dequeue_options.msgid := descr.msg_id;
  dequeue_options.consumer_name := descr.consumer_name;
  dequeue_options.navigation := dbms_aq.first_message;
  loop
    dbms_aq.dequeue (
      queue_name => 'my_teq',
      dequeue_options => dequeue_options,
      message_properties => message_properties,
      payload => message,
      msgid => message_handle
    );
    insert into my_log values ( message.text_vc, sysdate );
    commit;
  end loop;
exception
when no_messages then
  dbms_output.put_line ( 'No more messages for processing' ) ;
  commit;
end;
/

Let’s walk through that and talk about the details. First, the procedure must have this signature/interface:

procedure receiver (
    context in raw,
    reginfo in sys.aq$_reg_info,
    descr in sys.aq$_descriptor,
    payload in varchar2,
    payloadl in number
)

The name of the procedure is up to you, but it must have those exact parameters in that order, since this is a callback, and the TEQ notification is expecting this signature so that it can pass the data about new messages to the consumer.

When we get the callback, we need to perform a dequeue operation to get the actual message/event off the queue/topic. Since it is possible that there is more than one, its a good idea to use a loop to read and process all of them before we complete. Here we have a simple loop to dequeue a message and then save the details in our log table:

  loop
    dbms_aq.dequeue (
      queue_name => 'my_teq',
      dequeue_options => dequeue_options,
      message_properties => message_properties,
      payload => message,
      msgid => message_handle
    );
    insert into my_log values ( message.text_vc, sysdate );
    commit;
  end loop;

We’ve also defined an exception handler for when there are no messages available (though this should not ever happen, but its still a good practice to cater for it anyway):

when no_messages then
  dbms_output.put_line ( 'No more messages for processing' ) ;
  commit;

I used the JMS message format in this example, but of course you could use RAW or JSON or a user-defined type instead.

Ok, so now that our microservice is ready, we need to tell the database to call it when there is a message to process. To do this, we create a notification as follows:

begin
  dbms_aq.register(
      sys.aq$_reg_info_list(
        sys.aq$_reg_info(
            'my_teq:my_subscriber',
            dbms_aq.namespace_aq,
            'plsql://receiver', 
            HEXTORAW('FF')
        )
      ), 1);
  commit;
end;

Ok, so let’s talk about what is happening here. This register function that we are running will set up the connection between the queue, the subscriber and the consumer. In the aq$_reg_info you can see the first parameter has the queue name followed by a colon and the subscriber name – so this is telling us “when we have a message on my_teq and it is addressed to the subscriber my_subscriber…”

The next parameter tells us that we are interested in AQ (and TEQ) notifications, and the third parameter tells us the callback address. In this case we are telling it to run the PL/SQL procedure called receiver.

Once that is done, you can check on the details with this query:

select r.reg_id, subscription_name, location_name, num_ntfns, num_pending_ntfns
from USER_SUBSCR_REGISTRATIONS r, V$SUBSCR_REGISTRATION_STATS s
where r.reg_id = s.reg_id;

REG_ID                      SUBSCRIPTION_NAME    LOCATION_NAME NUM_NTFNS NUM_PENDING_NTFNS
______ ______________________________________ ________________ _________ _________________
   301 "PDBADMIN"."MY_TEQ":"MY_SUBSCRIBER"    plsql://receiver        40                 0

If you come back and run this again later, you will see the number of notifications sent, and pending (i.e. the last two columns) will increase each time we send a message.

Ok, let’s enqueue a message (publish an event) to test this out!

We can use this command to send a test message. This creates and sends a JMS message on our my_teq queue/topic addressed to our my_subscriber consumer:

declare
  enqueue_options dbms_aq.enqueue_options_t;
  message_properties dbms_aq.message_properties_t;
  message_handle raw(16);
  message sys.aq$_jms_text_message;
begin
  message := sys.aq$_jms_text_message.construct;
  message.set_text('hello from mark');
  message_properties.recipient_list(0) := sys.aq$_agent('my_subscriber', null, null);
  dbms_aq.enqueue(
    queue_name => 'my_teq',
    enqueue_options => enqueue_options,
    message_properties => message_properties,
    payload => message,
    msgid => message_handle);
  commit;
end;
/

Once that is run, the notification will kick in, and the callback will occur and our microservice will run, and consume all of the messages and dump them out into our “output table.” We can check the results with this query:

SQL> select * from my_log;

           MESSAGE                               WHEN
__________________ __________________________________
hello from mark    23-JUN-22 04.44.06.000000000 PM

Feel free to go run that a few more times to see what happens.

So there we go! We created a nice simple, loosely coupled consumer that will process messages/events as they arrive, and will scale to zero (consume no resources) when there is no work to do. Enjoy!

Posted in Uncategorized | Tagged , , , | Leave a comment

Java to get go-routine-like virtual threads!

Yay! Java is finally going to get some lightweight threads, a bit like go-routines, which allow you to create threads in the JVM without each one consuming an OS thread!

I’m looking forward to trying it out in Java 19.

Read all the details in this article by Nicolai Parloghttps://blogs.oracle.com/javamagazine/post/java-loom-virtual-threads-platform-threads

Posted in Uncategorized | Tagged , , | Leave a comment

Some big updates for the Python Oracle library (cx_Oracle)

There are some really interesting updates for the open source Python Oracle library (known as cx_oracle, but changing its name is part of this) – check it out here:

https://cjones-oracle.medium.com/open-source-python-thin-driver-for-oracle-database-e82aac7ecf5a

Posted in Uncategorized | Tagged , | Leave a comment

Cross-region event propagation with Oracle Transactional Event Queues

In this post I want to demonstrate how to use Oracle Transactional Event Queues (TEQ) to propagate messages/events across regions. I will use two Oracle Autonomous Databases running in Oracle Cloud, one in Ashburn, VA and one in Phoenix, AZ (about 2,000 miles apart).

Of course, there are a lot of reasons why you might want to propagate events like this, and you don’t necessarily have to do it across geographic regions, you might just want to do it across two database instances in the same data center, or even just two topics in the same database!

Here’s a quick diagram of what we are going to build. We are going to use the JMS Pub/Sub model. Our producer will connect to the ASHPROD1 instance and put messages onto the topic ASH_TOPIC. Messages will be propagated from this topic to the topic PHX_TOPIC in the PHXPROD1 instance. Our consumer will connect to PHXPROD1 and consume messages from there.

To get started, let’s create two databases. To follow along, you’ll need an Oracle Cloud account – you can do this with the “Always Free” account using the 21c version of the Autonomous Database, so you can try this without spending any money πŸ™‚ You can also use 19c if you prefer.

Creating the databases

First we log into the Oracle Cloud Infrastructure (OCI) Console at https://cloud.oracle.com. Enter your cloud account name and hit the “Next” button.

After you log in, click on the “hamburger” (three lines) menu (1) and go to “Oracle Database” (2) and then “Autonomous Database” (3) as shown:

Choose your compartment (1), and the region (2) (I’ll use Ashburn and Phoenix – use any two regions you like, or two in the same region will work too), then click on the “Create Autonomous Database” (3) button:

In the dialog, we need to give the database a name, I used ASHPROD1. Choose “Transaction Processing” as the workload type and “Shared Infrastructure” as the deployment type:

You can accept the default 19c database (or toggle that “Always Free” switch to use 21c). The default 1 OCPU, 1 TB is fine for this exercise. Also provide a password for the administrator (don’t forget it!):

In the “Choose network access” section, choose the option for secure access and click on the “Add My IP Address” button. Choose the “Bring You Own License (BYOL)” option and provide an email address for the administrator:

Then click on the “Create Autonomous Database” button to create the database.

Now choose the second region, e.g. Phoenix, in the top right corner of the OCI Console and repeat this same process to create a second database, for example called PHXPROD1. This time though, choose the “secure access from anywhere” option, since we are going to need to be able to have ASHPROD1 connect to this instance too.

Obtain Database Wallets

So now we have our two databases. Let’s download the wallets so that we can connect to them. The database wallets contain the necessary information to connect to, and authenticate the database.

In the OCI Console, click on the database name to see the details of the database:

Next, click on the “DB Connection” button:

You will click on the “Download wallet” button (1) to get the wallet file, but while you are here, notice the connection strings (2) – we’ll use one of those later.

After you click on the button, provide a password for the wallet, and then click on the “Download” button:

Repeat this for the other database.

Creating our consumer

Let’s create a new project and write our consumer code. We’ll use Maven to simplify the dependency management and to make it easy to run our consumer. Let’s create a new directory and unzip our two wallets into this directory. So we should see something like this:


/home/mark/src/redstack
β”œβ”€β”€ ASHPROD1
β”‚   β”œβ”€β”€ README
β”‚   β”œβ”€β”€ cwallet.sso
β”‚   β”œβ”€β”€ ewallet.p12
β”‚   β”œβ”€β”€ ewallet.pem
β”‚   β”œβ”€β”€ keystore.jks
β”‚   β”œβ”€β”€ ojdbc.properties
β”‚   β”œβ”€β”€ sqlnet.ora
β”‚   β”œβ”€β”€ tnsnames.ora
β”‚   └── truststore.jks
└── PHXPROD1
    β”œβ”€β”€ README
    β”œβ”€β”€ cwallet.sso
    β”œβ”€β”€ ewallet.p12
    β”œβ”€β”€ ewallet.pem
    β”œβ”€β”€ keystore.jks
    β”œβ”€β”€ ojdbc.properties
    β”œβ”€β”€ sqlnet.ora
    β”œβ”€β”€ tnsnames.ora
    └── truststore.jks

You will need to edit both of those sqlnet.ora files and update the DIRECTORY so that it is correct, for example, we would change this:

WALLET_LOCATION = (SOURCE = (METHOD = file) (METHOD_DATA = (DIRECTORY="?/network/admin")))
SSL_SERVER_DN_MATCH=yes

To this;

WALLET_LOCATION = (SOURCE = (METHOD = file) (METHOD_DATA =
  (DIRECTORY="/home/mark/src/redstack/PHXPROD1")))
SSL_SERVER_DN_MATCH=yes

Let’s add a Maven POM file to set up our project. I am assuming you have Maven and a JDK installed. If not – go get those now πŸ™‚ I am using Maven 3.8.4 and Java 17.0.3. Create a file called pom.xml with this content:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wordpress.redstack</groupId>
    <artifactId>propagation</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>propagation</name>
    <description>Demo of TEQ propagation</description>

    <properties>
        <maven.compiler.target>17</maven.compiler.target>
        <maven.compiler.source>17</maven.compiler.source>
    </properties>

    <dependencies>
        <dependency>
            <groupId>javax.transaction</groupId>
            <artifactId>javax.transaction-api</artifactId>
            <version>1.2</version>
        </dependency>
        <dependency>
            <groupId>com.oracle.database.jdbc</groupId>
            <artifactId>ojdbc8</artifactId>
            <version>19.3.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.oracle.database.messaging</groupId>
            <artifactId>aqapi</artifactId>
            <version>19.3.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.oracle.database.security</groupId>
            <artifactId>oraclepki</artifactId>
            <version>19.3.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.oracle.database.security</groupId>
            <artifactId>osdt_core</artifactId>
            <version>19.3.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.oracle.database.security</groupId>
            <artifactId>osdt_cert</artifactId>
            <version>19.3.0.0</version>
        </dependency>
        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>javax.jms-api</artifactId>
            <version>2.0.1</version>
        </dependency>
        <dependency>
            <groupId>javax.transaction</groupId>
            <artifactId>jta</artifactId>
            <version>1.1</version>
        </dependency>
    </dependencies>

    <profiles>
        <profile>
            <id>consumer</id>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.codehaus.mojo</groupId>
                        <artifactId>exec-maven-plugin</artifactId>
                        <version>3.0.0</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>exec</goal>
                                </goals>
                            </execution>
                        </executions>
                        <configuration>
                            <executable>java</executable>
                            <arguments>
                                <argument>-Doracle.jdbc.fanEnabled=false</argument>
                                <argument>-classpath</argument>
                                <classpath/>
                                <argument>com.wordpress.redstack.Consumer</argument>
                            </arguments>
                        </configuration>
                    </plugin>
                </plugins>
            </build>
        </profile>
        <profile>
            <id>producer</id>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.codehaus.mojo</groupId>
                        <artifactId>exec-maven-plugin</artifactId>
                        <version>3.0.0</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>exec</goal>
                                </goals>
                            </execution>
                        </executions>
                        <configuration>
                            <executable>java</executable>
                            <arguments>
                                <argument>-Doracle.jdbc.fanEnabled=false</argument>
                                <argument>-classpath</argument>
                                <classpath/>
                                <argument>com.wordpress.redstack.Producer</argument>
                            </arguments>
                        </configuration>
                    </plugin>
                </plugins>
            </build>
        </profile>
    </profiles>
</project>

This defines the Maven coordinates for our project, the dependencies we need to compile and run our code, and also a convenience goal to run the consumer (or producer) directly from Maven so that we don’t have to worry about constructing the class path manually. Let’s also create some directories to store our code:

mkdir -p src/main/java/com/wordpress/redstack
mkdir -p src/main/resources

Now let’s create a Java class in src/main/java/com/wordpress/redstack/Consumer.java with the following content:

package com.wordpress.redstack;

import java.sql.SQLException;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

import oracle.AQ.AQException;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import oracle.jms.AQjmsTextMessage;
import oracle.jms.AQjmsTopicSubscriber;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;

public class Consumer {

    private static String username = "admin";
    private static String url = "jdbc:oracle:thin:@phxprod1_high?TNS_ADMIN=/home/mark/src/redstack/PHXPROD1";
    private static String topicName = "phx_topic";

    public static void main(String[] args) throws AQException, SQLException, JMSException {

        // create a topic session
        PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
        ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
        ds.setURL(url);
        ds.setUser(username);
        ds.setPassword(System.getenv("DB_PASSWORD"));

        // create a JMS topic connection and session
        TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
        TopicConnection conn = tcf.createTopicConnection();
        conn.start();
        TopicSession session = 
           (AQjmsSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

        // create a subscriber on the topic
        Topic topic = ((AQjmsSession) session).getTopic(username, topicName);
        AQjmsTopicSubscriber subscriber = 
           (AQjmsTopicSubscriber) session.createDurableSubscriber(topic, "BOOK");

        System.out.println("Waiting for messages...");

        // wait forever for messages to arrive and print them out
        while (true) {

            // the 1_000 is a one second timeout
            AQjmsTextMessage message = (AQjmsTextMessage) subscriber.receive(1_000); 
            if (message != null) {
                if (message.getText() != null) {
                    System.out.println(message.getText());
                } else {
                    System.out.println();
                }
            }
            session.commit();
        }
    }

}

Let’s take a look at the interesting parts of that code.

    private static String url = "jdbc:oracle:thin:@phxprod1_high?TNS_ADMIN=/home/mark/src/redstack/PHXPROD1";

This defines the URL that we will use to connect to the database. Notice that it is using an alias (phxprod1_high) – that might look familiar, remember we saw those on the OCI Console when we were downloading the wallet. If you take a look at the tnsnames.ora file in the PHXPROD1 wallet you will see how this is defined, something like this:

phxprod1_high = (description= (retry_count=20)(retry_delay=3)(address=(protocol=tcps)(port=1522)(host=adb.us-phoenix-1.oraclecloud.com))(connect_data=(service_name=xxx_phxprod1_high.adb.oraclecloud.com))(security=(ssl_server_cert_dn="CN=adwc.uscom-east-1.oraclecloud.com, OU=Oracle BMCS US, O=Oracle Corporation, L=Redwood City, ST=California, C=US")))

In our main() method, we start by connecting to the database instance:

        // create a topic session
        PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
        ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
        ds.setURL(url);
        ds.setUser(username);
        ds.setPassword(System.getenv("DB_PASSWORD"));

Notice that we are reading the password from an environment variable – so you’ll need to set that variable wherever you are going to run this (note – this is not my real password, just an example):

export DB_PASSWORD=Welcome123##

Next we set up a TopicConnection, start a JMS Session, look up our Topic and create a Subscriber. This is all fairly standard JMS stuff πŸ™‚

        // create a JMS topic connection and session
        TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
        TopicConnection conn = tcf.createTopicConnection();
        conn.start();
        TopicSession session = (AQjmsSession) 
           conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

        // create a subscriber on the topic
        Topic topic = ((AQjmsSession) session).getTopic(username, topicName);
        AQjmsTopicSubscriber subscriber =
           (AQjmsTopicSubscriber) session.createDurableSubscriber(topic, "BOOK");

        System.out.println("Waiting for messages...");

I created a Durable Subscriber and named it BOOK. We’ll see that name again later, remember that!

Finally, we are going to just wait for messages forever and print them out.

        // wait forever for messages to arrive and print them out
        while (true) {
            AQjmsTextMessage message = (AQjmsTextMessage) subscriber.receive(1_000);
            if (message != null) {
                if (message.getText() != null) {
                    System.out.println(message.getText());
                } else {
                    System.out.println();
                }
            }
            session.commit();
        }

Normally, we would not wait forever, and we’d clean up our resources, but since this is just a small example consumer, we’ll make some allowances πŸ™‚

Ok, that takes care of our consumer. We won’t run it yet, since we have not created the topics. Let’s do that now!

Create the topics

We are going to create two topics, one in each database instance/region, and configure propagation between them. Let’s review what we want:

Ashburn (producer side)Phoenix (consumer side)
ASHPROD1 database instancePHXPROD1 database instance
ASH_TOPIC topicPHX_TOPIC topic

Navigate back to your ASHPROD1 Autonomous Database in the OCI Console and click on the “Database Actions” button:

Note that your browser might think this is a pop-up and block it. If so, clicking on the button again usually lets the browser know you really meant to open it πŸ™‚

In the Database Actions page, click on the “SQL” card to open the SQL Worksheet:

If you get the tour, you can click on “Next” or the “X” to close it.

We are just going to create our topics in the ADMIN schema. In real life, you would probably create a new user/schema to keep your topics in, perhaps several so that you can group them for easier administration. You can create topics with Java or PL/SQL. For this example, we will use PL/SQL.

Here’s the commands to create and start our new topic, ASH_TOPIC:

begin
    dbms_aqadm.create_sharded_queue(
        queue_name => 'ash_topic',
        multiple_consumers => TRUE
    ); 
    dbms_aqadm.start_queue('ash_topic');
end;

If you are using 21c, instead of create_sharded_queue, you should use create_transactional_event_queue – that procedure was renamed in 21c.

You can put these commands into the worksheet at the top (1), then click on the “Run Statement” button (2). You will see the result in the “Script Output” window (3) as shown below:

If you want to check, you can run this query to see details of the queues and topics in your schema:

select * from user_queues;

Now, we need to go to our PHXPROD1 database and create the PHX_TOPIC there. Just repeat what you just did for ASHPROD1 on the PHXPROD1 database and remember to change the name of the topic in the commands that you run!

Create the Database Link

Great, our topics are ready to go! Next, we need to create a Database Link from the ASHPROD1 database to the PHXPROD1 database. The Database Link will allow us to perform actions against the remote database, in this case, to enqueue messages on the remote topic.

Since our databases are using TLS, we need to make the remote database (PHXPROD1) wallet available to the ASHPROD1 database, so that it can authenticate. The easiest way to do this is to upload the files we need into an Object Store bucket.

Let’s create the bucket. In the OCI Console, make sure you are in the Ashburn region and then click on the “hamburger” menu (the three lines at the top left), then “Storage” and the “Buckets”:

Then click on the “Create Bucket” button. Give your bucket a name, I used dblinks and click on the “Create” button. All the defaults are fine for what we need:

Notice that your bucket is private:

Click on the “Upload” button to upload a file:

Then click on the “select files” link to choose the file. We need the file called cwallet.sso in the wallet we downloaded for the PHXPROD1 database (the remote database):

Once the upload completes you can close that dialog and then click on the “three dots” (1) next to the file we just uploaded and choose the “Create Pre-Authenticated Request” (2) option:

The defaults are what we want here – we want to be able to read this one object only. If you want to change the expiration to something like 2 days, just to be on the safe side, that’s not a bad idea at all! Click on the “Create Pre-Authenticated Request” button:

Make sure you take a copy of the URL, you won’t be able to get it again!

Ok, now we are ready to create the link. Open the SQL Worksheet for the ASHPROD1 database (the local/source database) and run these commands. You will need to get the right values for several fields before you run this, I’ll tell you where to get them next:

create or replace directory AQ_DBLINK_CREDENTIALS
as 'aq_dblink_credentials';

BEGIN
  DBMS_CLOUD.GET_OBJECT(
    object_uri => 'https://objectstorage.us-ashburn-1.oraclecloud.com/p/xxxx/n/xxxx/b/dblinks/o/cwallet.sso',
    directory_name => 'AQ_DBLINK_CREDENTIALS',
    file_name => 'cwallet.sso');

  DBMS_CLOUD.CREATE_CREDENTIAL(
    credential_name => 'CRED',
    username => 'ADMIN', -- remote db has case-sensitive login enabled, must be uppercase
    password => 'Welcome123##');

  DBMS_CLOUD_ADMIN.CREATE_DATABASE_LINK(
    db_link_name => 'PHXPROD1',
    hostname => 'adb.us-phoenix-1.oraclecloud.com',
    port => '1522',
    service_name => 'xxxxx.adb.oraclecloud.com',
    ssl_server_cert_dn => 'CN=adwc.uscom-east-1.oraclecloud.com, OU=Oracle BMCS US, O=Oracle Corporation, L=Redwood City, ST=California, C=US',
    credential_name => 'CRED',
    directory_name => 'AQ_DBLINK_CREDENTIALS');
END;

In the GET_OBJECT call, the object_uri needs to be that URL that you just copied from the Pre-Authenticated Request.

In the CREATE_CREDENTIAL call, the username should be the user for the remote (PHXPROD1) database – we can just use ADMIN. Note that this must be in upper case since Autonomous Database is configured for case-sensitive login by default. The password should be the password for that user.

In the CREATE_DATABASE_LINK call, the db_link_name is what we are going to use to refer to the remote database. I just used the name of the database – you’ll see later why that makes things more intuitive. You can get the values for the hostname, port, service_name and ssl_server_cert_dn fields from the wallet you downloaded. Make sure you use the wallet for the PHXPROD1 database. You will find the right values in the tnsnames.ora file, and you can just copy them in here. Here’s an example, I’ve bolded the values we need:

phxprod1_high = (description= (retry_count=20)(retry_delay=3)(address=(protocol=tcps)(port=1522)(host=adb.us-phoenix-1.oraclecloud.com))(connect_data=(service_name=xxxx_phxprod1_high.adb.oraclecloud.com))(security=(ssl_server_cert_dn="CN=adwc.uscom-east-1.oraclecloud.com, OU=Oracle BMCS US, O=Oracle Corporation, L=Redwood City, ST=California, C=US")))

Once you have all the right values, paste this into the SQL Worksheet and click on the “Run Script” button:

You can check it worked by doing a query through the database link. For example, let’s get a list of the queues/topics on the remote database. We are entering this query on the ASHPROD1 instance, using the database link (“@PHXPROD1“) to have it run on the other database, notice that the output shows the topic PHX_TOPIC we created in the PHXPROD1 database:

Start message propagation

Ok, now we are ready to start propagating messages! (Yay!)

We want to run these commands in the SQL Worksheet on the ASHPROD1 database (the source/local database):

BEGIN
   dbms_aqadm.schedule_propagation(
      queue_name         => 'ash_topic', 
      destination        => 'phxprod1',
      destination_queue  => 'phx_topic');
   dbms_aqadm.enable_propagation_schedule(
      queue_name         => 'ash_topic',
      destination        => 'phxprod1',
      destination_queue  => 'phx_topic');
end;

You can view the schedule you just created with this query:

select destination, LAST_RUN_TIME, NEXT_RUN_TIME, LAST_ERROR_TIME, LAST_ERROR_MSG 
from dba_queue_schedules;

Start the consumer

Now we can start up our consumer! Back in our directory with our code (the one with the pom.xml in it) run this command to start the consumer:

export DB_PASSWORD=Welcome123##          <-- use your real password!
mvn clean compile exec:exec -P consumer

After a few moments, the consumer will start up and we will see this message indicating that it is connected and waiting for messages:

[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:exec (default-cli) @ propagation ---
Waiting for messages...

So now, we need to send some messages to it!

Creating a Producer

Let’s create another Java file called src/main/java/com/wordpress/redstack/Producer.java with this content:

package com.wordpress.redstack;

import java.sql.SQLException;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

import oracle.AQ.AQException;
import oracle.jms.AQjmsAgent;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import oracle.jms.AQjmsTextMessage;
import oracle.jms.AQjmsTopicPublisher;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;

public class Producer {

    private static String username = "admin";
    private static String url = "jdbc:oracle:thin:@ashprod1_high?TNS_ADMIN=/home/mark/src/redstack/ASHPROD1";
    private static String topicName = "ash_topic";

    public static void main(String[] args) throws AQException, SQLException, JMSException {

        // create a topic session
        PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
        ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
        ds.setURL(url);
        ds.setUser(username);
        ds.setPassword(System.getenv("DB_PASSWORD"));

        TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
        TopicConnection conn = tcf.createTopicConnection();
        conn.start();
        TopicSession session = (AQjmsSession) 
           conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

        // publish message
        Topic topic = ((AQjmsSession) session).getTopic(username, topicName);
        AQjmsTopicPublisher publisher = (AQjmsTopicPublisher) session.createPublisher(topic);

        AQjmsTextMessage message = (AQjmsTextMessage) 
           session.createTextMessage("hello from ashburn, virginia!");
        publisher.publish(message, new AQjmsAgent[] { new AQjmsAgent("bob", null) });
        session.commit();

        // clean up
        publisher.close();
        session.close();
        conn.close();
    }

}

Let’s walk through this code. It’s very similar to the consumer, so I’ll just point out the important differences.

    private static String username = "admin";
    private static String url = "jdbc:oracle:thin:@ashprod1_high?TNS_ADMIN=/home/mark/src/redstack/ASHPROD1";
    private static String topicName = "ash_topic";

Notice that we are using the ASHPROD1 instance in the producer and the ASH_TOPIC.

        // publish message
        Topic topic = ((AQjmsSession) session).getTopic(username, topicName);
        AQjmsTopicPublisher publisher = (AQjmsTopicPublisher) session.createPublisher(topic);

        AQjmsTextMessage message = (AQjmsTextMessage) 
           session.createTextMessage("hello from ashburn, virginia!");
        publisher.publish(message, new AQjmsAgent[] { new AQjmsAgent("bob", null) });
        session.commit();

We create a TopicProducer, and we are sending a simple JMS Text Message to the topic.

Let’s run our producer now:

export DB_PASSWORD=Welcome123##          <-- use your real password!
mvn clean compile exec:exec -P producer

When that finishes (you’ll see a “BUILD SECCESS” message) go and have a look at your consumer, you should see something like this:

[INFO] --- exec-maven-plugin:3.0.0:exec (default-cli) @ propagation ---
Waiting for messages...
hello from ashburn, virginia!

Yay! It worked! We just published a message on the ASH_TOPIC in the ASHPROD1 instance and it was propagated to PHXPROD1 for us and our consumer read it off the PHX_TOPIC in PHXPROD1.

Here’s an interesting query we can run to see what happened:

select queue, msg_id, msg_state, enq_timestamp, deq_timestamp, deq_user_id, user_data, consumer_name from aq$ash_topic;

You can also run that on the remote database like this:

select queue, msg_id, msg_state, enq_timestamp, deq_timestamp, deq_user_id, user_data, consumer_name
 from aq$phx_topic@phxprod1;

Notice the consumer names – in the local ASHPROD1 instance, the consumer is AQ$_P_106126_92PHXPROD1 (yours will probably be slightly different.) That’s the propagation consumer that is running to propagate the messages to PHXPROD1.

But in the PHXPROD1 instance, the consumer is BOOK! That’s the name we gave to our consumer:

        AQjmsTopicSubscriber subscriber = (AQjmsTopicSubscriber) 
           session.createDurableSubscriber(topic, "BOOK");

Go ahead and send some more messages with the producer! Enjoy!

Posted in Uncategorized | Tagged , , , | Leave a comment

Installing Oracle REST Data Services (standalone)

I have been using Oracle REST Data Services (you might know it as “Database Actions”) with my Oracle Autonomous Database for a while now, and I wanted to play with some new features, which led me to want to install my own (“customer managed” or “standalone”) ORDS instance. It took me a few goes, and some help from Jeff Smith (yes, that Jeff Smith) to get it right, so I thought it would be good to document how I got it working!

In this example, I am going to use an Oracle 21c database, and I will set up ORDS 22.1 in one of the pluggable databases. Once we have it up and running, we will use Database Actions and look at some of the services in the REST service catalog.

Setting up a database

First, of course I needed a database to play with. I fired up a 21c database in a container for this exercise. You will need to go accept the license agreement before you can pull the container image.

Go to https://container-registry.oracle.com and navigate to “Database” and then “enterprise” and click on the button to accept the agreement. You may have to log in if you are not already.

You will also need to log in to Oracle Container Registry with your container runtime, in this post I am using Docker. This will prompt for your username and password:

docker login container-registry.oracle.com

Now we can start up a database in a container. Here is the command I used, I set a password and the SID/service names, and make sure to expose the database port so we can access the database from outside the container:

docker run -d \
  --name oracle-db \
  -p 1521:1521 \
  -e ORACLE_PWD=Welcome123## \
  -e ORACLE_SID=ORCL \
  -e ORACLE_PDB=PDB1 \
  container-registry.oracle.com/database/enterprise:21.3.0.0

Note: If you use different names, you will need to adjust the example commands appropriately! Also, if you want to be able to restart this database without losing all your data, you’ll want to mount a volume – the OCR page has details on how to do that.

It takes a few minutes to start up. You can watch the logs using this command, and you will need to wait until you see the message indicating it is ready to use:

docker logs -f oracle-db

When the database is ready, we can log on and give the necessary privileges to our PDB admin user.

sqlplus sys/Welcome123##@//localhost:1521/orcl as sysdba
SQL> alter session set container = PDB1;
SQL> grant dba to pdbadmin;

Ok, now we are ready to install ORDS!

Installing ORDS

First step is to download it, of course. Here is the site to get the latest version of ORDS:

https://www.oracle.com/database/technologies/appdev/rest-data-services-downloads.html

There is also a direct link to the latest version: https://download.oracle.com/otn_software/java/ords/ords-latest.zip

Once you have it downloaded, just unzip it into a new directory. I unzipped it into /home/mark/ords.

The steps that I am going to describe here are described in more detail in the documentation.

Now we want to run the pre-install script to set up the necessary privileges. I am using the pdbadmin user, the admin user in my PDB. This script will take just a few moments to run:

sqlplus sys/Welcome123##@//localhost:1521/pdb1 as sysdba \
        @scripts/installer/ords_installer_privileges.sql pdbadmin

Great, now we can run the installer. I used the interactive installer, which will ask you for the necessary information and let you type it in as you go. It is also possible to do a “silent” install by providing all of the information on the command line – the documentation explains how to do this.

Create a directory to hold the configuration and start the interactive installer:

cd /home/mark/ords
export PATH=/home/mark/ords/bin:$PATH
mkdir config
ords --config /home/mark/ords/config install

Here’s what the interactive install dialog looks like, I highlighted the data I entered in bold, mostly I just took the defaults:

Oracle REST Data Services - Interactive Install

  Enter a number to select the type of installation
    [1] Install or upgrade ORDS in the database only
    [2] Create or update a database pool and install/upgrade ORDS in the database
    [3] Create or update a database pool only
  Choose [2]:
  Enter a number to select the database connection type to use
    [1] Basic (host name, port, service name)
    [2] TNS (TNS alias, TNS directory)
    [3] Custom database URL
  Choose [1]:
  Enter the database host name [localhost]:
  Enter the database listen port [1521]:
  Enter the database service name [orcl]: pdb1

  Provide database user name with administrator privileges.
    Enter the administrator username: pdbadmin

  Enter the database password for pdbadmin:
Connecting to database user: pdbadmin url: jdbc:oracle:thin:@//localhost:1521/pdb1

Retrieving information.
  Enter the default tablespace for ORDS_METADATA and ORDS_PUBLIC_USER [SYSAUX]:
  Enter the temporary tablespace for ORDS_METADATA and ORDS_PUBLIC_USER [TEMP]:
  Enter a number to select additional feature(s) to enable:
    [1] Database Actions  (Enables all features)
    [2] REST Enabled SQL and Database API
    [3] REST Enabled SQL
    [4] Database API
    [5] None
  Choose [1]:
  Enter a number to configure and start ORDS in standalone mode
    [1] Configure and start ORDS in standalone mode
    [2] Skip
  Choose [1]:
  Enter a number to use HTTP or HTTPS protocol
    [1] HTTP
    [2] HTTPS
  Choose [1]:
  Enter the HTTP port [8080]:

Note: I just used HTTP, but if you want to use HTTPS, you will probably want to create some certificates and configure them in the installer. Here’s some commands to create a self-signed certificate and convert the key to the DER format ORDS requires:

# these are optional - only required if you want to use HTTPS

openssl req -new -x509 -sha256 -newkey rsa:2048 -nodes -keyout ords.key.pem \
            -days 365 -out ords.pem
openssl x509 -in ords.pem  -text -noout
openssl pkcs8 -topk8 -inform PEM -outform DER -in ords.pem -out ords.der -nocrypt
openssl pkcs8 -topk8 -inform PEM -outform DER -in ords.key.pem -out ords.der -nocrypt

Once you complete the interview, the installer will perform the installation. It takes just a couple of minutes, and it will start up the ORDS standalone server for you. If you need to stop it (with Ctrl-C) you can restart it with this command:

ords --config /home/mark/ords/config serve

Ok, now we have ORDS up and running, we are going to need a user!

Preparing an ORDS user

Let’s create a regular database user and give them access to ORDS.

Using the PDB admin user, we can create a new user and give them the necessary permissions to use ORDS:

sqlplus pdbadmin/Welcome123##@//localhost:1521/pdb1

SQL> create user mark identified by Welcome123##;
SQL> grant connect, resource to mark;
SQL> grant unlimited tablespace to mark;
SQL> begin
    ords.enable_schema(
        p_enabled => true,
        p_schema => 'mark',
        p_url_mapping_type => 'BASE_PATH',
        p_url_mapping_pattern => 'mark',
        p_auto_rest_auth => false
    );
    commit;
end;
/

Great, now we are ready to use ORDS!

Log in to ORDS Database Actions

To log into ORDS, open a browser and go to this URL: http://localhost:8080/ords/sql-developer

You may need to change the hostname or port if you used something different.

You should see the login page:

Enter your username – I used mark, and the press Next, and enter the password, I used Welcome123##. This will take you to the main “Database Actions” page:

Let’s create a table and enter some data πŸ™‚

Click on the SQL card (top left) to open the SQL worksheet, and enter these statements:

create table city (
    name varchar2(64),
    population number
);

insert into city values ('Tokyo', 37000000);
insert into city values ('Dehli', 29000000);
insert into city values ('Shanghai', 26000000);
insert into city values ('Sao Paulo', 21000000);
insert into city values ('Values', 21000000);

Click on the “Run Script” icon to execute these statements – its the one the red arrow is highlighting:

Let’s expose that table as a REST service!

Creating a REST service

ORDS allows us to easily expose an SQL statement, or a PL/SQL block as a REST service. Let’s navigate to the REST page – click on the “hamburger menu” (1) and then the REST page (2):

The basic structure is that we create a “module” which contains “templates” which in turn contain “handlers.” So let’s start by creating a module. Click on the “Modules” option in the top menu.

Then click on the “Create Module” button in the top right corner:

Give the module a name, I used mark and a base path, I used /api/. Since we are just playing here, set the “Protected by Privilege” to Not protected. Obviously, in real life, you’d set up authentication, for example using OAuth, which ORDS provides out of the box – but’s that another post πŸ™‚ Finally, click on the “Create” button to create the module. It will now appear in the modules page:

Click on the module name (“mark” in the example above) to open the module, and click on the “Create Template” button on the right hand side:

Enter a URI Template for this service, I used cities for mine, then click on the “Create” button:

Now you will see the template page. Click on the “Create Handler” button on the right:

In the “Create Handler” dialog, we provide details of the service. Notice that you can choose the HTTP Method (GET, POST, DELETE, etc.) and you can control paging. For this service, we want to create a GET handler and we want the “Source Type” to be Collection Query which lets us enter an SQL statement. While you are here, have a look in the pull down and notice that you can also use PL/SQL! You can also use bind variables in here, so we can accept parameters and use them in the query or PL/SQL code.

For now, enter this simple query, then click on the “Create” button:

select * from city

Note: You should not include the semi-colon!

Once you have created the handler, you will see the details view, where you can test it by clicking on the “Run” button (1):

Notice that you see the results in the bottom half of the screen. You also have the URL for the service provided (2) and there is a “copy” button on the right hand side. Let’s test our service using cURL:

$ curl http://localhost:8080/ords/mark/api/cities
{"items":[{"name":"Tokyo","population":37000000},{"name":"Dehli","population":29000000},{"name":"Shanghai","population":26000000},{"name":"Sao Paulo","population":21000000},{"name":"Values","population":21000000}],"hasMore":false,"limit":25,"offset":0,"count":5,"links":[{"rel":"self","href":"http://localhost:8080/ords/mark/api/cities"},{"rel":"describedby","href":"http://localhost:8080/ords/mark/metadata-catalog/api/item"},{"rel":"first","href":"http://localhost:8080/ords/mark/api/cities"}]}

Great! We made a service!

Explore the out of the box services

ORDS also provides a heap of out of the box services for us automatically. To explore these, let’s use Postman, which is a very popular tool for REST testing. You can download it from Postman.

Jeff Smith has a great post here that explains how to import all the ORDS REST APIs into Postman.

When you open Postman, click on the “Import” button:

Now you need the right URL! If you have been following along and using the same names as me your URL will be:

http://localhost:8080/ords/mark/_/db-api/latest/metadata-catalog/openapi.json

If you used a different user, you will need to change “mark” to your username in that URL. After you click on “Import” chose “Link” as the type and enter your URL:

One good thing about Postman is that we can set the authentication parameters at the top level, on that “ORDS Database API” folder that you just created. Open that and click on the “Authorization” tab, choose “Basic Auth” as the type and enter the database user and password:

In this folder you will see a whole collection of services for all kinds of things. Let’s try a simple one! Navigate to the “Get Database version” service and click on the “Send” button in the top right corner. You’ll see the result data in the bottom pane:

Well, there you go! We installed ORDS, used the Database Actions and REST interfaces, created a service and explored the out of the box services! I hope you enjoyed!

Posted in Uncategorized | Tagged | Leave a comment

The OCI Service Mesh is now available!

Dusko Vukmanovic just announced the general availability of OCI Service Mesh in this blog post.

It provides security,Β observability, andΒ network traffic managementΒ for cloud nativeΒ applications without requiring any changes to the applications.

Its a free managed service and its available in all commercial regions today. Check it out!

Posted in Uncategorized | Tagged , | Leave a comment

Playing with Kafka Java Client for TEQ – creating the simplest of producers and consumers

Today I was playing with Kafka Java Client for TEQ, that allows you to use Oracle Transactional Event Queues (formerly known as Sharded Queues) in the Oracle Database just like Kafka.

Kafka Java Client for TEQ is available as a preview in GitHub here: http://github.com/oracle/okafka

In this preview version, there are some limitations documented in the repository, but the main one to be aware of is that you need to use the okafka library, not the regular kafka one, so you would need to change existing kafka client code if you wanted to try out the preview.

Preparing the database

To get started, I grabbed a new Oracle Autonomous Database instance on Oracle Cloud, and I opened up the SQL Worksheet in Database Actions and created myself a user. As the ADMIN user, I ran the following commands:

create user mark identified by SomePassword;  -- that's not the real password!
grant connect, resource to mark;
grant create session to mark;
grant unlimited tablespace to mark;
grant execute on dbms_aqadm to mark;
grant execute on dbms_aqin to mark;
grant execute on dbms_aqjms to mark;
grant select_catalog_role to mark;
grant select on gv$instance to mark;
grant select on gv$listener_network to mark;
commit;

And of course, I needed a topic to work with, so I logged on to SQL Worksheet as my new MARK user and created a topic called topic1 with these commands:

begin
    sys.dbms_aqadm.create_sharded_queue(queue_name => 'topic1', multiple_consumers => TRUE); 
    sys.dbms_aqadm.set_queue_parameter('topic1', 'SHARD_NUM', 1);
    sys.dbms_aqadm.set_queue_parameter('topic1', 'STICKY_DEQUEUE', 1);
    sys.dbms_aqadm.set_queue_parameter('topic1', 'KEY_BASED_ENQUEUE', 1);
    sys.dbms_aqadm.start_queue('topic1');
end;

Note that this is for Oracle Database 19c. If you are using 21c, create_sharded_queue is renamed to create_transactional_event_queue, so you will have to update that line.

The topic is empty right now, since we just created it, but here are a couple of queries that will be useful later. We can see the messages in the topic, with details including the enqueue time, status, etc., using this query:

select * from topic1;

This is a useful query to see a count of messages in each status:

select msg_state, count(*)
from aq$topic1
group by msg_state;

Building the OKafka library

We need to build the OKafka library and install it in our local Maven repository so that it will be available to use as a dependency since the preview is not currently available in Maven Central.

First, clone the repository:

git clone https://github.com/oracle/okafka

Now we can build the uberjar with the included Gradle wrapper:

cd okafka
./gradlew fullJar

This will put the JAR file in gradle/build/libs and we can install this into our local Maven repository using this command:

mvn install:install-file \
    -DgroupId=org.oracle.okafka \
    -DartifactId=okafka \
    -Dversion=0.8 \
    -Dfile=clients/build/libs/okafka-0.8-full.jar \
    -DpomFile=clients/okafka-0.8.pom 

Now we are ready to start writing our code!

Creating the Producer

Let’s start by creating our Maven POM file. In a new directory, called okafka, I created a file called pom.xml with the following content:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" 
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.example</groupId>
	<artifactId>okafka</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>okafka</name>

	<properties>
		<java.version>17</java.version>
		<maven.compiler.source>17</maven.compiler.source>
		<maven.compiler.target>17</maven.compiler.target>
		<okafka.version>0.8</okafka.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.oracle.okafka</groupId>
			<artifactId>okafka</artifactId>
			<version>${okafka.version}</version>
		</dependency>
	</dependencies>
</project>

I am using Java 17 for this example. But you could use anything from 1.8 onwards, just update the version in the properties if you are using an earlier version.

Now let’s create our producer class:

mkdir -p src/main/java/com/example/okafka
touch src/main/java/com/example/okafka/Producer.java

Here’s the content for Producer.java:

package com.example.okafka;

import java.io.FileNotFoundException;
import java.io.InputStream;
import java.util.Properties;

import org.oracle.okafka.clients.producer.KafkaProducer;
import org.oracle.okafka.clients.producer.ProducerRecord;

public class Producer {

    private static final String propertiesFilename = "producer.properties";

    public static void main(String[] args) {
        // configure logging level
        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "INFO");

        // load props
        Properties props = getProperties();

        String topicName = props.getProperty("topic.name", "TOPIC1");

        try(KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            for (int i = 0; i < 100; i++) {
                producer.send(new ProducerRecord<String, String>(
                    topicName, 0, "key", "value " + i));
            }
            System.out.println("sent 100 messages");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static Properties getProperties() {
        Properties props = new Properties();

        try (
                InputStream inputStream = Producer.class
                    .getClassLoader()
                    .getResourceAsStream(propertiesFilename);
        ) {
            if (inputStream != null) {
                props.load(inputStream);
            } else {
                throw new FileNotFoundException(
                     "could not find properties file: " + propertiesFilename);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return props;
    }
}

Let’s walk through this code and talk about what it does.

First, let’s notice the imports. We are importing the OKafka versions of the familiar Kafka classes. These have the same interfaces as the standard Kafka ones, but they work with Oracle TEQ instead:

import org.oracle.okafka.clients.producer.KafkaProducer;
import org.oracle.okafka.clients.producer.ProducerRecord;

In the main() method we first set the log level and then we load some properties from our producer.properties config file. You will see the getProperties() method at the end of the file is a fairly standard, it is just reading the file and returning the contents as a new Properties object.

Let’s see what’s in that producer.properties file, which is located in the src/main/resources directory:

oracle.service.name=xxxxx_prod_high.adb.oraclecloud.com
oracle.instance.name=prod_high
oracle.net.tns_admin=/home/mark/src/okafka/wallet
security.protocol=SSL
tns.alias=prod_high

bootstrap.servers=adb.us-ashburn-1.oraclecloud.com:1522
batch.size=200
linger.ms=100
buffer.memory=326760
key.serializer=org.oracle.okafka.common.serialization.StringSerializer
value.serializer=org.oracle.okafka.common.serialization.StringSerializer
topic.name=TOPIC1

There are two groups of properties in there. The first group provide details about my Oracle Autonomous Database instance, including the location of the wallet file – we’ll get that and set it up in a moment.

The second group are the normal Kafka properties that you might expect to see, assuming you are familiar with Kafka. Notice that the bootstrap.servers lists the address of my Oracle Autonomous Database, not a Kafka broker! Also notice that we are using the serializers (and later, deserializers) provided in the OKafka library, not the standard Kafka ones.

Next, we set the topic name by reading it from the properties file. If it is not there, the second argument provides a default/fallback value:

String topicName = props.getProperty("topic.name", "TOPIC1");

And now we are ready to create the producer and send some messages:

try(KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
    for (int i = 0; i < 100; i++) {
        producer.send(new ProducerRecord<String, String>(
            topicName, 0, "key", "value " + i));
    }
    System.out.println("sent 100 messages");
} catch (Exception e) {
    e.printStackTrace();
}

We created the KafkaProducer and for this example, we are using String for both the key and the value.

We have a loop to send 100 messages, which we create with the ProducerRecord class. We are just setting them to some placeholder data.

Ok, that’s all we need in the code. But we will need to get the wallet and set it up so Java programs can use it to authenticate. Have a look at this post for details on how to do that! You just need to download the wallet from the OCI console, unzip it into a directory called wallet – put that in the same directory as the pom.xml, and then edit the sqlnet.ora to set the DIRECTORY to the right location, e.g. /home/mark/src/okafka/wallet for me, and then add your credentials using the setup_wallet.sh I showed in that post.

Finally, you need to add these lines to the ojdbc.properties file in the wallet directory to tell OKafka the user to connect to the database with:

user=mark
password=SomePassword
oracle.net.ssl_server_dn_match=true

With the wallet set up, we are ready to build and run our code!

mvn clean package
CLASSPATH=target/okafka-0.0.1-SNAPSHOT.jar
CLASSPATH=$CLASSPATH:$HOME/.m2/repository/org/oracle/okafka/okafka/0.8/okafka-0.8.jar
java -classpath $CLASSAPTH com.example.okafka.Producer

The output should look like this:

[main] INFO org.oracle.okafka.clients.producer.ProducerConfig - ProducerConfig values: 
        acks = 1
        batch.size = 200
        bootstrap.servers = [adb.us-ashburn-1.oraclecloud.com:1522]
        buffer.memory = 326760
        client.id = 
        compression.type = none
        connections.max.idle.ms = 540000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.oracle.okafka.common.serialization.StringSerializer
        linger.ms = 100
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        oracle.instance.name = prod_high
        oracle.net.tns_admin = /home/mark/src/okafka/wallet
        oracle.service.name = xxxxx_prod_high.adb.oraclecloud.com
        partitioner.class = class org.oracle.okafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 0
        retry.backoff.ms = 100
        security.protocol = SSL
        send.buffer.bytes = 131072
        tns.alias = prod_high
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.oracle.okafka.common.serialization.StringSerializer

[main] WARN org.oracle.okafka.common.utils.AppInfoParser - Error while loading kafka-version.properties :inStream parameter is null
[main] INFO org.oracle.okafka.common.utils.AppInfoParser - Kafka version : unknown
[main] INFO org.oracle.okafka.common.utils.AppInfoParser - Kafka commitId : unknown
[kafka-producer-network-thread | producer-1] INFO org.oracle.okafka.clients.Metadata - Cluster ID: 
sent 100 messages
[main] INFO org.oracle.okafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

You can see it dumps out the properties, and then after some informational messages you see the “sent 100 messages” output. Now you might want to go and run that query to look at the messages in the database!

Now, lets move on to creating a consumer, so we can read those messages back.

Creating the Consumer

The consumer is going to look very similar to the producer, and it will also have its own properties file. Here’s the contents of the properties file first – put this in src/main/resources/consumer.properties:

oracle.service.name=xxxxx_prod_high.adb.oraclecloud.com
oracle.instance.name=prod_high
oracle.net.tns_admin=/home/mark/src/testing-okafka/okafka/wallet
security.protocol=SSL
tns.alias=prod_high

bootstrap.servers=adb.us-ashburn-1.oraclecloud.com:1522
group.id=bob
enable.auto.commit=true
auto.commit.interval.ms=10000
key.deserializer=org.oracle.okafka.common.serialization.StringDeserializer
value.deserializer=org.oracle.okafka.common.serialization.StringDeserializer
max.poll.records=100

And here is the content for Consumer.java which you create in src/main/java/com/example/okafka:

package com.example.okafka;

import java.io.FileNotFoundException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.oracle.okafka.clients.consumer.ConsumerRecord;
import org.oracle.okafka.clients.consumer.ConsumerRecords;
import org.oracle.okafka.clients.consumer.KafkaConsumer;

public class Consumer {
    private static final String propertiesFilename = "consumer.properties";

    public static void main(String[] args) {
        // logging level
        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "INFO");


        // load props
        Properties props = getProperties();

        String topicName = props.getProperty("topic.name", "TOPIC1");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topicName));

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(30_000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(
                record.topic() + " " + 
                record.partition() + " " + 
                record.key() + " " + 
                record.value());
        }
        consumer.close();
    }

    private static Properties getProperties() {
        Properties props = new Properties();

        try (
                InputStream inputStream = Producer.class
                    .getClassLoader()
                    .getResourceAsStream(propertiesFilename);
        ) {
            if (inputStream != null) {
                props.load(inputStream);
            } else {
                throw new FileNotFoundException(
                    "could not find properties file: " + propertiesFilename);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return props;
    }
}

A lot of this is the same as the producer, so let’s walk through the parts that are different.

First, we load a the different properties file, the consumer one, it has a few different properties that are relevant for consumers. In particular, we are setting the max.poll.records to 100 – so we’ll only be reading at most 100 messages off the topic at a time.

Here’s how we create the consumer:

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topicName));

Again, you may notice that this is very similar to Kafka. We are using String as the type for both the key and value. Notice we provided the appropriate deserializers in the property file, the ones from the OKafka library, not the standard Kafka ones.

Here’s the actual consumer code:

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(30_000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(
                record.topic() + " " + 
                record.partition() + " " + 
                record.key() + " " + 
                record.value());
        }
        consumer.close();

We open our consumer and poll for messages (for 30 seconds) and then we just print out some information about each message, and then close out consumer! Again, this is very simple, but its enough to test consuming messages.

We can run this and we should see all of the message data in the output, here’s how to run it, and an excerpt of the output:

mvn clean package
CLASSPATH=target/okafka-0.0.1-SNAPSHOT.jar
CLASSPATH=$CLASSPATH:$HOME/.m2/repository/org/oracle/okafka/okafka/0.8/okafka-0.8.jar
java -classpath $CLASSAPTH com.example.okafka.Consumer

[main] INFO org.oracle.okafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
        auto.commit.interval.ms = 10000
        auto.offset.reset = latest
        bootstrap.servers = [adb.us-ashburn-1.oraclecloud.com:1522]
        check.crcs = true
        client.id = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = bob
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.oracle.okafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 100
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        oracle.instance.name = prod_high
        oracle.net.tns_admin = /home/mark/src/okafka/wallet
        oracle.service.name = xxxxx_prod_high.adb.oraclecloud.com
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        security.protocol = SSL
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        tns.alias = prod_high
        value.deserializer = class org.oracle.okafka.common.serialization.StringDeserializer

[main] WARN org.oracle.okafka.common.utils.AppInfoParser - Error while loading kafka-version.properties :inStream parameter is null
[main] INFO org.oracle.okafka.common.utils.AppInfoParser - Kafka version : unknown
[main] INFO org.oracle.okafka.common.utils.AppInfoParser - Kafka commitId : unknown
TOPIC1 0 key value 0
TOPIC1 0 key value 1
TOPIC1 0 key value 2
...

So there you go! We successfully created a very simple producer and consumer and we sent and received messages from a topic using the OKafka library and Oracle Transactional Event Queues!

Posted in Uncategorized | Tagged , , | 1 Comment