CDC(Change Data Capture) with Kafka + Debezium (Ubuntu 22.04)
- Murat Can ÇOBAN
- 6 days ago
- 5 min read
What is CDC (Change Data Capture)?
It allows INSERT, DELETE, UPDATE activities to be monitored and recorded directly by the database. The most important advantage of this feature is that it is more efficient than the algorithms we will create, as it uses the log infrastructure directly. In addition, it provides an ergonomic structure by only taking the columns where the change is made, not the entire row.

1- Preliminary Preparation Stage
Server IP and hostname information is added to the /etc/hosts file.
# ifconfig
# hostname
# nano /etc/hosts

We turn off the firewall.
# ufw disable
# ufw status

We are installing Java.
# sudo apt update
# sudo apt install openjdk-11-jdk
# java -version

2- Kafka Components Installation
We download our file from the Confluent website. This file contains all the components we will use.
# curl -O https://packages.confluent.io/archive/7.4/confluent-7.4.0.zip
We extract the file we downloaded from the zip, then we create a directory and transfer the kafka files to this directory.
# unzip unzip confluent-7.4.0.zip
# mkdir -p /apps/confluent-kafka
# cp -rf confluent-7.4.0/* /apps/confluent-kafka/
We create a Kafka user and add it to our data4tech group. Then we recursively define permissions for the directory where we transfer Kafka's files.
# useradd -m kafka
# usermod -aG data4tech kafka
# chown -R kafka:data4tech /apps/confluent-kafka
3- Configuration Settings of Kafka Components and Service Creation
a) Zookeeper
We start with the Zookeeper configurations.
$ nano /apps/confluent-kafka/etc/kafka/zookeeper.properties
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
audit.enable=true
initLimit=5
syncLimit=2
server.1=data4tech:2888:3888
Then we create a service for zookeper.
# nano /etc/systemd/system/zookeeper.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/zookeeper-server-start /apps/confluent-kafka/etc/kafka/zookeeper.properties
ExecStop=/apps/confluent-kafka/bin/zookeeper-server-stop
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
b) Kafka
We are configuring Kafka. Below I am writing only the parameters that need to be changed. You can leave the other parameters as default. You can change them by doing the necessary research for performance.
$ nano /apps/confluent-kafka/etc/kafka/server.properties
listeners=PLAINTEXT://data4tech:9092
We are creating a Kafka service.
# nano /etc/systemd/system/kafka.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/kafka-server-start /apps/confluent-kafka/etc/kafka/server.properties
ExecStop=/apps/confluent-kafka/bin/kafka-server-stop
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
c) Kafka Connect
We are configuring Kafka connect. Below I am writing only the parameters that need to be changed. You can leave the other parameters as default. You can change them by doing the necessary research for performance.
$ nano /apps/confluent-kafka/etc/kafka/connect-distributed.properties
listeners=HTTP://data4tech:8083
bootstrap.servers=data4tech:9092
We are creating the Kafka connect service.
# /etc/systemd/system/kafka-connect.service
[Unit]
Requires=kafka.service
After=kafka.service
[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/connect-distributed /apps/confluent-kafka/etc/kafka/connect-distributed.properties
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
d) Kafka Schema-registry
We are configuring Kafka schema-registry. Below I am writing only the parameters that need to be changed. You can leave the other parameters as default. You can change them by doing the necessary research for performance.
$ /apps/confluent-kafka/etc/schema-registry/schema-registry.properties
listeners=http://data4tech:8081
kafkastore.bootstrap.servers=PLAINTEXT://data4tech:9092
We create the Kafka schema-registry service.
# nano /etc/systemd/system/kafka-schema-registry.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/schema-registry-start /apps/confluent-kafka/etc/schema-registry/schema-registry.properties
ExecStop=/apps/confluent-kafka/bin/schema-registry-stop
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
4- Debezium Kafka-Connect Connector Installations
- Click here to download Debezium PostgreSQL connector .
We extract the downloaded files from the zip.
# tar xvf debezium-connector-postgres-2.2.1.Final-plugin.tar.gz
# unzip confluentinc-kafka-connect-jdbc-10.7.2.zip
We create a directory where we will put the unzipped files. We send the unzipped files here and give them permissions.
# mkdir /apps/confluent-kafka/plugins
# mv confluentinc-kafka-connect-jdbc-10.7.2 /apps/confluent-kafka/plugins
# mv debezium-connector-postgres /apps/confluent-kafka/plugins
# chown -R kafka:data4tech /apps/confluent-kafka/plugins
Now we need to enter the necessary configuration settings for Kafka-Connect to see the plugins directory we created.
$ /apps/confluent-kafka/etc/kafka/connect-distributed.properties
plugin.path=/usr/share/java,/apps/confluent-kafka/plugins
5- PostgreSQL Installation
We install postgresql using the commands below.
# sudo apt update
# sudo apt install postgresql postgresql-contrib
# sudo systemctl start postgresql.service
# sudo systemctl enable postgresql.service
# sudo -i -u postgres
$ psql
ALTER SYSTEM SET wal_level = logical;
We make the necessary configuration settings as follows. After completing the settings, we restart our postgresql service.
$ /etc/postgresql/14/main/postgresql.conf
wal_level= logical
listen_addresses = 'data4tech'
$ /etc/postgresql/14/main/pg_hba.conf
# IPv6 local connections:
host all all all md5
# systemctl restart postgresql.service
5- Kadeck Installation
Kadeck is an interface that we will use for the CDC operations we will perform. Now we start kadeck with docker.
docker run -d -p 80:80 -e xeotek_kadeck_free="data4tech" -e xeotek_kadeck_port=80 xeotek/kadeck:4.2.9
Then we log in to the interface via the browser.
http://data4tech:80
We enter our username and password as admin - admin by default.

After logging in, you can turn on dark mode as you wish using the lamp button at the bottom left.
Now, we will add the services we started on the Kafka side by clicking "Add Connection".

We continue by selecting Apache Kafka.
We enter the broker configurations as below and click the "Test Connection" button.

After clicking the "Test Connection" button, we see that we are connected successfully.

Now we make a "Test Connection" by entering our Schema-Registry configurations.

After clicking the "Test Connection" button, we see that we are connected successfully.

Now we click on the "Add Worker" button to enter our Kafka Connect configurations. We make a "Test Connection" .

After clicking the "Test Connection" button, we see that we are connected successfully.

After completing all our connections, we create our connection by clicking the "Create" button.
The connection has come to our home page, now we will connect and enter it by clicking "Connect" .

We come to the "Kafka Connect" tab on the left.

Here we will define the plugin files you added with the "Add Connection" button on the right.
The plugins we added are here. Now we will enter our settings by selecting "CDC Postgres" to create a source.

We enter the json I shared below and click the "Confirm" button.
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.user": "postgres",
"database.dbname": "postgres",
"slot.name": "my_slot",
"tasks.max": "1",
"database.history.kafka.bootstrap.servers": "data4tech:9092",
"database.history.kafka.topic": "schema-changes.bigdata",
"database.server.name": "localhost",
"database.port": "5432",
"plugin.name": "pgoutput",
"topic.prefix": "bigdata",
"database.hostname": "data4tech",
"database.password": "test123",
"name": "debezium_source",
"table.include.list": "public.source_tb",
"database.whitelist": "postgres",
"value.converter.schema.registry.url": "http://data4tech:8081",
"key.converter.schema.registry.url": "http://data4tech:8081",
"value.converter.schemas.enable": "true",
"key.converter.schemas.enable": "true",
"transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields":"op",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.unwrap.drop.tombstones":"false"
}

Now, to create a sink, we will enter our settings by selecting JDBCSinkConnector ("JDBC") in Row 2.
We enter the json I shared below and click the "Confirm" button.
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"table.name.format": "public.sink_tb",
"connection.password": "test123",
"tasks.max": "1",
"topics": "bigdata.public.source_tb",
"key.converter.schemas.enable": "true",
"delete.enabled": "true",
"connection.user": "postgres",
"name": "debezium_sink",
"value.converter.schemas.enable": "true",
"auto.create": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connection.url": "jdbc:postgresql://data4tech:5432/postgres",
"insert.mode": "upsert",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"pk.mode": "record_key",
"pk.fields": "id",
"snapshot.mode": "initial",
"key.converter.schema.registry.url": "http://data4tech:8081",
"value.converter.schema.registry.url": "http://data4tech:8081"
}

You can see the connectors we created as follows.

6- CDC
We will create our tables by switching back to the postgreqsl side from the terminal. We are creating a source table.
CREATE TABLE public.source_tb (
id serial PRIMARY KEY,
name VARCHAR (50) ,
surname VARCHAR (50) ,
job VARCHAR (50)
);

Since we created our sink table with auto.create command on the JDBCSinkConnector side, we do not need to create it manually.
Let's start with the "INSERT" command.
INSERT INTO public.source_tb VALUES(1,'micheal','dan','doctor');
INSERT INTO public.source_tb VALUES(2,'john','oliver','engineer');
INSERT INTO public.source_tb VALUES(3,'sam','curtis','police');

As you can see, the insert commands we put into the source table are also written into our sink table.
Let's continue with the "UPDATE" command.

Finally, let's use the "DELETE" command.

By using all our commands, we saw the harmony in our source - sink tables.
We look at the Kafka messages in the changes we made in our topics in the "Data Browser" tab and see the topic we created. We can see these messages by entering its content.

In this article, I explained CDC operations with Kafka - Debezium.
See you in our future articles, stay healthy.
Comments