Playing with Kafka Java Client for TEQ – creating the simplest of producers and consumers

Today I was playing with Kafka Java Client for TEQ, that allows you to use Oracle Transactional Event Queues (formerly known as Sharded Queues) in the Oracle Database just like Kafka.

Kafka Java Client for TEQ is available as a preview in GitHub here: http://github.com/oracle/okafka

In this preview version, there are some limitations documented in the repository, but the main one to be aware of is that you need to use the okafka library, not the regular kafka one, so you would need to change existing kafka client code if you wanted to try out the preview.

Preparing the database

To get started, I grabbed a new Oracle Autonomous Database instance on Oracle Cloud, and I opened up the SQL Worksheet in Database Actions and created myself a user. As the ADMIN user, I ran the following commands:

create user mark identified by SomePassword;  -- that's not the real password!
grant connect, resource to mark;
grant create session to mark;
grant unlimited tablespace to mark;
grant execute on dbms_aqadm to mark;
grant execute on dbms_aqin to mark;
grant execute on dbms_aqjms to mark;
grant select_catalog_role to mark;
grant select on gv$instance to mark;
grant select on gv$listener_network to mark;
commit;

And of course, I needed a topic to work with, so I logged on to SQL Worksheet as my new MARK user and created a topic called topic1 with these commands:

begin
    sys.dbms_aqadm.create_sharded_queue(queue_name => 'topic1', multiple_consumers => TRUE); 
    sys.dbms_aqadm.set_queue_parameter('topic1', 'SHARD_NUM', 1);
    sys.dbms_aqadm.set_queue_parameter('topic1', 'STICKY_DEQUEUE', 1);
    sys.dbms_aqadm.set_queue_parameter('topic1', 'KEY_BASED_ENQUEUE', 1);
    sys.dbms_aqadm.start_queue('topic1');
end;

Note that this is for Oracle Database 19c. If you are using 21c, create_sharded_queue is renamed to create_transactional_event_queue, so you will have to update that line.

The topic is empty right now, since we just created it, but here are a couple of queries that will be useful later. We can see the messages in the topic, with details including the enqueue time, status, etc., using this query:

select * from topic1;

This is a useful query to see a count of messages in each status:

select msg_state, count(*)
from aq$topic1
group by msg_state;

Building the OKafka library

We need to build the OKafka library and install it in our local Maven repository so that it will be available to use as a dependency since the preview is not currently available in Maven Central.

First, clone the repository:

git clone https://github.com/oracle/okafka

Now we can build the uberjar with the included Gradle wrapper:

cd okafka
./gradlew fullJar

This will put the JAR file in gradle/build/libs and we can install this into our local Maven repository using this command:

mvn install:install-file \
    -DgroupId=org.oracle.okafka \
    -DartifactId=okafka \
    -Dversion=0.8 \
    -Dfile=clients/build/libs/okafka-0.8-full.jar \
    -DpomFile=clients/okafka-0.8.pom 

Now we are ready to start writing our code!

Creating the Producer

Let’s start by creating our Maven POM file. In a new directory, called okafka, I created a file called pom.xml with the following content:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" 
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.example</groupId>
	<artifactId>okafka</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>okafka</name>

	<properties>
		<java.version>17</java.version>
		<maven.compiler.source>17</maven.compiler.source>
		<maven.compiler.target>17</maven.compiler.target>
		<okafka.version>0.8</okafka.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.oracle.okafka</groupId>
			<artifactId>okafka</artifactId>
			<version>${okafka.version}</version>
		</dependency>
	</dependencies>
</project>

I am using Java 17 for this example. But you could use anything from 1.8 onwards, just update the version in the properties if you are using an earlier version.

Now let’s create our producer class:

mkdir -p src/main/java/com/example/okafka
touch src/main/java/com/example/okafka/Producer.java

Here’s the content for Producer.java:

package com.example.okafka;

import java.io.FileNotFoundException;
import java.io.InputStream;
import java.util.Properties;

import org.oracle.okafka.clients.producer.KafkaProducer;
import org.oracle.okafka.clients.producer.ProducerRecord;

public class Producer {

    private static final String propertiesFilename = "producer.properties";

    public static void main(String[] args) {
        // configure logging level
        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "INFO");

        // load props
        Properties props = getProperties();

        String topicName = props.getProperty("topic.name", "TOPIC1");

        try(KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            for (int i = 0; i < 100; i++) {
                producer.send(new ProducerRecord<String, String>(
                    topicName, 0, "key", "value " + i));
            }
            System.out.println("sent 100 messages");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static Properties getProperties() {
        Properties props = new Properties();

        try (
                InputStream inputStream = Producer.class
                    .getClassLoader()
                    .getResourceAsStream(propertiesFilename);
        ) {
            if (inputStream != null) {
                props.load(inputStream);
            } else {
                throw new FileNotFoundException(
                     "could not find properties file: " + propertiesFilename);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return props;
    }
}

Let’s walk through this code and talk about what it does.

First, let’s notice the imports. We are importing the OKafka versions of the familiar Kafka classes. These have the same interfaces as the standard Kafka ones, but they work with Oracle TEQ instead:

import org.oracle.okafka.clients.producer.KafkaProducer;
import org.oracle.okafka.clients.producer.ProducerRecord;

In the main() method we first set the log level and then we load some properties from our producer.properties config file. You will see the getProperties() method at the end of the file is a fairly standard, it is just reading the file and returning the contents as a new Properties object.

Let’s see what’s in that producer.properties file, which is located in the src/main/resources directory:

oracle.service.name=xxxxx_prod_high.adb.oraclecloud.com
oracle.instance.name=prod_high
oracle.net.tns_admin=/home/mark/src/okafka/wallet
security.protocol=SSL
tns.alias=prod_high

bootstrap.servers=adb.us-ashburn-1.oraclecloud.com:1522
batch.size=200
linger.ms=100
buffer.memory=326760
key.serializer=org.oracle.okafka.common.serialization.StringSerializer
value.serializer=org.oracle.okafka.common.serialization.StringSerializer
topic.name=TOPIC1

There are two groups of properties in there. The first group provide details about my Oracle Autonomous Database instance, including the location of the wallet file – we’ll get that and set it up in a moment.

The second group are the normal Kafka properties that you might expect to see, assuming you are familiar with Kafka. Notice that the bootstrap.servers lists the address of my Oracle Autonomous Database, not a Kafka broker! Also notice that we are using the serializers (and later, deserializers) provided in the OKafka library, not the standard Kafka ones.

Next, we set the topic name by reading it from the properties file. If it is not there, the second argument provides a default/fallback value:

String topicName = props.getProperty("topic.name", "TOPIC1");

And now we are ready to create the producer and send some messages:

try(KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
    for (int i = 0; i < 100; i++) {
        producer.send(new ProducerRecord<String, String>(
            topicName, 0, "key", "value " + i));
    }
    System.out.println("sent 100 messages");
} catch (Exception e) {
    e.printStackTrace();
}

We created the KafkaProducer and for this example, we are using String for both the key and the value.

We have a loop to send 100 messages, which we create with the ProducerRecord class. We are just setting them to some placeholder data.

Ok, that’s all we need in the code. But we will need to get the wallet and set it up so Java programs can use it to authenticate. Have a look at this post for details on how to do that! You just need to download the wallet from the OCI console, unzip it into a directory called wallet – put that in the same directory as the pom.xml, and then edit the sqlnet.ora to set the DIRECTORY to the right location, e.g. /home/mark/src/okafka/wallet for me, and then add your credentials using the setup_wallet.sh I showed in that post.

Finally, you need to add these lines to the ojdbc.properties file in the wallet directory to tell OKafka the user to connect to the database with:

user=mark
password=SomePassword
oracle.net.ssl_server_dn_match=true

With the wallet set up, we are ready to build and run our code!

mvn clean package
CLASSPATH=target/okafka-0.0.1-SNAPSHOT.jar
CLASSPATH=$CLASSPATH:$HOME/.m2/repository/org/oracle/okafka/okafka/0.8/okafka-0.8.jar
java -classpath $CLASSAPTH com.example.okafka.Producer

The output should look like this:

[main] INFO org.oracle.okafka.clients.producer.ProducerConfig - ProducerConfig values: 
        acks = 1
        batch.size = 200
        bootstrap.servers = [adb.us-ashburn-1.oraclecloud.com:1522]
        buffer.memory = 326760
        client.id = 
        compression.type = none
        connections.max.idle.ms = 540000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.oracle.okafka.common.serialization.StringSerializer
        linger.ms = 100
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        oracle.instance.name = prod_high
        oracle.net.tns_admin = /home/mark/src/okafka/wallet
        oracle.service.name = xxxxx_prod_high.adb.oraclecloud.com
        partitioner.class = class org.oracle.okafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 0
        retry.backoff.ms = 100
        security.protocol = SSL
        send.buffer.bytes = 131072
        tns.alias = prod_high
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.oracle.okafka.common.serialization.StringSerializer

[main] WARN org.oracle.okafka.common.utils.AppInfoParser - Error while loading kafka-version.properties :inStream parameter is null
[main] INFO org.oracle.okafka.common.utils.AppInfoParser - Kafka version : unknown
[main] INFO org.oracle.okafka.common.utils.AppInfoParser - Kafka commitId : unknown
[kafka-producer-network-thread | producer-1] INFO org.oracle.okafka.clients.Metadata - Cluster ID: 
sent 100 messages
[main] INFO org.oracle.okafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

You can see it dumps out the properties, and then after some informational messages you see the “sent 100 messages” output. Now you might want to go and run that query to look at the messages in the database!

Now, lets move on to creating a consumer, so we can read those messages back.

Creating the Consumer

The consumer is going to look very similar to the producer, and it will also have its own properties file. Here’s the contents of the properties file first – put this in src/main/resources/consumer.properties:

oracle.service.name=xxxxx_prod_high.adb.oraclecloud.com
oracle.instance.name=prod_high
oracle.net.tns_admin=/home/mark/src/testing-okafka/okafka/wallet
security.protocol=SSL
tns.alias=prod_high

bootstrap.servers=adb.us-ashburn-1.oraclecloud.com:1522
group.id=bob
enable.auto.commit=true
auto.commit.interval.ms=10000
key.deserializer=org.oracle.okafka.common.serialization.StringDeserializer
value.deserializer=org.oracle.okafka.common.serialization.StringDeserializer
max.poll.records=100

And here is the content for Consumer.java which you create in src/main/java/com/example/okafka:

package com.example.okafka;

import java.io.FileNotFoundException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.oracle.okafka.clients.consumer.ConsumerRecord;
import org.oracle.okafka.clients.consumer.ConsumerRecords;
import org.oracle.okafka.clients.consumer.KafkaConsumer;

public class Consumer {
    private static final String propertiesFilename = "consumer.properties";

    public static void main(String[] args) {
        // logging level
        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "INFO");


        // load props
        Properties props = getProperties();

        String topicName = props.getProperty("topic.name", "TOPIC1");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topicName));

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(30_000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(
                record.topic() + " " + 
                record.partition() + " " + 
                record.key() + " " + 
                record.value());
        }
        consumer.close();
    }

    private static Properties getProperties() {
        Properties props = new Properties();

        try (
                InputStream inputStream = Producer.class
                    .getClassLoader()
                    .getResourceAsStream(propertiesFilename);
        ) {
            if (inputStream != null) {
                props.load(inputStream);
            } else {
                throw new FileNotFoundException(
                    "could not find properties file: " + propertiesFilename);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return props;
    }
}

A lot of this is the same as the producer, so let’s walk through the parts that are different.

First, we load a the different properties file, the consumer one, it has a few different properties that are relevant for consumers. In particular, we are setting the max.poll.records to 100 – so we’ll only be reading at most 100 messages off the topic at a time.

Here’s how we create the consumer:

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topicName));

Again, you may notice that this is very similar to Kafka. We are using String as the type for both the key and value. Notice we provided the appropriate deserializers in the property file, the ones from the OKafka library, not the standard Kafka ones.

Here’s the actual consumer code:

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(30_000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(
                record.topic() + " " + 
                record.partition() + " " + 
                record.key() + " " + 
                record.value());
        }
        consumer.close();

We open our consumer and poll for messages (for 30 seconds) and then we just print out some information about each message, and then close out consumer! Again, this is very simple, but its enough to test consuming messages.

We can run this and we should see all of the message data in the output, here’s how to run it, and an excerpt of the output:

mvn clean package
CLASSPATH=target/okafka-0.0.1-SNAPSHOT.jar
CLASSPATH=$CLASSPATH:$HOME/.m2/repository/org/oracle/okafka/okafka/0.8/okafka-0.8.jar
java -classpath $CLASSAPTH com.example.okafka.Consumer

[main] INFO org.oracle.okafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
        auto.commit.interval.ms = 10000
        auto.offset.reset = latest
        bootstrap.servers = [adb.us-ashburn-1.oraclecloud.com:1522]
        check.crcs = true
        client.id = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = bob
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.oracle.okafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 100
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        oracle.instance.name = prod_high
        oracle.net.tns_admin = /home/mark/src/okafka/wallet
        oracle.service.name = xxxxx_prod_high.adb.oraclecloud.com
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        security.protocol = SSL
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        tns.alias = prod_high
        value.deserializer = class org.oracle.okafka.common.serialization.StringDeserializer

[main] WARN org.oracle.okafka.common.utils.AppInfoParser - Error while loading kafka-version.properties :inStream parameter is null
[main] INFO org.oracle.okafka.common.utils.AppInfoParser - Kafka version : unknown
[main] INFO org.oracle.okafka.common.utils.AppInfoParser - Kafka commitId : unknown
TOPIC1 0 key value 0
TOPIC1 0 key value 1
TOPIC1 0 key value 2
...

So there you go! We successfully created a very simple producer and consumer and we sent and received messages from a topic using the OKafka library and Oracle Transactional Event Queues!

About Mark Nelson

Mark Nelson is a Developer Evangelist at Oracle, focusing on microservices and messaging. Before this role, Mark was an Architect in the Enterprise Cloud-Native Java Team, the Verrazzano Enterprise Container Platform project, worked on Wercker, WebLogic and was a senior member of the A-Team since 2010, and worked in Sales Consulting at Oracle since 2006 and various roles at IBM since 1994.
This entry was posted in Uncategorized and tagged , , . Bookmark the permalink.

1 Response to Playing with Kafka Java Client for TEQ – creating the simplest of producers and consumers

  1. Pingback: Last Week in Stream Data Integration & Stream Analytics – 27.04.2022 | Enjoy IT - SOA, Java, Event-Driven Computing and Integration

Leave a comment