Configure Content-Based Routing to Kafka Topics with YugabyteDB’s Change Data Capture

Nishant Mittal

In today’s data-driven world, companies are constantly looking for ways to process and analyze data in real time to gain insights and make informed decisions. However, as data volumes continue to grow and compliance requirements increase, efficiently managing and routing data to the appropriate systems and applications has become a daunting task.

As a result, content-based routing has become a key messaging pattern. With content-based routing, messages are selectively routed based on their content, providing precise control over message routing.  This is extremely useful in situations where messages need to be routed to different destinations based on their content. This is especially true when multiple consumers need to process different subsets of the data in the topic.

This blog post will guide you on configuring content-based routing for the YugabyteDB Change Data Capture (CDC) connector. Let’s walk through an example to show how content-based routing can help with GDPR compliance that mandate geolocating data based on a user’s locations. In the example, we will show you how to reroute database events from users whose country is set to a specific value to a different Kafka Topic that has a partition in that country.

YugabyteDB Change Data Capture and Kafka Topics

To demonstrate content-based routing, we’ll use two key technologies. Apache Kafka will be our streaming platform, and YugabyteDB will be our core relational database. For those not familiar with these products here’s a quick review as well as key details on them as it relates to setting up with a streaming platform and change data capture.

  • YugabyteDB is an open source, distributed SQL database ideally built for distributed and scalable transactional applications. Change data capture (CDC) in YugabyteDB ensures that any changes in data due to operations such as inserts, updates, and deletions are identified, captured, and automatically applied to another data repository instance, or made available for consumption by applications and other tools. To consume the events generated by CDC, Debezium is used as the connector. By default, a connector created on the YugabyteDB Debezium CDC Connector streams all change data capture events from a table into a single Kafka Topic. Later we’ll see how you can push or reroute events to other Kafka Topics based on that event’s content (i.e. content-based routing).
  • Apache Kafka is a powerful open-source distributed streaming platform that excels at handling high volumes of data in real-time. It provides reliable, scalable, and high-performance data processing for mission-critical applications. To efficiently manage data streams between Kafka Topics, the Confluent Platform offers a range of powerful tools and connectors. The YugabyteDB Source Connector will be utilized to move data from YugabyteDB to Kafka. This connector streamlines the process and helps ensure seamless data transfer. Confluent Platform helps you operate efficiently at scale, making it an ideal solution for this use case illustration.

Follow the links above for more details, but now let’s look at how we can bring these key pieces together to build a content-based routing system.

How to Set Up Content-Based Routing

Step 1: Include necessary dependencies in the CDC connector

There are some dependencies that are required for content-based routing to work. These are not included in the official yugabyte-debezium-connector for security reasons. In particular, these dependencies are

  • Debezium routing SMT (Single Message Transform)
  • Groovy JSR223 implementation (or GraalVM JavaScript JSR 223 implementation)

To include these dependencies in our Kafka-connect environment, we need to rebuild the yugabyte-debezium-connector image with both the plugins mentioned above.

To get started, use the Dockerfile and docker-compose configuration in our Github cdc-examples repository.

Here’s what the Dockerfile would look like:

FROM quay.io/yugabyte/debezium-connector:latest

# Add the required jar files for content based routing
RUN cd $KAFKA_CONNECT_YB_DIR && curl -so debezium-scripting-2.1.2.Final.jar https://repo1.maven.org/maven2/io/debezium/debezium-scripting/2.1.2.Final/debezium-scripting-2.1.2.Final.jar
RUN cd $KAFKA_CONNECT_YB_DIR && curl -so groovy-4.0.9.jar  https://repo1.maven.org/maven2/org/apache/groovy/groovy/4.0.9/groovy-4.0.9.jar
RUN cd $KAFKA_CONNECT_YB_DIR && curl -so groovy-jsr223-4.0.9.jar  https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/4.0.9/groovy-jsr223-4.0.9.jar

Step 2: Setup the CDC environment

Set up your CDC environment by deploying and configuring Kafka and the yugabyte-debezium-connector, etc. Use the Docker image that was built in Step 1 to ensure content-based routing will work.

Now, create a CDC stream with “before image” enabled. Use the command below.

./yb-admin --master_addresses <master-addresses> create_change_data_stream ysql.<namespace> IMPLICIT ALL

Step 3: Deploy a connector with content-based routing enabled

See below for an example of how a connector configuration that has content-based routing enabled would look. In this example, all the event rows with country set to “UK”, “India” and “USA” on the table “public.users” will be routed to different Kafka Topics called uk_users, india_users and usa_users respectively.

All the other events will still be streamed to the default “ybconnector.public.users” topic.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
 "name": "ybconnector",
 "config": {
   "tasks.max":"2",
   "connector.class": "io.debezium.connector.yugabytedb.YugabyteDBConnector",
   "database.hostname":"'$NODE'",
   "database.master.addresses":"'$MASTERS'",
   "database.port":"5433",
   "database.user": "yugabyte",
   "database.password":"Yugabyte@123",
   "database.dbname":"yugabyte",
   "database.server.name":"ybconnector",
   "snapshot.mode":"initial",
   "database.streamid":"'$1'",
   "table.include.list":"public.users",
   "key.converter":"io.confluent.connect.avro.AvroConverter",
   "key.converter.schema.registry.url":"https://schema-registry:8081",
   "value.converter":"io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url":"https://schema-registry:8081",
   "transforms":"route1,route2,route3",
   "transforms.route1.type":"io.debezium.transforms.ContentBasedRouter",
   "transforms.route1.language":"jsr223.groovy",
   "transforms.route1.topic.expression":"value.after != null ? (value.after?.country?.value == '\''UK'\'' ? '\''uk_users'\'' : null) : (value.before?.country?.value == '\''UK'\'' ? '\''uk_users'\'' : null)",
   "transforms.route2.type":"io.debezium.transforms.ContentBasedRouter",
   "transforms.route2.language":"jsr223.groovy",
   "transforms.route2.topic.expression":"value.after != null ? (value.after?.country?.value == '\''India'\'' ? '\''india_users'\'' : null) : (value.before?.country?.value == '\''India'\'' ? '\''india_users'\'' : null)",
   "transforms.route3.type":"io.debezium.transforms.ContentBasedRouter",
   "transforms.route3.language":"jsr223.groovy",
   "transforms.route3.topic.expression":"value.after != null ? (value.after?.country?.value == '\''USA'\'' ? '\''usa_users'\'' : null) : (value.before?.country?.value == '\''USA'\'' ? '\''usa_users'\'' : null)"
 }
}'

The configuration that controls content re-routing

Look  at the value of “transforms.route1.topic.expression” in the code above.

value.after != null ? (value.after?.country?.value == '\''UK'\'' ? '\''uk_users'\'' : null) : (value.before?.country?.value == '\''UK'\'' ? '\''uk_users'\'' : null)"

This expression checks if the value of the row after the operation has the country set to “UK.” If ‘yes’ then the expression returns “uk_users.” If “no”, it returns “null,” and no value appears after the operation (for example, in a “delete” operation), the expression also checks for the same condition on row values before the operation.

The value that is returned determines which new Kafka Topic will receive the re-routed event. If it returns “null”, the event is sent to the default topic.

NOTE: In this example, we rerouted the events for three countries (i.e. UK, India and USA). We could have achieved this using a single expression, but for the sake of illustration and readability we created three different routers. For more advanced routing configuration, you can refer to Debezium’s official documentation on content-based routing.

Step 4: Verify the message routing

You can verify the message routing and connector deployment by performing CRUD operations on the “users” table. You should be able to see new Kafka Topics created specifically for the users who have their country set to “UK”, “India” or “USA”.

Kafka Control Center in Confluent Platform
Kafka Control Center in Confluent Platform

Conclusion

In this post, we walked through the process of rerouting change data capture (CDC) events from a table to different Kafka Topics based on the event content.

Nishant Mittal

Related Posts

Explore Distributed SQL and YugabyteDB in Depth

Discover the future of data management.
Learn at Yugabyte University
Get Started
Browse Yugabyte Docs
Explore docs
PostgreSQL For Cloud Native World
Read for Free