Introduction
In modern software applications, developers face challenges like real-time data synchronization and responsiveness, which both play an important role in delivering seamless user experiences and ensuring data consistency across distributed systems. Whether you are managing complex microservices architectures, powering live analytics dashboards, or meeting strict compliance requirements, tracking and responding to data changes efficiently is crucial.
This is where Change Data Capture (CDC) can be utilized – a design pattern that allows you to capture, process, and propagate database changes in real-time. In this post, the platform Debezium1 is explored and a step-by-step guide on how to use it locally is provided.
What is Change Data Capture (CDC)?
Before diving into Debezium, it’s important to first set some context for the topic of this post. Change Data Capture is a software design pattern used to track changes in a database. These include inserting, updating and deleting data events. In most cases, these events are forwarded to other systems, such as event streaming platforms (e.g., Kafka), data lakes, or analytics pipelines. This allows for incremental updates instead of bulk loading or batch processing. Typical use cases include:
- Data Synchronization/Replication/Sharing: CDC enables seamless synchronization of data across multiple databases, systems, or services, ensuring all entities have consistent, up-to-date information.
- Event-driven Architectures/Listening to Changes: By capturing data changes in real-time, CDC allows applications to react to specific events, triggering workflows or microservices whenever data is updated.
- Real-time Analytics: CDC powers real-time analytics by continuously streaming fresh data into analytics platforms, enabling businesses to gain insights and make decisions based on the most current information.
- Audit Logging/Security: CDC provides a reliable way to track and log every database change, helping organizations meet compliance requirements, monitor security, and maintain a detailed history for auditing purposes
What is Debezium?
From the official website:
Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong.
https://debezium.io/
This suggests that Debezium could be the right tool for CDC, making it worth a closer look!
How does Debezium work?
Initially, when Debezium is connected to a database, it will take a snapshot of all available schemas. Afterwards, the platform constantly monitors the database for changes. It does so by analyzing the write-ahead logs, binlogs, operation logs, or transaction logs, depending on the used connector. The connectors are built in Java for specific databases, listed as follows:
As Debezium is built on top of Apache Kafka2 and all the database source connectors are compatible with Kafka Connect3, a record for a Kafka topic is created and published for each change event. Debezium generates a change record to Kafka in three steps. In the first step, the configured transformations are applied, which are then serialized into a record by the Kafka Connect Converter. This record is then written to the Kafka topic. From there, the notification is available for every consumer.
With a foundational understanding in place, the focus now shifts to the hands-on part.
Step-by-step guide for a local setup of CDC with Debezium
In the following GitHub repository all files needed for the setup can be found. Ensure, that the project has been cloned before the walk-through is continued. Provided is a docker-compose file, which starts all neccessary containers needed for connecting Debezium to a MySQL4 database and publish records to Kafka topics5.
Step 1: Prerequisites
This step-by-step guide depends on Docker.6 The following was tested with Docker Desktop 4.34.3 and WSL in combination with Ubuntu 22.04.2 LTS. As a first step, clone the repository:
git clone https://github.com/ConSol/debezium-quick-start
Step 2: Starting containers (setup)
Inside a terminal, change the directory to the cloned repository and run docker-compose to create the containers.
docker compose up
Five containers will be created from this command and each of them will be explained in the following.
First of, the Kafka container from the latest apache/kafka
image which is the Kafka broker. The broker exposes ports 9092
and 9093
for listeners and controllers. Here is the relevant definition from the „docker-compose.yaml“ file:
kafka: image: apache/kafka:latest container_name: kafka ports: - "9092:9092" # Exposing the broker's listener port - "9093:9093" # Exposing the controller listener port environment: # Listeners for client and controller connections KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 # KRaft mode settings KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9093" KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER # Storage and replication settings KAFKA_LOG_DIRS: /tmp/kafka-logs KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 networks: - kafka_network
In addition, a container from the latest MySQL is instantiated. This container exposes the port 3306
which will be used later on to make changes on the database. Furthermore, a volume containing an SQL script to initialize the database with a table of customers and some rows is mounted. With the config a debezium_user is created which will be used by the Debezium connector. This user is granted all privileges that are required and recommended in the initialization script. They are mainly needed for reading from the binlog of the database. Here is how the privileges are granted:
GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD ON *.* TO 'debezium_user'@'%'; FLUSH PRIVILEGES;
Following is the mysql service:
mysql: image: mysql:latest container_name: mysql ports: - "3306:3306" environment: MYSQL_ROOT_PASSWORD: root_password MYSQL_DATABASE: database MYSQL_USER: debezium_user MYSQL_PASSWORD: debezium_password volumes: - ./init_db.sql:/docker-entrypoint-initdb.d/init.sql networks: - kafka_network
Note: Never set passwords in plain text in real scenarios!
Moreover, the kafka-connect container is created from a custom Dockerfile. There, the debezium/connect:3.0.0.Final
is used and the Debezium connector for the MySQL database is downloaded and installed as a plugin. This image packages Debezium with Kafka Connect to provide a platform for capturing and streaming change data (CDC) from databases into Kafka topics. The image also requires a log4.properties
file. In the following configuration for the kafka-connect service the plugin path and also the connection to the Kafka broker is set. In order to interact with this service, the port 8083
is exposed. In a later step, this will be used to create the Debezium connector. Here is the kafka-connect service:
kafka-connect: build: context: . dockerfile: ./kafka-connect/Dockerfile container_name: kafka-connect environment: BOOTSTRAP_SERVERS: 'kafka:9092' GROUP_ID: '1' CONFIG_STORAGE_TOPIC: 'docker-connect-configs' OFFSET_STORAGE_TOPIC: 'docker-connect-offsets' STATUS_STORAGE_TOPIC: 'docker-connect-status' CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect' CONNECT_PLUGIN_PATH: '/kafka/connect' CONNECT_REST_PORT: '8083' ports: - 8083:8083 depends_on: - kafka networks: - kafka_network volumes: - ./log4j.properties:/kafka/log4j.properties
Finally, one container which creates Kafka topics for the Debezium connector and one to watch the topic is started. Both depend on the latest apache/kafka
image. Beginning with the create-topics service:
create-topics: image: apache/kafka:latest depends_on: - kafka entrypoint: > /bin/sh -c " until nc -z kafka 9092; do echo 'Waiting for Kafka to be ready...'; sleep 5; done; /opt/kafka/bin/kafka-topics.sh --create --topic dbserver1.database.customer --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1 && /opt/kafka/bin/kafka-topics.sh --create --topic dbserver1 --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1; echo 'Topics created successfully.';" networks: - kafka_network
It is necessary for the topic creation to wait until the Kafka broker is ready. With the command in the entrypoint this is ensured. Also, it creates the topics dbserver1
and dbserver1.database.customer
by using scripts provided in the Kafka image. The naming of the topics have to match the Debezium connector’s configuration which will be shown later. Following, the kafka-watch service.
kafka-watch: image: apache/kafka:latest container_name: kafka-watch depends_on: - kafka networks: - kafka_network entrypoint: > /bin/sh -c " echo 'Waiting for topic dbserver1.database.customer to be available...'; while ! /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka:9092 | grep -q '^dbserver1\.database\.customer$'; do echo 'Topic dbserver1.database.customer not found yet. Waiting...'; sleep 5; done; echo 'Topic dbserver1.database.customer exists. Starting consumer...'; /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbserver1.database.customer --from-beginning --property print.key=true;"
Again, in the entrypoint a command is issued. This time, the creation of the before mentioned topics is waited for. Afterwards, a console consumer from the Kafka image is started which listens on the dbserver1.database.customer
topic, where all events for changes in the customer table will be published.
Step 3: Deploying the Debezium Connector
At this stage, all the required containers should be created and the kafka-connect service is ready to create a Debezium connector. With the following curls it can be ensured that everything works as expected.
curl -H "Accept:application/json" localhost:8083/
curl -H "Accept:application/json" localhost:8083/connectors/
Running these two commands in a new terminal should show a healthy response from the kafka-connect service and an empty list ([]) for the registered connectors.
As a next step the Debezium connector for the MySQL database will be deployed. For that a connector configuration is required.
{ "name": "debezium-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium_user", "database.password": "debezium_password", "database.server.id": "184054", "topic.prefix": "dbserver1", "database.include.list": "database", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.database" } }
Inside the configuration, the name for of the connector, the class, the database configuration, and which database to monitor is provided. In the last two lines, the Kafka broker and topic for schmea changes which will be used are set. The topic prefix „dbserver1“ matches the topics which were created earlier on.
NOTE: Again, do not use plain text for passwords in real scenarios! The Debezium documentation7 provides an example for production environments.
Running the following command creates the Debezium connector from the provided configuration:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "debezium-connector","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1","database.hostname": "mysql","database.port": "3306","database.user": "debezium_user","database.password": "debezium_password","database.server.id": "184054","topic.prefix": "dbserver1","database.include.list": "database","schema.history.internal.kafka.bootstrap.servers": "kafka:9092","schema.history.internal.kafka.topic": "schema-changes.database"}}'
Checking for the registered connectors again should show the one which was just created. ([„debezium-connector“])
curl -H "Accept:application/json" localhost:8083/connectors/
With the following curl the connector’s configuration can be seen.
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/debezium-connector
In the logs of the kafka-connect service the startup of the debezium connector can be seen. These include taking a snapshot of the database, visible asMySQL|dbserver1|snapshot
, and also the transition to reading from the binlog, visible asMySQL|dbserver1|binlog
. Inside the snapshot phase the already existing customers rows are detected and records for each are created. Finished exporting 4 records for table 'database.customer' (1 of 1 tables); total duration '00:00:00.015'
Step 4: View Read/Create/Update/Delete events
By entering the following command inside a new terminal a connection with the MySQL database is established from which SQL updates can be performed.
docker exec -it mysql mysql -u debezium_user -p
After entering the password for the user specified before, the mysql shell is available. After executing the following two commands the available customer table should be visible and everything is setup accordingly.
use database
show tables;
Finally, the usability of Debezium for CDC will be shown by looking into the kafka-watch service logs.
In a new terminal, the topics messages can be seen after entering the next command:
docker logs kafka-watch
This should already yields some output, because the database is not empty. Currently, there are 4 rows in the customer table. Therefore, it should also show 4 events on the topic. Each event contains two JSON objects, one for the key and one for the value:
KEY
{ "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" } ], "optional": false, "name": "dbserver1.database.customer.Key" }, "payload": { "id": 1 } }
VALUE
{ "schema": {}, "payload": { "before": null, "after": { "id": 1, "first_name": "John", "last_name": "Doe", "email": "john.doe@example.com" }, "source": { "version": "3.0.0.Final", "connector": "mysql", "name": "dbserver1", "ts_ms": 1730368845000, "snapshot": "first", "db": "database", "sequence": null, "ts_us": 1730368845000000, "ts_ns": 1730368845000000000, "table": "customer", "server_id": 0, "gtid": null, "file": "binlog.000002", "pos": 158, "row": 0, "thread": null, "query": null }, "transaction": null, "op": "r", "ts_ms": 1730368845569, "ts_us": 1730368845569376, "ts_ns": 1730368845569376528 } }
This is a reduced output of the value for better readability where only parts are kept which will be discussed briefly. In the JSON object, the schema’s structure and the actual payload of the change event can be seen. Both the schema and the payload are the main information of an event. In the creation case, there is no „before“ state, that is why it is set to null. „Source“ shows metadata for where the event was created from (such as from the table or from the log bin file). The „op“ field is set to „r“, which means it comes from the read operation of the snapshot. If one was to insert a new customer, the operations should be „c“. This can be tried out by going back to the MySQL shell and entering the following to insert a new customer:
INSERT into customer (first_name, last_name, email) VALUES ("you", "pick", "any.com");
{ "schema": {}, "payload": { "before": null, "after": { "id": 5, "first_name": "you", "last_name": "pick", "email": "any.com" }, "source": { "version": "3.0.0.Final", "connector": "mysql", "name": "dbserver1", "ts_ms": 1730375092000, "snapshot": "false", "db": "database", "sequence": null, "ts_us": 1730375092000000, "ts_ns": 1730375092000000000, "table": "customer", "server_id": 1, "gtid": null, "file": "binlog.000002", "pos": 388, "row": 0, "thread": 13, "query": null }, "transaction": null, "op": "c", "ts_ms": 1730375092586, "ts_us": 1730375092586585, "ts_ns": 1730375092586585589 } }
A different message will be created when a row is update. This can be achieved by setting a new first name for one of the customers.
UPDATE customer SET first_name='me' WHERE first_name='you';
{ "schema": {}, "payload": { "before": { "id": 5, "first_name": "you", "last_name": "pick", "email": "any.com" }, "after": { "id": 5, "first_name": "me", "last_name": "pick", "email": "any.com" }, "source": { "version": "3.0.0.Final", "connector": "mysql", "name": "dbserver1", "ts_ms": 1730375401000, "snapshot": "false", "db": "database", "sequence": null, "ts_us": 1730375401000000, "ts_ns": 1730375401000000000, "table": "customer", "server_id": 1, "gtid": null, "file": "binlog.000002", "pos": 716, "row": 0, "thread": 13, "query": null }, "transaction": null, "op": "u", "ts_ms": 1730375401944, "ts_us": 1730375401944871, "ts_ns": 1730375401944871811 } }
As visible in the message, the operation field now is set to „u“. Additionally, there are the „before“ and „after“ fields set in the payload. When comparing both, it can be seen that the „first_name“ was changed from „you“ to „me“. Again, the schema is omitted from the output for better readability.
As a last step, the row which was just created will be removed again to prove that Debezium also catches deletions.
DELETE FROM customer WHERE first_name = 'me';
{ "schema": {}, "payload": { "before": { "id": 5, "first_name": "me", "last_name": "pick", "email": "any.com" }, "after": null, "source": { "version": "3.0.0.Final", "connector": "mysql", "name": "dbserver1", "ts_ms": 1730375603000, "snapshot": "false", "db": "database", "sequence": null, "ts_us": 1730375603000000, "ts_ns": 1730375603000000000, "table": "customer", "server_id": 1, "gtid": null, "file": "binlog.000002", "pos": 1058, "row": 0, "thread": 13, "query": null }, "transaction": null, "op": "d", "ts_ms": 1730375603567, "ts_us": 1730375603567066, "ts_ns": 1730375603567066446 } }
This time, the operation is set to „d“ for deletion, and the „after“ field is set to „null“, showing that the row has been removed from the database.
Summary and Conclusion
As presented in the step-by-step guide, Debezium offers a solution for CDC. After setting everything up, the logs showed that a snapshot of the initial database was taken, and for each row of the customer table a record was created and published to the Kafka topic. Additionally, the logs showed how the MySQL connector reacted to changes in the database by creating, updating, and deleting rows.
Debezium has a lot more to offer: It is also possible to embed Debezium into your own Java application instead of relying on Kafka. Furthermore, this blog post did not cover the use of Debezium in a real-world scenario – it only provided a quick setup for local use.