当前位置: 首页 > news >正文

深入解析 Flink CDC 增量快照读取机制

一、Flink-CDC 1.x 痛点

Flink CDC 1.x 使用 Debezium 引擎集成来实现数据采集,支持全量加增量模式,确保数据的一致性。然而,这种集成存在一些痛点需要注意:

  1. 一致性通过加锁保证:在保证数据一致性时,Debezium 需要对读取的库或表加锁。全局锁可能导致数据库出现挂起情况,而表级锁会影响表的写操作。

  2. 只支持单并发读取:Flink CDC 1.x版本只支持单并发读取,对于大表读取非常耗时。如果需要读取的数据量较大,可能会导致性能瓶颈。

  3. 全量读取阶段不支持 checkpoint:CDC 的initial模式下读取分为两个阶段,全量和增量。然而,在全量读取阶段,不支持 checkpoint 的功能。如果出现故障,必须重新进行全量读取操作。

1.1、全局锁

在 Flink CDC 1.x 中,全量读取时的锁机制流程如下:

  1. 开始全量读取:当 Flink CDC 启动全量读取任务时,它会与 MySQL 数据库建立连接,并开始读取源表的数据。

  2. 获取读取的锁:为了保证数据的一致性,Flink CDC 在全量读取过程中需要获取读取的锁。在默认情况下,Flink CDC 使用全局锁(Global Lock)来确保数据的一致性。

  3. 全局锁的获取:Flink CDC 通过向 MySQL 数据库发送命令来获取全局锁。全局锁将阻塞其他对源表进行写操作的事务,确保在全量读取期间不会有数据的变更。

  4. 全量读取数据:一旦获得全局锁,Flink CDC 开始进行全量读取。它会扫描源表的所有数据,并将其传输到目标系统(如 Doris)进行加载和处理。

  5. 释放全局锁:当全量读取完成后,Flink CDC 会释放全局锁,允许其他事务对源表进行写操作。

全局锁的获取可能会导致一些潜在的问题:

  1. 长时间锁定:全局锁通常需要在全量读取过程中长时间持有,这可能会对其他业务操作产生影响。如果全量读取任务的持续时间较长,其他事务可能需要等待较长时间才能执行读写操作。
  2. 性能影响:获取全局锁可能导致性能下降。当全局锁被获取时,其他事务需要等待锁的释放,这可能导致并发性下降,特别是在高负载的情况下。长时间的等待可能会导致数据库挂起(hang),影响整体系统的吞吐量和响应时间。

1.2、表级锁

在 Flink CDC 1.x 中,全量读取表时的表锁机制流程如下:

  1. 开始全量读取:当 Flink CDC 启动全量读取任务时,它会与 MySQL 数据库建立连接,并准备开始读取源表的数据。

  2. 获取表级锁:为了确保数据的一致性,在全量读取期间需要获取源表的表级锁。表级锁将阻塞其他事务对源表进行写操作,以保证读取过程中数据不会发生变化。

  3. 发起锁请求:Flink CDC 向 MySQL 数据库发送请求,尝试获取源表的表级锁。这个请求将被发送到 MySQL 的锁管理器。

  4. 等待锁释放:如果源表的表级锁已经被其他事务占用,Flink CDC 将等待锁释放的信号。在等待期间,Flink CDC 将一直保持连接并监测锁的状态。

  5. 获取锁成功:一旦源表的表级锁被成功获取,Flink CDC 可以开始进行全量数据的读取操作。它会扫描源表的所有数据,并将其传输到目标系统进行加载和处理。

  6. 释放表级锁:当全量读取完成后,Flink CDC 会释放源表的表级锁,允许其他事务对源表进行写操作。

表级锁的获取和释放可能会带来一些潜在的问题:

  1. 数据一致性问题:表级锁在全量读取期间会锁定整张表,以保证数据的一致性。然而,在某些情况下,如果全量读取过程中出现了长时间的阻塞或异常情况,可能会导致数据一致性问题。
  2. 长时间锁定:表级锁通常需要在读取过程中长时间持有,特别是在全量读取时。这可能会对其他事务产生长时间的阻塞,影响系统的响应性能。

二、Flink-CDC 2.x 新特性

Flink 2.x不仅引入了增量快照读取机制,还带来了一些其他功能的改进。以下是对Flink 2.x的主要功能的介绍:

  1. 增量快照读取:Flink 2.x引入了增量快照读取机制,这是一种全新的数据读取方式。该机制支持并发读取和以chunk为粒度进行checkpoint。在增量快照读取过程中,Flink首先根据表的主键将其划分为多个块(chunk),然后将这些块分配给多个读取器并行读取数据。这一机制极大地提高了数据读取的效率。
  2. 精确一次性处理:Flink 2.x引入了Exactly-Once语义,确保数据处理结果的精确一次性。MySQL CDC 连接器是Flink的Source连接器,可以利用Flink的checkpoint机制来确保精确一次性处理。
  3. 动态加表:Flink 2.x支持动态加表,通过使用savepoint来复用之前作业的状态,解决了动态加表的问题。
  4. 无主键表的处理:Flink 2.x对无主键表的读取和处理进行了优化。在无主键表中,Flink可以通过一些额外的字段来识别数据记录的唯一性,从而实现准确的数据读取和处理。

本文主要介绍了Flink 2.x引入的重要特性之一:增量快照读取机制。该机制带来了并发读取、chunk粒度的checkpoint等优势,提升了数据读取的效率。

三、增量快照读取机制

3.1、功能

增量快照读取基本功能:

  1. 并发读取:在增量快照读取期间,源(Source)可以支持并发读取。这意味着多个读取器可以同时读取数据,从而提高读取的速度和效率。
  2. Chunk级别的checkpoint:增量快照读取期间,源可以进行chunk级别的checkpoint。这意味着在读取过程中,可以对数据进行更细粒度的检查点,提高故障恢复的准确性和效率。
  3. 全量增量无锁读取算法:相比于旧的快照机制,全量快照读取不需要源具有数据库锁权限。这降低了对数据库的依赖和权限要求,简化了配置和部署的过程。

3.2、并发读取

增量快照读取的并行读取功能利用了Flink的Source并行度来控制源的并行度。你可以通过设置作业的并行度(parallelism.default)来实现。

在SQL CLI中,可以使用以下命令进行设置:

Flink SQL> SET 'parallelism.default' = 4;

通过将并行度设置为4,Flink CDC Source算子将占用4个slot来并行读取数据。这样可以最大程度地利用系统资源,提高数据读取的效率和速度。

3.3、Chunk级别的checkpoint

3.3.1、Chunk

为了充分利用并行Source,MySQL CDC Source在增量快照读取过程中使用主键列将表划分为多个分片(chunk)。默认情况下,MySQL CDC Source会识别表的主键列,并使用主键中的第一列作为分片列。如果表中没有主键,增量快照读取将失败。你可以通过禁用scan.incremental.snapshot.enabled来回退到旧的快照读取机制。

对于数值和自动增量拆分列,MySQL CDC Source会按照固定步长高效地拆分块。例如,如果你有一个主键列为id的表,类型为自动增量的BIGINT,最小值为0,最大值为100,并设置表选项scan.incremental.snapshot.chunk.size的值为25,那么表将被拆分为以下块:

(-∞, 25),
[25, 50),
[50, 75),
[75, 100),
[100, +∞)

对于其他类型的主键列,MySQL CDC Source执行类似以下形式的语句来获取每个块的低值和高值:SELECT MAX(STR_ID) AS chunk_high FROM (SELECT * FROM TestTable WHERE STR_ID > 'uuid-001' limit 25),然后将块集分割如下:

(-∞, 'uuid-001'),
['uuid-001', 'uuid-009'),
['uuid-009', 'uuid-abc'),
['uuid-abc', 'uuid-def'),
[uuid-def, +∞).

通过这种分片方式,MySQL CDC Source可以高效地划分表数据,以实现并行的增量快照读取。每个读取器将负责读取和处理一个或多个分片的数据,从而提高整体的读取性能和效率。

注意,scan.incremental.snapshot.chunk.size的默认值为8096

3.3.2、原理

在 Flink CDC 中实现 Chunk 级别的 checkpoint 本质是使用 Flink 的 Checkpointing 机制和相应的配置,启用 Chunk 级别的 checkpoint 后,Flink CDC 将在每个 Chunk 完成读取后进行一次 checkpoint,以确保数据的一致性和容错性。

注意,Flink 的 checkpoint 机制包括两种类型的 checkpoint:时间驱动和计数驱动。但Flink CDC 中 Chunk 级别的 checkpoint 并不是直接利用Flink 计数驱动的 checkpoint 来实现的,相反,它是 Flink CDC 根据自身的机制自己实现的。它提供了在每个 Chunk 完成读取时进行一次 checkpoint 的能力,以实现更细粒度的数据一致性和容错性保障。

3.4、全量增量无锁读取算法【重点】

3.4.1、原理

3.4.1.1、全量无锁读取算法流程
  1. 首先,FlinkCDC 会先根据主键和粒度将要读取的表划分为多个分片(chunk)。

  2. 每个 MySQL CDC Source 负责读取一个分片,多个Source 可以并发读取多个chunk,完成当前分片处理后才可以读取下一个分片,直到读取完所有分片。

  3. 在读取每个分片时,FlinkCDC 使用一种名为偏移信号算法的方法来获取快照区块的最终一致输出。以下是该算法的简要步骤:

    • (1) 在读取chunk数据前先记录当前的 binlog 位置,即 LOW 偏移量。

    • (2) 执行语句 SELECT * FROM MyTable WHERE id > chunk_low AND id <= chunk_high,读取chunk分片内的数据并缓存至快照区块。

    • (3) 读取完chunk后再次记录当前的 binlog 位置记录,即 HIGH 偏移量,如下图:
      在这里插入图片描述

    • (4) 读取binlog:从 LOW 偏移量到 HIGH 偏移量之间的 binlog 记录,读取到的数据 append 到上个队列后面,并将此时binlog的最终offset保存至最后,如下图:

    在这里插入图片描述

    • (5) 检查读取到的 binlog 每条记录,如果属于chunk分片范围,则对之前缓存的chunk队列里的数据进行修正,最后将修正后的记录作为快照区块的最终输出,如下图:

    在这里插入图片描述

    • (6) 将此次chunk的元信息【lw,hw等】保存至MySqlSourceReader 进行备份【checkponit阶段也会保存此数据】,为后续增量读取做准备。
  4. 当所有chunk都被消费完毕后,即全量阶段同步完毕,此时将结束Source的并发读取,改为单线程读取binlog日志进行后续同步,此步骤在3.4.1.2、增量无锁读取算法流程。


  • 为了方便理解举例:表当前总数据为9条,Chunk切分粒度scan.incremental.snapshot.chunk.size=5;作业的并发数为2,故Mysql CDC Source 会有两个Task并行读取Chunk01,Chunk02,读取过程如下:

在这里插入图片描述

  • chunk01的数据流转过程如下:由于update#6、update#9 不属于chunk01分片范围故不做处理。

在这里插入图片描述

  • chunk02的数据流转过程如下:update#9、delete#7属于切片范围故修正缓存数据,而 update#4 不属于chunk02分片范围故不做处理。

在这里插入图片描述

FAQ[常见问题]:

  • chunk01 与 chunk02阶段有重叠部分,即 update#9,是否会影响数据准确性?
    • 答:不会,因为chunk只会对属于该分片范围的数据进行处理,故不会重复执行。
  • chunk01 与 chunk02 均未处理 update#4 日志,是否会影响数据准确性?
    • 答:不会,因为当所有chunk阶段结束后,MySqlSourceEnumerator调查员会根据所有chunk中的min(lw) 再次读取binlog,选择性补全数据,具体细节在:3.4.1.2、增量无锁读取算法流程
  • chun02 没有读取update#6的日志,是否会影响数据准确性?
    • 答:不会,因为update#6的日志 < lw,说明chunk02在lw时已经读取到了update#6后的最新数据,故不会影响数据准确性。
3.4.1.2、增量无锁读取算法流程
  1. 当全量阶段同步完毕后, MySqlSourceReader 会将每个 chunk 的 lw,hw等元数据汇报给 MySqlSourceEnumerator调查员,如下图:

在这里插入图片描述

  1. MySqlSourceEnumerator调查员取所有chunk中最小的lw 作为offset 来读取binlog日志,如下图:

    在这里插入图片描述

  2. 当一个 binlog 记录属于一个分片的主键范围内时,如果该记录在这个分片的 hw 之后,则该记录应该发送给下游,如下图:update#6、update#9虽然数据chunk02分片范围但<=hw 故舍弃;而update#4属于chunk01分片范围 且 >hw 代表缺失该条记录故发送至下游。

在这里插入图片描述

  1. 当一个 binlog 记录已经处于所有chunk中最大的hw时,即表示日志记录已经进入 Pure Binlog Phase,对于这样的 binlog 记录,不需进行比较,直接发送给下游,如下图:

在这里插入图片描述

至此增量无锁读取算法流程完毕

3.4.2、源码分析

  • MySql cdc 类图关系如下:

在这里插入图片描述

  • 快照读取chunk分片逻辑:MySqlSnapshotSplitReadTask#doExecute
protected SnapshotResult doExecute(ChangeEventSourceContext context,SnapshotContext snapshotContext,SnapshottingTask snapshottingTask)throws Exception {final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =(RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;ctx.offset = offsetContext;final SignalEventDispatcher signalEventDispatcher =new SignalEventDispatcher(offsetContext.getPartition(),topicSelector.topicNameFor(snapshotSplit.getTableId()),dispatcher.getQueue());final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);LOG.info("Snapshot step 1 - Determining low watermark {} for split {}",lowWatermark,snapshotSplit);((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context)).setLowWatermark(lowWatermark);signalEventDispatcher.dispatchWatermarkEvent(snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);LOG.info("Snapshot step 2 - Snapshotting data");createDataEvents(ctx, snapshotSplit.getTableId());final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);LOG.info("Snapshot step 3 - Determining high watermark {} for split {}",highWatermark,snapshotSplit);signalEventDispatcher.dispatchWatermarkEvent(snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context)).setHighWatermark(highWatermark);return SnapshotResult.completed(ctx.offset);
}
  • chunk分片数据读取后进行格式处理归一逻辑:RecordUtils#normalizedSplitRecords
/*** Normalize the records of snapshot split which represents the split records state on high* watermark. data input: [low watermark event] [snapshot events ] [high watermark event]* [binlog events] [binlog-end event] data output: [low watermark event] [normalized events]* [high watermark event]*/public static List<SourceRecord> normalizedSplitRecords(MySqlSnapshotSplit snapshotSplit,List<SourceRecord> sourceRecords,SchemaNameAdjuster nameAdjuster) {List<SourceRecord> normalizedRecords = new ArrayList<>();Map<Struct, SourceRecord> snapshotRecords = new HashMap<>();List<SourceRecord> binlogRecords = new ArrayList<>();if (!sourceRecords.isEmpty()) {SourceRecord lowWatermark = sourceRecords.get(0);checkState(isLowWatermarkEvent(lowWatermark),String.format("The first record should be low watermark signal event, but is %s",lowWatermark));SourceRecord highWatermark = null;int i = 1;for (; i < sourceRecords.size(); i++) {SourceRecord sourceRecord = sourceRecords.get(i);if (!isHighWatermarkEvent(sourceRecord)) {snapshotRecords.put((Struct) sourceRecord.key(), sourceRecord);} else {highWatermark = sourceRecord;i++;break;}}if (i < sourceRecords.size() - 1) {List<SourceRecord> allBinlogRecords =sourceRecords.subList(i, sourceRecords.size() - 1);for (SourceRecord binlog : allBinlogRecords) {if (isDataChangeRecord(binlog)) {Object[] key =getSplitKey(snapshotSplit.getSplitKeyType(), binlog, nameAdjuster);// 当获取chunk lw hw 的binlog后会先判断是否数据chunk的区间内,只有负责chunk区间内的数据才会被更正if (splitKeyRangeContains(key, snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd())) {binlogRecords.add(binlog);}}}}checkState(isHighWatermarkEvent(highWatermark),String.format("The last record should be high watermark signal event, but is %s",highWatermark));// chunk数据修正逻辑函数:upsertBinlognormalizedRecords =upsertBinlog(snapshotSplit,lowWatermark,highWatermark,snapshotRecords,binlogRecords);}return normalizedRecords;}
  • chunk数据修正逻辑:RecordUtils#upsertBinlog

    private static List<SourceRecord> upsertBinlog(MySqlSplit split,SourceRecord lowWatermarkEvent,SourceRecord highWatermarkEvent,Map<Struct, SourceRecord> snapshotRecords,List<SourceRecord> binlogRecords) {final List<SourceRecord> normalizedBinlogRecords = new ArrayList<>();normalizedBinlogRecords.add(lowWatermarkEvent);// upsert binlog events to snapshot events of splitif (!binlogRecords.isEmpty()) {for (SourceRecord binlog : binlogRecords) {Struct key = (Struct) binlog.key();Struct value = (Struct) binlog.value();if (value != null) {Envelope.Operation operation =Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION));switch (operation) {case UPDATE:Envelope envelope = Envelope.fromSchema(binlog.valueSchema());Struct source = value.getStruct(Envelope.FieldName.SOURCE);Struct updateAfter = value.getStruct(Envelope.FieldName.AFTER);Instant ts =Instant.ofEpochMilli((Long) source.get(Envelope.FieldName.TIMESTAMP));SourceRecord record =new SourceRecord(binlog.sourcePartition(),binlog.sourceOffset(),binlog.topic(),binlog.kafkaPartition(),binlog.keySchema(),binlog.key(),binlog.valueSchema(),envelope.read(updateAfter, source, ts));snapshotRecords.put(key, record);break;case DELETE:snapshotRecords.remove(key);break;case CREATE:snapshotRecords.put(key, binlog);break;case READ:throw new IllegalStateException(String.format("Binlog record shouldn't use READ operation, the the record is %s.",binlog));}}}}normalizedBinlogRecords.addAll(snapshotRecords.values());normalizedBinlogRecords.add(highWatermarkEvent);return normalizedBinlogRecords;}
    
  • 全量快照结束后MySqlSourceReader 整合各个split,汇报给MySqlSourceEnumerator逻辑:handleSourceEvents

@Overridepublic void handleSourceEvents(SourceEvent sourceEvent) {if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {FinishedSnapshotSplitsAckEvent ackEvent = (FinishedSnapshotSplitsAckEvent) sourceEvent;LOG.debug("The subtask {} receives ack event for {} from enumerator.",subtaskId,ackEvent.getFinishedSplits());for (String splitId : ackEvent.getFinishedSplits()) {this.finishedUnackedSplits.remove(splitId);}} else if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {// report finished snapshot splitsLOG.debug("The subtask {} receives request to report finished snapshot splits.",subtaskId);reportFinishedSnapshotSplitsIfNeed();} else if (sourceEvent instanceof BinlogSplitMetaEvent) {LOG.debug("The subtask {} receives binlog meta with group id {}.",subtaskId,((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);} else {super.handleSourceEvents(sourceEvent);}}private void reportFinishedSnapshotSplitsIfNeed() {if (!finishedUnackedSplits.isEmpty()) {final Map<String, BinlogOffset> finishedOffsets = new HashMap<>();for (MySqlSnapshotSplit split : finishedUnackedSplits.values()) {finishedOffsets.put(split.splitId(), split.getHighWatermark());}FinishedSnapshotSplitsReportEvent reportEvent =new FinishedSnapshotSplitsReportEvent(finishedOffsets);context.sendSourceEventToCoordinator(reportEvent);LOG.debug("The subtask {} reports offsets of finished snapshot splits {}.",subtaskId,finishedOffsets);}}
  • MySqlSourceEnumerator 收到全量快照结束后处理逻辑:createBinlogSplit

当 MySqlSourceEnumerator 将所有 split 的 hw 收齐之后,会创建一个 binlog split,该分片包含了需要读取 binlog 的起始位置(所有分片 hw 的最小值)和所有分片的 hw 信息。

private MySqlBinlogSplit createBinlogSplit() {final List<MySqlSnapshotSplit> assignedSnapshotSplit =snapshotSplitAssigner.getAssignedSplits().values().stream().sorted(Comparator.comparing(MySqlSplit::splitId)).collect(Collectors.toList());Map<String, BinlogOffset> splitFinishedOffsets =snapshotSplitAssigner.getSplitFinishedOffsets();final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();BinlogOffset minBinlogOffset = null;for (MySqlSnapshotSplit split : assignedSnapshotSplit) {// find the min binlog offsetBinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {minBinlogOffset = binlogOffset;}finishedSnapshotSplitInfos.add(new FinishedSnapshotSplitInfo(split.getTableId(),split.splitId(),split.getSplitStart(),split.getSplitEnd(),binlogOffset));}// the finishedSnapshotSplitInfos is too large for transmission, divide it to groups and// then transfer themboolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize;return new MySqlBinlogSplit(BINLOG_SPLIT_ID,minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,BinlogOffset.NO_STOPPING_OFFSET,divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,new HashMap<>(),finishedSnapshotSplitInfos.size());}
  • 增量阶段逻辑:shouldEmit

MySqlSourceEnumerator 将 binlog 分片分配给 MySqlSourceReader 时,任务从全量阶段转变为增量阶段。MySqlSourceReader 在读取 binlog 数据后,使用 shouldEmit 来判断是否应该将该记录发送给下游。

/*** Returns the record should emit or not.** <p>The watermark signal algorithm is the binlog split reader only sends the binlog event that* belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid* since the offset is after its high watermark.** <pre> E.g: the data input is :*    snapshot-split-0 info : [0,    1024) highWatermark0*    snapshot-split-1 info : [1024, 2048) highWatermark1*  the data output is:*  only the binlog event belong to [0,    1024) and offset is after highWatermark0 should send,*  only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should send.* </pre>*/private boolean shouldEmit(SourceRecord sourceRecord) {if (isDataChangeRecord(sourceRecord)) {TableId tableId = getTableId(sourceRecord);BinlogOffset position = getBinlogPosition(sourceRecord);// 判断是否处于纯净的binlog区域if (hasEnterPureBinlogPhase(tableId, position)) {return true;}// only the table who captured snapshot splits need to filterif (finishedSplitsInfo.containsKey(tableId)) {RowType splitKeyType =ChunkUtils.getSplitType(statefulTaskContext.getDatabaseSchema().tableFor(tableId));Object[] key =getSplitKey(splitKeyType,sourceRecord,statefulTaskContext.getSchemaNameAdjuster());for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {if (RecordUtils.splitKeyRangeContains(key, splitInfo.getSplitStart(), splitInfo.getSplitEnd())&& position.isAfter(splitInfo.getHighWatermark())) { // 判断该binlog是否属于chunk区间且是否>该chunk的hwreturn true;}}}// not in the monitored splits scope, do not emitreturn false;}// always send the schema change event and signal event// we need record them to state of Flinkreturn true;}private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) {// the existed tables those have finished snapshot readingif (maxSplitHighWatermarkMap.containsKey(tableId)&& position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {return true;}// capture dynamically new added tables// TODO: there is still very little chance that we can't capture new added table.//  That the tables dynamically added after discovering captured tables in enumerator//  and before the lowest binlog offset of all table splits. This interval should be//  very short, so we don't support it for now.return !maxSplitHighWatermarkMap.containsKey(tableId)&& capturedTableFilter.isIncluded(tableId);}

四、相关文档

  • 官方文档
  • Flink CDC 设计文档
  • FAQ

相关文章:

深入解析 Flink CDC 增量快照读取机制

一、Flink-CDC 1.x 痛点 Flink CDC 1.x 使用 Debezium 引擎集成来实现数据采集&#xff0c;支持全量加增量模式&#xff0c;确保数据的一致性。然而&#xff0c;这种集成存在一些痛点需要注意&#xff1a; 一致性通过加锁保证&#xff1a;在保证数据一致性时&#xff0c;Debez…...

060:vue中markdown编辑器mavon-editor的应用示例

第060个 查看专栏目录: VUE ------ element UI 专栏目标 在vue和element UI联合技术栈的操控下&#xff0c;本专栏提供行之有效的源代码示例和信息点介绍&#xff0c;做到灵活运用。 &#xff08;1&#xff09;提供vue2的一些基本操作&#xff1a;安装、引用&#xff0c;模板使…...

使用SCP在Linux中安全复制文件:参数详解

SCP&#xff08;Secure Copy&#xff09;是一个在Linux和其他类Unix系统中使用的命令行工具&#xff0c;用于在本地和远程主机之间安全地复制文件和目录。本文将详细介绍SCP的多个常用参数&#xff0c;并通过示例进行说明。 基本语法 scp [options] source destination其中&a…...

【动态规划精选题目】3、简单多状态模型

此动态规划系列主要讲解大约10个系列【后续持续更新】 本篇讲解简单多状态模型中的9道经典题&#xff0c;会在讲解题目同时给出AC代码 目录 1、按摩师 2、力扣198:打家劫舍1 3、打家劫舍II 4、删除并获得点数 5、 粉刷房子 6、力扣309:买卖股票的最佳时机含冷冻期 7、 买…...

软件测试/测试开发丨Python 虚拟环境及pip环境管理

venv 虚拟环境管理 venv 虚拟环境的优点 独立的 Python 环境&#xff0c;不会产生冲突有助于包的管理删除和卸载方便 venv 使用方法 创建虚拟环境 python3 -m venv test 激活虚拟环境 切换指定文件夹Windows&#xff1a;/Scripts/macOS&#xff1a;/bin/ 执行指令&#xff…...

Mybatis SQL构建器类 - SQL类

下面是一些例子&#xff1a; // Anonymous inner class public String deletePersonSql() {return new SQL() {{DELETE_FROM("PERSON");WHERE("ID #{id}");}}.toString(); }// Builder / Fluent style public String insertPersonSql() {String sql new…...

海云安亮相2023北京国际金融安全论坛,助力金融企业数字化转型降本增效

近日&#xff0c;2023北京国际金融安全论坛暨金融科技标准认证生态大会在北京金融安全产业园成功举办。深圳海云安网络安全技术有限公司&#xff08;以下简称“海云安”&#xff09;受邀参展亮相此次大会。海云安作为国内领先的金融科技服务商&#xff0c;展示了开发安全系列产…...

nodeJS搭建免费代理IP池爬取贴吧图片实战

之前用python写过爬虫&#xff0c;这次想试试nodeJS爬虫爬取贴吧图片&#xff0c;话不多说代码如下&#xff0c;爬取制定吧的前十页所有帖子里的图片 爬取贴吧图片脚本 你得提前创建一个images文件夹 const axios require("axios"); const cheerio require("…...

基于图搜索的自动驾驶规划算法 - BFS,Dijstra,A*

本文将讲解BFS&#xff0c;Dijstra&#xff0c;A*&#xff0c;动态规划的算法原理&#xff0c;不正之处望读者指正&#xff0c;希望有兴趣的读者能在评论区提出一些这些算法的面试考点&#xff0c;共同学习&#xff0c;一起进步 0 图论基础 图有三种&#xff1a;无向图、有向…...

Spring系列学习四、Spring数据访问

Spring数据访问 一、Spring中的JDBC模板介绍1、新建SpringBoot应用2、引入依赖&#xff1a;3、配置数据库连接&#xff0c;注入dbcTemplate对象&#xff0c;执行查询&#xff1a;4&#xff0c;测试验证&#xff1a; 二、整合MyBatis Plus1&#xff0c;在你的项目中添加MyBatis …...

HBase 创建不分裂的表 ( 禁止 Table Split )

注意&#xff1a;由于 HBase 版本众多&#xff0c;配置表的语法在不同版本上会有差异&#xff0c;本文介绍的配置方法是在 1.4.9 版本上测试的&#xff0c;使用 HBase 2.0 的版本需要核实并修改相关配置方法&#xff01; 有时候&#xff0c;出于特殊需要&#xff0c;我们希望对…...

docker入门概念详解

本篇文章对docker的一些基础概念和周边概念进行了详细解释。帮助你可以很好的理解docker是用来干什么的&#xff0c;docker是怎么工作的。其中有docker所运用到的技术解释&#xff0c;docker的不同发展版本&#xff0c;dokcer的架构&#xff0c;docker的生态等等详解。希望本片…...

C++程序设计实践报告【格式】

C程序设计实践报告 原XX工业学院 C程序设计实践报告 题目&#xff1a; 专业&#xff1a; 学号&#xff1a; 姓名&#xff1a; 年 月 日 目录 一、绪…...

浅谈数据仓库运营

一、背景 企业每天都会产生大量的数据&#xff0c;随着时间增长&#xff0c;数据会呈现几何增长&#xff0c;尤其在系统基建基础好的公司。好的数据仓库需要提前规划和好的运营&#xff0c;才能支持企业的发展&#xff0c;为企业提供数据分析基础。 二、目标 提高数据仓库存储…...

系列六、Consul

一、Consul 1.1、概述 Consul是一套开源的分布式服务发现和配置管理系统&#xff0c;由HashiCorp公司用Go语言开发。他提供了微服务系统中的服务治理、配置中心、控制总线等功能。这些功能中的每一个功能都可以单独使用&#xff0c;也可以一起使用以构建全方位的服务网格&…...

Java集合/泛型篇----第一篇

系列文章目录 文章目录 系列文章目录前言一、ArrayList和linkedList的区别二、HashMap和HashTable的区别三、Collection包结构,与Collections的区别四、泛型常用特点前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站…...

集合使用注意事项

集合使用注意事项总结 集合判空 判断所有集合内部的元素是否为空&#xff0c;使用 isEmpty() 方法&#xff0c;而不是 size()0 的方式 这是因为 isEmpty() 方法的可读性更好&#xff0c;并且时间复杂度为 O(1)。 集合转 Map 在使用 java.util.stream.Collectors 类的 toMap()…...

什么是 JavaScript 中的 WeakMap

在 JavaScript 中&#xff0c;WeakMap 是一种特殊的 Map 数据结构&#xff0c;它允许将对象作为键&#xff0c;而且键值对是弱引用的关系。 与 Map 不同的是&#xff0c;WeakMap 的键只能是对象&#xff0c;不能是其他类型的值。同时&#xff0c;当键对象没有任何引用时&#…...

nodejs+vue+ElementUi农产品团购销售系统zto2c

目标是为了完成小区团购平台的设计和实现&#xff0c;在疫情当下的环境&#xff0c;方便小区业主购入生活所需&#xff0c;减小居民的生活压力 采用B/S模式架构系统&#xff0c;开发简单&#xff0c;只需要连接网络即可登录本系统&#xff0c;不需要安装任何客户端。开发工具采…...

nacos入门篇001-安装与启动

1、下载zip包 我这里下载的是版本2.2.0 Nacos 快速开始 2、修改配置文件 2.1集群模式修改成单例模式 vi startup.sh 2.2 修改数据库配置信息 3、初始化数据库 3.1 创建db名称&#xff1a;db_nacos 3.2 执行mysql-schema.sql 3.3 执行完截图&#xff1a; 4、运行脚本启动 …...

WordPress主题大前端DUX v8.3源码下载

DUX主题8.3版本更新内容&#xff1a; 新增&#xff1a;Cloudflare Turnstile 免费验证功能 新增&#xff1a;子菜单页面模版&#xff0c;支持多级页面 新增&#xff1a;手机端文章内表格自动出现横向滚动条&#xff0c;可集体或单独设置滚动宽度 新增&#xff1a;标签云页面模版…...

RabbitMQ之快速入门、上手

前言 学习一样新技术、新框架&#xff0c;最重要的是学习其思想、原理。即原理性思维。 如果是因为工作原因&#xff0c;需要快速上手RabbitMQ&#xff0c;本篇或许适合你。 核心概念 Connection&#xff1a;publisher&#xff0f;consumer 和 broker 之间的 TCP 连接Channel…...

GBASE南大通用-GBase 8s数据库日志模式及切换

一、 GBase 8s数据库共有以下 4 种日志模式&#xff1a;无日志模式、缓冲日志模式、无缓冲日志模式、ANSI 模式。详细介绍如下&#xff1a; 1、无日志模式&#xff08;Non logging&#xff09;&#xff1a; 采用无日志模式时&#xff0c;所有 DML 操作都不会被记录到日志中&…...

侵入式和非侵入式微服务框架的比较

微服务框架可以分为侵入式和非侵入式两种。侵入式框架需要对现有代码进行改造&#xff0c;而非侵入式框架则无需改造现有代码。 侵入式框架 侵入式框架将微服务治理功能嵌入到应用程序中&#xff0c;需要修改应用程序的代码。这种框架的优点是可以提供更强大的功能&#xff0…...

Go语言程序设计-第5章--函数

Go语言程序设计-第5章–函数 5.1 函数声明 每个函数声明都包含一个名字、一个形参列表、一个可选的返回列表以及函数体: func name(parameter-list) (result-list) {body }func add(x int, y int) int { return x y} func sub(x, y int) (z int) {z x - y; return} func f…...

数据被锁?被.mkp 勒索病毒攻击后的拯救行动

导言&#xff1a; 网络安全面临着越来越多的挑战&#xff0c;而.mallox勒索病毒则成为数字威胁中的一股强大势力。它的威胁不仅体现在其高度复杂的加密算法上&#xff0c;还表现在对受感染系统的深度渗透和数据的极大破坏上。以下是.mallox勒索病毒的主要威胁&#xff1a;如不…...

Fine-Tuning Language Models from Human Preferences

Abstract 奖励学习(reward learning)可以将强化学习(RL)应用到由人类判断定义奖励的任务中,通过询问人类问题来构建奖励模型。奖励学习的大部分工作使用了模拟环境,但是关于价值的复杂信息经常是以自然语言的形式表达的。我们相信语言奖励学习是使强化学习在现实世界任务…...

提升数据库性能的关键指南-Oracle AWR报告

文章目录 一、了解AWR报告&#xff1a;数据库性能的仪表盘二、生成AWR报告三、解读AWR报告的关键部分1.报告开头的系统基础信息2.ADDM发现3.负载概览(Load Profile)4.参数文件5.顶级前台等待事件6.SQL 统计信息-顶级SQL7.SGA Advisory AND PAG Advisory 一、了解AWR报告&#x…...

云计算IaaS、PaaS和SaaS之

提供的服务来比较如下两图 示例图 示例图...

解锁大数据世界的钥匙——Hadoop HDFS安装与使用指南

目录 1、前言 2、Hadoop HDFS简介 3、Hadoop HDFS安装与配置 4、Hadoop HDFS使用 5、结语 1、前言 大数据存储与处理是当今数据科学领域中最重要的任务之一。随着互联网的迅速发展和数据量的爆炸性增长&#xff0c;传统的数据存储和处理方式已经无法满足日益增长的需求。…...

健康私人定制网站怎么做/怎样开网站

作者(Alex Rodriguez, Alessandro Laio)提出了一种很简洁优美的聚类算法, 可以识别各种形状的类簇, 并且其超参数很容易确定. 算法思想 该算法的假设是类簇的中心由一些局部密度比较低的点围绕, 并且这些点距离其他有高局部密度的点的距离都比较大. 首先定义两个值: 局部密度以…...

搜索引擎优化是什么?/营销推广seo

ON DEMAND物化视图的特性及其和ON COMMIT物化视图的区别&#xff0c;即前者不刷新(手工或自动)就不更新物化视图&#xff0c;而后者不刷新也会更新物化视图&#xff0c;——只要基表发生了COMMIT。 创建定时刷新的物化视图(指定物化视图每天刷新一次)&#xff1a; SQL> crea…...

网站流量下降的原因/seo外链发布

使用python中最有用的50个数据可视化图形&#xff0c;并且用代码清晰的演示了使用matplotlib和seaborn库的过程并且展示了最终的结果。 一、简介 下面的图表根据不同的目标被分成了7组。例如&#xff0c;如果你想画出一张两个元素的相互关系图&#xff0c;你可以在关联这一章…...

网站排名突然下降解决/360搜索引擎首页

窗口事件 onload 当网页加载完毕的时候触发 会覆盖 只有一个生效 onscroll 滚动窗口的时候出发 鼠标事件 onclick 点击 ondblclick 双击 onmouseover onmouseleave 鼠标移入 移出 onmouseenter onmouseleave 鼠标移入移出 onmousemove 鼠标移动 onmousedown 鼠标按下 onmouseu…...

怎么看公司网站做的好不好哦/太原做网站推广的公司

一 前言温习python 多进程语法的时候&#xff0c;对 join的理解不是很透彻&#xff0c;本文通过代码实践来加深对 join()的认识。multiprocessing 是python提供的跨平台版本的多进程模块。multiprocessing可以充分利用多核&#xff0c;提升程序运行效率。multiprocessing支持子…...

吴江建设网站/产品推广平台排行榜

我们这次实现的命令行计算器&#xff0c;支持加减乘除、括号、浮点数、负数&#xff0c;以及查看历史和退出功能。 主要的思路&#xff1a;read - parse - print - loop。 read 阶段是指读取用户在提示符(cal> )之后输入的字符串。 parse 阶段包括&#xff1a;将用户输入…...