This is just a short post – but all the details are in this post from Rob Van Wijk.
Today I wanted to read the contents of a JMS Text Message sitting in a queue. I wrote a Spring Boot micrsoervice that sends a message, and I have not written the one that recieves and processes the message yet, so I wanted to look at the message on the queue to check it was correct.
So I went and did a good old “select user_data from deposits_qt” and stared at the answer: “Object”. Hmmm, not what I wanted.
After a quick bit of Googling, I found Rob’s post which told me exactly what I needed to know. Yay! Thanks Rob!
Then I changed my query to this:
select qt.user_data.text_vc from account.deposits_qt qt;
Vote for my People’s Choice Session “Experiences and lessons learnt building a multi-cloud #SpringBoot backend (ID 2002)” to be featured at #VMwareExplore 2023 Las Vegas! Place your vote by May 26: https://lnkd.in/eiRi-YF7
Register for VMWare Explore here. Learn more about Oracle Backend for Spring Boot here.
That will start up pretty quickly too, and you can check with this command:
kubectl -n oracle-database-operator-system get pods
Let’s create a Single Instance Database. The Oracle Database Operator will let you create other types of databases too, including sharded and multitenant databases, and to manage cloud database instances like Autonomous Database and Database Cloud Service. But today, I’m going to stick with a simple single instance.
Here’s the Kubernetes YAML file to describe the database we want, I called this sidb.yaml:
If you have not before, head over to Oracle Container Registry and go to the Database group, and accept the license agreement for the Enterprise option. You’ll also want to create a Kubernetes secret with your credentials so it can pull the image:
You will want to change the storageClass to match your cluster. I am using Oracle Container Engine for Kuberentes in this example, so I used the “oci-bv” storage class. If you are using a different flavor of Kubernetes you should check what storage classes are available and use one of them.
This YAML describes a databse with the SID ORCL1 and a PDB called orclpdb1. It will get the password for sys, pdbadmin, etc., from a Kubernetes secret – so let’s create that:
Now we can create the database by applying that YAML file to our cluster:
kubectl apply -f sidb.yaml
It will take few minutes to start up fully – it has to pull the image (which took 3m30s on my cluster, for the “enterprise” image which is the biggest one), create the database instance the first time (mine took 8m), and apply any patches that are required (just over 1m for me). Subsequent startups will be much faster of course (I stopped it by scaling to zero replicas, then started it again by scaling back to one replica and it reached ready/healthy status in about 90s). For reference, my cluster had two nodes each with one OCPU and 16 GB of RAM. You can check on the progress with this command:
kubectl get singleinstancedatabases -o wide -w
As the database starts up, you will see the connection string and other fields populate in the output.
Now, let’s add Oracle REST Data Services. Here’s a Kubernetes YAML file that describes what we want, I called this ords.yaml:
You can apply that to your cluster with this command:
kubectl apply -f ords.yaml
And we can check on progress with this command:
kubectl get oraclerestdataservice -w
As it becomes ready, you will see the URLs for the Database API REST endpoint and for Database Actions. Mine took about 2m to reach ready/healthy status.
If your nodes are on a private network, the quickest way to access the REST APIs and Database Actions is to use a port forward. You can get the name of the ORDS pod and start a port forwarding session with commands like this:
kubectl get pods
kubectl port-forward pod/ords-sample-g4wc7 8443
Now you can hit the Database API REST endpoint with curl:
On the login page, enter ORCLPDB1 for the PDB Name and mark as the user. Then on the password page enter Welcome12345, and you are good to go!
While we are at it, let’s also get SQLcl access to the database.
Again, we can use port forwarding to access the database from outside the cluster:
kubectl port-forward svc/sidb-sample 1521 &
And then connect from SQLcl (if you have not checked out SQLcl yet, you should, it’s got cool features like command line completion and history):
sql mark/Welcome12345@//localhost:1521/orclpdb1
SQLcl: Release 22.2 Production on Mon May 01 14:32:57 2023
Copyright (c) 1982, 2023, Oracle. All rights reserved.
Last Successful login time: Mon May 01 2023 14:32:56 -04:00
Connected to:
Oracle Database 21c Enterprise Edition Release 21.0.0.0.0 - Production
Version 21.3.0.0.0
SQL> select * from dual;
DUMMY
________
X
SQL>
There you go! That was super quick and easy! Enjoy!
We just published the new 23c version of the Kafka-compatible Java APIs for Transactional Event Queues in Maven Central, and I wanted to show you how to use them! If you are not familiar with these APIs – they basically allow you to use the standard Kafka Java API with Transactaional Event Queues acting as the Kafka broker. The only things that you would need to change are the broker address, and you need to use the Oracle versions of KafkaProducer and KafkaConsumer – other than that, your existing Kafka Java code should just work!
Let’s build a Kafka producer and consumer using the updated Kafka-compatible APIs.
Prepare the database
The first thing we want to do is start up the Oracle 23c Free Database. This is very easy to do in a container using a command like this:
docker run --name free23c -d -p 1521:1521 -e ORACLE_PWD=Welcome12345 container-registry.oracle.com/database/free:latest
This will pull the image and start up the database with a listener on port 1521. It will also create a pluggable database (a database container) called “FREEPDB1” and will set the admin passwords to the password you specified on this command.
You can tail the logs to see when the database is ready to use:
docker logs -f free23c
(look for this message...)
#########################
DATABASE IS READY TO USE!
#########################
Also, grab the IP address of the container, we’ll need that to connect to the database:
To set up the necessary permissions, you’ll need to connect to the database with a client. If you don’t have one already, I’d recommend trying the new SQLcl CLI which you can download here. Start it up and connect to the database like this (note that your IP address and password may be different):
sql sys/Welcome12345@//172.17.0.2:1521/freepdb1 as sysdba
SQLcl: Release 22.2 Production on Tue Apr 11 12:36:24 2023
Copyright (c) 1982, 2023, Oracle. All rights reserved.
Connected to:
Oracle Database 23c Free, Release 23.0.0.0.0 - Developer-Release
Version 23.2.0.0.0
SQL>
Now, run these commands to create a user called “mark” and give it the necessary privileges:
SQL> create user mark identified by Welcome12345;
User MARK created.
SQL> grant resource, connect, unlimited tablespace to mark;
Grant succeeded.
SQL> grant execute on dbms_aq to mark;
Grant succeeded.
SQL> grant execute on dbms_aqadm to mark;
Grant succeeded.
SQL> grant execute on dbms_aqin to mark;
Grant succeeded.
SQL> grant execute on dbms_aqjms_internal to mark;
Grant succeeded.
SQL> grant execute on dbms_teqk to mark;
Grant succeeded.
SQL> grant execute on DBMS_RESOURCE_MANAGER to mark;
Grant succeeded.
SQL> grant select_catalog_role to mark;
Grant succeeded.
SQL> grant select on sys.aq$_queue_shards to mark;
Grant succeeded.
SQL> grant select on user_queue_partition_assignment_table to mark;
Grant succeeded.
SQL> exec dbms_teqk.AQ$_GRANT_PRIV_FOR_REPL('MARK');
PL/SQL procedure successfully completed.
SQL> commit;
Commit complete.
SQL> quit;
begin
-- Creates a topic named TEQ with 5 partitions and 7 days of retention time
dbms_teqk.aq$_create_kafka_topic('TEQ', 5);
-- Creates a Consumer Group CG1 for Topic TEQ
dbms_aqadm.add_subscriber('TEQ', subscriber => sys.aq$_agent('CG1', null, null));
end;
/
You should note that the dbms_teqk package is likely to be renamed in the GA release of Oracle Database 23c, but for the Oracle Database 23c Free – Developer Release you can use it.
Ok, we are ready to start on our Java code!
Create a Java project
Let’s create a Maven POM file (pom.xml) and add the dependencies we need for this application. I’ve also iunclude some profiles to make it easy to run the two main entry points we will create – the producer, and the consumer. Here’s the content for the pom.xml. Note that I have excluded the osdt_core and osdt_cert transitive dependencies, since we are not using a wallet or SSL in this example, so we do not need those libraries:
This is a pretty straightforward POM. I just set the project’s coordinates, declared my one dependency, and then created the two profiles so I can run the code easily.
Next, we are going to need a file called ojdbc.properties in the same directory as the POM with this content:
user=mark
password=Welcome12345
The KafkaProducer and KafkaConsumer will use this to connect to the database.
Create the consumer
Ok, now let’s create our consumer. In a directory called src/main/jaba/com/example, create a new Java file called SimpleConsumerOKafka.java with the following content:
package com.example;
import java.util.Properties;
import java.time.Duration;
import java.util.Arrays;
import org.oracle.okafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumerOKafka {
public static void main(String[] args) {
// set the required properties
Properties props = new Properties();
props.put("bootstrap.servers", "172.17.0.2:1521");
props.put("group.id" , "CG1");
props.put("enable.auto.commit","false");
props.put("max.poll.records", 100);
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("oracle.service.name", "freepdb1");
props.put("oracle.net.tns_admin", ".");
props.put("security.protocol","PLAINTEXT");
// create the consumer
Consumer<String , String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("TEQ"));
int expectedMsgCnt = 4000;
int msgCnt = 0;
long startTime = 0;
// consume messages
try {
startTime = System.currentTimeMillis();
while(true) {
try {
ConsumerRecords <String, String> records =
consumer.poll(Duration.ofMillis(10_000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition = %d, offset = %d, key = %s, value = %s\n ",
record.partition(), record.offset(), record.key(), record.value());
for(Header h: record.headers()) {
System.out.println("Header: " + h.toString());
}
}
// commit the records we received
if (records != null && records.count() > 0) {
msgCnt += records.count();
System.out.println("Committing records " + records.count());
try {
consumer.commitSync();
} catch(Exception e) {
System.out.println("Exception in commit " + e.getMessage());
continue;
}
// if we got all the messages we expected, then exit
if (msgCnt >= expectedMsgCnt ) {
System.out.println("Received " + msgCnt + ". Expected " +
expectedMsgCnt +". Exiting Now.");
break;
}
} else {
System.out.println("No records fetched. Retrying...");
Thread.sleep(1000);
}
} catch(Exception e) {
System.out.println("Inner Exception " + e.getMessage());
throw e;
}
}
} catch(Exception e) {
System.out.println("Exception from consumer " + e);
e.printStackTrace();
} finally {
long runDuration = System.currentTimeMillis() - startTime;
System.out.println("Application closing Consumer. Run duration " +
runDuration + " ms");
consumer.close();
}
}
}
Let’s walk through this code together.
The first thing we do is prepare the properties for the KafkaConsumer. This is fairly standard, though notice that the bootstrap.servers property contains the address of your database listener:
Then, we add some Oracle-specific properties – oracle.service.name is the name of the service we are connecting to, in our case this is freepdb1; oracle.net.tns_admin needs to point to the directory where we put our ojdbc.properties file; and security.protocol controls whether we are using SSL, or not, as in this case:
With that done, we can create the KafkaConsumer and subscribe to a topic. Note that we use the Oracle version of KafkaConsumer which is basically just a wrapper that understand those extra Oracle-specific properites:
The rest of the code is standard Kafka code that polls for records, prints out any it finds, commits them, and then loops until it has received the number of records it expected and then exits.
Run the consumer
We can build and run the consumer with this command:
mvn exec:exec -P consumer
It will connect to the database and start polling for records, of course there won’t be any yet because we have not created the producer. It should output a message like this about every ten seconds:
No records fetched. Retrying...
Let’s write that producer!
Create the producer
In a directory called src/main/jaba/com/example, create a new Java file called SimpleProducerOKafka.java with the following content:
package com.example;
import org.oracle.okafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;
import java.util.Properties;
import java.util.concurrent.Future;
public class SimpleProducerOKafka {
public static void main(String[] args) {
long startTime = 0;
try {
// set the required properties
Properties props = new Properties();
props.put("bootstrap.servers", "172.17.0.2:1521");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("batch.size", "5000");
props.put("linger.ms","500");
props.put("oracle.service.name", "freepdb1");
props.put("oracle.net.tns_admin", ".");
props.put("security.protocol","PLAINTEXT");
// create the producer
Producer<String, String> producer = new KafkaProducer<String, String>(props);
Future<RecordMetadata> lastFuture = null;
int msgCnt = 4000;
startTime = System.currentTimeMillis();
// send the messages
for (int i = 0; i < msgCnt; i++) {
RecordHeader rH1 = new RecordHeader("CLIENT_ID", "FIRST_CLIENT".getBytes());
RecordHeader rH2 = new RecordHeader("REPLY_TO", "TOPIC_M5".getBytes());
ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>(
"TEQ", String.valueOf(i), "Test message "+ i
);
producerRecord.headers().add(rH1).add(rH2);
lastFuture = producer.send(producerRecord);
}
// wait for the last one to finish
lastFuture.get();
// print summary
long runTime = System.currentTimeMillis() - startTime;
System.out.println("Produced "+ msgCnt +" messages in " + runTime + "ms.");
producer.close();
}
catch(Exception e) {
System.out.println("Caught exception: " + e );
e.printStackTrace();
}
}
}
This code is quite similar to the consumer. We first set up the Kafka properties, including the Oracle-specific ones. Then we create a KafkaProducer, again using the Oracle version which understands those extra properties. After that we just loop and produce the desired number of records.
Make sure your consumer is still running (or restart it) and then build and run the producer with this command:
mvn exec:exec -P producer
When you do this, it will run for a short time and then print a message like this to let you know it is done:
Produced 4000 messages in 1955ms.
Now take a look at the output in the consumer window. You should see quite a lot of output there. Here’s a short snippet from the end:
partition = 0, offset = 23047, key = 3998, value = Test message 3998
Header: RecordHeader(key = CLIENT_ID, value = [70, 73, 82, 83, 84, 95, 67, 76, 73, 69, 78, 84])
Header: RecordHeader(key = REPLY_TO, value = [84, 79, 80, 73, 67, 95, 77, 53])
Committing records 27
Received 4000. Expected 4000. Exiting Now.
Application closing Consumer. Run duration 510201 ms
It prints out a message for each record it finds, including the partition ID, the offset, and the key and value. It them prints out the headers. You will also see commit messages, and at the end it prints out how many records it found and how long it ws running for. I left mine running while I got the producer ready to go, so it shows a fairly long duration 🙂 But you can run it again and start the producer immediately after it and you will see a much shorter run duration.
Well, there you go! That’s a Kafka producer and consumer using the new updated 23c version of the Kafka-compatible Java API for Transactional Event Queues. Stay tuned for more!
Hi everyone. We have just published some updates to the Spring Boot Starters for Oracle Database – we added a starter for UCP (Universal Connection Pool) for Spring 3.0.2. This makes it easy to access the Oracle Database from a Spring Boot application – just two steps!
Add a dependency to your Maven POM file (or equivalent)
Here’s the dependency to add:
<dependency>
<groupId>com.oracle.database.spring</groupId>
<artifactId>oracle-spring-boot-starter-ucp</artifactId>
<version>3.0.2</version> <!-- or 2.7.7 for Spring Boot 2.x -->
<type>pom</type>
</dependency>
Add the datasource properties to your Spring Boot application.yaml
Here’s an example, assuming you are also using Spring Data JPA:
We are working to add more Spring Boot Starters for Oracle Database to make it even easier to use, and to make sure we cover all the versions you need! Stay tuned for more updates!
We just published a short YouTube video that introduces the Oracle Backend for Spring Boot (and Parse Platform) which makes it super easy to develop, run and manage Spring Boot microservices and mobile applications leveraging all the power of Oracle’s converged database.
In the Transactional Outbox pattern, we have a microservice that needs to perform a database operation (like an insert) and send a message, and either both or neither of these need to happen.
Unlike other messaging providers, Transactional Event Queues is built-in to the Oracle Database and has the unique advantage of being able to expose the underlying database transaction to your application. This allows us perform database and messaging operations in the same transaction – which is exactly what we need to implement this pattern.
Prepare the database
The first thing we want to do is start up the Oracle 23c Free Database. This is very easy to do in a container using a command like this:
docker run --name free23c -d -p 1521:1521 -e ORACLE_PWD=Welcome12345 container-registry.oracle.com/database/free:latest
This will pull the image and start up the database with a listener on port 1521. It will also create a pluggable database (a database container) called “FREEPDB1” and will set the admin passwords to the password you specified on this command.
You can tail the logs to see when the database is ready to use:
docker logs -f free23c
(look for this message...)
#########################
DATABASE IS READY TO USE!
#########################
Also, grab the IP address of the container, we’ll need that to connect to the database:
To set up the necessary permissions, you’ll need to connect to the database with a client. If you don’t have one already, I’d recommend trying the new SQLcl CLI which you can download here. Start it up and connect to the database like this (note that your IP address and password may be different):
sql sys/Welcome12345@//172.17.0.2:1521/freepdb1 as sysdba
SQLcl: Release 22.2 Production on Tue Apr 11 12:36:24 2023
Copyright (c) 1982, 2023, Oracle. All rights reserved.
Connected to:
Oracle Database 23c Free, Release 23.0.0.0.0 - Developer-Release
Version 23.2.0.0.0
SQL>
Now, run these commands to create a user called “mark” and give it the necessary privileges:
SQL> create user mark identified by Welcome12345;
User MARK created.
SQL> grant resource , connect, unlimited tablespace to mark;
Grant succeeded.
SQL> grant execute on dbms_aq to mark;
Grant succeeded.
SQL> grant execute on dbms_aqadm to mark;
Grant succeeded.
SQL> grant execute on dbms_aqin to mark;
Grant succeeded.
SQL> grant execute on dbms_aqjms_internal to mark;
Grant succeeded.
SQL> grant execute on dbms_teqk to mark;
Grant succeeded.
SQL> grant execute on DBMS_RESOURCE_MANAGER to mark;
Grant succeeded.
SQL> grant select_catalog_role to mark;
Grant succeeded.
SQL> grant select on sys.aq$_queue_shards to mark;
Grant succeeded.
SQL> grant select on user_queue_partition_assignment_table to mark;
Grant succeeded.
SQL> exec dbms_teqk.AQ$_GRANT_PRIV_FOR_REPL('MARK');
PL/SQL procedure successfully completed.
SQL> commit;
Commit complete.
SQL> quit;
Ok, we are ready to start on our Java code!
Create the Java project
If you have read my posts before, you’ll know I like to use Maven for my Java projects. Let’s create a Maven POM file (pom.xml) and add the dependencies we need for this application. I’ve also iunclude some profiles to make it easy to run the three main entry points we will create – one to create a queue, one to consume messages, and finally the transactional outbox implementation. Here’s the content for the pom.xml:
I won’t go into a heap of detail on this or the first two Java classes, since they are fairly standard and I have talked about very similiar things before in older posts including this one for example. I will go into detail on the transactional outbox implementation though, don’t worry!
Create a Java class to create the queue
We are going to need a queue to put messages on, so let me show you how to do that in Java. Transactional Event Queues support various types of queues and payloads. This example shows how to create a queue that uses the JMS format. Create a file called src/main/com/example/CreateTxEventQ.java with this content:
As you read through this, you’ll see I’ve just hardcoded the username, password and url for convenience in this file (and the others in this post), of course we’d never do that in real life, would we 🙂 You should also notice that we get a connection, then create the queue table first, set the consumer type (multiple, i.e. pub/sub – so a JMS Topic) and the format (JMS) and the queue itself, and then start it up. Easy, right?
You can run this and create the queue with this command:
mvn exec:exec -Pcreateq
If you want to see the queue in the database, you can log in using that mark user you created and run a query:
While you’re there, let’s also create a table so we have somewhere to perform the database insert operation:
create table customer ( name varchar2(256), email varchar2(256) );
Create the consumer
Let’s create the consumer next. This will be a new Java file in the same directory called Consume.java. Here’s the content:
package com.example;
import java.sql.SQLException;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import oracle.AQ.AQException;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import oracle.jms.AQjmsTextMessage;
import oracle.jms.AQjmsTopicSubscriber;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;
public class Consume {
private static String username = "mark";
private static String password = "Welcome12345";
private static String url = "jdbc:oracle:thin:@//172.17.0.2:1521/freepdb1";
private static String topicName = "my_txeventq";
public static void main(String[] args) throws AQException, SQLException, JMSException {
// create a topic session
PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
ds.setURL(url);
ds.setUser(username);
ds.setPassword(password);
// create a JMS topic connection and session
TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
TopicConnection conn = tcf.createTopicConnection();
conn.start();
TopicSession session =
(AQjmsSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
// create a subscriber on the topic
Topic topic = ((AQjmsSession) session).getTopic(username, topicName);
AQjmsTopicSubscriber subscriber =
(AQjmsTopicSubscriber) session.createDurableSubscriber(topic, "my_subscriber");
System.out.println("Waiting for messages...");
// wait forever for messages to arrive and print them out
while (true) {
// the 1_000 is a one second timeout
AQjmsTextMessage message = (AQjmsTextMessage) subscriber.receive(1_000);
if (message != null) {
if (message.getText() != null) {
System.out.println(message.getText());
} else {
System.out.println();
}
}
session.commit();
}
}
}
This one is a fairly standard JMS consumer. It is going to create a subscription to that topic we just created, and wait for messages to arrive, and then just print the content on the screen. Nice and simple. You can run this with this command:
mvn exec:exec -Pconsume
Leave that running so that you see messages as they are produced. Later, when you run the transactional outbox producer, run it in a different window so that you can see what happens in the consumer.
Implement the Transactional Outbox pattern
Yay! The fun part! Here’s the code for this class, which will go into a new Java file in the same dircetory called Publish.java. I’ll walk through this code step by step.
package com.example;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import oracle.AQ.AQException;
import oracle.jms.AQjmsAgent;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import oracle.jms.AQjmsTextMessage;
import oracle.jms.AQjmsTopicPublisher;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;
public class Publish {
private static String username = "mark";
private static String password = "Welcome12345";
private static String url = "jdbc:oracle:thin:@//172.17.0.2:1521/freepdb1";
private static String topicName = "my_txeventq";
public static void main(String[] args) throws JMSException, SQLException {
AQjmsTopicPublisher publisher = null;
TopicSession session = null;
TopicConnection tconn = null;
Connection conn = null;
if (args.length != 3) {
System.err.println("""
You must provide 3 arguments - name, email and failure mode
failure mode:
0 do not fail
1 fail before insert and publish
2 fail after insert, before publish
3 fail after insert and publlsh
""");
}
String name = args[0];
String email = args[1];
int failMode = Integer.parseInt(args[2]);
try {
// create a topic session
PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
ds.setURL(url);
ds.setUser(username);
ds.setPassword(password);
// create a JMS topic connection and session
TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
tconn = tcf.createTopicConnection();
tconn.start();
// open a Transactional session
session = (AQjmsSession) tconn.createSession(true, Session.AUTO_ACKNOWLEDGE);
// also get the JDBC connection
conn = ((AQjmsSession) session).getDBConnection();
conn.setAutoCommit(false);
// if failMode = 1, fail here
if (failMode == 1) throw new Exception();
// first, perform the database operation
PreparedStatement stmt = conn.prepareStatement("insert into customer (name, email) values (?, ?)");
stmt.setString(1, name);
stmt.setString(2, email);
stmt.executeUpdate();
System.out.println("row inserted");
// if failMode = 2, fail here
if (failMode == 2) throw new Exception();
// second, publish the message
Topic topic = ((AQjmsSession) session).getTopic(username, topicName);
publisher = (AQjmsTopicPublisher) session.createPublisher(topic);
AQjmsTextMessage message = (AQjmsTextMessage) session.createTextMessage("new customer with name=" + name + " and email=" + email);
publisher.publish(message, new AQjmsAgent[] { new AQjmsAgent("my_subscription", null) });
System.out.println("message sent");
// if failMode = 3, fail here
if (failMode == 3) throw new Exception();
// we didn't fail - so commit the transaction
if (failMode == 0) session.commit();
} catch (Exception e) {
System.err.println("rolling back");
if (conn != null) conn.rollback();
} finally {
// clean up
if (publisher != null) publisher.close();
if (session != null) session.close();
if (tconn != null) tconn.close();
}
}
}
Ok, so the overall structure of the code is as follows:
First, we are going to start a transaction. Then we will perform two operations – insert a record into the customer table, and send a message on a topic. If eevrything works as expected, we will commit the transaction. Of course, if there is a failure at any point, we will rollback instead. Notice the arrows are labeled with numbers – in the code I have included failure points that correspond to each of these arrows.
At the start of the main method, we are going to check we have the expected arguments — the name and email, and the point at which to fail, i.e., which of those arrows to simulate a failure at. A “0” indicates that no failure should be simulated. So if we run the code with “mark mark@example.com 2” as the input, we expect it to fail on the “2” arrow – after it inserted the row in the table and before it sent the message on the topic.
Next we get both a JMS Connection and a JDBC Connection. This is important because it allows us to have a single transaction. Note the following lines:
// open a Transactional session
session = (AQjmsSession) tconn.createSession(true, Session.AUTO_ACKNOWLEDGE);
also get the JDBC connection
conn = ((AQjmsSession) session).getDBConnection();
conn.setAutoCommit(false);
We explicity set the “auto commit” to false on the JDBC connection – we want to control exactly if and when work is commited, we do not want any automatic commits to occur. And on the JMS session we set the “transacted” parameter to true. That’s the first parameter in the createSession() call. This tells it to use the same database transaction.
Next, you will notice that we simulate a failure if the failure point was “1”:
if (failMode == 1) throw new Exception();
If an exception is thrown at this point (or any point), we’d expect to see no new rows in the database and no messages recieved by the consumer. We can check the table with this query:
select * from customer;
And you will see output like this in the consumer window every time a message is produced, so if you do not see that output – no messages:
new customer with name=jack and email=jack@jack.com
You can also check directly in the database with this query:
select * from my_txeventq;
The next thing you will see is a standard JDBC Prepared Statement to insert a row into the customer table. Notice that I don’t commit yet.
And next, we have the code to publish a message on the topic:
AQjmsTextMessage message = (AQjmsTextMessage) session.createTextMessage("new customer with name=" + name + " and email=" + email);
publisher.publish(message, new AQjmsAgent[] { new AQjmsAgent("my_subscription", null) });
System.out.println("message sent");
Then you’ll see failure point “3” and then finally the commit!
Next, notice that the catch block contains a rollback on the database connection. You don’t have to rollback the JMS session as well – since they are in the same transaction, this one rollback call is enough to rollback all of the operations.
Run the Transactional Outbox code
Now we’re ready to run the code! First, notice in the POM file we created a profile called “publish” whic contains the following configuration:
The last three arguments are the name, email and the failure point. If you go ahead and run it as is (with failure point 0, meaning no failure) then it should actually get all the way through to the commit. You should see output in the consumer window to let you know the message was produced, and you can check the table in the database to see the new record in there. Run the code like this:
mvn exec:exec -Pproduce
Of course, you’ll see a record in the table and the message.
If you now edit the POM file and change that last argument from 0 to any of the other options and run it again, you’ll notice that it rolls back and you do not get a new record in the table or a message produced on the topic.
How do I know it really worked?
If you’d like to experiment and convince yourself it really is working, try something like commenting out failure point 2 like this:
// if (failMode == 2) throw new Exception();
When you run the code again, you will now see that there is a row in the database that was not rolled back (because the failure never occured and the exception was never thrown) but the message was never sent (becuase the commit was never run due to failMode being 2, not 0).
If you tweak the failure points you can easily convince yourself that it is in fact working just as expected 🙂
So there you go, that’s the Transactional Outbox pattern implemented using Transactional Event Queues with Oracle Database 23c Free – that was pretty easy, right? Hope you enjoyed it, and see you soon!
Hi everyone! Big news today, just announced at Oracle CloudWorld in Singapore!
The new Oracle Database 23c Free – Developer Release is now available.
Oracle Database 23c Free – Developer Release is the first release of the next-generation Oracle Database, allowing developers a head-start on building applications with innovative 23c features that simplify development of modern data-driven apps. The entire feature set of Oracle Database 23c is planned to be generally available within the next 12 months.
It has heaps of new developer-focused features and its completely free! And easy to download and use!
My two favorite features are:
the new JSON Relational Duality Views which allow you to create a JSON document representation from a number of existing tables, and they are read/write! So you can use JSON in your applications and have the underlying data stored in relational tables. Of course you can store it in JSON too if you want to!
JavaScript Stored Procedures, or as I like to think of them – in-database microservices which can scale to zero, with fast startup and scaling, and resource management to prevent noisy neighbors!
I look forward to writing posts about those, and some other exicting new features really soon.
Hi again! In this earlier post, I mentioned that I am speaking at Level Up 2023. The session catalog has just been released on the event website. You can find my sessions in this stream:
Hi! I am going to be speaking at the Level Up 2023 event at Oracle Redwood Shores in March. I will talking about our new Developer Previews for both Oracle Backend for Spring Boot and Oracle Backend for Parse Platform, and running a hands on lab where we will use those to build a “Cloud Banking” application in Spring Boot complete with a web and mobile front end user interface. In the lab we’ll explore topics like service discovery, external configuration, workflow, API management, fault tolerance and observability.
If you’re in the Bay Area and you’d like to attend in person – or if you’d like to attend from anywhere digitally – you can find more information and register here:
1,326,886 people have been kind enough to visit our humble blog. Others get our posts by RSS or email or through syndicators. We hope you took away something of value. Please come again!
Copyright 2009-2022 Mark Nelson and other contributors. All Rights Reserved. The views expressed in this blog are our own and do not necessarily reflect the views of Oracle Corporation. All content is provided on an ‘as is’ basis, without warranties or conditions of any kind, either express or implied, including, without limitation, any warranties or conditions of title, non-infringement, merchantability, or fitness for a particular purpose. You are solely responsible for determining the appropriateness of using or redistributing and assume any risks.
The header image is a photograph of a cafe in Ravello, Italy, taken by one of my favourite photographers, Roland Slee. Used with permission. Copyright Roland Slee.
About Advertising
WordPress does not allow blog owners to advertise on blogs hosted on wordpress.com, however they do sometimes inject their own advertisments into some pages for some users in order to produce revenue to support the great service they provide to us at no charge. WordPress provide blog owners with the ability to opt-out of this advertising for a small yearly fee. We prefer not to have advertisements displayed to readers of our blog and have opted out for your viewing pleasure.
You must be logged in to post a comment.