Today I was playing with Kafka Java Client for TEQ, that allows you to use Oracle Transactional Event Queues (formerly known as Sharded Queues) in the Oracle Database just like Kafka.
Kafka Java Client for TEQ is available as a preview in GitHub here: http://github.com/oracle/okafka
In this preview version, there are some limitations documented in the repository, but the main one to be aware of is that you need to use the okafka library, not the regular kafka one, so you would need to change existing kafka client code if you wanted to try out the preview.
Preparing the database
To get started, I grabbed a new Oracle Autonomous Database instance on Oracle Cloud, and I opened up the SQL Worksheet in Database Actions and created myself a user. As the ADMIN
user, I ran the following commands:
create user mark identified by SomePassword; -- that's not the real password!
grant connect, resource to mark;
grant create session to mark;
grant unlimited tablespace to mark;
grant execute on dbms_aqadm to mark;
grant execute on dbms_aqin to mark;
grant execute on dbms_aqjms to mark;
grant select_catalog_role to mark;
grant select on gv$instance to mark;
grant select on gv$listener_network to mark;
commit;
And of course, I needed a topic to work with, so I logged on to SQL Worksheet as my new MARK
user and created a topic called topic1
with these commands:
begin
sys.dbms_aqadm.create_sharded_queue(queue_name => 'topic1', multiple_consumers => TRUE);
sys.dbms_aqadm.set_queue_parameter('topic1', 'SHARD_NUM', 1);
sys.dbms_aqadm.set_queue_parameter('topic1', 'STICKY_DEQUEUE', 1);
sys.dbms_aqadm.set_queue_parameter('topic1', 'KEY_BASED_ENQUEUE', 1);
sys.dbms_aqadm.start_queue('topic1');
end;
Note that this is for Oracle Database 19c. If you are using 21c, create_sharded_queue
is renamed to create_transactional_event_queue
, so you will have to update that line.
The topic is empty right now, since we just created it, but here are a couple of queries that will be useful later. We can see the messages in the topic, with details including the enqueue time, status, etc., using this query:
select * from topic1;
This is a useful query to see a count of messages in each status:
select msg_state, count(*)
from aq$topic1
group by msg_state;
Building the OKafka library
We need to build the OKafka library and install it in our local Maven repository so that it will be available to use as a dependency since the preview is not currently available in Maven Central.
First, clone the repository:
git clone https://github.com/oracle/okafka
Now we can build the uberjar with the included Gradle wrapper:
cd okafka
./gradlew fullJar
This will put the JAR file in gradle/build/libs
and we can install this into our local Maven repository using this command:
mvn install:install-file \
-DgroupId=org.oracle.okafka \
-DartifactId=okafka \
-Dversion=0.8 \
-Dfile=clients/build/libs/okafka-0.8-full.jar \
-DpomFile=clients/okafka-0.8.pom
Now we are ready to start writing our code!
Creating the Producer
Let’s start by creating our Maven POM file. In a new directory, called okafka
, I created a file called pom.xml
with the following content:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>okafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>okafka</name>
<properties>
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<okafka.version>0.8</okafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.oracle.okafka</groupId>
<artifactId>okafka</artifactId>
<version>${okafka.version}</version>
</dependency>
</dependencies>
</project>
I am using Java 17 for this example. But you could use anything from 1.8 onwards, just update the version in the properties if you are using an earlier version.
Now let’s create our producer class:
mkdir -p src/main/java/com/example/okafka
touch src/main/java/com/example/okafka/Producer.java
Here’s the content for Producer.java
:
package com.example.okafka;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.util.Properties;
import org.oracle.okafka.clients.producer.KafkaProducer;
import org.oracle.okafka.clients.producer.ProducerRecord;
public class Producer {
private static final String propertiesFilename = "producer.properties";
public static void main(String[] args) {
// configure logging level
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "INFO");
// load props
Properties props = getProperties();
String topicName = props.getProperty("topic.name", "TOPIC1");
try(KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>(
topicName, 0, "key", "value " + i));
}
System.out.println("sent 100 messages");
} catch (Exception e) {
e.printStackTrace();
}
}
private static Properties getProperties() {
Properties props = new Properties();
try (
InputStream inputStream = Producer.class
.getClassLoader()
.getResourceAsStream(propertiesFilename);
) {
if (inputStream != null) {
props.load(inputStream);
} else {
throw new FileNotFoundException(
"could not find properties file: " + propertiesFilename);
}
} catch (Exception e) {
e.printStackTrace();
}
return props;
}
}
Let’s walk through this code and talk about what it does.
First, let’s notice the imports. We are importing the OKafka versions of the familiar Kafka classes. These have the same interfaces as the standard Kafka ones, but they work with Oracle TEQ instead:
import org.oracle.okafka.clients.producer.KafkaProducer;
import org.oracle.okafka.clients.producer.ProducerRecord;
In the main()
method we first set the log level and then we load some properties from our producer.properties
config file. You will see the getProperties()
method at the end of the file is a fairly standard, it is just reading the file and returning the contents as a new Properties
object.
Let’s see what’s in that producer.properties
file, which is located in the src/main/resources
directory:
oracle.service.name=xxxxx_prod_high.adb.oraclecloud.com
oracle.instance.name=prod_high
oracle.net.tns_admin=/home/mark/src/okafka/wallet
security.protocol=SSL
tns.alias=prod_high
bootstrap.servers=adb.us-ashburn-1.oraclecloud.com:1522
batch.size=200
linger.ms=100
buffer.memory=326760
key.serializer=org.oracle.okafka.common.serialization.StringSerializer
value.serializer=org.oracle.okafka.common.serialization.StringSerializer
topic.name=TOPIC1
There are two groups of properties in there. The first group provide details about my Oracle Autonomous Database instance, including the location of the wallet file – we’ll get that and set it up in a moment.
The second group are the normal Kafka properties that you might expect to see, assuming you are familiar with Kafka. Notice that the bootstrap.servers
lists the address of my Oracle Autonomous Database, not a Kafka broker! Also notice that we are using the serializers (and later, deserializers) provided in the OKafka library, not the standard Kafka ones.
Next, we set the topic name by reading it from the properties file. If it is not there, the second argument provides a default/fallback value:
String topicName = props.getProperty("topic.name", "TOPIC1");
And now we are ready to create the producer and send some messages:
try(KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>(
topicName, 0, "key", "value " + i));
}
System.out.println("sent 100 messages");
} catch (Exception e) {
e.printStackTrace();
}
We created the KafkaProducer
and for this example, we are using String
for both the key and the value.
We have a loop to send 100 messages, which we create with the ProducerRecord
class. We are just setting them to some placeholder data.
Ok, that’s all we need in the code. But we will need to get the wallet and set it up so Java programs can use it to authenticate. Have a look at this post for details on how to do that! You just need to download the wallet from the OCI console, unzip it into a directory called wallet
– put that in the same directory as the pom.xml
, and then edit the sqlnet.ora
to set the DIRECTORY
to the right location, e.g. /home/mark/src/okafka/wallet
for me, and then add your credentials using the setup_wallet.sh
I showed in that post.
Finally, you need to add these lines to the ojdbc.properties
file in the wallet directory to tell OKafka the user to connect to the database with:
user=mark
password=SomePassword
oracle.net.ssl_server_dn_match=true
With the wallet set up, we are ready to build and run our code!
mvn clean package
CLASSPATH=target/okafka-0.0.1-SNAPSHOT.jar
CLASSPATH=$CLASSPATH:$HOME/.m2/repository/org/oracle/okafka/okafka/0.8/okafka-0.8.jar
java -classpath $CLASSAPTH com.example.okafka.Producer
The output should look like this:
[main] INFO org.oracle.okafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 200
bootstrap.servers = [adb.us-ashburn-1.oraclecloud.com:1522]
buffer.memory = 326760
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.oracle.okafka.common.serialization.StringSerializer
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
oracle.instance.name = prod_high
oracle.net.tns_admin = /home/mark/src/okafka/wallet
oracle.service.name = xxxxx_prod_high.adb.oraclecloud.com
partitioner.class = class org.oracle.okafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
security.protocol = SSL
send.buffer.bytes = 131072
tns.alias = prod_high
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.oracle.okafka.common.serialization.StringSerializer
[main] WARN org.oracle.okafka.common.utils.AppInfoParser - Error while loading kafka-version.properties :inStream parameter is null
[main] INFO org.oracle.okafka.common.utils.AppInfoParser - Kafka version : unknown
[main] INFO org.oracle.okafka.common.utils.AppInfoParser - Kafka commitId : unknown
[kafka-producer-network-thread | producer-1] INFO org.oracle.okafka.clients.Metadata - Cluster ID:
sent 100 messages
[main] INFO org.oracle.okafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
You can see it dumps out the properties, and then after some informational messages you see the “sent 100 messages” output. Now you might want to go and run that query to look at the messages in the database!
Now, lets move on to creating a consumer, so we can read those messages back.
Creating the Consumer
The consumer is going to look very similar to the producer, and it will also have its own properties file. Here’s the contents of the properties file first – put this in src/main/resources/consumer.properties
:
oracle.service.name=xxxxx_prod_high.adb.oraclecloud.com
oracle.instance.name=prod_high
oracle.net.tns_admin=/home/mark/src/testing-okafka/okafka/wallet
security.protocol=SSL
tns.alias=prod_high
bootstrap.servers=adb.us-ashburn-1.oraclecloud.com:1522
group.id=bob
enable.auto.commit=true
auto.commit.interval.ms=10000
key.deserializer=org.oracle.okafka.common.serialization.StringDeserializer
value.deserializer=org.oracle.okafka.common.serialization.StringDeserializer
max.poll.records=100
And here is the content for Consumer.java
which you create in src/main/java/com/example/okafka
:
package com.example.okafka;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.oracle.okafka.clients.consumer.ConsumerRecord;
import org.oracle.okafka.clients.consumer.ConsumerRecords;
import org.oracle.okafka.clients.consumer.KafkaConsumer;
public class Consumer {
private static final String propertiesFilename = "consumer.properties";
public static void main(String[] args) {
// logging level
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "INFO");
// load props
Properties props = getProperties();
String topicName = props.getProperty("topic.name", "TOPIC1");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(30_000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(
record.topic() + " " +
record.partition() + " " +
record.key() + " " +
record.value());
}
consumer.close();
}
private static Properties getProperties() {
Properties props = new Properties();
try (
InputStream inputStream = Producer.class
.getClassLoader()
.getResourceAsStream(propertiesFilename);
) {
if (inputStream != null) {
props.load(inputStream);
} else {
throw new FileNotFoundException(
"could not find properties file: " + propertiesFilename);
}
} catch (Exception e) {
e.printStackTrace();
}
return props;
}
}
A lot of this is the same as the producer, so let’s walk through the parts that are different.
First, we load a the different properties file, the consumer one, it has a few different properties that are relevant for consumers. In particular, we are setting the max.poll.records
to 100
– so we’ll only be reading at most 100 messages off the topic at a time.
Here’s how we create the consumer:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
Again, you may notice that this is very similar to Kafka. We are using String
as the type for both the key and value. Notice we provided the appropriate deserializers in the property file, the ones from the OKafka library, not the standard Kafka ones.
Here’s the actual consumer code:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(30_000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(
record.topic() + " " +
record.partition() + " " +
record.key() + " " +
record.value());
}
consumer.close();
We open our consumer and poll for messages (for 30 seconds) and then we just print out some information about each message, and then close out consumer! Again, this is very simple, but its enough to test consuming messages.
We can run this and we should see all of the message data in the output, here’s how to run it, and an excerpt of the output:
mvn clean package
CLASSPATH=target/okafka-0.0.1-SNAPSHOT.jar
CLASSPATH=$CLASSPATH:$HOME/.m2/repository/org/oracle/okafka/okafka/0.8/okafka-0.8.jar
java -classpath $CLASSAPTH com.example.okafka.Consumer
[main] INFO org.oracle.okafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 10000
auto.offset.reset = latest
bootstrap.servers = [adb.us-ashburn-1.oraclecloud.com:1522]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = bob
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.oracle.okafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 100
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
oracle.instance.name = prod_high
oracle.net.tns_admin = /home/mark/src/okafka/wallet
oracle.service.name = xxxxx_prod_high.adb.oraclecloud.com
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
tns.alias = prod_high
value.deserializer = class org.oracle.okafka.common.serialization.StringDeserializer
[main] WARN org.oracle.okafka.common.utils.AppInfoParser - Error while loading kafka-version.properties :inStream parameter is null
[main] INFO org.oracle.okafka.common.utils.AppInfoParser - Kafka version : unknown
[main] INFO org.oracle.okafka.common.utils.AppInfoParser - Kafka commitId : unknown
TOPIC1 0 key value 0
TOPIC1 0 key value 1
TOPIC1 0 key value 2
...
So there you go! We successfully created a very simple producer and consumer and we sent and received messages from a topic using the OKafka library and Oracle Transactional Event Queues!
Pingback: Last Week in Stream Data Integration & Stream Analytics – 27.04.2022 | Enjoy IT - SOA, Java, Event-Driven Computing and Integration