Stream Binary Data with Protobufs and YugabyteDB Change Data Capture

Nishant Mittal

Protocol Buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data. One of the key features of protocol buffers (or protobufs) is that it can encode messages to binary format that are really small in size.

This blog post explains how the “bytea” column type in YugabyteDB can be leveraged to store the protobuf binary data. We will walk through a specific example to illustrate how to set up a YugabyteDB Change Data Capture (CDC) connector and run a separate microservice that will read the messages from Kafka and re-create the protobuf messages using that binary data.

Steps to run the example

To get started, clone our GitHub cdc-examples repository. It contains all the required configurations and scripts we need for this blog.

Step 1: Setup the CDC environment

Set up your CDC environment by deploying and configuring Kafka and the yugabyte-debezium-connector, etc. You can use the example docker-compose configuration.

Now, create a CDC stream, using the command below.

./yb-admin --master_addresses <master-addresses> create_change_data_stream ysql.<namespace>

Step 2: Create a table with the bytea column type

Create a table that will store the protobuf encoded binary data. Use the bytea column type for this. bytea is a generic type supported by Postgres to represent variable length binary data.

CREATE TABLE users(
    id bigserial PRIMARY KEY,
    data bytea
);

Step 3: Create a protobuf message definition

Create a .proto file that defines the protobuf message to be stored in the table we just created.

syntax = "proto3";

enum Source {
  UNKNOWN = 0;
  LINKEDIN = 1;
  TWITTER = 2;
}

message User {
  string name = 1;
  string email = 2;
  string address = 3;
  string birth_date = 4;
  Source source = 5;
}

Step 4: Deploy the source connectors

Deploy a source connector that will stream changes from the table to a Kafka topic. You can choose any Kafka serializer i.e. Avro, JSON, or protobuf.

For this example, we’ll use Avro as the Kafka serialization format.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
 "name": "ybconnector1",
 "config": {
   "tasks.max":"2",
   "connector.class": "io.debezium.connector.yugabytedb.YugabyteDBConnector",
   "database.hostname":"'$NODE'",
   "database.master.addresses":"'$MASTERS'",
   "database.port":"5433",
   "database.user": "yugabyte",
   "database.password":"Yugabyte@123",
   "database.dbname":"yugabyte",
   "database.server.name":"ybconnector1",
   "snapshot.mode":"initial",
   "database.streamid":"'$1'",
   "table.include.list":"public.users",
   "new.table.poll.interval.ms":"5000",
   "key.converter":"io.confluent.connect.avro.AvroConverter",
   "key.converter.schema.registry.url":"https://schema-registry:8081",
   "value.converter":"io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url":"https://schema-registry:8081"
 }
}'

NOTE: By default, the connector will send binary data as bytes. This behavior can be changed by specifying the binary.handling.mode property in the connector configuration. Three valid values can be used for this property:

  • bytes – Sends the binary data as bytes.
  • hex – Sends the binary data as a hexadecimal string.
  • base64 – Sends the binary data as a base64 encoded string.

Step 5: Run a microservice to consume messages from Kafka

Create a microservice that reads messages from Kafka and constructs the binary data bytes again to a protobuf message.

Here’s how it can be done in Java.

ByteBuffer userData = (ByteBuffer) ((Record) record.get("data")).get("value");
User user =  User.parseFrom(userData);

NOTE: If you’re using JSON as the serialization format, the binary data is automatically encoded to a base64 string. The encoded string can still be easily converted back to bytes.

String userData = ((JSONObject) record.get("data")).getString("value");
byte[] bytes = Base64.getDecoder().decode(userData);
User user =  User.parseFrom(bytes);

For illustration purposes, we have a consumer microservice that supports both Avro and JSON serialization formats. You can compile and directly run the microservice.

mvn package
java -jar target/protobuftest-1.0-SNAPSHOT-jar-with-dependencies.jar <avro/json>

Step 6: Perform operations on the created table

Now, you can test the setup by doing some CRUD operations on the table. We have a utility script so that you can test it out right away.

# Create a new user
python users.py --create

# Update an existing entry with new user data
python users.py --update

# Delete an user
python users.py --delete

You should be able to see the logs in the consumer microservice started in the previous step corresponding to the changes made in the table.

Conclusion

In this post, we walked through the process of storing data in the form of protobuf encoded binary and processing the data in other microservices using the YugabyteDB CDC connector.

Nishant Mittal

Related Posts

Explore Distributed SQL and YugabyteDB in Depth

Discover the future of data management.
Learn at Yugabyte University
Get Started
Browse Yugabyte Docs
Explore docs
PostgreSQL For Cloud Native World
Read for Free