Combine Transactional Integrity and Data Lake Operations with YugabyteDB and Apache Hudi

Balachandar Seetharaman

Apache Hudi is an open storage format that can support data lakehouses, which combine data lakes and data warehouses into a single storage platform to support advanced analytics and data science workloads. Developed by Uber and launched in 2016,  Hudi was initially created to manage their cloud-based data lake, crucial for maintaining over 100PB of data vital to their core operations for long-term retention and large-scale analytics queries.

In 2019, Uber open-sourced Hudi by submitting it to the Apache Software Foundation. 

Integrating YugabyteDB with Apache Hudi further enhances these data lakehouse capabilities through real-time data processing, more efficient upserts and deletes, and improved consistency and scalability. Our database’s flexible data models and open-source ecosystem compatibility make it a robust solution for managing and processing large-scale, distributed data efficiently.

What is a Data Lakehouse?

A data lakehouse merges a data lake and a data warehouse into one unified platform. This single-system approach accelerates operations by providing direct access to data without having to toggle between systems. It also guarantees the latest and most comprehensive data for data science, machine learning, and business analytics projects.

Benefits of Apache Hudi Integration with YugabyteDB

YugabyteDB’s strong transactional capabilities and Apache Hudi’s ability to effectively manage large-scale data lakes form a highly effective combination. This synergy is especially beneficial when both transactional integrity and efficient data lake operations are needed. Together, YugabyteDB and Apache Hudi provide the following benefits:

  • Enhanced Data Processing: Apache Hudi’s real-time data processing combines with YugabyteDB’s high-performance SQL queries for faster insights from large datasets.
  • Scalable and Fault-Tolerant Architecture: Hudi leverages YugabyteDB’s distributed database design for efficient data handling, all while ensuring infinite scalability and reliability in data-intensive environments.
  • Strong Consistency and ACID Compliance: Hudi’s ACID transaction capabilities integrated with YugabyteDB’s consistency model ensures data integrity across distributed systems.
  • Optimized Data Pipelines: Hudi’s data lake management merges with YugabyteDB’s SQL interface to streamline data ingestion, storage, and processing operations.
  • Advanced Data Recovery and Replication: Hudi’s incremental processing and rollback features pairs with YugabyteDB’s robust replication to enhance data resilience and recovery options.
  • Accelerated Query Performance: Integrating Hudi’s indexing and record-level updates with YugabyteDB’s distributed query processing improves data access and modification speed.
  • Unified Data Governance: Aligns Hudi’s data management tools align with YugabyteDB’s governance features for better compliance and data oversight.

Integration Use Cases with Examples

Several compelling use cases exist for integrating Apache Hudi with YugabyteDB, especially in scenarios involving large-scale, distributed, and real-time data. Here are four key examples:

  1. Data Lakehouse Integration:
    • Scenario: Adopting a distributed database with a data lakehouse architecture combines transactional data with data lakehouse capabilities for comprehensive data management.
    • Use Case: Integrating Apache Hudi’s incremental data processing capabilities with Yugabyte’s distributed SQL database creates a powerful data lakehouse. With this architecture, organizations can efficiently manage both batch and streaming data, providing a unified platform for transactional workloads, analytics, reporting, and machine learning.
  2. Change Data Capture (CDC) for Real-time Analytics:
    • Scenario: Capturing and processing real-time data changes for analytics and reporting.
    • Use Case: Integrating YugabyteDB with Apache Hudi enhances change data capture (CDC), enabling real-time tracking and data change analysis. This is crucial to monitor user activities, detect anomalies, and instantly produce reports.
  3. Efficient ETL (Extract, Transform, Load) Pipelines:
    • Scenario: Implementing ETL pipelines to process and transform data before loading it into a data warehouse or analytics system.
    • Use Case: Integrating Apache Hudi’s upsert and delete capabilities with YugabyteDB enhances ETL pipelines by streamlining the processing of source data changes. This leads to quicker, more reliable data loading and reduced latency for downstream analytics.
  4. Data Archiving and Historical Data Management:
    • Scenario: Organizations with regulatory requirements need to retain historical data for compliance or analysis purposes.
    • Use Case: When combined with YugabyteDB’s scalability, Apache Hudi’s ability to manage point-in-time queries and historical data enables efficient data archiving and retrieval. This is especially useful in finance, healthcare, and legal sectors where preserving and accessing historical data is crucial.

Upserts are a default operation where an input record is first tagged as an insert or update by looking up the index. The records are ultimately written after heuristics are run to determine how best to pack them in storage to optimize for things like file sizing. Upserts are recommended for use cases like database change capture (CDC), where the input almost certainly contains updates. (Source: Writing to Hudi Datasets )

These four use cases demonstrate how Apache Hudi and the YugabyteDB database effectively address data management, analytics, and processing challenges. Their integration offers a flexible solution for organizations handling dynamic, large-scale data needs.

YFTT Open Lakehouse Evolution with Apache HudiJoin us for our next YFTT where we will discuss YugabyteDB and Apache Hudi integration and how it marks a breakthrough in data management, combining YugabyteDB’s robust, scalable transactions with Hudi’s analytical prowess.

Check it out!

Integration Architecture of YugabyteDB to Apache Hudi

Now let’s walk through two different approaches to loading data from the Yugabyte database to Apache Hudi tables.

Approach 1: HoodieDeltaStreamer with YugabyteDB Debezium CDC Connector

Applicable Use Case: Real-time CDC for analytics, incremental data loading/ETL with micro-batches

The diagram below (Figure 1) shows the integration architecture of YugabyteDB with Apache Hudi using YugabyteDB’s CDC Connector and the HoodieDeltaStreamer with Apache Spark.

Incremental Data Loading - Architecture
Figure 1 – Realtime YugabyteDB CDC Using  DeltaStreamer – End-to-End Architecture

The table below shows the data flow sequences with their operations and tasks performed.

Data flow seq#Operations/TasksComponent Involved
1Enable YugabyteDB CDC and create the Stream ID for the specific YSQL database (e.g. your db name)YugabyteDB
2 and 3Install and configure the Kafka server. NOTE: You can install and configure it locally .Kafka Server, ZooKeeper, and Debezium Kafka Connector for YugabyteDB
4Install and configure Apache Spark 3.2/3.3/3.4Apache Spark
5Create/configure Apache Hudi and run it with Spark Streaming JobApache Hudi

Note: Detailed instructions for setting up YugabyteDB and Apache Hudi using HoodieDeltaStreamer with Apache Spark.

Follow the eight steps below to install and test the integration.

Step 1: Install YugabyteDB

You have multiple options to install or deploy YugabyteDB. NOTE: If you’re running a Windows, you can use Docker on Windows with YugabyteDB.

Step 2: Install and set up Kafka, Schema Registry

Using the CDC-examples folder download the docker-compose.yaml file and run all the containers included. It will install the Confluent schema registry, Controlcenter, ZooKeeper, Kafka, YugabyteDB Debezium Kafka Connector, Grafana, and Prometheus containers and configure the ports as required.  In the example below, I am using Port 8091 for the Schema registry.

 schema-registry:
    image: confluentinc/cp-schema-registry:7.2.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8091:8091"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8091

Step 3: Setup and Configure CDC Stream ID in YugabyteDB

Create a database stream ID in YugabyteDB for a specific database (e.g. demo). Use the “enable before image” option (shown below) with implicit all. You will need to use this stream ID in step 4. Assign your current DB Node address to {IP} session variable e.g. export IP=10.23.16.6 and run this command from the YugabyteDB node bin directory or /home/yugabyte/tserver/bin directory of any DB node of your YugabyteDB cluster.

./yb-admin --master_addresses ${IP}:7100 create_change_data_stream ysql.demo implicit all

The generated data stream id is a4f8291c3737419dbe4feee5a1b19aee.

Step 4: Deploy the Kafka source connector

Run the curl command (see below) in the command prompt where your Kafka connector is running. In this example, my Kafka connect is running with 8083 port, and I assigned the {IP} variable as 10.23.16.6

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json"  	
localhost:8083/connectors/ 	-d '{
  "name": "cdc-demo",
  "config": {
    	"connector.class": 
"io.debezium.connector.yugabytedb.YugabyteDBConnector",
    	"database.hostname": "'$IP'",
    	"database.port": "5433",
    	"tasks.max": "3",
    	"database.master.addresses": "'$IP':7100",
    	"database.user": "yugabyte",
    	"database.password": "xxxxxxx",
    	"database.dbname": "demo",
    	"database.server.name": "dbs",
    	"table.include.list": "public.cdctest",
    	"database.streamid": "a4f8291c3737419dbe4feee5a1b19aee",
    	"transforms":"pgcompatible",
    "transforms.pgcompatible.type":"io.debezium.connector.yugabytedb.transforms.PGCompatible",
    	"key.converter.schemas.enable": "true",
    	"value.converter.schemas.enable": "true",
    	"tombstones.on.delete":"false",
    	"key.converter":"io.confluent.connect.avro.AvroConverter",
    	"key.converter.schema.registry.url":"http://schema-registry:8091",
    	"value.converter":"io.confluent.connect.avro.AvroConverter",
    	"value.converter.schema.registry.url":"http://schema-registry:8091"
 	}
}'

NOTE: 

  • For database.dbname”: “demo” — “demo” is the YugabyteDB database name.  For “database.user”: “yugabyte” —  “yugabyte” is the DB user.
  • For “table.include.list”: “public.cdctest”; the table we included for the CDC stream is “cdctest”.
  • We also used schema registry to preserve the schema in AVRO format and enabled “tombstones.on.delete”:”false” to handle DELETE operations.
  • We are using PGCompatible Transformer to comply with the PostgreSQL Payload format for Apache Hudi.

Step 5: Validate the Schema and Verify the Kafka Topics in Confluent Control Center

Launch the Confluent Control Center to access the topics and its schema details (e.g. http://{yourdockercontainer or VM}:9021/clusters/management/topics/dbs.public.cdctest/schema/value). Start to populate the data from YugabyteDB, and ensure you are able to see the messages in control center–>topics–>messages (e.g. http://{your docker container IP or VM}:9021/clusters/management/topics/cdc.public.cdctest/message-viewer).

Step 6: Install Apache Hudi

Let’s build Apache Hudi from source (in Github). Before running the Maven command, you will need to make a couple of changes:

  • YugabyteDB CDC does not emit an “xmin” field like Postgres Debezium Payload, so we need to comment “xmin” file line#49,#67 and remove the xmin as a parameter in #79 of /your Hudi release folder/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/DebeziumConstants.java file
  • Comment or remove this xmin parameter on this Java file /your Hudi release folder/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java

After making the changes above, run the Maven command mvn clean package -DskipTests as mentioned in the same GitHub link above.

Step 7: Install and Configure Apache Spark

Now install Scala, Apache Spark (3.4 or 3.3. or 3.2) and configure. Ensure you are able to run spark-submit command or spark-shell command after installing everything in your environment.

Step 8: Submit Spark Job using Hudi’s DeltaStreamer

Use Spark Submit to stream the changes from the YugabyteDB Table to the Apache Hudi Table using DeltaStreamer.

Create Spark-Config. Properties and store in the folder so you are able to refer back to it.

spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.hive.convertMetastoreParquet=false
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
--packages org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0
--master local[*]
"/your 
folder/hudi-release-0.14.0/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.14.0.jar"
--table-type MERGE_ON_READ
--source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource
--payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload
--target-base-path file:///tmp/hoodie/dbs-cdctest
--target-table dbs_cdctest
--source-ordering-field _event_origin_ts_ms
--continuous
--source-limit 4000000
--min-sync-interval-seconds 20
--hoodie-conf bootstrap.servers=localhost:9092
--hoodie-conf schema.registry.url=http://localhost:8091
--hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8091/subjects/dbs.public.cdctest-value/versions/latest
--hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
--hoodie-conf hoodie.deltastreamer.source.kafka.topic=dbs.public.cdctest
--hoodie-conf auto.offset.reset=earliest
--hoodie-conf hoodie.datasource.write.recordkey.field=sno
--hoodie-conf hoodie.datasource.write.partitionpath.field=name
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
--props /yourfolder/spark-config.properties

Note: Here, we used hudi-release 0.14.0 and referred to those specific JAR files above. The Hudi Table name is “dbs_cdctest”, and we have kept the hoodie file stored in the local directory i.e. file:///tmp/hoodie/dbs-cdctest. The recordkey file is “sno,” and the partition file is “name” (just as an example). You can change your table’s key field and partition columns if required.

Step 9: Query the HUDI table

Verify the HUDI table has been created in */tmp/hoodie/dbs-cdctest folder.

azureuser@bseetharaman-hudi-test:/tmp/hoodie/dbs-cdctest$ ls -atlr
total 24
drwxr-xr-x 53 azureuser azureuser 4096 Dec 16 15:44  ..
drwxr-xr-x  2 azureuser azureuser 4096 Dec 16 15:45  Bseetharaman
drwxr-xr-x  6 azureuser azureuser 4096 Dec 16 15:53  .
drwxr-xr-x  2 azureuser azureuser 4096 Dec 16 15:53 'Bala Seetharaman'
drwxr-xr-x  2 azureuser azureuser 4096 Dec 16 15:59  Demo1
drwxr-xr-x  7 azureuser azureuser 4096 Dec 16 15:59  .hoodie
azureuser@bseetharaman-hudi-test:/tmp/hoodie/dbs-cdctest$ cd .hoodie/
azureuser@bseetharaman-hudi-test:/tmp/hoodie/dbs-cdctest/.hoodie$ ls -altr
total 136
drwxr-xr-x 2 azureuser azureuser 4096 Dec 16 15:44 archived
drwxr-xr-x 2 azureuser azureuser 4096 Dec 16 15:44 .schema
drwxr-xr-x 3 azureuser azureuser 4096 Dec 16 15:44 .aux
-rw-r--r-- 1 azureuser azureuser	0 Dec 16 15:44 20231216154446818.deltacommit.requested
-rw-r--r-- 1 azureuser azureuser	8 Dec 16 15:44 .20231216154446818.deltacommit.requested.crc
drwxr-xr-x 4 azureuser azureuser 4096 Dec 16 15:44 metadata
-rw-r--r-- 1 azureuser azureuser 1014 Dec 16 15:44 hoodie.properties
-rw-r--r-- 1 azureuser azureuser   16 Dec 16 15:44 .hoodie.properties.crc
-rw-r--r-- 1 azureuser azureuser 1499 Dec 16 15:44 20231216154446818.deltacommit.inflight
-rw-r--r-- 1 azureuser azureuser   20 Dec 16 15:44 .20231216154446818.deltacommit.inflight.crc
-rw-r--r-- 1 azureuser azureuser 2858 Dec 16 15:44 20231216154446818.deltacommit
-rw-r--r-- 1 azureuser azureuser   32 Dec 16 15:44 .20231216154446818.deltacommit.crc
-rw-r--r-- 1 azureuser azureuser	0 Dec 16 15:45 20231216154526806.deltacommit.requested
-rw-r--r-- 1 azureuser azureuser	8 Dec 16 15:45 .20231216154526806.deltacommit.requested.crc
-rw-r--r-- 1 azureuser azureuser 1499 Dec 16 15:45 20231216154526806.deltacommit.inflight
-rw-r--r-- 1 azureuser azureuser   20 Dec 16 15:45 .20231216154526806.deltacommit.inflight.crc
-rw-r--r-- 1 azureuser azureuser 2882 Dec 16 15:45 20231216154526806.deltacommit
-rw-r--r-- 1 azureuser azureuser   32 Dec 16 15:45 .20231216154526806.deltacommit.crc
-rw-r--r-- 1 azureuser azureuser	0 Dec 16 15:53 20231216155306810.deltacommit.requested
-rw-r--r-- 1 azureuser azureuser	8 Dec 16 15:53 .20231216155306810.deltacommit.requested.crc
-rw-r--r-- 1 azureuser azureuser  814 Dec 16 15:53 20231216155306810.deltacommit.inflight
-rw-r--r-- 1 azureuser azureuser   16 Dec 16 15:53 .20231216155306810.deltacommit.inflight.crc
drwxr-xr-x 6 azureuser azureuser 4096 Dec 16 15:53 ..
-rw-r--r-- 1 azureuser azureuser 1957 Dec 16 15:53 20231216155306810.deltacommit
-rw-r--r-- 1 azureuser azureuser   24 Dec 16 15:53 .20231216155306810.deltacommit.crc
-rw-r--r-- 1 azureuser azureuser	0 Dec 16 15:53 20231216155326810.deltacommit.requested
-rw-r--r-- 1 azureuser azureuser	8 Dec 16 15:53 .20231216155326810.deltacommit.requested.crc
-rw-r--r-- 1 azureuser azureuser 1503 Dec 16 15:53 20231216155326810.deltacommit.inflight
-rw-r--r-- 1 azureuser azureuser   20 Dec 16 15:53 .20231216155326810.deltacommit.inflight.crc
-rw-r--r-- 1 azureuser azureuser 2898 Dec 16 15:53 20231216155326810.deltacommit
-rw-r--r-- 1 azureuser azureuser   32 Dec 16 15:53 .20231216155326810.deltacommit.crc
-rw-r--r-- 1 azureuser azureuser	0 Dec 16 15:59 20231216155926812.deltacommit.requested
-rw-r--r-- 1 azureuser azureuser	8 Dec 16 15:59 .20231216155926812.deltacommit.requested.crc
-rw-r--r-- 1 azureuser azureuser 1525 Dec 16 15:59 20231216155926812.deltacommit.inflight
-rw-r--r-- 1 azureuser azureuser   20 Dec 16 15:59 .20231216155926812.deltacommit.inflight.crc
drwxr-xr-x 7 azureuser azureuser 4096 Dec 16 15:59 .
-rw-r--r-- 1 azureuser azureuser 2232 Dec 16 15:59 20231216155926812.deltacommit
-rw-r--r-- 1 azureuser azureuser   28 Dec 16 15:59 .20231216155926812.deltacommit.crc
drwxr-xr-x 2 azureuser azureuser 4096 Dec 16 15:59 .temp
azureuser@bseetharaman-hudi-test:/tmp/hoodie/dbs-cdctest/.hoodie$

Queries to this table can be made using Spark. Follow the instructions referred to in the Apache Hudi documentation. The output is shown below in Spark Shell (Scala):

 
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
 
scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._
 
scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._
 
scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._
 
scala> import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.common.table.HoodieTableConfig._
 
scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._
 
scala> import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
 
scala> import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.model.HoodieRecord
 
scala> import spark.implicits._
import spark.implicits._
 
scala> val basePath = "file:///tmp/hoodie/dbs-cdctest"
basePath: String = file:///tmp/hoodie/dbs-cdctest
 
scala> val cdcDF = spark.read.format("hudi").load(basePath)
23/12/16 15:48:42 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
23/12/16 15:48:42 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
cdcDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 12 more fields]
scala> spark.sql("SELECT _hoodie_commit_time,sno,name,_change_operation_type from cdcdemo").show();
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
 
Here We inserted and updated the records
+-------------------+---+----------------+----------------------+
|_hoodie_commit_time|sno|        	name|_change_operation_type|
+-------------------+---+----------------+----------------------+
|  20231216155306810|822|Bala Seetharaman|                 	u|
|  20231216155326810|825|Bala Seetharaman|                 	u|
|  20231216154446818|823|	Bseetharaman|                 	c|
|  20231216154526806|824|	Bseetharaman|                 	c|
|  20231216154446818|822|       	Demo1|                 	c|
|  20231216154526806|825|       	Demo1|                 	c|
|  20231216155326810|826|       	Demo1|                 	c|
+-------------------+---+----------------+----------------------+
scala> spark.sql("SELECT _hoodie_commit_time,sno,name,_change_operation_type from cdcdemo").show();
+-------------------+---+----------------+----------------------+
|_hoodie_commit_time|sno|        	name|_change_operation_type|
+-------------------+---+----------------+----------------------+
|  20231216155306810|822|Bala Seetharaman|                 	u|
|  20231216155326810|825|Bala Seetharaman|                 	u|
|  20231216154446818|823|	Bseetharaman|                 	c|
|  20231216154526806|824|	Bseetharaman|                 	c|
|  20231216154446818|822|       	Demo1|                 	c|
|  20231216154526806|825|       	Demo1|                 	c|
|  20231216155326810|826|       	Demo1|                 	c|
+-------------------+---+----------------+----------------------+
After Deletion of records in YugabyteDB (sno:826, Hudi also removed that entry automatically)
scala> spark.sql("SELECT _hoodie_commit_time,sno,name,_change_operation_type from cdcdemo").show();
+-------------------+---+----------------+----------------------+
|_hoodie_commit_time|sno|        	name|_change_operation_type|
+-------------------+---+----------------+----------------------+
|  20231216155306810|822|Bala Seetharaman|                 	u|
|  20231216155326810|825|Bala Seetharaman|                 	u|
|  20231216154446818|823|	Bseetharaman|                 	c|
|  20231216154526806|824|	Bseetharaman|                 	c|
|  20231216154446818|822|       	Demo1|                 	c|
|  20231216154526806|825|       	Demo1|                 	c|
+-------------------+---+----------------+----------------------+

Although we used a MOR (Merge-On-Read) Hudi table for our example, there are a lot of merge updates from row-based log files to the corresponding columnar-based base file. To produce a new version of the base file, periodic file compaction is necessary. During the compaction process, updates from the log files are merged with the base file to form a new version of the base file. Since MOR is designed to be write-optimized, on new writes (after index tagging is complete), Hudi appends the records pertaining to each file group as log blocks in log files. No synchronous merge happens during a write, which results in a lower write amplification and better write latency.

Approach 2: HoodieStreamer – Incremental Data Pull from YugabyteDB to Apache Hudi

Applicable Use Case: Incremental data loading/ETL using HoodieStreamer (JDBC option)

The diagram below (Figure 2) shows the integration architecture of YugabyteDB to Apache Hudi using Apache Spark

Incremental Data Loading  - Architecture
Figure 2 – Incremental Data Loading – End-to-End Architecture

The table below shows the data flow sequences with their operations and tasks performed.

Data flow seq#Operations/TasksComponent Involved
1YugabyteDB  – Connect it using JDBC configurationYugabyteDB 
2Install and configure Apache Spark 3.2/3.3/3.4Apache Spark 
3Create/configure Apache HoodieStreamer and run with Spark Job.Apache Hudi

Note: Detailed instructions for setting up YugabyteDB and Hudi’s DeltaStreamer (with JDBC Connector option).

The five steps below will help you install and test the incremental loading of data from YugabyteDB to Apache Hudi using Apache HoodieStreamer.

Step 1: Install YugabyteD

You have multiple options to install or deploy YugabyteDB. NOTE: If you’re running Windows then you can use Docker on Windows with YugabyteDB.

Step 2: Install Apache Hudi

Build Apache Hudi from source (use this Github repository). Before running the Maven command, you need to make a couple of changes. Run the Maven command >mvn clean package -DskipTests as mentioned in the same link.

Step 3: Install and Configure Apache Spark

Install Scala, Apache Spark (3.4 or 3.3. or 3.2) and configure it. Ensure you are able to run the spark-submit command or spark-shell command after installing everything in your environment.

Step 4: Submit Spark Job using HoodieStreamer with Incremental Loading

Use Spark Submit to load the incremental changes from the YugabyteDB table with a specific column (e.g. updated_dt) to the Apache Hudi Table using the HoodieStreamer link.

Create Spark-Config.Properties and store in the folder so you are able to refer back to it.

spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.hive.convertMetastoreParquet=false

Create Hudi Table Properties(hudi_tbl.props):

hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
# hoodie.datasource.write.recordkey.field=id  Here id the primary key of hudi_test_table of YugabyteDB
hoodie.datasource.write.recordkey.field=id
# hoodie.datasource.write.partitionpath.field=id  PartitionPath field is created_at
hoodie.datasource.write.partitionpath.field=created_at
# hoodie.datasource.write.precombine.field=id  Precombine field is updated_at
hoodie.datasource.write.precombine.field=update_at
#hoodie.streamer.jdbc.url=jdbc:postgresql://10.12.22.168:5433/yugabyte -- Here yugabyteDB DB Node IP address is mentioned with DB Name (yugabyte)
hoodie.streamer.jdbc.url=jdbc:postgresql://10.12.22.168:5433/yugabyte
#hoodie.streamer.jdbc.user=yugabyte - DB User is yugabyte
hoodie.streamer.jdbc.user=yugabyte
#hoodie.streamer.jdbc.password=yugabyte -  Password as per your configuration
hoodie.streamer.jdbc.password=xxxxx
#hoodie.streamer.jdbc.driver.class - Here we used postgresql driver to connect YugabyteDB
hoodie.streamer.jdbc.driver.class=org.postgresql.Driver
#hoodie.streamer.jdbc.table.name - YugabyteDB table name is hudi_test_table
hoodie.streamer.jdbc.table.name=hudi_test_table
#hoodie.streamer.jdbc.incr.column.name - This is incremental column based on this value, data will be loaded into Hudi using this column Delta streamer compares Old checkpoint=(Option{val=2023-12-21 00:00:00}). New Checkpoint=(2023-12-21 00:00:00) and load the fresh data.
hoodie.streamer.jdbc.table.incr.column.name=update_at
# JDBC connection perform an incremental pull if it is TRUE
hoodie.streamer.jdbc.incr.pull=true
# hoodie.streamer.jdbc.incr.fallback.to.full.fetch to true means that it will attempt incremental fetch, and if there are errors, it will fallback to a full fetch.
hoodie.streamer.jdbc.incr.fallback.to.full.fetch=true

Submit the Spark-submit job:

spark-submit \
	--class org.apache.hudi.utilities.streamer.HoodieStreamer \
	--packages 
org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,org.postgresql:postgresql:42.5.4 \
	--properties-file spark-config.properties \
	--master 'local[*]' \
	--executor-memory 1g 
"/home/azureuser/hudi-release-0.14.0/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.14.0.jar" \
	--table-type COPY_ON_WRITE \
	--op UPSERT \
	--source-ordering-field update_at \
	--source-class org.apache.hudi.utilities.sources.JdbcSource \
	--target-base-path file:///tmp/balatest/hudidb/ \
	--target-table hudi_test_table \
	--props hudi_tbl.props

Note: In this example, we used hudi-release 0.14.0 and referred to those specific JAR files above. The Hudi table name is “hudi_test_table”, and we kept the hoodie file stored in the local directory (i.e. file:///tmp/hoodie/balatest/hudidb). 

Step 5: Query the HUDI table

Verify the HUDI table created in /tmp/hoodie/dbs-cdctest folder.

azureuser@bseetharaman-hudi-test:/tmp/balatest/hudidb$ ls -latrl
total 72
drwxr-xr-x  3 azureuser azureuser 4096 Dec 17 09:04 ..
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 09:04 1701799492525559
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 09:04 1701799492217245
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 09:04 1701799491758567
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 09:04 1701799493063026
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 09:04 1701799492802950
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 09:04 1701799491130855
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 09:04 1701799233160026
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 09:04 1701799228692355
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 09:04 1672531200000000
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 09:07 1702804051787098
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 09:10 1702804223869273
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 09:32 1702804336667807
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 09:50 1702805076153952
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 10:04 1702807392023269
drwxr-xr-x 18 azureuser azureuser 4096 Dec 17 10:20 .
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 10:20 1702808385858829
drwxr-xr-x  7 azureuser azureuser 4096 Dec 17 10:20 .hoodie
azureuser@bseetharaman-hudi-test:/tmp/balatest/hudidb$ cd .hoodie/
azureuser@bseetharaman-hudi-test:/tmp/balatest/hudidb/.hoodie$ ls -latr
total 228
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 09:04 .schema
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 09:04 archived
drwxr-xr-x  3 azureuser azureuser 4096 Dec 17 09:04 .aux
-rw-r--r--  1 azureuser azureuser	0 Dec 17 09:04 20231217090400688.commit.requested
-rw-r--r--  1 azureuser azureuser	8 Dec 17 09:04 .20231217090400688.commit.requested.crc
drwxr-xr-x  4 azureuser azureuser 4096 Dec 17 09:04 metadata
-rw-r--r--  1 azureuser azureuser  837 Dec 17 09:04 hoodie.properties
-rw-r--r--  1 azureuser azureuser   16 Dec 17 09:04 .hoodie.properties.crc
-rw-r--r--  1 azureuser azureuser 6414 Dec 17 09:04 20231217090400688.inflight
-rw-r--r--  1 azureuser azureuser   60 Dec 17 09:04 .20231217090400688.inflight.crc
-rw-r--r--  1 azureuser azureuser 9319 Dec 17 09:04 20231217090400688.commit
-rw-r--r--  1 azureuser azureuser   84 Dec 17 09:04 .20231217090400688.commit.crc
-rw-r--r--  1 azureuser azureuser	0 Dec 17 09:07 20231217090731749.commit.requested
-rw-r--r--  1 azureuser azureuser	8 Dec 17 09:07 .20231217090731749.commit.requested.crc
-rw-r--r--  1 azureuser azureuser  814 Dec 17 09:07 20231217090731749.inflight
-rw-r--r--  1 azureuser azureuser   16 Dec 17 09:07 .20231217090731749.inflight.crc
-rw-r--r--  1 azureuser azureuser 1746 Dec 17 09:07 20231217090731749.commit
-rw-r--r--  1 azureuser azureuser   24 Dec 17 09:07 .20231217090731749.commit.crc
-rw-r--r--  1 azureuser azureuser	0 Dec 17 09:10 20231217091023760.commit.requested
-rw-r--r--  1 azureuser azureuser	8 Dec 17 09:10 .20231217091023760.commit.requested.crc
-rw-r--r--  1 azureuser azureuser  814 Dec 17 09:10 20231217091023760.inflight
-rw-r--r--  1 azureuser azureuser   16 Dec 17 09:10 .20231217091023760.inflight.crc
-rw-r--r--  1 azureuser azureuser 1745 Dec 17 09:10 20231217091023760.commit
-rw-r--r--  1 azureuser azureuser   24 Dec 17 09:10 .20231217091023760.commit.crc
-rw-r--r--  1 azureuser azureuser	0 Dec 17 09:12 20231217091216543.commit.requested
-rw-r--r--  1 azureuser azureuser	8 Dec 17 09:12 .20231217091216543.commit.requested.crc
-rw-r--r--  1 azureuser azureuser  814 Dec 17 09:12 20231217091216543.inflight
-rw-r--r--  1 azureuser azureuser   16 Dec 17 09:12 .20231217091216543.inflight.crc
-rw-r--r--  1 azureuser azureuser 1745 Dec 17 09:12 20231217091216543.commit
-rw-r--r--  1 azureuser azureuser   24 Dec 17 09:12 .20231217091216543.commit.crc
-rw-r--r--  1 azureuser azureuser	0 Dec 17 09:24 20231217092436030.commit.requested
-rw-r--r--  1 azureuser azureuser	8 Dec 17 09:24 .20231217092436030.commit.requested.crc
-rw-r--r--  1 azureuser azureuser  814 Dec 17 09:24 20231217092436030.inflight
-rw-r--r--  1 azureuser azureuser   16 Dec 17 09:24 .20231217092436030.inflight.crc
-rw-r--r--  1 azureuser azureuser 1747 Dec 17 09:24 20231217092436030.commit
-rw-r--r--  1 azureuser azureuser   24 Dec 17 09:24 .20231217092436030.commit.crc
-rw-r--r--  1 azureuser azureuser	0 Dec 17 09:32 20231217093200392.commit.requested
-rw-r--r--  1 azureuser azureuser	8 Dec 17 09:32 .20231217093200392.commit.requested.crc
-rw-r--r--  1 azureuser azureuser 1536 Dec 17 09:32 20231217093200392.inflight
-rw-r--r--  1 azureuser azureuser   20 Dec 17 09:32 .20231217093200392.inflight.crc
-rw-r--r--  1 azureuser azureuser 1748 Dec 17 09:32 20231217093200392.commit
-rw-r--r--  1 azureuser azureuser   24 Dec 17 09:32 .20231217093200392.commit.crc
-rw-r--r--  1 azureuser azureuser	0 Dec 17 09:50 20231217095004344.commit.requested
-rw-r--r--  1 azureuser azureuser	8 Dec 17 09:50 .20231217095004344.commit.requested.crc
-rw-r--r--  1 azureuser azureuser 1536 Dec 17 09:50 20231217095004344.inflight
-rw-r--r--  1 azureuser azureuser   20 Dec 17 09:50 .20231217095004344.inflight.crc
-rw-r--r--  1 azureuser azureuser 1752 Dec 17 09:50 20231217095004344.commit
-rw-r--r--  1 azureuser azureuser   24 Dec 17 09:50 .20231217095004344.commit.crc
-rw-r--r--  1 azureuser azureuser	0 Dec 17 10:04 20231217100415790.commit.requested
-rw-r--r--  1 azureuser azureuser	8 Dec 17 10:04 .20231217100415790.commit.requested.crc
-rw-r--r--  1 azureuser azureuser  814 Dec 17 10:04 20231217100415790.inflight
-rw-r--r--  1 azureuser azureuser   16 Dec 17 10:04 .20231217100415790.inflight.crc
-rw-r--r--  1 azureuser azureuser 1735 Dec 17 10:04 20231217100415790.commit
-rw-r--r--  1 azureuser azureuser   24 Dec 17 10:04 .20231217100415790.commit.crc
-rw-r--r--  1 azureuser azureuser	0 Dec 17 10:20 20231217102027307.commit.requested
-rw-r--r--  1 azureuser azureuser	8 Dec 17 10:20 .20231217102027307.commit.requested.crc
-rw-r--r--  1 azureuser azureuser  814 Dec 17 10:20 20231217102027307.inflight
-rw-r--r--  1 azureuser azureuser   16 Dec 17 10:20 .20231217102027307.inflight.crc
drwxr-xr-x 18 azureuser azureuser 4096 Dec 17 10:20 ..
drwxr-xr-x  7 azureuser azureuser 4096 Dec 17 10:20 .
-rw-r--r--  1 azureuser azureuser 1735 Dec 17 10:20 20231217102027307.commit
-rw-r--r--  1 azureuser azureuser   24 Dec 17 10:20 .20231217102027307.commit.crc
drwxr-xr-x  2 azureuser azureuser 4096 Dec 17 10:20 .temp
azureuser@bseetharaman-hudi-test:/tmp/balatest/hudidb/.hoodie$

Queries of this table can be made using Spark. Follow the instructions referred to in the Apache Hudi documentation (using Spark).

The output is shown below in Spark Shell (Scala):

scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
 
scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._
 
scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._
 
scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._
 
scala> import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.common.table.HoodieTableConfig._
 
scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._
 
scala> import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
 
scala> import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.model.HoodieRecord
 
scala> import spark.implicits._
import spark.implicits._
 
scala> val basePath = "file:///tmp/balatest/hudidb/"
basePath: String = file:///tmp/balatest/hudidb/
 
scala> val ybdbDF = spark.read.format("hudi").load(basePath)
ybdbDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 9 more fields]
 
scala> ybdbDF.createOrReplaceTempView("hudi_test")
scala> spark.sql("SELECT id,name,qty,created_at,update_at FROM hudi_test").show();
+---+--------------------+---+----------+--------------------+
| id|            	name|qty|created_at|       	update_at|
+---+--------------------+---+----------+--------------------+
|101|Seetharaman Balac...|255|  	null|2023-12-17 09:07:...|
|201|Seetharaman Balac...|255|  	null|2023-12-17 09:10:...|
|207|Seetharaman Balac...|255|  	null| 2023-12-21 00:00:00|
|209|Seetharaman Balac...|255|  	null| 2023-12-22 00:00:00|
|203|   	Balachandar S|255|  	null| 2023-12-20 00:00:00|
|202|   	Balachandar S|255|  	null| 2023-12-19 00:00:00|
|  7|          	Bala S|250|  	null|2023-12-05 18:04:...|
|  4|          	Bala S|250|  	null|2023-12-05 18:04:...|
|  5|          	Bala S|250|  	null|2023-12-05 18:04:...|
|  8|          	Bala S|250|  	null|2023-12-05 18:04:...|
|  2|          	Bala S|250|  	null|2023-12-05 18:00:...|
|  3|          	Bala S|250|  	null|2023-12-05 18:04:...|
|  1|          	Bala S|250|  	null|2023-12-05 18:00:...|
|  6|          	Bala S|250|  	null|2023-12-05 18:04:...|
|  9|          	Bala S|250|  	null| 2023-10-01 00:00:00|
+---+--------------------+---+----------+--------------------+
 
 
scala>

Conclusion and Summary

Integrating YugabyteDB with Apache Hudi creates a robust, scalable, and efficient environment for data management and processing. This integration is particularly advantageous for applications that require real-time processing, transactional consistency, and need to handle large amounts of data in distributed environments. The synergy between Hudi’s incremental data processing and YugabyteDB’s distributed SQL capabilities offers a compelling solution for modern, data-intensive applications. As data grows in volume and complexity, such integrations are pivotal in driving efficiency and performance in data-driven enterprises.

Additional Resources on YugabyteDB CDC

YugabyteDB CDC Library

What is Change Data Capture?

Additional Resources on Integrations with Other Apache Products

How to Build Applications with YugabyteDB and Apache Spark

Explore and Visualize Open Source Data with YugabyteDB and Apache Superset

Integrate YugabyteDB with Real-Time OLAP DB (Apache Pinot) Using CDC 

Stream Data to Amazon S3 Using YugabyteDB CDC and Apache Iceberg

Balachandar Seetharaman

Related Posts

Explore Distributed SQL and YugabyteDB in Depth

Discover the future of data management.
Learn at Yugabyte University
Get Started
Browse Yugabyte Docs
Explore docs
PostgreSQL For Cloud Native World
Read for Free