Debezium专门用于捕获数据库的变化并将这些变化以实时流的方式推送到消息队列系统Kafka,从而实现高效、可靠的实时数据同步和流处理。

我们为什么选择Debezium?

实时数据同步

Debezium是一个开源的分布式平台,专门用于捕获数据库的变化,并将这些变化以实时流的方式推送到Kafka或其他消息队列系统。这对于需要实时更新库存信息的应用场景尤为重要。

支持多种数据库

Debezium支持多种关系型数据库,包括MySQL、PostgreSQL、MongoDB等。这意味着我们可以灵活地选择适合业务需求的数据库,而无需担心数据捕获的问题。

高性能和低延迟

Debezium通过使用数据库的日志文件(如MySQL的二进制日志)来捕获数据变化,这种方式不仅高效而且延迟极低。这确保了即使在高并发环境下,也能快速响应数据库的变化。

结构化数据输出

Debezium将捕获到的数据变化以结构化的JSON格式输出,便于下游系统解析和处理。这种标准化的数据格式使得集成变得更加简单和可靠。

容错性和可靠性

Debezium具有强大的容错机制,能够在网络故障或服务器重启后继续从断点处恢复数据捕获。这确保了数据的一致性和完整性。

易于配置和部署

Debezium可以通过简单的REST API进行配置和管理,这大大简化了部署过程。此外,Debezium与Kafka生态系统紧密集成,使得整个数据管道易于搭建和维护。

数据一致性保证

Debezium确保数据捕获过程中的一致性,避免了因数据不同步导致的业务问题。这对于库存管理系统来说尤为重要,因为任何库存数据的不一致都可能导致严重的后果。

应用场景

  • 博客平台:当新文章发布或现有文章更新时,实时刷新前端页面。

  • 论坛:实时显示最新的帖子和评论,提升用户体验。

  • 跨系统集成:将 CRM 系统、ERP 系统和其他业务系统的数据变化整合到一起,提供统一的数据视图。

  • 增量加载:仅加载自上次同步以来发生变化的数据,减少数据传输量和处理时间。

  • 库存监控:当库存低于阈值时,立即触发告警,提醒相关人员补充库存。

  • 交易监控:实时监控金融交易数据,检测可疑活动并触发安全措施。

  • 订单管理系统:当订单状态发生变化时,将变化事件发送给支付、物流等微服务,触发相应的业务流程。

  • 用户管理系统:当用户信息更新时,将变化事件通知权限管理、营销等微服务,保持数据一致性。

  • 财务审计:记录所有财务交易的变化,供后续审计使用。

  • 大数据分析:将来自不同系统的数据变化收集到 Hadoop 或 Amazon S3 中,使用 Spark 等工具进行复杂的数据分析。

  • 机器学习模型训练:实时收集和处理数据,用于训练和更新机器学习模型。

  • 玩家行为分析:实时收集玩家的游戏行为数据,分析玩家偏好和游戏平衡性。

  • 动态调整:根据玩家的行为数据动态调整游戏难度和奖励机制。

哪些公司使用Debezium?

Uber

  • 用途: Uber 使用 Debezium 捕获订单、司机位置等数据的变化,并将这些数据推送到 Kafka。

  • 描述: 这使得 Uber 能够实时监控订单状态和司机位置,优化调度算法并提高运营效率。

LinkedIn

  • 用途: LinkedIn 使用 Debezium 来捕获用户活动和社交网络数据的变化。

  • 描述: 通过 Debezium,LinkedIn 能够实时更新推荐系统和新闻推送,提供个性化的用户体验。

Walmart

  • 用途: Walmart 使用 Debezium 实现其供应链管理系统中的数据同步和实时分析。

  • 描述: 通过 Debezium,Walmart 能够实时监控库存水平和订单状态,提高供应链效率和客户满意度。

IBM

  • 用途: IBM 使用 Debezium 实现其混合云环境中的数据同步和流处理。

  • 描述: Debezium 帮助 IBM 在不同云平台之间无缝传输数据,确保业务连续性和数据一致性。

eBay

  • 用途: eBay 使用 Debezium 实现其电子商务平台中的数据同步和实时分析。

  • 描述: 通过 Debezium,eBay 能够实时更新商品信息和库存状态,提升购物体验和运营效率。

PayPal

  • 用途: PayPal 使用 Debezium 捕获支付交易数据的变化,并将其用于实时风险管理和合规性检查。

  • 描述: 通过 Debezium,PayPal 能够及时发现可疑交易行为,确保支付系统的安全性和可靠性。

Airbnb

  • 用途: Airbnb 使用 Debezium 实现其内部系统的数据同步和实时监控。

  • 描述: 通过 Debezium,Airbnb 能够实时更新房源信息和预订状态,优化住宿安排和客户服务。

数据表

-- 创建 products 表,存储产品信息。
CREATETABLE products (
    idINT AUTO_INCREMENT PRIMARY KEY,
    nameVARCHAR(255) NOTNULLCOMMENT'产品名称',
    price DECIMAL(10, 2) NOTNULLCOMMENT'产品价格'
);

-- 创建 inventory 表,存储产品的库存信息。
CREATETABLE inventory (
    idINT AUTO_INCREMENT PRIMARY KEY,
    product_id INTCOMMENT'关联的产品ID',
    quantity INTNOTNULLCOMMENT'库存数量',
    last_updated TIMESTAMPDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'最后更新时间',
    FOREIGNKEY (product_id) REFERENCES products(id)
);

my.cnf文件配置

[mysqld]
log-bin=mysql-bin
binlog_format=ROW
server-id=1
expire_logs_days=10

确保启用了二进制日志, 记得记得要重启MySQL服务!!!

配置Debezium Connector

创建一个Debezium连接器配置文件 register-mysql.json

{
  "name": "inventory-connector",
"config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "password",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.include.list": "inventorydb",
    "table.include.list": "inventorydb.products,inventorydb.inventory",
    "include.schema.changes": "false"
  }
}

使用curl命令注册Debezium连接器:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

代码实操

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.5</version>
        <relativePath/><!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>inventory-sync</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>inventory-sync</name>
    <description>Debezium Demo</description>
    <properties>
        <java.version>11</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

application.properties

# Kafka服务器地址
spring.kafka.bootstrap-servers=localhost:9092
# 消费者组ID
spring.kafka.consumer.group-id=inventory-consumer-group
# 自动偏移重置策略
spring.kafka.consumer.auto-offset-reset=earliest
# 键反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 值反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.connect.json.JsonDeserializer

Debezium消息监听器

package com.example.inventorysync.listener;

import com.example.inventorysync.handler.DataChangeHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class DebeziumEventListener {

    private static final Logger logger = LoggerFactory.getLogger(DebeziumEventListener.class);

    @Autowired
    private DataChangeHandler dataChangeHandler;

    /**
     * 监听Kafka主题中的消息
     *
     * @param record 接收到的Kafka消息记录
     */
    @KafkaListener(topics = {"dbserver1.inventorydb.products", "dbserver1.inventorydb.inventory"})
    public void listen(ConsumerRecord<String, String> record) {
        logger.info("Received message: {}", record.value());
        dataChangeHandler.handleChange(record.value());
    }
}

数据处理器

package com.example.inventorysync.handler;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class DataChangeHandler {

    private static final Logger logger = LoggerFactory.getLogger(DataChangeHandler.class);
    private final ObjectMapper objectMapper = new ObjectMapper();

    /**
     * 处理数据变更事件
     *
     * @param jsonData 变更事件的JSON字符串
     */
    public void handleChange(String jsonData) {
        try {
            // 解析JSON数据
            JsonNode rootNode = objectMapper.readTree(jsonData);
            JsonNode payloadNode = rootNode.path("payload");

            if (!payloadNode.isMissingNode()) {
                String op = payloadNode.path("op").asText();
                switch (op) {
                    case"c":
                        handleCreate(payloadNode);
                        break;
                    case"u":
                        handleUpdate(payloadNode);
                        break;
                    case"d":
                        handleDelete(payloadNode);
                        break;
                    default:
                        logger.warn("Unsupported operation type: {}", op);
                }
            } else {
                logger.error("Payload node is missing in the JSON data");
            }
        } catch (Exception e) {
            logger.error("Error processing data change event", e);
        }
    }

    /**
     * 处理插入操作
     *
     * @param payloadNode 包含插入数据的JSON节点
     */
    private void handleCreate(JsonNode payloadNode) throws Exception {
        // 获取after节点的数据
        JsonNode afterNode = payloadNode.path("after");
        logger.info("Handling CREATE event: {}", afterNode.toString());

        if (afterNode.has("id")) {
            int id = afterNode.path("id").asInt();
            String tableName = getTableName(afterNode);
            if ("products".equals(tableName)) {
                String name = afterNode.path("name").asText();
                double price = afterNode.path("price").asDouble();
                logger.info("Product created: ID={}, Name={}, Price={}", id, name, price);
            } elseif ("inventory".equals(tableName)) {
                int productId = afterNode.path("product_id").asInt();
                int quantity = afterNode.path("quantity").asInt();
                String lastUpdated = afterNode.path("last_updated").asText();
                logger.info("Inventory created: ID={}, Product ID={}, Quantity={}, Last Updated={}", id, productId, quantity, lastUpdated);
            }
        }
    }

    /**
     * 处理更新操作
     *
     * @param payloadNode 包含更新前后数据的JSON节点
     */
    private void handleUpdate(JsonNode payloadNode) throws Exception {
        // 获取before和after节点的数据
        JsonNode beforeNode = payloadNode.path("before");
        JsonNode afterNode = payloadNode.path("after");
        logger.info("Handling UPDATE event: Before - {}, After - {}", beforeNode.toString(), afterNode.toString());

        if (afterNode.has("id")) {
            int id = afterNode.path("id").asInt();
            String tableName = getTableName(afterNode);
            if ("products".equals(tableName)) {
                String name = afterNode.path("name").asText();
                double price = afterNode.path("price").asDouble();
                logger.info("Product updated: ID={}, Name={}, Price={}", id, name, price);
            } elseif ("inventory".equals(tableName)) {
                int productId = afterNode.path("product_id").asInt();
                int quantity = afterNode.path("quantity").asInt();
                String lastUpdated = afterNode.path("last_updated").asText();
                logger.info("Inventory updated: ID={}, Product ID={}, Quantity={}, Last Updated={}", id, productId, quantity, lastUpdated);
            }
        }
    }

    /**
     * 处理删除操作
     *
     * @param payloadNode 包含删除前数据的JSON节点
     */
    private void handleDelete(JsonNode payloadNode) throws Exception {
        // 获取before节点的数据
        JsonNode beforeNode = payloadNode.path("before");
        logger.info("Handling DELETE event: {}", beforeNode.toString());

        if (beforeNode.has("id")) {
            int id = beforeNode.path("id").asInt();
            String tableName = getTableName(beforeNode);
            if ("products".equals(tableName)) {
                logger.info("Product deleted: ID={}", id);
            } elseif ("inventory".equals(tableName)) {
                logger.info("Inventory deleted: ID={}", id);
            }
        }
    }

    /**
     * 获取表名
     *
     * @param node 包含表名的JSON节点
     * @return 表名
     */
    private String getTableName(JsonNode node) {
        return node.path("table").asText();
    }
}

Application

package com.example.inventorysync;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class InventorySyncApplication {

    public static void main(String[] args) {
        SpringApplication.run(InventorySyncApplication.class, args);
    }
}

测试

插入数据

执行SQL语句插入产品和库存数据:

-- 插入产品数据
INSERT INTO products (name, price) VALUES ('Laptop', 999.99);

-- 插入库存数据
INSERT INTO inventory (product_id, quantity) VALUES (1, 100);

日志:

2025-03-31 21:01:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"string","field":"name"},{"type":"decimal","field":"price"}],"optional":false,"name":"inventorydb.products.Value"},"optional":true},"payload":{"op":"c","before":null,"after":{"id":1,"name":"Laptop","price":999.99},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307260000,"snapshot":"last","db":"inventorydb","sequence":null,"table":"products","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":1234,"row":0,"thread":2,"query":null},"ts_ms":1680307260000}}
2025-03-31 21:01:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling CREATE event: {"id":1,"name":"Laptop","price":999.99}
2025-03-31 21:01:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Product created: ID=1, Name=Laptop, Price=999.99

2025-03-31 21:01:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"int32","field":"product_id"},{"type":"int32","field":"quantity"},{"type":"string","field":"last_updated"}],"optional":false,"name":"inventorydb.inventory.Value"},"optional":true},"payload":{"op":"c","before":null,"after":{"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307261000,"snapshot":"last","db":"inventorydb","sequence":null,"table":"inventory","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":5678,"row":0,"thread":2,"query":null},"ts_ms":1680307261000}}
2025-03-31 21:01:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling CREATE event: {"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"}
2025-03-31 21:01:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Inventory created: ID=1, Product ID=1, Quantity=100, Last Updated=2025-03-31T21:01:01.000Z

更新数据

执行SQL语句更新产品和库存数据:

-- 更新产品数据
UPDATE products SET price = 899.99 WHERE id = 1;

-- 更新库存数据
UPDATE inventory SET quantity = 90 WHERE id = 1;

日志:

2025-03-31 21:02:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"string","field":"name"},{"type":"decimal","field":"price"}],"optional":false,"name":"inventorydb.products.Value"},"optional":true},"payload":{"op":"u","before":{"id":1,"name":"Laptop","price":999.99},"after":{"id":1,"name":"Laptop","price":899.99},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307320000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"products","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":9012,"row":0,"thread":2,"query":null},"ts_ms":1680307320000}}
2025-03-31 21:02:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling UPDATE event: Before - {"id":1,"name":"Laptop","price":999.99}, After - {"id":1,"name":"Laptop","price":899.99}
2025-03-31 21:02:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Product updated: ID=1, Name=Laptop, Price=899.99

2025-03-31 21:02:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"int32","field":"product_id"},{"type":"int32","field":"quantity"},{"type":"string","field":"last_updated"}],"optional":false,"name":"inventorydb.inventory.Value"},"optional":true},"payload":{"op":"u","before":{"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"},"after":{"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307321000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"inventory","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":13456,"row":0,"thread":2,"query":null},"ts_ms":1680307321000}}
2025-03-31 21:02:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling UPDATE event: Before - {"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"}, After - {"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"}
2025-03-31 21:02:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Inventory updated: ID=1, Product ID=1, Quantity=90, Last Updated=2025-03-31T21:02:01.000Z

删除数据

执行SQL语句删除产品和库存数据:

-- 删除库存数据
DELETE FROM inventory WHERE id = 1;

-- 删除产品数据
DELETE FROM products WHERE id = 1;

日志:

2025-03-31 21:03:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"int32","field":"product_id"},{"type":"int32","field":"quantity"},{"type":"string","field":"last_updated"}],"optional":false,"name":"inventorydb.inventory.Value"},"optional":true},"payload":{"op":"d","before":{"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"},"after":null,"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307380000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"inventory","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":17890,"row":0,"thread":2,"query":null},"ts_ms":1680307380000}}
2025-03-31 21:03:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling DELETE event: {"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"}
2025-03-31 21:03:00.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Inventory deleted: ID=1

2025-03-31 21:03:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"string","field":"name"},{"type":"decimal","field":"price"}],"optional":false,"name":"inventorydb.products.Value"},"optional":true},"payload":{"op":"d","before":{"id":1,"name":"Laptop","price":899.99},"after":null,"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307381000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"products","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":22345,"row":0,"thread":2,"query":null},"ts_ms":1680307381000}}
2025-03-31 21:03:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling DELETE event: {"id":1,"name":"Laptop","price":899.99}
2025-03-31 21:03:01.000 INFO  [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Product deleted: ID=1

关注我,送Java福利

/**
 * 这段代码只有Java开发者才能看得懂!
 * 关注我微信公众号之后,
 * 发送:"666",
 * 即可获得一本由Java大神一手面试经验诚意出品
 * 《Java开发者面试百宝书》Pdf电子书
 * 福利截止日期为2025年02月28日止
 * 手快有手慢没!!!
*/
System.out.println("请关注我的微信公众号:");
System.out.println("Java知识日历");
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐