top of page

CDC(Change Data Capture) with Kafka + Debezium (Ubuntu 22.04)

What is CDC (Change Data Capture)?


It allows INSERT, DELETE, UPDATE activities to be monitored and recorded directly by the database. The most important advantage of this feature is that it is more efficient than the algorithms we will create, as it uses the log infrastructure directly. In addition, it provides an ergonomic structure by only taking the columns where the change is made, not the entire row.


1- Preliminary Preparation Stage


Server IP and hostname information is added to the /etc/hosts file.

# ifconfig 
# hostname 
# nano /etc/hosts


We turn off the firewall.

# ufw disable
# ufw status

We are installing Java.

# sudo apt update
# sudo apt install openjdk-11-jdk
# java -version


2- Kafka Components Installation


We download our file from the Confluent website. This file contains all the components we will use.

# curl -O https://packages.confluent.io/archive/7.4/confluent-7.4.0.zip

We extract the file we downloaded from the zip, then we create a directory and transfer the kafka files to this directory.

# unzip unzip confluent-7.4.0.zip
# mkdir -p /apps/confluent-kafka
# cp -rf confluent-7.4.0/* /apps/confluent-kafka/

We create a Kafka user and add it to our data4tech group. Then we recursively define permissions for the directory where we transfer Kafka's files.

# useradd -m kafka
# usermod -aG data4tech kafka
# chown -R kafka:data4tech /apps/confluent-kafka


3- Configuration Settings of Kafka Components and Service Creation


a) Zookeeper

We start with the Zookeeper configurations.

$ nano /apps/confluent-kafka/etc/kafka/zookeeper.properties
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
audit.enable=true
initLimit=5
syncLimit=2
server.1=data4tech:2888:3888

Then we create a service for zookeper.

# nano /etc/systemd/system/zookeeper.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/zookeeper-server-start /apps/confluent-kafka/etc/kafka/zookeeper.properties
ExecStop=/apps/confluent-kafka/bin/zookeeper-server-stop
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

b) Kafka

We are configuring Kafka. Below I am writing only the parameters that need to be changed. You can leave the other parameters as default. You can change them by doing the necessary research for performance.

$ nano /apps/confluent-kafka/etc/kafka/server.properties

listeners=PLAINTEXT://data4tech:9092

We are creating a Kafka service.

# nano /etc/systemd/system/kafka.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/kafka-server-start /apps/confluent-kafka/etc/kafka/server.properties
ExecStop=/apps/confluent-kafka/bin/kafka-server-stop
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

c) Kafka Connect

We are configuring Kafka connect. Below I am writing only the parameters that need to be changed. You can leave the other parameters as default. You can change them by doing the necessary research for performance.


$ nano /apps/confluent-kafka/etc/kafka/connect-distributed.properties

listeners=HTTP://data4tech:8083
bootstrap.servers=data4tech:9092

We are creating the Kafka connect service.

# /etc/systemd/system/kafka-connect.service
[Unit]
Requires=kafka.service
After=kafka.service

[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/connect-distributed /apps/confluent-kafka/etc/kafka/connect-distributed.properties
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

d) Kafka Schema-registry

We are configuring Kafka schema-registry. Below I am writing only the parameters that need to be changed. You can leave the other parameters as default. You can change them by doing the necessary research for performance.

$ /apps/confluent-kafka/etc/schema-registry/schema-registry.properties

listeners=http://data4tech:8081
kafkastore.bootstrap.servers=PLAINTEXT://data4tech:9092

We create the Kafka schema-registry service.

# nano /etc/systemd/system/kafka-schema-registry.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/apps/confluent-kafka/bin/schema-registry-start /apps/confluent-kafka/etc/schema-registry/schema-registry.properties
ExecStop=/apps/confluent-kafka/bin/schema-registry-stop
Restart=on-abnormal

[Install]
WantedBy=multi-user.target


4- Debezium Kafka-Connect Connector Installations


- Click here to download Debezium PostgreSQL connector .


We extract the downloaded files from the zip.

# tar xvf debezium-connector-postgres-2.2.1.Final-plugin.tar.gz
# unzip confluentinc-kafka-connect-jdbc-10.7.2.zip

We create a directory where we will put the unzipped files. We send the unzipped files here and give them permissions.

# mkdir /apps/confluent-kafka/plugins
# mv confluentinc-kafka-connect-jdbc-10.7.2 /apps/confluent-kafka/plugins
# mv debezium-connector-postgres /apps/confluent-kafka/plugins
# chown -R kafka:data4tech /apps/confluent-kafka/plugins

Now we need to enter the necessary configuration settings for Kafka-Connect to see the plugins directory we created.

$ /apps/confluent-kafka/etc/kafka/connect-distributed.properties

plugin.path=/usr/share/java,/apps/confluent-kafka/plugins


5- PostgreSQL Installation


We install postgresql using the commands below.

# sudo apt update
# sudo apt install postgresql postgresql-contrib
# sudo systemctl start postgresql.service
# sudo systemctl enable postgresql.service
# sudo -i -u postgres

$ psql
ALTER SYSTEM SET wal_level = logical;

We make the necessary configuration settings as follows. After completing the settings, we restart our postgresql service.

$ /etc/postgresql/14/main/postgresql.conf

wal_level= logical
listen_addresses = 'data4tech'


$ /etc/postgresql/14/main/pg_hba.conf

# IPv6 local connections:
host    all             all             all                   md5


# systemctl restart postgresql.service

5- Kadeck Installation

Kadeck is an interface that we will use for the CDC operations we will perform. Now we start kadeck with docker.

docker run -d -p 80:80 -e xeotek_kadeck_free="data4tech" -e xeotek_kadeck_port=80 xeotek/kadeck:4.2.9

Then we log in to the interface via the browser.

http://data4tech:80

We enter our username and password as admin - admin by default.


After logging in, you can turn on dark mode as you wish using the lamp button at the bottom left.

Now, we will add the services we started on the Kafka side by clicking "Add Connection".


We continue by selecting Apache Kafka.


We enter the broker configurations as below and click the "Test Connection" button.


After clicking the "Test Connection" button, we see that we are connected successfully.


Now we make a "Test Connection" by entering our Schema-Registry configurations.


After clicking the "Test Connection" button, we see that we are connected successfully.


Now we click on the "Add Worker" button to enter our Kafka Connect configurations. We make a "Test Connection" .


After clicking the "Test Connection" button, we see that we are connected successfully.

After completing all our connections, we create our connection by clicking the "Create" button.


The connection has come to our home page, now we will connect and enter it by clicking "Connect" .


We come to the "Kafka Connect" tab on the left.


Here we will define the plugin files you added with the "Add Connection" button on the right.


The plugins we added are here. Now we will enter our settings by selecting "CDC Postgres" to create a source.


We enter the json I shared below and click the "Confirm" button.


{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "database.user": "postgres",
  "database.dbname": "postgres",
  "slot.name": "my_slot",
  "tasks.max": "1",
  "database.history.kafka.bootstrap.servers": "data4tech:9092",
  "database.history.kafka.topic": "schema-changes.bigdata",
  "database.server.name": "localhost",
  "database.port": "5432",
  "plugin.name": "pgoutput",
  "topic.prefix": "bigdata",
  "database.hostname": "data4tech",
  "database.password": "test123",
  "name": "debezium_source",
  "table.include.list": "public.source_tb",
  "database.whitelist": "postgres",
  "value.converter.schema.registry.url": "http://data4tech:8081",
  "key.converter.schema.registry.url": "http://data4tech:8081",
  "value.converter.schemas.enable": "true",
  "key.converter.schemas.enable": "true",
  "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.add.fields":"op",
  "transforms.unwrap.delete.handling.mode":"rewrite",
  "transforms.unwrap.drop.tombstones":"false"
}

Now, to create a sink, we will enter our settings by selecting JDBCSinkConnector ("JDBC") in Row 2.


We enter the json I shared below and click the "Confirm" button.

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "table.name.format": "public.sink_tb",
  "connection.password": "test123",
  "tasks.max": "1",
  "topics": "bigdata.public.source_tb",
  "key.converter.schemas.enable": "true",
  "delete.enabled": "true",
  "connection.user": "postgres",
  "name": "debezium_sink",
  "value.converter.schemas.enable": "true",
  "auto.create": "true",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "connection.url": "jdbc:postgresql://data4tech:5432/postgres",
  "insert.mode": "upsert",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "pk.mode": "record_key",
  "pk.fields": "id",
  "snapshot.mode": "initial",
  "key.converter.schema.registry.url": "http://data4tech:8081",
  "value.converter.schema.registry.url": "http://data4tech:8081"
}

You can see the connectors we created as follows.




6- CDC

We will create our tables by switching back to the postgreqsl side from the terminal. We are creating a source table.

CREATE TABLE public.source_tb (
	id serial PRIMARY KEY,
	name VARCHAR (50) ,
	surname VARCHAR (50) ,
	job VARCHAR (50)
);

Since we created our sink table with auto.create command on the JDBCSinkConnector side, we do not need to create it manually.



Let's start with the "INSERT" command.

INSERT INTO public.source_tb VALUES(1,'micheal','dan','doctor');
INSERT INTO public.source_tb VALUES(2,'john','oliver','engineer');
INSERT INTO public.source_tb VALUES(3,'sam','curtis','police');

As you can see, the insert commands we put into the source table are also written into our sink table.


Let's continue with the "UPDATE" command.


Finally, let's use the "DELETE" command.

By using all our commands, we saw the harmony in our source - sink tables.



We look at the Kafka messages in the changes we made in our topics in the "Data Browser" tab and see the topic we created. We can see these messages by entering its content.


In this article, I explained CDC operations with Kafka - Debezium.



See you in our future articles, stay healthy.

Comments

Rated 0 out of 5 stars.
No ratings yet

Add a rating

©2021, Data4Tech 

bottom of page