Getting Started with the Kafka Connect for YugabyteDB (beta)

Suranjan Kumar

Kafka Connect is a popular tool for scaling and reliably streaming data between Apache Kafka and other data systems. It ships with a JDBC Sink which is used to insert data from Kafka to a database. Although the default JDBC Sink is good for many popular RDBMS it isn’t optimized for distributed SQL databases that provide linear scalability and high availability like YugabyteDB.

In our earlier blog introducing YugabyteDB 2.7, we announced the YugabyteDB Sink Connector for Apache Kafka that improves on the default JDBC Sink delivering better resilience and scale when used with the YugabyteDB distributed SQL database. The YugabyteDB Sink Connector supports inserting into SQL & CQL (Cassandra) tables. The YugabyteDB Sink Connector is currently in beta but is fully functional and ready for you to use.

YugabyteDB Sink Connector for Apache Kafka

YugabyteDB-Sink-Connector-for-Apache-Kafka-Graphic

As seen in the diagram, using the YugabyteDB Sink Connector, multiple Kafka Connect workers can connect to different YugabyteDB TServers. This has two advantages:

  • Higher throughput because the load is balanced between multiple TServers. (In a follow-up blog we will show how this achieves high throughput.)
  • In case the node hosting a TServer goes down, the Kafka Connect workers will connect to other TServers and avoid downtime.

As you add new TServers to handle increased load, you can modify the YugabyteDB Sink Connector properties file and add the new TServer endpoints. This will allow the Kafka Connect workers to connect to new TServers as you scale out.

Getting Started:

To see the basic functionality of the sink connector, you’ll be copying Avro data from a single Kafka topic to a local YugabyteDB database using the JDBC as well as the CQL Sink connector.

Prerequisites

You will need the following:

Configure Kafka

1. Assuming you have downloaded the tar file, extract it and set the env variables CONFLUENT_HOME and PATH.

confluent-6.1.1.tar
tar xzf confluent-6.1.1.tar
cd confluent-6.1.1
export CONFLUENT_HOME=/Users/suranjan/confluent-6.1.1
export PATH=$PATH:$CONFLUENT_HOME/bin

2. By default ZooKeeper, Kafka, Schema Registry, Kafka Connect REST API, and Kafka Connect are started with the `confluent local services start` command.

confluent local services start
The local commands are intended for a single-node development environment only,
NOT for production usage. https://docs.confluent.io/current/cli/index.html
 
Using CONFLUENT_CURRENT: /var/folders/_1/ltd94t1x2nsdrwj302jl85vc0000gn/T/confluent.127538
Starting ZooKeeper
ZooKeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting ksqlDB Server
ksqlDB Server is [UP]

3. You need to stop the Kafka Connect service and restart it after changing the configuration to load the YugabyteDB Sink Connector jar.

To stop Kafka Connect execute the following:

confluent local services connect stop 
The local commands are intended for a single-node development environment only,
NOT for production usage. https://docs.confluent.io/current/cli/index.html
 
Using CONFLUENT_CURRENT: /var/folders/_1/ltd94t1x2nsdrwj302jl85vc0000gn/T/confluent.127538
Stopping Connect
Connect is [DOWN]

4. Download the jar for the YugabyteDB Sink Connector here. Alternatively, build the YugabyteDB Sink Connector jar by following the instructions in the Building from Sources section of the readme.

5. Save the jar to etc/connectors/ in your Confluent folder.

6. In your Confluent folder, modify the file etc/schema-registry/connect-avro-distributed.properties to add the YugabyteDB Sink Connector jar to the plugin.path.Set plugin.path to the path of your YugabyteDB Sink jar, and save the file.

...
plugin.path=/Users/suranjan/confluent-6.1.1/etc/connectors/kafka-connect-yugabytedb-sink-1.4.1-SNAPSHOT.jar,share/java

7. You need to start Kafka Connect again.

To start only the Kafka Connect service with the properties file you just modified, execute the following:

connect-distributed
etc/schema-registry/connect-avro-distributed.properties

8. To verify that the plugins are loaded correctly, execute `confluent local services connect plugin list`. Verify that the list of plugins has com.yugabyte.jdbc.JdbcSinkConnector and com.datastax.oss.kafka.sink.CassandraSinkConnector.

confluent local services connect plugin list
{
    "class": "com.datastax.kafkaconnector.DseSinkConnector",
    "type": "sink",
    "version": "1.4.1-SNAPSHOT"
  },
  {
    "class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
    "type": "sink",
    "version": "1.4.1-SNAPSHOT"
  },
  {
    "class": "com.yugabyte.jdbc.JdbcSinkConnector",
    "type": "sink",
    "version": "1.4.1-SNAPSHOT"
  },
  {
    "class": "com.yugabyte.jdbc.JdbcSourceConnector",
    "type": "source",
    "version": "1.4.1-SNAPSHOT"
  },

Load the sink connector:

1. In your Confluent folder, create a file etc/kafka/yb-jdbc-sink-quickstart.properties with the following contents:

{
  "name": "yb-jdbc-sink",
  "config": {
    "connector.class": "com.yugabyte.jdbc.JdbcSinkConnector",
    "tasks.max": "10",
    "topics": "orders",
    "connection.urls":"jdbc:postgresql://localhost:5433/yugabyte",
    "connection.user":"yugabyte",
    "connection.password":"yugabyte",
    "batch.size":"256",
    "mode":"INSERT",
    "auto.create":"true"
  }
}

2. To load the sink connector, you need to connect to the rest API and provide the config file.

curl -X POST -H "Content-Type: application/json" -d @/Users/suranjan/confluent-6.1.1/etc/kafka/yb-jdbc-sink-quickstart.properties "localhost:8083/connectors"
{"name":"yb-jdbc-sink","config":{"connector.class":"com.yugabyte.jdbc.JdbcSinkConnector","tasks.max":"10","topics":"orders","connection.urls":"jdbc:postgresql://localhost:5433/yugabyte","connection.user":"yugabyte","connection.password":"yugabyte","batch.size":"256","mode":"INSERT","auto.create":"true","name":"yb-jdbc-sink"},"tasks":[],"type":"sink"}

Verify the integration

1. Produce a record into the orders topic in Kafka using the Avro producer.

./bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price",
"type": "float"}]}'

2. The console producer waits for input.

Copy and paste the following record into the terminal and press Enter:

{"id": 999, "product": "foo", "quantity": 100, "price": 50}

3. Verify that the message is published to the topic by using an Avro consumer.

./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic orders --from-beginning
{"id":999,"product":"foo","quantity":100,"price":50.0}

Now you can query the YugabyteDB database, and you should see that the orders table was automatically created and contain the record you just published to Kafka.

yugabyte=# select * from orders;
 id  | product | quantity | price
-----+---------+----------+-------
 999 | foo     |      100 |    50
(1 row)

Next Steps

Interested in learning more about using Kafka Connect with YugabyteDB? Download the YugabyteDB Sink Connector for Apache Kafka to get started! You can also sign up for the beta of Yugabyte Cloud, a fully managed YugabyteDB-as-a-service. It’s PostgreSQL reimagined for a cloud native world.

Suranjan Kumar

Related Posts

Explore Distributed SQL and YugabyteDB in Depth

Discover the future of data management.
Learn at Yugabyte University
Get Started
Browse Yugabyte Docs
Explore docs
PostgreSQL For Cloud Native World
Read for Free