Debezium 特性深入介绍

0
0
{"value":"数据湖理念逐渐深入人心。我们经常需要从业务数据库中实时同步数据到数据湖以便进一步的统计和处理。在这个过程中,我们通常会利用数据库的「变更数据捕捉」机制( Change Data Capture ,简称 CDC)。\n\n然而不同的数据库会有不同的 CDC 处理方式,给数据消费方造成了诸多不便。我们需要一个通用的数据抽取及转换层,把这些数据统一转换成目标格式(比如 JSON),再发送给下游进行处理。\n\nDebezium 可以完美地解决增量的数据抽取及转换工作。它最早是 Red Hat 的开源项目,是基于 Kafka Connect 框架的 CDC 工具。它可以对接 MySQL、PostgreSQL、SQL Server、Oracle、MongoDB 等多种SQL 及 NoSQL 数据库,把这些数据库的数据持续以统一的格式发送到 Kafka 的主题,供下游进行实时消费。\n\n此前,我已经有多篇文章介绍了 Debezium 的基础使用,包括实时[抽取 MySQL 数据到 Amazon S3 桶](https://aws.amazon.com/cn/blogs/china/use-debezium-export-mysql-data-to-amazon-s3/),以及[使用 Flink 实时消费 SQL Server 的数据](https://aws.amazon.com/cn/blogs/china/unified-batch-stream-architectures/)。不过,在生产环境中使用 Debezium,我们还需要了解它的一些生产相关的特性,包括:\n\n- 高可用\n- 快照机制\n- 配置变更\n- 数据处理\n- 排错\n\n虽然 Debezium 提供了所有的这些机制,但它们往往分散在文档各处,给使用者造成了困扰。在这篇文章中,我将介绍并演示 Debezium 的相关特性以及配置方式,让大家可以在生产环境中更放心地使用。\n\n### **高可用**\n要在生产环境中使用 Debezium,我们需要问的第一个问题就是:**Debezium 能不能做到高可用?**\n\n我们把高可用分为两个部分:\n\n- 单机高可用:如果 Debezium 应用崩溃,那我是否能重启,并且接续此前的处理?\n- 集群高可用:我是否能以集群形式运行 Debezium,并且避免单点失效?故障迁移的时效是多少?\n\n要回答这些问题,我们需要先了解 Debezium 的工作机制。\n\nDebezium 并不是一个独立的软件,而是很多个 Kafka 连接器的总称。这些 Kafka 连接器分别对应不同的数据库,比如 MySQL、Oracle 等。按 Kafka 连接器的常见命名规则,可能我们会把它们叫做 MySQL Kafka Source Connector 之类。\n\n![image.png](https://dev-media.amazoncloud.cn/45f80a8f829e4503b109a79be745e0a1_image.png)\n\nKafka 连接器基于 Kafka Connect 框架来编写。这个框架主要的目的是提供一套统一的接口和机制,帮助大家把数据写入(Source)和读出(Sink)Kafka 流。使用 Kafka Connect 框架的连接器,默认就获得在 Kafka 流中存储配置、偏移量(Offset)、状态的功能,并且也具备了高可用的属性。\n\n这个框架与 Kafka 打包安装,也就是说当你安装 Kafka 的时候,就已经安装了 Kafka 本体、Kafka 控制台脚本工具,以及 Kafka Connect 框架。\n\n![image.png](https://dev-media.amazoncloud.cn/09f93df02a3d46f48950807ce077dc6e_image.png)\n\nKafka 连接器通常是已经打包好的 JAR 文件,此时,我们只需要下载这些文件到 Kafka Connect 指定的插件目录,做好配置,就可以直接用一条命令启动 Kafka Connect。比如,下图就是 Debezium 的 MySQL Connector。\n\n![image.png](https://dev-media.amazoncloud.cn/37faca1b8f204487a0d15d8b97fcb4b9_image.png)\n#### **单机模式和分布式模式**\n在单机模式(Standalone Mode)下,Kafka Connect 仅跑在单台机器上,并且仅在这台机器上用文件来存储偏移,所以可以做到在这一台机器上重启时可以接续。\n\n此外,Kafka Connect 本身提供了分布式模式(Distributed Mode)。这个模式会把连接器任务分布到不同的节点上,并且在某个节点失效时自动把任务重新协调到其他节点上。所以,如果使用分布式模式,则不会出现单点失效的问题。\n\n![image.png](https://dev-media.amazoncloud.cn/bc6a080baede45469bf6e3d5fa7b7ff7_image.png)\n\n故障转移的时效取决于节点之间的「心跳」(Heartbeat)间隔。心跳指的是节点之间定期发送的,确认节点状态的消息。Kafka Connect 心跳间隔默认是 3 秒,所以通常来说数秒内,集群即可识别到错误并自动转移。\n\n在分布式模式下,Kafka Connect 会提供三个 Kafka 主题,让连接器存储状态、偏移和配置。连接器启动时会到这几个主题中去找对应自己名字的偏移量,所以我们可以随时删除某个连接器,重启它,或者在别的地方再启动,它都能接续之前的位置继续处理。\n\n不过需要注意的是,偏移的提交并不是实时进行,而是按间隔。默认的间隔是 10 秒(也有的是 60 秒),这意味着如果已经处理到偏移 550,但是只提交到了 500,而此时任务失败重启,任务就会从偏移 500 处开始重新处理。也就是说,使用 Kafka Connect 我们只能确保流处理中的「至少一次」(At Least Once)逻辑,而不是「正好一次」(Exactly Once)。\n\n当然,分布式模式使用也非常简单只需要使用 connect-distribute.properties 来启动 Kafka Connect 即可。对连接器的操作使用其暴露的 REST API 接口。\n\nREST API 通常使用 Kafka Connect 的标准端口 8083。在使用 Shell 时,我们可以用 curl 来进行操作。比如,下面的语句用于创建一个连接器(假设已登录到 Kafka Connect 节点)。\n```\ncurl -X POST -sH \"Accept:application/json\" -H \"Content-Type:application/json\" localhost:8083/connectors/ -d @/tmp/config.json\n```\n### **快照机制**\nDebezium 常常被用作一个持续捕捉数据变更(插入、更新、删除)的工具,但很多时候我们既需要捕捉新的变更,也需要提取和处理存量的数据。\n\n比如,我们可能会对业务数据进行瘦身和现代化改造,把三年前的数据全部都从数据库转移到数据湖内,然后持续把更新也复制到湖内。又比如,财务审计部分可能会要求把有记录以来的交易数据全部做一次统计,并且有新数据来的时候,更新我的统计数字。\n\n这时候,我们就需要借助 Debezium 的快照(Snapshotting)机制。\n\n快照,指的是在任务启动时,读取整个数据库的内容持续注入 Kafka 流中,并且记录下当时的 Binlog 节点。在读取完成后,从当时记录下的 Binlog 节点开始,继续把新的数据库变更注入 Kafka 流之中。\n\nDebezium 的快照机制并不神秘,其实就是做了一次锁表,以及 SELECT * 的操作。这里的值得注意的点有两个。\n\n- 锁表方式。根据不同的数据库引擎,Debezium 提供不同的锁表方式,比如,MySQL 连接器支持 minimal 即最小化锁表方式(snapshot.locking.mode),只在读取数据库表元数据的时候进行全局锁表避免更新操作(通常耗时不到 1 秒),然后使用 REPEATABLE READS 来进行单表锁定,这使得其他用户的更新操作不受影响。\n- **局部快照**。有时候数据库里面可能有十年的数据,但我实际上只需要提取 1 个月前开始的数据,这个就需要对快照语句进行定制了。Debezium 支持自定义快照语句(select.statement.overrides),比如加上 WHERE 条件限制,就能只做局部快照。\n\n为了区别于普通的插入操作,快照读取的消息其 op 字段会是 r。我们也可以通过配置把它修改成和插入一样的 c,方便做统一处理。\n\n配置方法是在连接器配置中增加一个数据变换:\n```\ntransforms=snapshotasinsert\ntransforms.snapshotasinsert.type=io.debezium.connector.mysql.transforms.ReadToInsertEvent\n```\n#### **增量快照机制**\nDebezium 的原始快照机制看上去不错,但它有几个问题。\n\n- 不能在不终止连接器的情况下,增加新表。\n- 快照语句(SELECT *)必须持续运行并且不能被打断,无法暂停和恢复。\n- 在做快照的过程中,变更不能及时打入 Kafka 流之中。\n\n这几个问题似乎是逻辑限制,很难解决。直到 2019 年,Netflix 开源了一套基于数据水位的数据变更捕捉框架。它使用了流式处理中「水位」(Watermark)的思路,用数据中的顺序字段(比如时间戳)作为高低水位,同时读取存量数据和新的数据变更,并且在二者发生在同一水位区间时做合并。\n\nDebezium 采取了这个思路,实现了一套增量快照机制。新的增量快照一次只读取部分数据,不需要从头到尾、持续运行,并且支持随时增加新表,还可以随时触发快照,而不是只在任务开始时执行。更重要的是,快照过程中有数据变更,它也可以近乎实时地把变更也打入 Kafka 流之中。\n\n这又引发了另一个问题,就是 Kafka Connect 框架其实并没有提供一个机制来向正在运行中的连接器发送一条指令。为此,Debezium 提供了一个「信号表」(Signal Table)的新设定。\n\n用户可以在数据源中,创建一个 dbz_signal 的表,使用特定的格式。通过朝这个表里面插入数据,就可以触发增量快照。\n\n增量快照的完整使用方式超出了本文设定的范围,感兴趣的读者阅读官方指南或期待我们的下篇文章。\n### **配置变更**\n在单机模式下,Kafka Connect 无法变更配置,而在分布式模式下,Kafka Connect 会暴露 REST API 方便我们查看连接器状态以及修改连接器配置。\n\n比如,修改配置的方式如下。\n```\ncurl -X PUT -sH \"Accept:application/json\" -H \"Content-Type:application/json\" localhost:8083/connectors/{connector-name}/config -d @/tmp/new-config.json\n```\n这里需要注意的是,并不是所有的配置都可以在线修改。如果修改之后没有生效,那么我们就只能删除并重建这个连接器。\n```\ncurl -X DELETE -sH \"Accept:application/json\" localhost:8083/connectors/{connector-name}\n\ncurl -X POST -sH \"Accept:application/json\" -H \"Content-Type:application/json\" localhost:8083/connectors/ -d @/tmp/config.json\n```\n这里又有一个问题。如果我们删除并重建这个连接器,会发现它并不会像一个全新的连接器一样,从头开始读取数据变更,而是会接上原来删除的连接器。\n\n这是因为虽然连接器的进程删除了,但是它所对应的 Kafka 偏移(Offset)仍然保存在 connect-offsets 主题中。当我们新建一个连接器,却使用了原来的名字,则它会从这个主题中找到原来的偏移并接续读取数据。\n\n这其实也是 Kafka Connect 实现高可用的机制——因为 Kafka 主题本身是高可用的,所以即便一个连接器失效,我们可以立即以同样名字、同样配置起一个新的连接器,自动接续。\n\n不过,在开发和测试过程中,我们可能需要频繁从头测试全新的连接器,这个机制就很恼人了。解决这个问题的办法有两个。\n\n- 创建连接器的时候使用不同的名字。这会让 Kafka Connect 把它当成一个全新的连接器对待,从头开始执行整个逻辑。\n- 在 connect-offset 主题里面添加一条记录,记录的键是连接器名字,而值则是 null。因为 Kafka 是流,所以这里面的配置也是新的配置覆盖旧的配置,null 覆盖原有配置后,连接器失去了偏移,就会从头开始读取数据了。\n### **数据处理**\nDebezium 通常用于做数据入流,以及异构的数据复制和迁移,所以数据会在不同的系统、语言之间流转。在这个过程中,我们经常会遇到数据处理相关的问题。这里,我们来看两个数据处理问题。\n\n#### **特殊格式**\n在这里,特殊格式指的是不同系统中使用不同表现方式的数据格式。\n\n最常见的特殊格式是数字格式。因为每个数据库对数字的定义方式、精度都有不同,数字经常会造成一些麻烦。比如,原来在 SQL Server 中明明是普通的 DECIMAL 类型,但读取到流中,却变成了一串二进制值。又或者,本来是一个 TIMESTAMP 格式,读取的时候却报告数据溢出。\n\n这些都是因为数据表现方式的不同造成的问题。在 Debezium 中,提供了 decimal.handling.mode 这样的参数帮助我们解决这个问题。它的几个值解读如下。\n\n- precise,即精确读取,这也是默认的形式,如果使用 JSON,就会把 DECIMAL 字段变成一个 Base64 编码的字符串;它实际上是使用了 Java 的 BigDecimal 类的格式来精确存储了数据,但需要我们在消费侧使用 BigDecimal 来读取这个原始值。\n- double,即转换成双精度浮点数,可能会损失精度,但是消费侧可直接使用。\n- string,即转换成字符串,这也是我个人比较常用的方式,在消费侧通常都可以用很简单的形式来把字符串转换回原格式\n\n除了数字格式之外,还有各种二进制格式,以及诸如 DATETIME 实际上底层存的是 BIGINT,以及部分语言、数据库没有 UNSIGNED BIGINT 这样的格式而导致的溢出问题等等。这些通常都可以通过设置其各自的 handling.mode 来解决。\n\n#### **主题路由**\n之前有说到 Debezium 的两个场景是异构数据复制和数据入流做流式处理。前者通常是数据库到数据湖的复制,而后者则经常是为了实现某些对实时性要求很高的业务。\n\n比如,一个生产生鲜食品的客户,可能会从 ERP 中的上百个业务表中读取数据更新,并且实时形成一个对产品进行全链路追踪,对生产、流转、分发环节中出现的异常情况进行告警。\n\n这时候,如果我们还使用 Debezium 默认的配置,就会产生数百个 Kafka 主题,这给消费侧使用这些数据造成了麻烦。由于每条数据上都带了源表信息,我们其实可以把所有的变更都打入到同一个主题里,而消费者只需要订阅这一个主题即可。\n\n这就需要用到 Debezium 的主题路由功能了。在 Debezium 中,主题路由是由数据变换实现,它的配置方式如下。\n```\ntransforms=Reroute\ntransforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter\ntransforms.Reroute.topic.regex=(.*)customers_shard(.*)\ntransforms.Reroute.topic.replacement=$1customers_all_shards\n```\n其中 transforms.Reroute.topic.regex 是匹配原 Kafka 主题名字的正则表达式,而 transforms.Reroute.topic.replacement 则是替换之后的实际主题名字。如果我们将后者设定为一个固定的主题(customers),则所有匹配的消息都会被路由到这个主题。\n\n### **排错**\n再好的软件,也不可能用得一帆风顺,所以我们也需要知道当它遇到问题的时候如何来排错。\n\n首先,我们来明确日志位置。如果在运行 Kafka Connect 的时候指定了 LOG_DIR,则日志会输出到你指定的位置,但是要注意运行 Kafka Connect 进程的用户必须有写入权限,否则不会报错,但也不会有日志。如果没有配置,则日志会写入 Kafka 安装目录下的 logs 文件夹。\n\n接下来,我们来看一些常见的问题。\n\n- 权限问题。对于不同的数据库,Debezium 都会有权限要求,如果创建的用户没有赋予所需的权限,就会造成错误。注意缺乏权限不一定会立刻出错,而有可能是要用到该权限的时候出错(比如打快照到最后一张表时出错),所以最好能在创建时就确认使用了正确权限。\n- Binlog 失效和清除问题。如果在连接器持续工作的过程中,Binlog 失效或者被清除,而连接器还没有读到该位置,就会导致连接器直接终止。用户需要注意配置好 Binlog 的失效时间并不要在任务运行时清除 Binlog。\n- 集群节点配置错误问题。因为 Debezium 的节点都是自己手动配置,所以 A 节点正确配置,但 B 节点缺少某些依赖,而在启动的时候,我们又设置了 max 为 1。此时,A 节点将可以正常启动,但是在故障切换到 B 节点时,因为缺少依赖而无法启动。建议尽量使用自动化或者脚本方式配置每个节点。\n- dbhistory 错误问题。因为主题配置错误,可能会导致 dbhistory 中没有包含数据库变更记录,导致消息无法通过数据结构检查。此时只能删除重启任务,让它重新去读取最新的数据表结构。\n\n注意大部分出错的任务都是无法自行恢复的,这也是目前 Kafka Connect 的限制。此时我们只能手动对连接器进行重启。如果需要自动重启功能,则需要借助脚本或者服务来实现。\n\n### **总结**\nDebezium 是目前最成熟和完善的数据采集入 Kafka 的开源组件。它支持的数据库数量,以及整体的成熟程度都远超同类软件。Flink CDC 等数据采集软件也是对其进行了封装。\n\n使用 Debezium,我们可以快速把各种数据库中的数据录入到流之中,再进行落盘、入湖,或者进行流批一体的处理。由于数据在 Kafka 之中,我们也可以借助 Kafka 的目标连接器,无需一行代码,将数据注入到其他系统之中,或者做异构的数据同步,比如,把数据从 MySQL 同步到 MongoDB。\n\n它本身是代码开源,社区也非常活跃。在遇到问题时,因为其基于 Kafka Connect 的机制,整体完全透明,所以排查也比较容易,在很多场合相比商业黑盒产品有不可替代的优势。\n\n本篇文章着重介绍了在生产中使用 Debezium 时可能会遇到的一些顾虑、问题,并且介绍了应对方式和对应的底层原理。希望对读者有所帮助,高效、稳健、开放地运行自己的数据采集业务。\n\n### **本篇作者**\n\n![image.png](https://dev-media.amazoncloud.cn/524cc3177ae74d7a8430683e8741019e_image.png)\n\n**张玳**\nAmazon 解决方案架构师。十余年企业软件研发、设计和咨询经验,专注企业业务与 Amazon 服务的有机结合。译有《软件之道》《精益创业实战》《精益设计》《互联网思维的企业》,著有《体验设计白书》等书籍。","render":"<p>数据湖理念逐渐深入人心。我们经常需要从业务数据库中实时同步数据到数据湖以便进一步的统计和处理。在这个过程中,我们通常会利用数据库的「变更数据捕捉」机制( Change Data Capture ,简称 CDC)。</p>\n<p>然而不同的数据库会有不同的 CDC 处理方式,给数据消费方造成了诸多不便。我们需要一个通用的数据抽取及转换层,把这些数据统一转换成目标格式(比如 JSON),再发送给下游进行处理。</p>\n<p>Debezium 可以完美地解决增量的数据抽取及转换工作。它最早是 Red Hat 的开源项目,是基于 Kafka Connect 框架的 CDC 工具。它可以对接 MySQL、PostgreSQL、SQL Server、Oracle、MongoDB 等多种SQL 及 NoSQL 数据库,把这些数据库的数据持续以统一的格式发送到 Kafka 的主题,供下游进行实时消费。</p>\n<p>此前,我已经有多篇文章介绍了 Debezium 的基础使用,包括实时<a href=\"https://aws.amazon.com/cn/blogs/china/use-debezium-export-mysql-data-to-amazon-s3/\" target=\"_blank\">抽取 MySQL 数据到 Amazon S3 桶</a>,以及<a href=\"https://aws.amazon.com/cn/blogs/china/unified-batch-stream-architectures/\" target=\"_blank\">使用 Flink 实时消费 SQL Server 的数据</a>。不过,在生产环境中使用 Debezium,我们还需要了解它的一些生产相关的特性,包括:</p>\n<ul>\n<li>高可用</li>\n<li>快照机制</li>\n<li>配置变更</li>\n<li>数据处理</li>\n<li>排错</li>\n</ul>\n<p>虽然 Debezium 提供了所有的这些机制,但它们往往分散在文档各处,给使用者造成了困扰。在这篇文章中,我将介绍并演示 Debezium 的相关特性以及配置方式,让大家可以在生产环境中更放心地使用。</p>\n<h3><a id=\"_16\"></a><strong>高可用</strong></h3>\n<p>要在生产环境中使用 Debezium,我们需要问的第一个问题就是:<strong>Debezium 能不能做到高可用?</strong></p>\n<p>我们把高可用分为两个部分:</p>\n<ul>\n<li>单机高可用:如果 Debezium 应用崩溃,那我是否能重启,并且接续此前的处理?</li>\n<li>集群高可用:我是否能以集群形式运行 Debezium,并且避免单点失效?故障迁移的时效是多少?</li>\n</ul>\n<p>要回答这些问题,我们需要先了解 Debezium 的工作机制。</p>\n<p>Debezium 并不是一个独立的软件,而是很多个 Kafka 连接器的总称。这些 Kafka 连接器分别对应不同的数据库,比如 MySQL、Oracle 等。按 Kafka 连接器的常见命名规则,可能我们会把它们叫做 MySQL Kafka Source Connector 之类。</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/45f80a8f829e4503b109a79be745e0a1_image.png\" alt=\"image.png\" /></p>\n<p>Kafka 连接器基于 Kafka Connect 框架来编写。这个框架主要的目的是提供一套统一的接口和机制,帮助大家把数据写入(Source)和读出(Sink)Kafka 流。使用 Kafka Connect 框架的连接器,默认就获得在 Kafka 流中存储配置、偏移量(Offset)、状态的功能,并且也具备了高可用的属性。</p>\n<p>这个框架与 Kafka 打包安装,也就是说当你安装 Kafka 的时候,就已经安装了 Kafka 本体、Kafka 控制台脚本工具,以及 Kafka Connect 框架。</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/09f93df02a3d46f48950807ce077dc6e_image.png\" alt=\"image.png\" /></p>\n<p>Kafka 连接器通常是已经打包好的 JAR 文件,此时,我们只需要下载这些文件到 Kafka Connect 指定的插件目录,做好配置,就可以直接用一条命令启动 Kafka Connect。比如,下图就是 Debezium 的 MySQL Connector。</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/37faca1b8f204487a0d15d8b97fcb4b9_image.png\" alt=\"image.png\" /></p>\n<h4><a id=\"_39\"></a><strong>单机模式和分布式模式</strong></h4>\n<p>在单机模式(Standalone Mode)下,Kafka Connect 仅跑在单台机器上,并且仅在这台机器上用文件来存储偏移,所以可以做到在这一台机器上重启时可以接续。</p>\n<p>此外,Kafka Connect 本身提供了分布式模式(Distributed Mode)。这个模式会把连接器任务分布到不同的节点上,并且在某个节点失效时自动把任务重新协调到其他节点上。所以,如果使用分布式模式,则不会出现单点失效的问题。</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/bc6a080baede45469bf6e3d5fa7b7ff7_image.png\" alt=\"image.png\" /></p>\n<p>故障转移的时效取决于节点之间的「心跳」(Heartbeat)间隔。心跳指的是节点之间定期发送的,确认节点状态的消息。Kafka Connect 心跳间隔默认是 3 秒,所以通常来说数秒内,集群即可识别到错误并自动转移。</p>\n<p>在分布式模式下,Kafka Connect 会提供三个 Kafka 主题,让连接器存储状态、偏移和配置。连接器启动时会到这几个主题中去找对应自己名字的偏移量,所以我们可以随时删除某个连接器,重启它,或者在别的地方再启动,它都能接续之前的位置继续处理。</p>\n<p>不过需要注意的是,偏移的提交并不是实时进行,而是按间隔。默认的间隔是 10 秒(也有的是 60 秒),这意味着如果已经处理到偏移 550,但是只提交到了 500,而此时任务失败重启,任务就会从偏移 500 处开始重新处理。也就是说,使用 Kafka Connect 我们只能确保流处理中的「至少一次」(At Least Once)逻辑,而不是「正好一次」(Exactly Once)。</p>\n<p>当然,分布式模式使用也非常简单只需要使用 connect-distribute.properties 来启动 Kafka Connect 即可。对连接器的操作使用其暴露的 REST API 接口。</p>\n<p>REST API 通常使用 Kafka Connect 的标准端口 8083。在使用 Shell 时,我们可以用 curl 来进行操作。比如,下面的语句用于创建一个连接器(假设已登录到 Kafka Connect 节点)。</p>\n<pre><code class=\"lang-\">curl -X POST -sH &quot;Accept:application/json&quot; -H &quot;Content-Type:application/json&quot; localhost:8083/connectors/ -d @/tmp/config.json\n</code></pre>\n<h3><a id=\"_58\"></a><strong>快照机制</strong></h3>\n<p>Debezium 常常被用作一个持续捕捉数据变更(插入、更新、删除)的工具,但很多时候我们既需要捕捉新的变更,也需要提取和处理存量的数据。</p>\n<p>比如,我们可能会对业务数据进行瘦身和现代化改造,把三年前的数据全部都从数据库转移到数据湖内,然后持续把更新也复制到湖内。又比如,财务审计部分可能会要求把有记录以来的交易数据全部做一次统计,并且有新数据来的时候,更新我的统计数字。</p>\n<p>这时候,我们就需要借助 Debezium 的快照(Snapshotting)机制。</p>\n<p>快照,指的是在任务启动时,读取整个数据库的内容持续注入 Kafka 流中,并且记录下当时的 Binlog 节点。在读取完成后,从当时记录下的 Binlog 节点开始,继续把新的数据库变更注入 Kafka 流之中。</p>\n<p>Debezium 的快照机制并不神秘,其实就是做了一次锁表,以及 SELECT * 的操作。这里的值得注意的点有两个。</p>\n<ul>\n<li>锁表方式。根据不同的数据库引擎,Debezium 提供不同的锁表方式,比如,MySQL 连接器支持 minimal 即最小化锁表方式(snapshot.locking.mode),只在读取数据库表元数据的时候进行全局锁表避免更新操作(通常耗时不到 1 秒),然后使用 REPEATABLE READS 来进行单表锁定,这使得其他用户的更新操作不受影响。</li>\n<li><strong>局部快照</strong>。有时候数据库里面可能有十年的数据,但我实际上只需要提取 1 个月前开始的数据,这个就需要对快照语句进行定制了。Debezium 支持自定义快照语句(select.statement.overrides),比如加上 WHERE 条件限制,就能只做局部快照。</li>\n</ul>\n<p>为了区别于普通的插入操作,快照读取的消息其 op 字段会是 r。我们也可以通过配置把它修改成和插入一样的 c,方便做统一处理。</p>\n<p>配置方法是在连接器配置中增加一个数据变换:</p>\n<pre><code class=\"lang-\">transforms=snapshotasinsert\ntransforms.snapshotasinsert.type=io.debezium.connector.mysql.transforms.ReadToInsertEvent\n</code></pre>\n<h4><a id=\"_79\"></a><strong>增量快照机制</strong></h4>\n<p>Debezium 的原始快照机制看上去不错,但它有几个问题。</p>\n<ul>\n<li>不能在不终止连接器的情况下,增加新表。</li>\n<li>快照语句(SELECT *)必须持续运行并且不能被打断,无法暂停和恢复。</li>\n<li>在做快照的过程中,变更不能及时打入 Kafka 流之中。</li>\n</ul>\n<p>这几个问题似乎是逻辑限制,很难解决。直到 2019 年,Netflix 开源了一套基于数据水位的数据变更捕捉框架。它使用了流式处理中「水位」(Watermark)的思路,用数据中的顺序字段(比如时间戳)作为高低水位,同时读取存量数据和新的数据变更,并且在二者发生在同一水位区间时做合并。</p>\n<p>Debezium 采取了这个思路,实现了一套增量快照机制。新的增量快照一次只读取部分数据,不需要从头到尾、持续运行,并且支持随时增加新表,还可以随时触发快照,而不是只在任务开始时执行。更重要的是,快照过程中有数据变更,它也可以近乎实时地把变更也打入 Kafka 流之中。</p>\n<p>这又引发了另一个问题,就是 Kafka Connect 框架其实并没有提供一个机制来向正在运行中的连接器发送一条指令。为此,Debezium 提供了一个「信号表」(Signal Table)的新设定。</p>\n<p>用户可以在数据源中,创建一个 dbz_signal 的表,使用特定的格式。通过朝这个表里面插入数据,就可以触发增量快照。</p>\n<p>增量快照的完整使用方式超出了本文设定的范围,感兴趣的读者阅读官方指南或期待我们的下篇文章。</p>\n<h3><a id=\"_95\"></a><strong>配置变更</strong></h3>\n<p>在单机模式下,Kafka Connect 无法变更配置,而在分布式模式下,Kafka Connect 会暴露 REST API 方便我们查看连接器状态以及修改连接器配置。</p>\n<p>比如,修改配置的方式如下。</p>\n<pre><code class=\"lang-\">curl -X PUT -sH &quot;Accept:application/json&quot; -H &quot;Content-Type:application/json&quot; localhost:8083/connectors/{connector-name}/config -d @/tmp/new-config.json\n</code></pre>\n<p>这里需要注意的是,并不是所有的配置都可以在线修改。如果修改之后没有生效,那么我们就只能删除并重建这个连接器。</p>\n<pre><code class=\"lang-\">curl -X DELETE -sH &quot;Accept:application/json&quot; localhost:8083/connectors/{connector-name}\n\ncurl -X POST -sH &quot;Accept:application/json&quot; -H &quot;Content-Type:application/json&quot; localhost:8083/connectors/ -d @/tmp/config.json\n</code></pre>\n<p>这里又有一个问题。如果我们删除并重建这个连接器,会发现它并不会像一个全新的连接器一样,从头开始读取数据变更,而是会接上原来删除的连接器。</p>\n<p>这是因为虽然连接器的进程删除了,但是它所对应的 Kafka 偏移(Offset)仍然保存在 connect-offsets 主题中。当我们新建一个连接器,却使用了原来的名字,则它会从这个主题中找到原来的偏移并接续读取数据。</p>\n<p>这其实也是 Kafka Connect 实现高可用的机制——因为 Kafka 主题本身是高可用的,所以即便一个连接器失效,我们可以立即以同样名字、同样配置起一个新的连接器,自动接续。</p>\n<p>不过,在开发和测试过程中,我们可能需要频繁从头测试全新的连接器,这个机制就很恼人了。解决这个问题的办法有两个。</p>\n<ul>\n<li>创建连接器的时候使用不同的名字。这会让 Kafka Connect 把它当成一个全新的连接器对待,从头开始执行整个逻辑。</li>\n<li>在 connect-offset 主题里面添加一条记录,记录的键是连接器名字,而值则是 null。因为 Kafka 是流,所以这里面的配置也是新的配置覆盖旧的配置,null 覆盖原有配置后,连接器失去了偏移,就会从头开始读取数据了。</li>\n</ul>\n<h3><a id=\"_118\"></a><strong>数据处理</strong></h3>\n<p>Debezium 通常用于做数据入流,以及异构的数据复制和迁移,所以数据会在不同的系统、语言之间流转。在这个过程中,我们经常会遇到数据处理相关的问题。这里,我们来看两个数据处理问题。</p>\n<h4><a id=\"_121\"></a><strong>特殊格式</strong></h4>\n<p>在这里,特殊格式指的是不同系统中使用不同表现方式的数据格式。</p>\n<p>最常见的特殊格式是数字格式。因为每个数据库对数字的定义方式、精度都有不同,数字经常会造成一些麻烦。比如,原来在 SQL Server 中明明是普通的 DECIMAL 类型,但读取到流中,却变成了一串二进制值。又或者,本来是一个 TIMESTAMP 格式,读取的时候却报告数据溢出。</p>\n<p>这些都是因为数据表现方式的不同造成的问题。在 Debezium 中,提供了 decimal.handling.mode 这样的参数帮助我们解决这个问题。它的几个值解读如下。</p>\n<ul>\n<li>precise,即精确读取,这也是默认的形式,如果使用 JSON,就会把 DECIMAL 字段变成一个 Base64 编码的字符串;它实际上是使用了 Java 的 BigDecimal 类的格式来精确存储了数据,但需要我们在消费侧使用 BigDecimal 来读取这个原始值。</li>\n<li>double,即转换成双精度浮点数,可能会损失精度,但是消费侧可直接使用。</li>\n<li>string,即转换成字符串,这也是我个人比较常用的方式,在消费侧通常都可以用很简单的形式来把字符串转换回原格式</li>\n</ul>\n<p>除了数字格式之外,还有各种二进制格式,以及诸如 DATETIME 实际上底层存的是 BIGINT,以及部分语言、数据库没有 UNSIGNED BIGINT 这样的格式而导致的溢出问题等等。这些通常都可以通过设置其各自的 handling.mode 来解决。</p>\n<h4><a id=\"_134\"></a><strong>主题路由</strong></h4>\n<p>之前有说到 Debezium 的两个场景是异构数据复制和数据入流做流式处理。前者通常是数据库到数据湖的复制,而后者则经常是为了实现某些对实时性要求很高的业务。</p>\n<p>比如,一个生产生鲜食品的客户,可能会从 ERP 中的上百个业务表中读取数据更新,并且实时形成一个对产品进行全链路追踪,对生产、流转、分发环节中出现的异常情况进行告警。</p>\n<p>这时候,如果我们还使用 Debezium 默认的配置,就会产生数百个 Kafka 主题,这给消费侧使用这些数据造成了麻烦。由于每条数据上都带了源表信息,我们其实可以把所有的变更都打入到同一个主题里,而消费者只需要订阅这一个主题即可。</p>\n<p>这就需要用到 Debezium 的主题路由功能了。在 Debezium 中,主题路由是由数据变换实现,它的配置方式如下。</p>\n<pre><code class=\"lang-\">transforms=Reroute\ntransforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter\ntransforms.Reroute.topic.regex=(.*)customers_shard(.*)\ntransforms.Reroute.topic.replacement=$1customers_all_shards\n</code></pre>\n<p>其中 transforms.Reroute.topic.regex 是匹配原 Kafka 主题名字的正则表达式,而 transforms.Reroute.topic.replacement 则是替换之后的实际主题名字。如果我们将后者设定为一个固定的主题(customers),则所有匹配的消息都会被路由到这个主题。</p>\n<h3><a id=\"_150\"></a><strong>排错</strong></h3>\n<p>再好的软件,也不可能用得一帆风顺,所以我们也需要知道当它遇到问题的时候如何来排错。</p>\n<p>首先,我们来明确日志位置。如果在运行 Kafka Connect 的时候指定了 LOG_DIR,则日志会输出到你指定的位置,但是要注意运行 Kafka Connect 进程的用户必须有写入权限,否则不会报错,但也不会有日志。如果没有配置,则日志会写入 Kafka 安装目录下的 logs 文件夹。</p>\n<p>接下来,我们来看一些常见的问题。</p>\n<ul>\n<li>权限问题。对于不同的数据库,Debezium 都会有权限要求,如果创建的用户没有赋予所需的权限,就会造成错误。注意缺乏权限不一定会立刻出错,而有可能是要用到该权限的时候出错(比如打快照到最后一张表时出错),所以最好能在创建时就确认使用了正确权限。</li>\n<li>Binlog 失效和清除问题。如果在连接器持续工作的过程中,Binlog 失效或者被清除,而连接器还没有读到该位置,就会导致连接器直接终止。用户需要注意配置好 Binlog 的失效时间并不要在任务运行时清除 Binlog。</li>\n<li>集群节点配置错误问题。因为 Debezium 的节点都是自己手动配置,所以 A 节点正确配置,但 B 节点缺少某些依赖,而在启动的时候,我们又设置了 max 为 1。此时,A 节点将可以正常启动,但是在故障切换到 B 节点时,因为缺少依赖而无法启动。建议尽量使用自动化或者脚本方式配置每个节点。</li>\n<li>dbhistory 错误问题。因为主题配置错误,可能会导致 dbhistory 中没有包含数据库变更记录,导致消息无法通过数据结构检查。此时只能删除重启任务,让它重新去读取最新的数据表结构。</li>\n</ul>\n<p>注意大部分出错的任务都是无法自行恢复的,这也是目前 Kafka Connect 的限制。此时我们只能手动对连接器进行重启。如果需要自动重启功能,则需要借助脚本或者服务来实现。</p>\n<h3><a id=\"_164\"></a><strong>总结</strong></h3>\n<p>Debezium 是目前最成熟和完善的数据采集入 Kafka 的开源组件。它支持的数据库数量,以及整体的成熟程度都远超同类软件。Flink CDC 等数据采集软件也是对其进行了封装。</p>\n<p>使用 Debezium,我们可以快速把各种数据库中的数据录入到流之中,再进行落盘、入湖,或者进行流批一体的处理。由于数据在 Kafka 之中,我们也可以借助 Kafka 的目标连接器,无需一行代码,将数据注入到其他系统之中,或者做异构的数据同步,比如,把数据从 MySQL 同步到 MongoDB。</p>\n<p>它本身是代码开源,社区也非常活跃。在遇到问题时,因为其基于 Kafka Connect 的机制,整体完全透明,所以排查也比较容易,在很多场合相比商业黑盒产品有不可替代的优势。</p>\n<p>本篇文章着重介绍了在生产中使用 Debezium 时可能会遇到的一些顾虑、问题,并且介绍了应对方式和对应的底层原理。希望对读者有所帮助,高效、稳健、开放地运行自己的数据采集业务。</p>\n<h3><a id=\"_173\"></a><strong>本篇作者</strong></h3>\n<p><img src=\"https://dev-media.amazoncloud.cn/524cc3177ae74d7a8430683e8741019e_image.png\" alt=\"image.png\" /></p>\n<p><strong>张玳</strong><br />\nAmazon 解决方案架构师。十余年企业软件研发、设计和咨询经验,专注企业业务与 Amazon 服务的有机结合。译有《软件之道》《精益创业实战》《精益设计》《互联网思维的企业》,著有《体验设计白书》等书籍。</p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭
contact-us