Debezium日常分享系列之:Debezium Engine
Debezium日常分享系列之:Debezium Engine
- 依赖
- 打包项目
- 在代码中
- 输出消息格式
- 消息转换
- 消息转换谓词
- 高级记录使用
- 引擎属性
- 异步引擎属性
- 数据库模式历史属性
- 处理故障
Debezium连接器通常通过部署到Kafka Connect服务来运行,并配置一个或多个连接器来监视上游数据库,并为上游数据库中的所有更改生成数据变更事件。这些数据变更事件被写入Kafka,可以由许多不同的应用程序独立消费。Kafka Connect提供了出色的容错性和可伸缩性,因为它作为分布式服务运行,并确保所有注册和配置的连接器始终运行。例如,即使集群中的一个Kafka Connect端点关闭,剩余的Kafka Connect端点也会重新启动之前在已终止端点上运行的任何连接器,从而最大程度地减少停机时间并消除管理活动。
并非每个应用程序都需要这种级别的容错性和可靠性,他们可能不想依赖外部的Kafka代理和Kafka Connect服务。相反,一些应用程序更愿意直接在应用程序空间中嵌入Debezium连接器。它们仍然需要相同的数据变更事件,但更希望连接器直接将其发送到应用程序而不是在Kafka中持久化。
这个debezium-api模块定义了一个小的API,允许应用程序使用Debezium Engine轻松配置和运行Debezium连接器。
从2.6.0版本开始,Debezium提供了两个DebeziumEngine接口的实现。较旧的EmbeddedEngine实现运行一个只使用一个任务的连接器。连接器按顺序发出所有记录。这是默认的实现。
从2.6.0版本开始,还提供了一个新的AsyncEmbeddedEngine实现。这个实现也只运行一个连接器,但它可以在多个线程中处理记录,并运行多个任务,如果连接器支持的话(目前只有SQL Server和MongoDB的连接器支持在一个连接器中运行多个任务)。由于这两个引擎实现了相同的接口并共享相同的API,下面的代码示例对于任何引擎都是有效的。这两个实现支持相同的配置选项。
然而,新的AsyncEmbeddedEngine提供了一些用于设置和优化并行处理的新配置选项。
依赖
要使用Debezium Engine模块,将debezium-api模块添加到应用程序的依赖项中。还应将debezium-embedded模块添加到依赖项中,这是该API的一个开箱即用的实现。对于Maven,这需要将以下内容添加到应用程序的POM文件中:
<dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>${version.debezium}</version>
</dependency>
<dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${version.debezium}</version>
</dependency>
其中${version.debezium}可以是您使用的Debezium版本,也可以是一个包含Debezium版本字符串的Maven属性的值。
同样,为您的应用程序将使用的每个Debezium连接器添加依赖项。例如,可以将以下内容添加到您的应用程序的Maven POM文件中,以便您的应用程序可以使用MySQL连接器:
<dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>${version.debezium}</version>
</dependency>
或者对于 MongoDB 连接器:
<dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mongodb</artifactId><version>${version.debezium}</version>
</dependency>
本文档的其余部分介绍了如何在应用程序中嵌入 MySQL 连接器。其他连接器的使用方式类似,但连接器特定的配置、主题和事件除外。
打包项目
Debezium使用SPI通过ServiceLoader加载实现。实现可以基于连接器类型,也可以是自定义实现。
有些接口有多个实现。例如,io.debezium.snapshot.spi.SnapshotLock在核心中有一个默认实现,并且针对每个连接器有特定的实现。为了确保Debezium可以定位所需的实现,必须显式地配置构建工具以合并META-INF/services文件。
例如,如果使用的是Maven shade插件,请添加ServicesResourceTransformer转换器,如下例所示:
...
<configuration><transformers>...<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />...</transformers>
...
</configuration>
或者,如果您使用 Maven Assembly 插件,则可以使用 metaInf-services 容器描述符处理程序。
在代码中
您的应用程序需要为每个要运行的连接器实例设置一个嵌入式引擎。io.debezium.engine.DebeziumEngine<R>类作为一个易于使用的包装器,完全管理连接器的生命周期。您可以使用它的构建器API创建DebeziumEngine实例,提供以下内容:
- 您希望以哪种格式接收消息,例如JSON、Avro或Kafka Connect SourceRecord(见链接)
- 配置属性(可能从属性文件中加载),用于定义引擎和连接器的环境
- 一个方法,该方法将被调用以处理连接器产生的每个数据变更事件
以下是一个配置和运行嵌入式引擎的示例代码:
// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/path/to/storage/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");
/* begin connector properties */
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "mysqluser");
props.setProperty("database.password", "mysqlpw");
props.setProperty("database.server.id", "85744");
props.setProperty("topic.prefix", "my-app-connector");
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat");// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(props).notifying(record -> {System.out.println(record);}).build()) {// Run the engine asynchronously ...ExecutorService executor = Executors.newSingleThreadExecutor();executor.execute(engine);// Do something else or wait for a signal or an event
}
// Engine is stopped when the main code is finished
让我们更详细地研究这段代码,从我们在这里重复的前几行开始:
// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/path/to/storage/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");
这将创建一个新的标准Properties对象,用于设置引擎所需的几个字段,无论使用哪个连接器。第一个字段是引擎的名称,它将在连接器产生的源记录和内部状态中使用,因此在应用程序中使用一些有意义的名称。
connector.class字段定义了扩展Kafka Connect org.apache.kafka.connect.source.SourceConnector抽象类的类名;在此示例中,我们指定了Debezium的MySqlConnector类。
当Kafka Connect连接器运行时,它会从源中读取信息,并定期记录定义了它已经处理了多少信息的"偏移量"。如果连接器重新启动,它将使用最后记录的偏移量来确定在源信息中应该从哪里恢复读取。由于连接器不知道也不关心偏移量的存储方式,因此引擎需要提供一种存储和恢复这些偏移量的方式。我们的配置的下几个字段指定了我们的引擎应该使用FileOffsetBackingStore类将偏移量存储在本地文件系统上的/path/to/storage/offset.dat文件中(文件可以任意命名和存储在任何位置)。此外,尽管连接器在生成每个源记录时记录偏移量,但引擎会定期将偏移量刷新到后备存储(在我们的示例中,每分钟刷新一次)。这些字段可以根据您的应用程序需要进行调整。
接下来的几行定义了特定于连接器的字段(在每个连接器文档中有记录),在我们的示例中是MySqlConnector连接器:
/* begin connector properties */props.setProperty("database.hostname", "localhost")props.setProperty("database.port", "3306")props.setProperty("database.user", "mysqluser")props.setProperty("database.password", "mysqlpw")props.setProperty("database.server.id", "85744")props.setProperty("topic.prefix", "my-app-connector")props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory")props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat")
在这里,我们设置了MySQL数据库服务器运行的主机机器的名称和端口号,并定义了将用于连接到MySQL数据库的用户名和密码。请注意,对于MySQL,用户名和密码应对应于已被授予以下MySQL权限的MySQL数据库用户:
- SELECT
- RELOAD
- SHOW DATABASES
- REPLICATION SLAVE
- REPLICATION CLIENT
在读取数据库的一致快照时,需要前三个权限。最后两个权限允许数据库读取通常用于MySQL复制的服务器的binlog。
该配置还包括一个用于MySQL的数值标识符。由于MySQL的binlog是MySQL复制机制的一部分,因此为了读取binlog,MySqlConnector实例必须加入MySQL服务器组,这意味着该服务器ID必须是1到232-1之间的任意整数。在我们的代码中,我们将其设置为一个相当大但有些随机的值,仅供我们的应用程序使用。
该配置还指定了MySQL服务器的逻辑名称。连接器将此逻辑名称包含在其生成的每个源记录的主题字段中,使您的应用程序能够区分这些记录的来源。我们的示例使用了一个名为"products"的服务器名称,这可能是因为数据库包含产品信息。当然,您可以为您的应用程序命名任何有意义的名称。
当MySqlConnector类运行时,它会读取MySQL服务器的binlog,其中包括对由服务器托管的数据库所做的所有数据更改和模式更改。由于所有数据更改都是基于拥有表格的模式结构化的,因此连接器需要跟踪所有模式更改,以便可以正确解码更改事件。连接器记录模式信息,以便如果连接器重新启动并恢复从最后记录的偏移量读取,它知道该偏移量时数据库模式的确切外观。连接器如何记录数据库模式历史记录在我们的配置的最后两个字段中定义,即我们的连接器应该使用FileSchemaHistory类将数据库模式历史更改存储在本地文件系统上的/path/to/storage/schemahistory.dat文件中(同样,此文件可以任意命名和存储在任何位置)。
最后,使用build()方法构建不可变配置。(顺便说一下,我们可以使用Configuration.read(…)方法之一从属性文件中读取配置,而不是通过编程方式构建它。)
现在我们有了一个配置,我们可以创建引擎。以下是相关的代码行:
// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(props).notifying(record -> {System.out.println(record);}).build()) {
}
所有的更改事件都将传递给给定的处理方法,该方法必须与java.util.function.Consumer<R>函数接口的签名匹配,其中<R>必须与调用create()时指定的格式类型匹配。请注意,您的应用程序的处理函数不应抛出任何异常;如果抛出异常,引擎将记录方法抛出的任何异常,并继续处理下一个源记录,但您的应用程序将没有机会处理导致异常的特定源记录,这意味着您的应用程序可能与数据库不一致。
此时,我们有一个已配置并准备运行的DebeziumEngine对象,但它什么也不做。DebeziumEngine设计为由Executor或ExecutorService异步执行:
// Run the engine asynchronously ...
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);// Do something else or wait for a signal or an event
您的应用程序可以通过调用其 close() 方法来安全、优雅地停止引擎:
// At some later time ...
engine.close();
或者,由于引擎支持Closeable接口,当离开try块时,它将被自动调用。
引擎的连接器将停止从源系统读取信息,将所有剩余的更改事件转发给处理函数,并将最新的偏移量刷新到偏移量存储中。只有在所有这些操作完成后,引擎的run()方法才会返回。如果您的应用程序需要在退出之前等待引擎完全停止,您可以使用ExecutorService的shutdown和awaitTermination方法来实现:
try {executor.shutdown();while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {logger.info("Waiting another 5 seconds for the embedded engine to shut down");}
}
catch ( InterruptedException e ) {Thread.currentThread().interrupt();
}
或者,您可以在创建DebeziumEngine时注册CompletionCallback作为回调函数,以便在引擎终止时得到通知。
请记住,当JVM关闭时,它只会等待非守护线程。因此,当您在守护线程上运行引擎时,如果您的应用程序退出,请确保等待引擎进程完成。
为了确保优雅和完全的关闭,并确保每个源记录仅发送一次到应用程序,您的应用程序应始终正确停止引擎。例如,不要依赖于关闭ExecutorService,因为这会中断运行的线程。虽然当线程被中断时,DebeziumEngine确实会终止,但引擎可能无法完全终止,并且当您的应用程序重新启动时,它可能会看到在关闭之前处理的一些相同的源记录。
正如前面提到的,DebeziumEngine接口有两个实现。这两个实现使用相同的API,前面的代码示例对两个版本都有效。唯一的例外是创建DebeziumEngine实例。正如在介绍中提到的,默认情况下使用EmbeddedEngine实现。因此,DebeziumEngine.create(Json.class)方法在内部使用EmbeddedEngine实例。
如果您想使用新的AsyncEmbeddedEngine实例,可以使用以下方法:DebeziumEngine#create(KeyValueHeaderChangeEventFormat<K, V, H> format, String builderFactory)
例如,要创建一个使用AsyncEmbeddedEngine并以JSON作为其键、值和标头格式的嵌入式引擎,您可以使用以下代码:
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class),"io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory").using(props).notifying(record -> {System.out.println(record);}).build()) {// Also run the engine asynchronously ...ExecutorService executor = Executors.newSingleThreadExecutor();executor.execute(engine);// Do something else or wait for a signal or an event
}
输出消息格式
DebeziumEngine#create()可以接受多个不同的参数,这些参数会影响消息被消费者接收的格式。允许的值为:
- Connect.class - 输出值是包装Kafka Connect的SourceRecord的变更事件
- Json.class - 输出值是键和值对,编码为JSON字符串
- JsonByteArray.class - 输出值是键和值对,格式化为JSON并编码为UTF-8字节数组
- Avro.class - 输出值是以Avro序列化记录编码的键和值对
- CloudEvents.class - 输出值是编码为 消息的键和值对
在调用DebeziumEngine#create()时也可以指定标头格式。允许的值为:
- Json.class - 标头值被编码为JSON字符串
- JsonByteArray.class - 标头值被格式化为JSON并编码为UTF-8字节数组
在内部,引擎将数据转换委托给Kafka Connect或Apicurio转换器实现,使用最适合执行转换的算法。可以使用引擎属性对转换器进行参数化以修改其行为。JSON输出格式的示例:
final Properties props = new Properties();
...
props.setProperty("converter.schemas.enable", "false"); // don't include schema in message
...
final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(props).notifying((records, committer) -> {for (ChangeEvent<String, String> r : records) {System.out.println("Key = '" + r.key() + "' value = '" + r.value() + "'");committer.markProcessed(r);}
...
其中 ChangeEvent 数据类型是键/值对。
消息转换
在将消息传递给处理程序之前,可以通过Kafka Connect的简单消息转换(SMT)管道运行它们。每个SMT可以将消息保持不变、修改消息或过滤消息。使用属性transforms配置链。属性包含要应用的转换的逗号分隔的逻辑名称列表。然后,属性transforms.<logical_name>.type为每个转换定义了实现类的名称,transforms.<logical_name>.*配置选项将传递给转换。
配置示例:
final Properties props = new Properties();
...
props.setProperty("transforms", "filter, router"); // (1)
props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.RegexRouter"); // (2)
props.setProperty("transforms.router.regex", "(.*)"); // (3)
props.setProperty("transforms.router.replacement", "trf$1"); // (3)
props.setProperty("transforms.filter.type", "io.debezium.embedded.ExampleFilterTransform"); // (4)
定义了两个转换 - 过滤器和路由器
路由器转换的实现是 org.apache.kafka.connect.transforms.RegexRouter
路由器转换有两个配置选项 - 正则表达式和替换
过滤器转换的实现是 io.debezium.embedded.ExampleFilterTransform
消息转换谓词
谓词可以应用于转换,以使转换成为可选的。
配置示例如下
final Properties props = new Properties();
...
props.setProperty("transforms", "filter"); // (1)
props.setProperty("predicates", "headerExists"); // (2)
props.setProperty("predicates.headerExists.type", "org.apache.kafka.connect.transforms.predicates.HasHeaderKey"); //(3)
props.setProperty("predicates.headerExists.name", "header.name"); // (4)
props.setProperty("transforms.filter.type", "io.debezium.embedded.ExampleFilterTransform");// (5)
props.setProperty("transforms.filter.predicate", "headerExists"); // (6)
props.setProperty("transforms.filter.negate", "true");
定义了一个转换 - 过滤器
定义了一个谓词 - headerExists
headerExists 谓词的实现是 org.apache.kafka.connect.transforms.predicates.HasHeaderKey
headerExists 谓词有一个配置选项 - name
过滤器转换的实现是 io.debezium.embedded.ExampleFilterTransform
过滤器转换需要谓词 headerExists
过滤器转换期望谓词的值被否定,从而使谓词确定标头是否不存在
高级记录使用
对于某些用例,例如尝试批量写入记录或针对异步 API 时,上面描述的功能接口可能具有挑战性。在这些情况下,使用 io.debezium.engine.DebeziumEngine.ChangeConsumer. 接口可能会更容易。
此接口具有单个函数,其签名如下:
/*** Handles a batch of records, calling the {@link RecordCommitter#markProcessed(Object)}* for each record and {@link RecordCommitter#markBatchFinished()} when this batch is finished.* @param records the records to be processed* @param committer the committer that indicates to the system that we are finished*/void handleBatch(List<R> records, RecordCommitter<R> committer) throws InterruptedException;
如Javadoc中所提到的,RecordCommitter对象将在每个记录和每个批次完成时被调用。RecordCommitter接口是线程安全的,这允许对记录进行灵活的处理。
您可以选择重写已处理的记录的偏移量。这可以通过首先调用RecordCommitter#buildOffsets()构建一个新的Offsets对象,使用Offsets#set(String key, Object value)更新偏移量,然后使用更新后的Offsets调用RecordCommitter#markProcessed(SourceRecord record, Offsets sourceOffsets)来完成。
要使用ChangeConsumer API,您必须将接口的实现传递给通知API,如下所示:
class MyChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {...}
}
// Create the engine with this configuration ...
DebeziumEngine<RecordChangeEvent<SourceRecord>> engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(props).notifying(new MyChangeConsumer()).build();
如果使用 JSON 格式(等效格式也适用于其他格式),则代码将如下所示:
class JsonChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<String, String>> {public void handleBatch(List<ChangeEvent<String, String>> records,RecordCommitter<ChangeEvent<String, String>> committer) throws InterruptedException {...}
}
// Create the engine with this configuration ...
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(props).notifying(new JsonChangeConsumer()).build();
引擎属性
除非有默认值,否则以下配置属性是必需的(为了文本格式化,Java 类的包名称被替换为 <…>)。
属性 | 默认值 | 描述 |
---|---|---|
name | 连接器实例的唯一名称。 | |
connector.class | 连接器的 Java 类的名称 | |
offset.storage | 负责连接器偏移持久性的 Java 类的名称。 | |
offset.storage.file.filename | 存储偏移量的文件的路径。 | |
offset.storage.topic | 要存储偏移量的 Kafka 主题的名称。 | |
offset.storage.partitions | 创建偏移量存储主题时使用的分区数。 | |
offset.storage.replication.factor | 创建偏移存储主题时使用的复制因子。 | |
offset.commit.policy | 提交策略的 Java 类的名称。它根据处理的事件数和自上次提交以来经过的时间定义何时触发偏移提交。默认是基于时间间隔的定期提交策略。 | |
offset.flush.interval.ms | 60000 | 尝试提交偏移的间隔。默认值为 1 分钟。 |
offset.flush.timeout.ms | 5000 | 在取消该过程并恢复将来尝试提交的偏移数据之前,等待记录刷新和分区要提交到偏移存储的最大毫秒数。默认值为 5 秒。 |
errors.max.retries | -1 | 失败前连接错误的最大重试次数(-1 = 无限制,0 = 禁用,> 0 = 重试次数)。 |
errors.retry.delay.initial.ms | 300 | 遇到连接错误时重试的初始延迟(以毫秒为单位)。每次重试时此值将加倍,但不会超过 errors.retry.delay.max.ms。 |
errors.retry.delay.max.ms | 10000 | 遇到连接错误时重试之间的最大延迟(以毫秒为单位)。 |
异步引擎属性
属性 | 默认值 | 描述 |
---|---|---|
record.processing.threads | 根据工作负载和可用 CPU 核心数按需分配线程。 | 可用于处理更改事件记录的线程数。如果未指定任何值(默认值),则引擎将使用 Java ThreadPoolExecutor 根据当前工作负载动态调整线程数。最大线程数是给定计算机上的 CPU 核心数。如果指定了值,则引擎将使用 Java 固定线程池方法创建具有指定线程数的线程池。要使用给定计算机上的所有可用核心,请设置占位符值 AVAILABLE_CORES。 |
record.processing.shutdown.timeout.ms | 1000 | 调用任务关闭后等待处理已提交记录的最长时间(以毫秒为单位)。 |
record.processing.order | ORDERED | 确定应如何生成记录。ORDERED记录按顺序处理;也就是说,它们按从数据库获取的顺序生成。UNORDERED记录按非顺序处理;也就是说,它们可以按与源数据库不同的顺序生成。UNORDERED 选项的非顺序处理可实现更好的吞吐量,因为记录在任何 SMT 处理和消息序列化完成后立即生成,而无需等待其他记录。当向引擎提供 ChangeConsumer 方法时,此选项不起作用。 |
record.processing.with.serial.consumer | false | 指定是否应从提供的 Consumer 创建默认的 ChangeConsumer,从而导致串行 Consumer 处理。如果您在使用 API 创建引擎时指定了 ChangeConsumer 接口,则此选项无效。 |
task.management.timeout.ms | 180,000 (3 min) | 引擎等待任务生命周期管理操作(启动和停止)完成的时间(以毫秒为单位)。 |
数据库模式历史属性
一些连接器还需要一组额外的属性来配置数据库模式历史记录:
- MySQL
- SQL Server
- Oracle
- Db2
如果没有正确配置数据库模式历史记录,则连接器将拒绝启动。默认配置需要可用的Kafka集群。对于其他部署,可使用基于文件的数据库模式历史记录存储实现。
属性 | 默认值 | 描述 |
---|---|---|
schema.history.internal | 负责持久保存数据库模式历史的 Java 类的名称。 | |
schema.history.internal.file.filename | 存储数据库架构历史记录的文件的路径。 | |
schema.history.internal.kafka.topic | 存储数据库架构历史记录的 Kafka 主题。 | |
schema.history.internal.kafka.bootstrap.servers | 要连接的 Kafka 集群服务器的初始列表。集群提供用于存储数据库架构历史记录的主题。 |
处理故障
当引擎执行时,其连接器会主动记录每个源记录中的源偏移,并且引擎会定期将这些偏移刷新到持久存储中。当应用程序和引擎正常关闭或崩溃时,重新启动后,引擎及其连接器将从最后记录的偏移处恢复读取源信息。
那么,当嵌入式引擎正在运行时应用程序发生故障会发生什么?结果是,在重新启动后,应用程序很可能会收到一些之前在崩溃之前已经处理过的源记录。这取决于引擎多久将偏移刷新到其存储中(通过offset.flush.interval.ms属性)以及特定连接器在一个批次中返回多少个源记录。最理想的情况是每次都刷新偏移量(例如,将offset.flush.interval.ms设置为0),但即使这样,嵌入式引擎仍然只会在从连接器接收到每个源记录批次后刷新偏移量。
例如,MySQL连接器使用max.batch.size来指定批次中可能出现的源记录的最大数量。即使将offset.flush.interval.ms设置为0,当应用程序在崩溃后重新启动时,可能会看到最多n个重复记录,其中n是批次的大小。如果将offset.flush.interval.ms属性设置得更高,则应用程序可能会看到最多n * m个重复记录,其中n是批次的最大大小,m是在单个偏移刷新间隔期间可能累积的批次数。(显然,可以将嵌入式连接器配置为不进行批处理并始终刷新偏移量,从而使应用程序永远不会接收到任何重复的源记录。但是,这会大大增加开销并降低连接器的吞吐量。)
总的来说,当使用嵌入式连接器时,应用程序在正常操作期间(包括在正常关闭后重新启动)将仅接收到每个源记录一次,但在崩溃或不正确关闭后重新启动后,需要容忍接收到重复事件。如果应用程序需要更严格的确切一次性行为,那么应该使用完整的Debezium平台,该平台可以提供确切一次性保证(即使在崩溃和重新启动后)。
相关文章:
Debezium日常分享系列之:Debezium Engine
Debezium日常分享系列之:Debezium Engine 依赖打包项目在代码中输出消息格式消息转换消息转换谓词高级记录使用引擎属性异步引擎属性数据库模式历史属性处理故障 Debezium连接器通常通过部署到Kafka Connect服务来运行,并配置一个或多个连接器来监视上游…...
I.MX6U 裸机开发20. DDR3 内存知识
I.MX6U 裸机开发20. DDR3 内存知识 一、DDR3内存简介1. DDR发展历程SRAMSDRAMDDR1DDR2DDR3DDR4DDR5 2. 开发板资源3. DDR3的时间参数1. 传输速率2. tRCD3. CL 参数作用取值范围工作原理4. tRC参数原理单位与取值5. tRAS重要性及作用 二、I.MX6U MMDC 控制器1. MMDC简介…...
【R安装】VSCODE安装及R语言环境配置
目录 VSCODE下载及安装VSCODE上配置R语言环境参考 Visual Studio Code(简称“VSCode” )是Microsoft在2015年4月30日Build开发者大会上正式宣布一个运行于 Mac OS X、Windows和 Linux 之上的,针对于编写现代Web和云应用的跨平台源代码编辑器&…...
ES更新问题 Failed to close the XContentBuilder异常
问题描述 使用RestHighLevelClient对文档进行局部更新的时候报错如下: Suppressed: java.lang.IllegalStateException: Failed to close the XContentBuilderat org.elasticsearch.common.xcontent.XContentBuilder.close(XContentBuilder.java:1011)at org.elast…...
svn-git下载
windows: svn 客户端:-------------- TortoiseSVN 安装 下载地址:https://tortoisesvn.net/downloads.html, 页面里有语言包补丁的下载链接。 目前最新版为 1.11.0 下载地址: https://osdn.net/projects/tortoisesvn/storage/1.…...
10个Word自动化办公脚本
在日常工作和学习中,我们常常需要处理Word文档(.docx)。 Python提供了强大的库,如python-docx,使我们能够轻松地进行文档创建、编辑和格式化等操作。本文将分享10个使用Python编写的Word自动化脚本,帮助新…...
Paddle Inference部署推理(十八)
十八:Paddle Inference推理 (C)API详解 3. 使用 CPU 进行预测 注意: 在 CPU 型号允许的情况下,进行预测库下载或编译试尽量使用带 AVX 和 MKL 的版本 可以尝试使用 Intel 的 MKLDNN 进行 CPU 预测加速,默…...
Redis开发02:redis.windows-service.conf 默认配置文件解析与注解
文件位置:redis安装目录下的 redis.windows-service.conf ,存放了redis服务的相关配置,下面列举出默认配置的含义: 配置项含义bind 127.0.0.1限制 Redis 只监听本地回环地址,意味着只能从本地连接 Redis。protected-m…...
redis大key和热key
redis中大key、热key 什么是大key大key可能产生的原因大key可能会造成什么影响如何检测大key如何优化删除大key时可能的问题删除大key的策略 热key热key可能导致的问题解决热key的方法 什么是大key 大key通常是指占用内存空间过大或包含大量元素的键值对。 数据量大ÿ…...
Dubbo 最基础的 RPC 应用(使用 ZooKeeper)
看国内的一些项目时 Dubbo 这个词经常闪现,一直也不以为然,未作搜索,当然也不知道它是做什么用的。直到最近阅读关于大型网站架构相关的书中反复提到 Dubbo 后,觉得不能再对它视而不见。Google 了一下,它是在阿里巴巴创…...
科技赋能:企业如何通过新技术提升竞争力的策略与实践
引言 在当今瞬息万变的商业环境中,科技的迅猛发展正在重新定义行业的游戏规则。无论是小型企业还是跨国巨头,都感受到数字化转型的迫切需求。过去,企业竞争力更多依赖于成本控制、资源调配或市场覆盖,而如今,新技术的引…...
从0开始深度学习(33)——循环神经网络的简洁实现
本章使用Pytorch的API实现RNN上的语言模型训练 0 导入库 import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import Dataset, DataLoader from collections import Counter import re import math from tqdm import tqdm1 准备数据 …...
【FAQ】HarmonyOS SDK 闭源开放能力 — 公共模块
1.问题描述: 文档哪里能找到所有的权限查看该权限是用户级的还是系统级的。 解决方案: 您好,可以看一下下方链接是否可以解决问题: https://developer.huawei.com/consumer/cn/doc/harmonyos-guides-V5/permissions-for-all-V…...
百度 文心一言 vs 阿里 通义千问 哪个好?
背景介绍: 在当前的人工智能领域,随着大模型技术的快速发展,市场上涌现出了众多的大规模语言模型。然而,由于缺乏统一且权威的评估标准,很多关于这些模型能力的文章往往基于主观测试或自行设定的排行榜来评价模型性能…...
内网不出网上线cs
一:本地正向代理目标 如下,本地(10.211.55.2)挂好了基于 reGeorg 的 http 正向代理。代理为: Socks5 10.211.55.2 1080python2 reGeorgSocksProxy.py -l 0.0.0.0 -p 1080 -u http://10.211.55.3:8080/shiro/tunnel.jsp 二:虚拟机配置proxifer 我们是…...
ubuntu22开机自动登陆和开机自动运行google浏览器自动打开网页
一、开机自动登陆 1、打开settings->点击Users 重启系统即可自动登陆桌面 二、开机自动运行google浏览器自动打开网页 1、安装google浏览器 sudo wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb sudo dpkg -i ./google-chrome-stable…...
企业建站高性能的内容管理系统
AnQiCMS 是一款高性能的内容管理系统,基于Go语言开发。它支持多站点、多语言管理,提供灵活的内容发布和模板管理功能,同时,系统内置丰富的利于SEO操作的功能,支持包括自定义字段、文档分类、批量导入导出等功能 AnQiC…...
【爬虫框架:feapder,管理系统 feaplat】
github:https://github.com/Boris-code/feapder 爬虫管理系统 feaplat:http://feapder.com/#/feapder_platform/feaplat 爬虫在线工具库 :http://www.spidertools.cn :https://www.kgtools.cn/1、feapder 简介 对于学习 Python…...
faiss库中ivf-sq(ScalarQuantizer,标量量化)代码解读-5
训练过程 通过gdb调试得到这个ivfsq的训练过程,我尝试对这个内容具体训练过程进行解析,对每个调用栈里面的逻辑和代码进行解读。 步骤函数名称调用位置说明1faiss::IndexIVF::train/faiss/IndexIVF.cpp:1143开始训练,判断是否需要训练第一级…...
代码随想录算法训练营第六十天|Day60 图论
Bellman_ford 队列优化算法(又名SPFA) https://www.programmercarl.com/kamacoder/0094.%E5%9F%8E%E5%B8%82%E9%97%B4%E8%B4%A7%E7%89%A9%E8%BF%90%E8%BE%93I-SPFA.html 本题我们来系统讲解 Bellman_ford 队列优化算法 ,也叫SPFA算法…...
在嵌入式Linux下如何用QT开发UI
在嵌入式 Linux 环境下使用 Qt 开发用户界面 (UI) 是一个常见的选择。Qt 提供了丰富的功能、跨平台支持以及优秀的图形界面开发能力,非常适合用于嵌入式系统。以下是开发流程的详细步骤: 1. 准备开发环境 硬件环境 一块运行嵌入式 Linux 的开发板&…...
【JavaScript】Promise详解
Promise 是 JavaScript 中处理异步操作的一种强大机制。它提供了一种更清晰、更可控的方式来处理异步代码,避免了回调地狱(callback hell)和复杂的错误处理。 基本概念 状态: Pending:初始状态,既不是成功…...
1062 Talent and Virtue
About 900 years ago, a Chinese philosopher Sima Guang wrote a history book in which he talked about peoples talent and virtue. According to his theory, a man being outstanding in both talent and virtue must be a "sage(圣人)"…...
C++《二叉搜索树》
在初阶数据结构中我学习了树基础的概念以及了解了顺序结构的二叉树——堆和链式结构二叉树该如何实现,那么接下来我们将进一步的学习二叉树,在此会先后学习到二叉搜索树、AVL树、红黑树;通过这些的学习将让我们更易于理解后面set、map、哈希等…...
机器学习-神经网络(BP神经网络前向和反向传播推导)
1.1 神经元模型 神经网络(neural networks)方面的研究很早就已出现,今天“神经网络”已是一个相当大的、多学科交叉的学科领域.各相关学科对神经网络的定义多种多样,本书采用目前使用得最广泛的一种,即“神经网络是由具有适应性的简单单元组成的广泛并行互连的网络,它的组织能够…...
基于智能物联网关的车辆超重AI检测应用
超重超载是严重的交通违法行为,超重超载车辆的交通安全风险极高,像是一颗行走的“不定时炸弹”,威胁着社会公众的安全。但总有一些人受到利益驱使,使超重超载的违法违规行为时有发生。 随着物联网和AI技术的发展,针对预…...
记录pbootcms提示:登录失败:表单提交校验失败,请刷新后重试的解决办法
问题描述 pbootcms后台登录的时候提示“登录失败:表单提交校验失败,请刷新后重试!” 解决办法 删除runtime目录,或尝试切换PHP版本,选择7.3或5.6一般就能解决了。...
【JavaScript】同步异步详解
同步和异步是编程中处理任务执行顺序的两种不同方式。理解这两种概念对于编写高效和响应式的应用程序至关重要。 同步(Synchronous) 定义:同步操作是指一个任务必须在下一个任务开始之前完成。换句话说,代码按顺序执行ÿ…...
vue 使用el-button 如何实现多个button 单选
在 Vue 中,如果你想要实现多个 el-button 按钮的 单选(即只能选择一个按钮),可以通过绑定 v-model 或使用事件来处理按钮的选中状态。 下面是两种实现方式,分别使用 v-model 和事件监听来实现单选按钮效果:…...
HarmonyOS-初级(二)
文章目录 应用程序框架UIAbilityArkUI框架 🏡作者主页:点击! 🤖HarmonyOS专栏:点击! ⏰️创作时间:2024年11月28日13点10分 应用程序框架 应用程序框架可以被看做是应用模型的一种实现方式。 …...
做视频资源网站有哪些/百度卖货平台
转自http://tshfang.nipei.com/ " 如何遍历所有记录,不用每次输入特定的值去查询。那么我们使用Oracle游标游标分为:静态游标和引用游标(动态游标)静态游标:由用户定义(隐式游标、显示游标)…...
陕西网站建设咨询/网站上做推广
WPF loading遮罩层 LoadingMask 原文:WPF loading遮罩层 LoadingMask大家可能很纠结在异步query数据的时候想在wpf程序中显示一个loading的遮罩吧 今天就为大家介绍下遮罩的制作 源码下载 点击此处 先上张效果图看看 如果不如您的法眼 可以移步了 或者有更好的效果 可以留言给我…...
ui设计网站模板/seo策略是什么意思
这里写自定义目录标题欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注…...
总部基地网站建设/关键词排名优化网站
当我们需要在C#中实现视频播放器的时候,可以使用如下几种方法: 一、使用MediaPlayer ActiveX控件 在C#中支持视屏播放器最简单的方式就是插入MediaPlayer控件了,在WPF中还内置了对MediaPlayer的封装MediaElement,可以通过它直接打…...
无锡免费网站制作/石家庄新闻网
数据库(Database)是按照数据结构来组织、存储和管理数据的仓库,数据管理不再仅仅是存储和管理数据,而转变成用户所需要的各种数据管理的方式。在信息化社会,充分有效地管理和利用各类信息资源,是进行科学研究和决策管理的前提条件…...
网站做seo屏蔽搜索/宁波免费建站seo排名
CacheItemPriority 枚举 备注 当承载 ASP.NET 应用程序的 Web 服务器缺少内存时,Cache 将有选择地清除项来释放系统内存。当向缓存添加项时,可以为其分配与缓存中存储的其他项相比较的相对优先级。在服务器处理大量请求时,分配了较高优先级值…...