Debezium

Debezium: A Quick-start on Change Data Capture (CDC) in 2024

In modern software applications, developers face challenges like real-time data synchronization and responsiveness, which both play an important role in delivering seamless user experiences and ensuring data consistency across distributed systems. Whether you are managing complex microservices architectures, powering live analytics dashboards, or meeting strict compliance requirements, tracking and responding to data changes efficiently is crucial.

This is where Change Data Capture (CDC) can be utilized – a design pattern that allows you to capture, process, and propagate database changes in real-time. In this post, the platform Debezium1 is explored and a step-by-step guide on how to use it locally is provided.

Before diving into Debezium, it’s important to first set some context for the topic of this post. Change Data Capture is a software design pattern used to track changes in a database. These include inserting, updating and deleting data events. In most cases, these events are forwarded to other systems, such as event streaming platforms (e.g., Kafka), data lakes, or analytics pipelines. This allows for incremental updates instead of bulk loading or batch processing. Typical use cases include:

  • Data Synchronization/Replication/Sharing: CDC enables seamless synchronization of data across multiple databases, systems, or services, ensuring all entities have consistent, up-to-date information.
  • Event-driven Architectures/Listening to Changes: By capturing data changes in real-time, CDC allows applications to react to specific events, triggering workflows or microservices whenever data is updated.
  • Real-time Analytics: CDC powers real-time analytics by continuously streaming fresh data into analytics platforms, enabling businesses to gain insights and make decisions based on the most current information.
  • Audit Logging/Security: CDC provides a reliable way to track and log every database change, helping organizations meet compliance requirements, monitor security, and maintain a detailed history for auditing purposes

From the official website:

Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong.

https://debezium.io/

This suggests that Debezium could be the right tool for CDC, making it worth a closer look!

Initially, when Debezium is connected to a database, it will take a snapshot of all available schemas. Afterwards, the platform constantly monitors the database for changes. It does so by analyzing the write-ahead logs, binlogs, operation logs, or transaction logs, depending on the used connector. The connectors are built in Java for specific databases, listed as follows:

As Debezium is built on top of Apache Kafka2 and all the database source connectors are compatible with Kafka Connect3, a record for a Kafka topic is created and published for each change event. Debezium generates a change record to Kafka in three steps. In the first step, the configured transformations are applied, which are then serialized into a record by the Kafka Connect Converter. This record is then written to the Kafka topic. From there, the notification is available for every consumer.

https://debezium.io
https://kafka.apache.org/

With a foundational understanding in place, the focus now shifts to the hands-on part.

In the following GitHub repository all files needed for the setup can be found. Ensure, that the project has been cloned before the walk-through is continued. Provided is a docker-compose file, which starts all neccessary containers needed for connecting Debezium to a MySQL4 database and publish records to Kafka topics5.

Step 1: Prerequisites

This step-by-step guide depends on Docker.6 The following was tested with Docker Desktop 4.34.3 and WSL in combination with Ubuntu 22.04.2 LTS. As a first step, clone the repository:

git clone https://github.com/ConSol/debezium-quick-start

Step 2: Starting containers (setup)

Inside a terminal, change the directory to the cloned repository and run docker-compose to create the containers.

docker compose up

Five containers will be created from this command and each of them will be explained in the following.
First of, the Kafka container from the latest apache/kafka image which is the Kafka broker. The broker exposes ports 9092 and 9093for listeners and controllers. Here is the relevant definition from the „docker-compose.yaml“ file:

  kafka:
    image: apache/kafka:latest
    container_name: kafka
    ports:
      - "9092:9092"  # Exposing the broker's listener port
      - "9093:9093"  # Exposing the controller listener port
    environment:
      # Listeners for client and controller connections
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092

      # KRaft mode settings
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9093"
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER

      # Storage and replication settings
      KAFKA_LOG_DIRS: /tmp/kafka-logs
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    networks:
      - kafka_network

In addition, a container from the latest MySQL is instantiated. This container exposes the port 3306 which will be used later on to make changes on the database. Furthermore, a volume containing an SQL script to initialize the database with a table of customers and some rows is mounted. With the config a debezium_user is created which will be used by the Debezium connector. This user is granted all privileges that are required and recommended in the initialization script. They are mainly needed for reading from the binlog of the database. Here is how the privileges are granted:

GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD ON *.* TO 'debezium_user'@'%';
FLUSH PRIVILEGES;

Following is the mysql service:

  mysql:
    image: mysql:latest
    container_name: mysql
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: root_password
      MYSQL_DATABASE: database
      MYSQL_USER: debezium_user
      MYSQL_PASSWORD: debezium_password
    volumes:
      - ./init_db.sql:/docker-entrypoint-initdb.d/init.sql
    networks:
      - kafka_network

Note: Never set passwords in plain text in real scenarios!

Moreover, the kafka-connect container is created from a custom Dockerfile. There, the debezium/connect:3.0.0.Final is used and the Debezium connector for the MySQL database is downloaded and installed as a plugin. This image packages Debezium with Kafka Connect to provide a platform for capturing and streaming change data (CDC) from databases into Kafka topics. The image also requires a log4.properties file. In the following configuration for the kafka-connect service the plugin path and also the connection to the Kafka broker is set. In order to interact with this service, the port 8083 is exposed. In a later step, this will be used to create the Debezium connector. Here is the kafka-connect service:

  kafka-connect:
    build:
      context: .
      dockerfile: ./kafka-connect/Dockerfile
    container_name: kafka-connect
    environment:
      BOOTSTRAP_SERVERS: 'kafka:9092'
      GROUP_ID: '1'
      CONFIG_STORAGE_TOPIC: 'docker-connect-configs'
      OFFSET_STORAGE_TOPIC: 'docker-connect-offsets'
      STATUS_STORAGE_TOPIC: 'docker-connect-status'
      CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect'
      CONNECT_PLUGIN_PATH: '/kafka/connect'
      CONNECT_REST_PORT: '8083'
    ports:
      - 8083:8083
    depends_on:
      - kafka
    networks:
      - kafka_network
    volumes:
      - ./log4j.properties:/kafka/log4j.properties

Finally, one container which creates Kafka topics for the Debezium connector and one to watch the topic is started. Both depend on the latest apache/kafka image. Beginning with the create-topics service:

  create-topics:
    image: apache/kafka:latest
    depends_on:
      - kafka
    entrypoint: >
      /bin/sh -c "
      until nc -z kafka 9092;
      do
        echo 'Waiting for Kafka to be ready...';
        sleep 5;
      done;
      /opt/kafka/bin/kafka-topics.sh --create --topic dbserver1.database.customer --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1 &&
      /opt/kafka/bin/kafka-topics.sh --create --topic dbserver1 --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1;
      echo 'Topics created successfully.';"
    networks:
      - kafka_network

It is necessary for the topic creation to wait until the Kafka broker is ready. With the command in the entrypoint this is ensured. Also, it creates the topics dbserver1 and dbserver1.database.customer by using scripts provided in the Kafka image. The naming of the topics have to match the Debezium connector’s configuration which will be shown later. Following, the kafka-watch service.

  kafka-watch:
    image: apache/kafka:latest
    container_name: kafka-watch
    depends_on:
      - kafka
    networks:
      - kafka_network
    entrypoint: >
      /bin/sh -c "
      echo 'Waiting for topic dbserver1.database.customer to be available...';
      while ! /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka:9092 | grep -q '^dbserver1\.database\.customer$'; do
        echo 'Topic dbserver1.database.customer not found yet. Waiting...';
        sleep 5;
      done;
      echo 'Topic dbserver1.database.customer exists. Starting consumer...';
      /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbserver1.database.customer --from-beginning --property print.key=true;"

Again, in the entrypoint a command is issued. This time, the creation of the before mentioned topics is waited for. Afterwards, a console consumer from the Kafka image is started which listens on the dbserver1.database.customer topic, where all events for changes in the customer table will be published.

Step 3: Deploying the Debezium Connector

At this stage, all the required containers should be created and the kafka-connect service is ready to create a Debezium connector. With the following curls it can be ensured that everything works as expected.

curl -H "Accept:application/json" localhost:8083/
curl -H "Accept:application/json" localhost:8083/connectors/

Running these two commands in a new terminal should show a healthy response from the kafka-connect service and an empty list ([]) for the registered connectors.

As a next step the Debezium connector for the MySQL database will be deployed. For that a connector configuration is required.

{
  "name": "debezium-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium_user",
    "database.password": "debezium_password",
    "database.server.id": "184054",
    "topic.prefix": "dbserver1",
    "database.include.list": "database",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.database"
  }
}

Inside the configuration, the name for of the connector, the class, the database configuration, and which database to monitor is provided. In the last two lines, the Kafka broker and topic for schmea changes which will be used are set. The topic prefix „dbserver1“ matches the topics which were created earlier on.

NOTE: Again, do not use plain text for passwords in real scenarios! The Debezium documentation7 provides an example for production environments.

Running the following command creates the Debezium connector from the provided configuration:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "debezium-connector","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1","database.hostname": "mysql","database.port": "3306","database.user": "debezium_user","database.password": "debezium_password","database.server.id": "184054","topic.prefix": "dbserver1","database.include.list": "database","schema.history.internal.kafka.bootstrap.servers": "kafka:9092","schema.history.internal.kafka.topic": "schema-changes.database"}}'

Checking for the registered connectors again should show the one which was just created. ([„debezium-connector“])

curl -H "Accept:application/json" localhost:8083/connectors/

With the following curl the connector’s configuration can be seen.

curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/debezium-connector

In the logs of the kafka-connect service the startup of the debezium connector can be seen. These include taking a snapshot of the database, visible asMySQL|dbserver1|snapshot, and also the transition to reading from the binlog, visible asMySQL|dbserver1|binlog. Inside the snapshot phase the already existing customers rows are detected and records for each are created. Finished exporting 4 records for table 'database.customer' (1 of 1 tables); total duration '00:00:00.015'

Step 4: View Read/Create/Update/Delete events

By entering the following command inside a new terminal a connection with the MySQL database is established from which SQL updates can be performed.

docker exec -it mysql mysql -u debezium_user -p

After entering the password for the user specified before, the mysql shell is available. After executing the following two commands the available customer table should be visible and everything is setup accordingly.

use database
show tables;

Finally, the usability of Debezium for CDC will be shown by looking into the kafka-watch service logs.
In a new terminal, the topics messages can be seen after entering the next command:

docker logs kafka-watch

This should already yields some output, because the database is not empty. Currently, there are 4 rows in the customer table. Therefore, it should also show 4 events on the topic. Each event contains two JSON objects, one for the key and one for the value:

KEY

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      }
    ],
    "optional": false,
    "name": "dbserver1.database.customer.Key"
  },
  "payload": {
    "id": 1
  }
}

VALUE

{
  "schema": {},
  "payload": {
    "before": null,
    "after": {
      "id": 1,
      "first_name": "John",
      "last_name": "Doe",
      "email": "john.doe@example.com"
    },
    "source": {
      "version": "3.0.0.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1730368845000,
      "snapshot": "first",
      "db": "database",
      "sequence": null,
      "ts_us": 1730368845000000,
      "ts_ns": 1730368845000000000,
      "table": "customer",
      "server_id": 0,
      "gtid": null,
      "file": "binlog.000002",
      "pos": 158,
      "row": 0,
      "thread": null,
      "query": null
    },
    "transaction": null,
    "op": "r",
    "ts_ms": 1730368845569,
    "ts_us": 1730368845569376,
    "ts_ns": 1730368845569376528
  }
}

This is a reduced output of the value for better readability where only parts are kept which will be discussed briefly. In the JSON object, the schema’s structure and the actual payload of the change event can be seen. Both the schema and the payload are the main information of an event. In the creation case, there is no „before“ state, that is why it is set to null. „Source“ shows metadata for where the event was created from (such as from the table or from the log bin file). The „op“ field is set to „r“, which means it comes from the read operation of the snapshot. If one was to insert a new customer, the operations should be „c“. This can be tried out by going back to the MySQL shell and entering the following to insert a new customer:

INSERT into customer (first_name, last_name, email) VALUES ("you", "pick", "any.com");
{
  "schema": {},
  "payload": {
    "before": null,
    "after": {
      "id": 5,
      "first_name": "you",
      "last_name": "pick",
      "email": "any.com"
    },
    "source": {
      "version": "3.0.0.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1730375092000,
      "snapshot": "false",
      "db": "database",
      "sequence": null,
      "ts_us": 1730375092000000,
      "ts_ns": 1730375092000000000,
      "table": "customer",
      "server_id": 1,
      "gtid": null,
      "file": "binlog.000002",
      "pos": 388,
      "row": 0,
      "thread": 13,
      "query": null
    },
    "transaction": null,
    "op": "c",
    "ts_ms": 1730375092586,
    "ts_us": 1730375092586585,
    "ts_ns": 1730375092586585589
  }
}

A different message will be created when a row is update. This can be achieved by setting a new first name for one of the customers.

UPDATE customer SET first_name='me' WHERE first_name='you';
{
  "schema": {},
  "payload": {
    "before": {
      "id": 5,
      "first_name": "you",
      "last_name": "pick",
      "email": "any.com"
    },
    "after": {
      "id": 5,
      "first_name": "me",
      "last_name": "pick",
      "email": "any.com"
    },
    "source": {
      "version": "3.0.0.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1730375401000,
      "snapshot": "false",
      "db": "database",
      "sequence": null,
      "ts_us": 1730375401000000,
      "ts_ns": 1730375401000000000,
      "table": "customer",
      "server_id": 1,
      "gtid": null,
      "file": "binlog.000002",
      "pos": 716,
      "row": 0,
      "thread": 13,
      "query": null
    },
    "transaction": null,
    "op": "u",
    "ts_ms": 1730375401944,
    "ts_us": 1730375401944871,
    "ts_ns": 1730375401944871811
  }
}

As visible in the message, the operation field now is set to „u“. Additionally, there are the „before“ and „after“ fields set in the payload. When comparing both, it can be seen that the „first_name“ was changed from „you“ to „me“. Again, the schema is omitted from the output for better readability.

As a last step, the row which was just created will be removed again to prove that Debezium also catches deletions.

DELETE FROM customer WHERE first_name = 'me';
{
  "schema": {},
  "payload": {
    "before": {
      "id": 5,
      "first_name": "me",
      "last_name": "pick",
      "email": "any.com"
    },
    "after": null,
    "source": {
      "version": "3.0.0.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1730375603000,
      "snapshot": "false",
      "db": "database",
      "sequence": null,
      "ts_us": 1730375603000000,
      "ts_ns": 1730375603000000000,
      "table": "customer",
      "server_id": 1,
      "gtid": null,
      "file": "binlog.000002",
      "pos": 1058,
      "row": 0,
      "thread": 13,
      "query": null
    },
    "transaction": null,
    "op": "d",
    "ts_ms": 1730375603567,
    "ts_us": 1730375603567066,
    "ts_ns": 1730375603567066446
  }
}

This time, the operation is set to „d“ for deletion, and the „after“ field is set to „null“, showing that the row has been removed from the database.

As presented in the step-by-step guide, Debezium offers a solution for CDC. After setting everything up, the logs showed that a snapshot of the initial database was taken, and for each row of the customer table a record was created and published to the Kafka topic. Additionally, the logs showed how the MySQL connector reacted to changes in the database by creating, updating, and deleting rows.

Debezium has a lot more to offer: It is also possible to embed Debezium into your own Java application instead of relying on Kafka. Furthermore, this blog post did not cover the use of Debezium in a real-world scenario – it only provided a quick setup for local use.

  1. https://debezium.io/ ↩︎
  2. https://kafka.apache.org/ ↩︎
  3. https://kafka.apache.org/documentation.html#connect ↩︎
  4. https://www.mysql.com/de/ ↩︎
  5. https://kafka.apache.org/intro ↩︎
  6. https://www.docker.com/ ↩︎
  7. https://debezium.io/documentation/ ↩︎

Kommentar verfassen

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert

Nach oben scrollen
WordPress Cookie Hinweis von Real Cookie Banner