Unleash Your Real-Time Data: Discover the YugabyteDB Source Connector for Confluent Cloud

Bridging YugabyteDB and Apache Kafka®
Anand Venkatesh

Real-time data is no longer a luxury but a necessity, and businesses require immediate insights and dynamic applications. To support these key demands, we’re thrilled to introduce the YugabyteDB Source Connector for Confluent Cloud.

This powerful connector is designed to seamlessly stream change data capture (CDC) events directly from your YugabyteDB database to the Confluent data streaming platform (built by the original co-creators of Kafka), unlocking a world of possibilities for real-time data integration.

At its core, the YugabyteDB Source Connector harnesses YugabyteDB’s robust logical replication capabilities. This allows it to capture row-level changes (every insert, update, and delete) and publish these changes as messages to Kafka topics.

The result? A consistent and reliable data stream that fuels a range of use cases, including:

  • Real-time Analytics: Gain immediate insights into your operational data.
  • Modern Data Warehousing: Keep your data warehouse continuously updated.
  • Event-Driven Applications: Build reactive systems that respond instantly to database modifications.

In this blog, we walk through the configuration required to get you up and running with a CDC pipeline fast.

Key Features

The YugabyteDB Source Connector is packed with features designed to make your data streaming journey efficient and reliable. Highlights include:

  1. Real-Time Data Streaming: Gain immediate access to critical business events as data changes are streamed instantly from YugabyteDB to Apache Kafka.
  2. Logical Replication Foundation: Built upon YugabyteDB’s robust logical replication, this guarantees high fidelity and consistency as your data transitions from the database to Kafka.
  3. Ideal for Event-Driven Architectures: Perfect for constructing modern, agile applications that dynamically react to real-time database modifications.
  4. Effortless Data Integration: Seamlessly integrate your YugabyteDB data with multiple other systems and applications that consume from Kafka topics.
  5. Guaranteed Reliable Data Transfer: Engineered for unwavering reliability, ensuring that data changes are captured and delivered to Kafka without any loss, backed by an at-least-once delivery guarantee.
  6. Full Confluent Cloud Compatibility: Enjoy effortless deployment and management within your existing Confluent ecosystem, thanks to full compatibility with Confluent Cloud.
  7. Simplified Operations: Dramatically reduce the operational overhead of data integration by streamlining the process of extracting and transforming YugabyteDB data.

Step-by-Step Configuration

Getting Started with the YugabyteDB Source Connector

Follow the steps below to set up and run your YugabyteDB Source Connector.

  1. Launch Confluent Cluster

    To begin, ensure you have a Confluent cluster running.

    • Refer to the Confluent Cloud Quick Start for detailed installation instructions if you’re using Confluent Cloud.
    • For this example, we’ll use a local machine with a Confluent Platform setup. Remember to note your machine’s IP address; you’ll need it for the YugabyteDB Aeon allowlist in the next step.
  2. Launch YugabyteDB Aeon Cluster

    Next, set up your YugabyteDB Aeon cluster.

    • Consult the YugabyteDB Aeon Quick Start for installation guidance.
    • While creating your cluster, make sure you establish a new IP allow-list and add the IP address you obtained from your Confluent setup.
  3. Create a Replication User

    Log in to the YugabyteDB Aeon cloud shell. The default adminrole has the necessary replication privileges already assigned.

  4. Create a Table

    Execute the following query in the YugabyteDB Aeon cloud shell to create it:

    CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT, email TEXT);
  5. Create a Publication and Replication Slot

    Using the same cloud shell, create a publication and a logical replication slot:

    CREATE PUBLICATION pub FOR TABLE public.users;
    
    SELECT * FROM pg_create_logical_replication_slot('slot_1', 'yboutput');
  6. Create a Topic in the Confluent Cluster

    In your Confluent cluster, use the “Create topic” button to create a topic named dbserver.public.users. Once created, your topic dashboard will display the new topic.

  7. Start Connect Standalone Locally

    Download the Confluent Platform package by following the steps in Confluent’s documentation.

  8. Add the Connector

    Locate the Kafka Connect YugabyteDB source connector under “connector plugins” and click “Configure” to proceed.

  9. Enter Connector Details

    On the configuration page, you’ll be prompted to download a properties file containing keys and secrets necessary for publishing records to Kafka on Confluent Cloud.

    Click “Generate Config” to download connect-standalone.properties. This file should be stored on the same machine where you installed Confluent Platform. It will resemble the following (with redacted sensitive information):

    bootstrap.servers=<redacted>
    
    key.converter=org.apache.kafka.connect.storage.StringConverter
    
    value.converter=io.confluent.connect.avro.AvroConverter
    
    ssl.endpoint.identification.algorithm=https
    
    security.protocol=SASL_SSL
    
    sasl.mechanism=PLAIN
    
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=<redacted> password=<redacted>;
    
    request.timeout.ms=20000
    
    retry.backoff.ms=500
    
    producer.bootstrap.servers=<redacted>
    
    producer.ssl.endpoint.identification.algorithm=https
    
    producer.security.protocol=SASL_SSL
    
    producer.sasl.mechanism=PLAIN
    
    producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=<redacted> password=<redacted>;
    
    producer.request.timeout.ms=20000
    
    producer.retry.backoff.ms=500
    
    consumer.bootstrap.servers=<redacted>
    
    consumer.ssl.endpoint.identification.algorithm=https
    
    consumer.security.protocol=SASL_SSL
    
    consumer.sasl.mechanism=PLAIN
    
    consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=<redacted> password=<redacted>;
    
    consumer.request.timeout.ms=20000
    
    consumer.retry.backoff.ms=500
    
    offset.flush.interval.ms=10000
    
    offset.storage.file.filename=/tmp/connect.offsets
    
    # Required connection configs for Confluent Cloud Schema Registry
    
    value.converter.basic.auth.credentials.source=USER_INFO
    
    value.converter.schema.registry.basic.auth.user.info=<redacted>
    
    value.converter.schema.registry.url=<redacted>
    
    database.history.kafka.bootstrap.servers=<redacted>
    
    database.history.consumer.security.protocol=SASL_SSL
    
    database.history.consumer.ssl.endpoint.identification.algorithm=https
    
    database.history.consumer.sasl.mechanism=PLAIN
    
    database.history.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=<redacted> password=<redacted>;
    
    database.history.producer.security.protocol=SASL_SSL
    
    database.history.producer.ssl.endpoint.identification.algorithm=https
    
    database.history.producer.sasl.mechanism=PLAIN
    
    database.history.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=<redacted> password=<redacted>;
    
    # Update the plugin.path in accordance to the path where the
    
    # YugabyteDB source connector has been installed
    
    plugin.path=/usr/share/java,/home/ec2-user/confluent-8.0.0/share/confluent-hub-components

    Next, create a source-connector.properties file with the following configuration for your connector deployment:

    name=yb-source-connector
    
    tasks.max=1
    
    connector.class=io.debezium.connector.postgresql.YugabyteDBConnector
    
    topic.prefix=dbserver
    
    database.hostname=hostNameFromYugabyteDBAeon
    
    database.port=5433
    
    database.user=admin
    
    database.password=passwordFromYugabyteDBAeon
    
    database.dbname=yugabyte
    
    snapshot.mode=never
    
    plugin.name=yboutput
    
    table.include.list=public.users
    
    slot.name=slot_1
    
    publication.name=pub
    
    database.sslmode=require
  10. Launch the Connector

    Use the following command to launch the connector locally from the location where Confluent is installed:

    /bin/connect-standalone <path-to-connect-standalone.properties> <path-to-source-connector.properties>
  11. Perform operations and check messages

    INSERT INTO users VALUES (generate_series(1,5), 'Demo Name', 'demo@example.com');

Next Steps

We encourage you to try out the new connector and experience it for yourself! This documentation provides more details on how to get started.

We are excited about the possibilities this new connector brings and look forward to seeing how our customers leverage it to streamline their data ingestion processes. You can find out more about YugabyteDB Source Connector for Confluent Cloud on our partner page.

Want to know more about building GenAI apps on a PostgreSQL-compatible database? Download our solution brief to discover basic AI concepts, architectural considerations, and hands-on tutorials that demonstrate how to build your first GenAI applications.

Anand Venkatesh

Explore Distributed SQL and YugabyteDB in Depth

Discover the future of data management.
Learn at Yugabyte University
Get Started
Browse Yugabyte Docs
Explore docs
PostgreSQL For Cloud Native World
Read for Free