Doris Connector 结合 Flink CDC 实现 MySQL 分库分表
1. 概述
在实际业务系统中为了解决单表数据量大带来的各种问题,我们通常采用分库分表的方式对库表进行拆分,以达到提高系统的吞吐量。
但是这样给后面数据分析带来了麻烦,这个时候我们通常试将业务数据库的分库分表同步到数据仓库时,将这些分库分表的数据,合并成一个库,一个表。便于我们后面的数据分析
本篇文档我们就演示怎么基于Flink CDC 并结合 Apache Doris Flink Connector 及 Doris Stream Load的两阶段提交,实现MySQL数据库分库分表实时高效的接入到 Apache Doris 数据仓库中进行分析。
1.1 什么是CDC
CDC是(Change Data Capture 变更数据获取
)的简称。
核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
CDC 技术应用场景也非常广泛,包括:
● 数据分发,将一个数据源分发给多个下游,常用于业务解耦、微服务。
● 数据集成,将分散异构的数据源集成到数据仓库中,消除数据孤岛,便于后续的分析。
● 数据迁移,常用于数据库备份、容灾等。
1.2 为什么选择Flink CDC
Flink CDC 基于数据库日志的Change Data Caputre 技术,实现了全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。
目前,Flink CDC 的上游已经支持了 MySQL、MariaDB、PG、Oracle、MongoDB 、Oceanbase、TiDB、SQLServer等数据库。
Flink CDC 的下游则更加丰富,支持写入 Kafka、Pulsar 消息队列,也支持写入 Hudi、Iceberg 、Doris等,支持写入各种数据仓库及数据湖中。
同时,通过 Flink SQL 原生支持的 Changelog 机制,可以让 CDC 数据的加工变得非常简单。用户可以通过 SQL 便能实现数据库全量和增量数据的清洗、打宽、聚合等操作,极大地降低了用户门槛。 此外, Flink DataStream API 支持用户编写代码实现自定义逻辑,给用户提供了深度定制业务的自由度
Flink CDC 技术的核心是支持将表中的全量数据和增量数据做实时一致性的同步与加工,让用户可以方便地获每张表的实时一致性快照。比如一张表中有历史的全量业务数据,也有增量的业务数据在源源不断写入,更新。Flink CDC 会实时抓取增量的更新记录,实时提供与数据库中一致性的快照,如果是更新记录,会更新已有数据。如果是插入记录,则会追加到已有数据,整个过程中,Flink CDC 提供了一致性保障,即不重不丢。
FLink CDC 如下优势:
- Flink 的算子和 SQL 模块更为成熟和易用
- Flink 作业可以通过调整算子并行度的方式,轻松扩展处理能力
- Flink 支持高级的状态后端(State Backends),允许存取海量的状态数据
- Flink 提供更多的 Source 和 Sink 等生态支持
- Flink 有更大的用户基数和活跃的支持社群,问题更容易解决
而且 Flink Table / SQL 模块将数据库表和变动记录流(例如 CDC 的数据流)看做是同一事物的两面,因此内部提供的 Upsert 消息结构(+I
表示新增、-U
表示记录更新前的值、+U
表示记录更新后的值,-D
表示删除)可以与 Debezium 等生成的变动记录一一对应。
1.3 什么是Apache Doris
Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris的分布式架构非常简洁,易于运维,并且可以支持10PB以上的超大数据集。
Apache Doris可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。令您的数据分析工作更加简单高效!
1.4 Two-phase commit
1.4.1 什么是 two-phase commit (2PC)
在分布式系统中,为了让每个节点都能够感知到其他节点的事务执行状况,需要引入一个中心节点来统一处理所有节点的执行逻辑,这个中心节点叫做协调者(coordinator),被中心节点调度的其他业务节点叫做参与者(participant)。
2PC将分布式事务分成了两个阶段,两个阶段分别为提交请求(投票)和提交(执行)。协调者根据参与者的响应来决定是否需要真正地执行事务,具体流程如下。
提交请求(投票)阶段
- 协调者向所有参与者发送prepare请求与事务内容,询问是否可以准备事务提交,并等待参与者的响应。
- 参与者执行事务中包含的操作,并记录undo日志(用于回滚)和redo日志(用于重放),但不真正提交。
- 参与者向协调者返回事务操作的执行结果,执行成功返回yes,否则返回no。
提交(执行)阶段
分为成功与失败两种情况。
- 若所有参与者都返回yes,说明事务可以提交:
- 协调者向所有参与者发送commit请求。
- 参与者收到commit请求后,将事务真正地提交上去,并释放占用的事务资源,并向协调者返回ack。
- 协调者收到所有参与者的ack消息,事务成功完成。
- 若有参与者返回no或者超时未返回,说明事务中断,需要回滚:
- 协调者向所有参与者发送rollback请求。
- 参与者收到rollback请求后,根据undo日志回滚到事务执行前的状态,释放占用的事务资源,并向协调者返回ack。
- 协调者收到所有参与者的ack消息,事务回滚完成。
1.4 Flink 2PC
Flink作为流式处理引擎,自然也提供了对exactly once语义的保证。端到端的exactly once语义,是输入、处理逻辑、输出三部分协同作用的结果。Flink内部依托检查点机制和轻量级分布式快照算法ABS保证exactly once。而要实现精确一次的输出逻辑,则需要施加以下两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write)。
预提交阶段的流程
每当需要做checkpoint时,JobManager就在数据流中打入一个屏障(barrier),作为检查点的界限。屏障随着算子链向下游传递,每到达一个算子都会触发将状态快照写入状态后端的动作。当屏障到达Kafka sink后,通过KafkaProducer.flush()方法刷写消息数据,但还未真正提交。接下来还是需要通过检查点来触发提交阶段
提交阶段流程
只有在所有检查点都成功完成这个前提下,写入才会成功。这符合前文所述2PC的流程,其中JobManager为协调者,各个算子为参与者(不过只有sink一个参与者会执行提交)。一旦有检查点失败,notifyCheckpointComplete()方法就不会执行。如果重试也不成功的话,最终会调用abort()方法回滚事务
1.5 Doris Stream Load 2PC
1.5.1 Stream load
Stream load 是Apache Doris 提供的一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。
Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。
使用方法,用户通过Http Client 进行操作,也可以使用Curl命令进行
curl --location-trusted -u user:passwd [-H ""...] -T data.file -H "label:label" -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
这里为了是防止用户重复导入相同的数据,使用了导入任务标识label。强烈推荐用户同一批次数据使用相同的 label。这样同一批次数据的重复请求只会被接受一次,保证了 At-Most-Once
1.5.2 Stream load 2PC
Aapche Doris 最早的Stream Load 是没有两阶段提交的,导入数据的时候直接通过 Stream Load 的 http 接口完成数据导入,只有成功和失败。
- 这种在正常情况下是没有问题的,在分布式环境下可能为因为某一个导入任务是失败导致两端数据不一致的情况,特别是在Doris Flink Connector里,之前的Doris Flink Connector 数据导入失败需要用户自己控制,做异常处理,比如如果导入失败之后,将数据保存到指定的地方(例如Kafka),然后人工手动处理。
- 如果Flink Job因为其他为题突然挂掉,这样会造成部分数据成功,部分数据失败,而且失败的数据因为没有checkpoint,重新启动Job也没办法重新消费失败的数据,造成两端数据不一致
为了解决上面的这些问题,保证两端数据一致性,我们实现了Doris Stream Load 2PC,原理如下:
- 提交分成两个阶段
- 第一阶段,提交数据写入任务,这个时候数据写入成功后,数据状态是不可见的,事务状态是PRECOMMITTED
- 数据写入成功之后,用户触发Commit操作,将事务状态变成VISIBLE,这个时候数据可以查询到
- 如果用户要方式这一批数据只需要通过事务ID,对事务触发abort操作,这批数据将会被自动删除掉
1.5.3 Stream load 2PC使用方式
- 在be.conf中配置
disable_stream_load_2pc=false
(重启生效) - 并且 在 HEADER 中声明
two_phase_commit=true
。
发起预提交:
curl --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load
触发事务Commit操作
curl -X PUT --location-trusted -u user:passwd -H "txn_id:18036" -H "txn_operation:commit" http://fe_host:http_port/api/{db}/_stream_load_2pc
对事物触发abort操作
curl -X PUT --location-trusted -u user:passwd -H "txn_id:18037" -H "txn_operation:abort" http://fe_host:http_port/api/{db}/_stream_load_2pc
1.6 Doris Flink Connector 2PC
我们之前提供了Doris Flink Connector ,支持对Doris表数据的读,Upsert、delete(Unique key模型),但是存在可能因为Job失败或者其他异常情况导致两端数据不一致的问题。
为了解决这些问题,我们基于FLink 2PC 和Doris Stream Load 2PC对Doris Connector进行了改造升级,保证两端exactly once。
- 我们会在内存中维护读写的buffer,在启动的时候,开启写入,并异步的提交,期间通过http chunked的方式持续的将数据写入到BE,直到Checkpoint的时候,停止写入,这样做的好处是避免用户频繁提交http带来的开销,Checkpoint完成后会开启下一阶段的写入
- 在这个Checkpoint期间,可能是多个task任务同时在写一张表的数据,这些我们都会在这个Checkpoint期间对应一个全局的label,在checkpoint的时候将这个label对应的写入数据的事务进行统一的一次提交,将数据状态变成可见,
- 如果失败 Flink 在重启的时候会对这些数据通过checkpoint进行回放。
- 这样就可以保证Doris两端数据的一致
2. 系统架构
下面我们通过一个完整示例来看怎么去通过Doris Flink Connector最新版本(支持两阶段提交),来完成整合Flink CDC实现MySQL分库分表实时采集入库
- 这里我们通过Flink CDC 来完成MySQL分库分表数据采集
- 然后通过Doris Flink Connector来完成数据的入库
- 最后利用Doris的高并发、高性能的OLAP分析计算能力对外提供数据服务
3. MySQL 安装配置
3.1 安装MySQL
快速使用Docker安装配置Mysql
3.2 开启Mysql binlog
进入 Docker 容器修改/etc/my.cnf 文件,在 [mysqld] 下面添加以下内容,
-
log_bin=mysql_bin
-
binlog-format=Row
-
server-id=1
然后重启Mysql
systemctl restart mysqld
3.3 准备数据
这里演示我们准备了两个库emp_1,emp_2,每个库下面各种主备了两张表employees_1,employees_2。并给出了一下初始化数据
-
CREATE DATABASE emp_1;
-
USE emp_1;
-
CREATE TABLE employees_1 (
-
emp_no INT NOT NULL,
-
birth_date DATE NOT NULL,
-
first_name VARCHAR(14) NOT NULL,
-
last_name VARCHAR(16) NOT NULL,
-
gender ENUM ('M','F') NOT NULL,
-
hire_date DATE NOT NULL,
-
PRIMARY KEY (emp_no)
-
);
-
-
INSERT INTO `employees_1` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
-
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
-
(10003,'1959-12-03','Parto','Bamford','M','1986-08-28'),
-
(10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'),
-
(10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12'),
-
(10006,'1953-04-20','Anneke','Preusig','F','1989-06-02'),
-
(10007,'1957-05-23','Tzvetan','Zielinski','F','1989-02-10'),
-
(10008,'1958-02-19','Saniya','Kalloufi','M','1994-09-15'),
-
(10009,'1952-04-19','Sumant','Peac','F','1985-02-18'),
-
(10010,'1963-06-01','Duangkaew','Piveteau','F','1989-08-24'),
-
(10011,'1953-11-07','Mary','Sluis','F','1990-01-22'),
-
(10012,'1960-10-04','Patricio','Bridgland','M','1992-12-18'),
-
(10013,'1963-06-07','Eberhardt','Terkki','M','1985-10-20'),
-
(10014,'1956-02-12','Berni','Genin','M','1987-03-11'),
-
(10015,'1959-08-19','Guoxiang','Nooteboom','M','1987-07-02'),
-
(10016,'1961-05-02','Kazuhito','Cappelletti','M','1995-01-27'),
-
(10017,'1958-07-06','Cristinel','Bouloucos','F','1993-08-03'),
-
(10018,'1954-06-19','Kazuhide','Peha','F','1987-04-03'),
-
(10019,'1953-01-23','Lillian','Haddadi','M','1999-04-30'),
-
(10020,'1952-12-24','Mayuko','Warwick','M','1991-01-26'),
-
(10021,'1960-02-20','Ramzi','Erde','M','1988-02-10'),
-
(10022,'1952-07-08','Shahaf','Famili','M','1995-08-22'),
-
(10023,'1953-09-29','Bojan','Montemayor','F','1989-12-17'),
-
(10024,'1958-09-05','Suzette','Pettey','F','1997-05-19'),
-
(10025,'1958-10-31','Prasadram','Heyers','M','1987-08-17'),
-
(10026,'1953-04-03','Yongqiao','Berztiss','M','1995-03-20'),
-
(10027,'1962-07-10','Divier','Reistad','F','1989-07-07'),
-
(10028,'1963-11-26','Domenick','Tempesti','M','1991-10-22'),
-
(10029,'1956-12-13','Otmar','Herbst','M','1985-11-20'),
-
(10030,'1958-07-14','Elvis','Demeyer','M','1994-02-17'),
-
(10031,'1959-01-27','Karsten','Joslin','M','1991-09-01'),
-
(10032,'1960-08-09','Jeong','Reistad','F','1990-06-20'),
-
(10033,'1956-11-14','Arif','Merlo','M','1987-03-18'),
-
(10034,'1962-12-29','Bader','Swan','M','1988-09-21'),
-
(10035,'1953-02-08','Alain','Chappelet','M','1988-09-05'),
-
(10036,'1959-08-10','Adamantios','Portugali','M','1992-01-03');
-
-
CREATE TABLE employees_2 (
-
emp_no INT NOT NULL,
-
birth_date DATE NOT NULL,
-
first_name VARCHAR(14) NOT NULL,
-
last_name VARCHAR(16) NOT NULL,
-
gender ENUM ('M','F') NOT NULL,
-
hire_date DATE NOT NULL,
-
PRIMARY KEY (emp_no)
-
);
-
-
INSERT INTO `employees_2` VALUES (10037,'1963-07-22','Pradeep','Makrucki','M','1990-12-05'),
-
(10038,'1960-07-20','Huan','Lortz','M','1989-09-20'),
-
(10039,'1959-10-01','Alejandro','Brender','M','1988-01-19'),
-
(10040,'1959-09-13','Weiyi','Meriste','F','1993-02-14'),
-
(10041,'1959-08-27','Uri','Lenart','F','1989-11-12'),
-
(10042,'1956-02-26','Magy','Stamatiou','F','1993-03-21'),
-
(10043,'1960-09-19','Yishay','Tzvieli','M','1990-10-20'),
-
(10044,'1961-09-21','Mingsen','Casley','F','1994-05-21'),
-
(10045,'1957-08-14','Moss','Shanbhogue','M','1989-09-02'),
-
(10046,'1960-07-23','Lucien','Rosenbaum','M','1992-06-20'),
-
(10047,'1952-06-29','Zvonko','Nyanchama','M','1989-03-31'),
-
(10048,'1963-07-11','Florian','Syrotiuk','M','1985-02-24'),
-
(10049,'1961-04-24','Basil','Tramer','F','1992-05-04'),
-
(10050,'1958-05-21','Yinghua','Dredge','M','1990-12-25'),
-
(10051,'1953-07-28','Hidefumi','Caine','M','1992-10-15'),
-
(10052,'1961-02-26','Heping','Nitsch','M','1988-05-21'),
-
(10053,'1954-09-13','Sanjiv','Zschoche','F','1986-02-04'),
-
(10054,'1957-04-04','Mayumi','Schueller','M','1995-03-13');
-
-
-
CREATE DATABASE emp_2;
-
-
USE emp_2;
-
-
CREATE TABLE employees_1 (
-
emp_no INT NOT NULL,
-
birth_date DATE NOT NULL,
-
first_name VARCHAR(14) NOT NULL,
-
last_name VARCHAR(16) NOT NULL,
-
gender ENUM ('M','F') NOT NULL,
-
hire_date DATE NOT NULL,
-
PRIMARY KEY (emp_no)
-
);
-
-
-
INSERT INTO `employees_1` VALUES (10055,'1956-06-06','Georgy','Dredge','M','1992-04-27'),
-
(10056,'1961-09-01','Brendon','Bernini','F','1990-02-01'),
-
(10057,'1954-05-30','Ebbe','Callaway','F','1992-01-15'),
-
(10058,'1954-10-01','Berhard','McFarlin','M','1987-04-13'),
-
(10059,'1953-09-19','Alejandro','McAlpine','F','1991-06-26'),
-
(10060,'1961-10-15','Breannda','Billingsley','M','1987-11-02'),
-
(10061,'1962-10-19','Tse','Herber','M','1985-09-17'),
-
(10062,'1961-11-02','Anoosh','Peyn','M','1991-08-30'),
-
(10063,'1952-08-06','Gino','Leonhardt','F','1989-04-08'),
-
(10064,'1959-04-07','Udi','Jansch','M','1985-11-20'),
-
(10065,'1963-04-14','Satosi','Awdeh','M','1988-05-18'),
-
(10066,'1952-11-13','Kwee','Schusler','M','1986-02-26'),
-
(10067,'1953-01-07','Claudi','Stavenow','M','1987-03-04'),
-
(10068,'1962-11-26','Charlene','Brattka','M','1987-08-07'),
-
(10069,'1960-09-06','Margareta','Bierman','F','1989-11-05'),
-
(10070,'1955-08-20','Reuven','Garigliano','M','1985-10-14'),
-
(10071,'1958-01-21','Hisao','Lipner','M','1987-10-01'),
-
(10072,'1952-05-15','Hironoby','Sidou','F','1988-07-21'),
-
(10073,'1954-02-23','Shir','McClurg','M','1991-12-01'),
-
(10074,'1955-08-28','Mokhtar','Bernatsky','F','1990-08-13'),
-
(10075,'1960-03-09','Gao','Dolinsky','F','1987-03-19'),
-
(10076,'1952-06-13','Erez','Ritzmann','F','1985-07-09'),
-
(10077,'1964-04-18','Mona','Azuma','M','1990-03-02'),
-
(10078,'1959-12-25','Danel','Mondadori','F','1987-05-26'),
-
(10079,'1961-10-05','Kshitij','Gils','F','1986-03-27'),
-
(10080,'1957-12-03','Premal','Baek','M','1985-11-19'),
-
(10081,'1960-12-17','Zhongwei','Rosen','M','1986-10-30'),
-
(10082,'1963-09-09','Parviz','Lortz','M','1990-01-03'),
-
(10083,'1959-07-23','Vishv','Zockler','M','1987-03-31'),
-
(10084,'1960-05-25','Tuval','Kalloufi','M','1995-12-15');
-
-
-
CREATE TABLE employees_2(
-
emp_no INT NOT NULL,
-
birth_date DATE NOT NULL,
-
first_name VARCHAR(14) NOT NULL,
-
last_name VARCHAR(16) NOT NULL,
-
gender ENUM ('M','F') NOT NULL,
-
hire_date DATE NOT NULL,
-
PRIMARY KEY (emp_no)
-
);
-
-
INSERT INTO `employees_2` VALUES (10085,'1962-11-07','Kenroku','Malabarba','M','1994-04-09'),
-
(10086,'1962-11-19','Somnath','Foote','M','1990-02-16'),
-
(10087,'1959-07-23','Xinglin','Eugenio','F','1986-09-08'),
-
(10088,'1954-02-25','Jungsoon','Syrzycki','F','1988-09-02'),
-
(10089,'1963-03-21','Sudharsan','Flasterstein','F','1986-08-12'),
-
(10090,'1961-05-30','Kendra','Hofting','M','1986-03-14'),
-
(10091,'1955-10-04','Amabile','Gomatam','M','1992-11-18'),
-
(10092,'1964-10-18','Valdiodio','Niizuma','F','1989-09-22'),
-
(10093,'1964-06-11','Sailaja','Desikan','M','1996-11-05'),
-
(10094,'1957-05-25','Arumugam','Ossenbruggen','F','1987-04-18'),
-
(10095,'1965-01-03','Hilari','Morton','M','1986-07-15'),
-
(10096,'1954-09-16','Jayson','Mandell','M','1990-01-14'),
-
(10097,'1952-02-27','Remzi','Waschkowski','M','1990-09-15'),
-
(10098,'1961-09-23','Sreekrishna','Servieres','F','1985-05-13'),
-
(10099,'1956-05-25','Valter','Sullins','F','1988-10-18'),
-
(10100,'1953-04-21','Hironobu','Haraldson','F','1987-09-21'),
-
(10101,'1952-04-15','Perla','Heyers','F','1992-12-28'),
-
(10102,'1959-11-04','Paraskevi','Luby','F','1994-01-26'),
-
(10103,'1953-11-26','Akemi','Birch','M','1986-12-02'),
-
(10104,'1961-11-19','Xinyu','Warwick','M','1987-04-16'),
-
(10105,'1962-02-05','Hironoby','Piveteau','M','1999-03-23'),
-
(10106,'1952-08-29','Eben','Aingworth','M','1990-12-19'),
-
(10107,'1956-06-13','Dung','Baca','F','1994-03-22'),
-
(10108,'1952-04-07','Lunjin','Giveon','M','1986-10-02'),
-
(10109,'1958-11-25','Mariusz','Prampolini','F','1993-06-16'),
-
(10110,'1957-03-07','Xuejia','Ullian','F','1986-08-22'),
-
(10111,'1963-08-29','Hugo','Rosis','F','1988-06-19'),
-
(10112,'1963-08-13','Yuichiro','Swick','F','1985-10-08'),
-
(10113,'1963-11-13','Jaewon','Syrzycki','M','1989-12-24'),
-
(10114,'1957-02-16','Munir','Demeyer','F','1992-07-17'),
-
(10115,'1964-12-25','Chikara','Rissland','M','1986-01-23'),
-
(10116,'1955-08-26','Dayanand','Czap','F','1985-05-28');
4. Doris 安装配置
这里我们以单机版为例
首先下载 Doris 1.1 release版本:
https://doris.apache.org/downloads/downloads.html
解压到指定目录
tar zxvf apache-doris-1.1.0-bin.tar.gz -C doris-1.1
解压后的目录结构是这样:
-
.
-
├── apache_hdfs_broker
-
│ ├── bin
-
│ ├── conf
-
│ └── lib
-
├── be
-
│ ├── bin
-
│ ├── conf
-
│ ├── lib
-
│ ├── log
-
│ ├── minidump
-
│ ├── storage
-
│ └── www
-
├── derby.log
-
├── fe
-
│ ├── bin
-
│ ├── conf
-
│ ├── doris-meta
-
│ ├── lib
-
│ ├── log
-
│ ├── plugins
-
│ ├── spark-dpp
-
│ ├── temp_dir
-
│ └── webroot
-
└── udf
-
├── include
-
└── lib
配置fe和be
-
cd doris-1.0
-
# 配置 fe.conf 和 be.conf,这两个文件分别在fe和be的conf目录下
-
打开这个 priority_networks
-
修改成自己的IP地址,注意这里是CIDR方式配置IP地址
-
例如我本地的IP是172.19.0.12,我的配置如下:
-
priority_networks = 172.19.0.0/24
-
-
######
-
在be.conf配置文件最后加上下面这个配置
-
disable_stream_load_2pc=false
- 注意这里默认只需要修改
fe.conf
和be.conf
同样的上面这个配置就行了 - 默认fe元数据的目录在
fe/doris-meta
目录下 - be的数据存储在
be/storage
目录下
启动 FE
sh fe/bin/start_fe.sh --daemon
启动BE
sh be/bin/start_be.sh --daemon
MySQL命令行连接FE,这里新安装的Doris集群默认用户是root和admin,密码是空
-
mysql -uroot -P9030 -h127.0.0.1
-
Welcome to the MySQL monitor. Commands end with ; or \g.
-
Your MySQL connection id is 41
-
Server version: 5.7.37 Doris version trunk-440ad03
-
-
Copyright (c) 2000, 2022, Oracle and/or its affiliates.
-
-
Oracle is a registered trademark of Oracle Corporation and/or its
-
affiliates. Other names may be trademarks of their respective
-
owners.
-
-
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
-
-
mysql> show frontends;
-
+--------------------------------+-------------+-------------+----------+-----------+---------+----------+----------+------------+------+-------+-------------------+---------------------+----------+--------+---------------+------------------+
-
| Name | IP | EditLogPort | HttpPort | QueryPort | RpcPort | Role | IsMaster | ClusterId | Join | Alive | ReplayedJournalId | LastHeartbeat | IsHelper | ErrMsg | Version | CurrentConnected |
-
+--------------------------------+-------------+-------------+----------+-----------+---------+----------+----------+------------+------+-------+-------------------+---------------------+----------+--------+---------------+------------------+
-
| 172.19.0.12_9010_1654681464955 | 172.19.0.12 | 9010 | 8030 | 9030 | 9020 | FOLLOWER | true | 1690644599 | true | true | 381106 | 2022-06-22 18:13:34 | true | | trunk-440ad03 | Yes |
-
+--------------------------------+-------------+-------------+----------+-----------+---------+----------+----------+------------+------+-------+-------------------+---------------------+----------+--------+---------------+------------------+
-
1 row in set (0.01 sec)
-
将BE节点加入到集群中
mysql>alter system add backend "172.19.0.12:9050";
这里是你自己的IP地址
查看BE
-
mysql> show backends;
-
+-----------+-----------------+-------------+---------------+--------+----------+----------+---------------------+---------------------+-------+----------------------+-----------------------+-----------+------------------+---------------+---------------+---------+----------------+--------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------------------+
-
| BackendId | Cluster | IP | HeartbeatPort | BePort | HttpPort | BrpcPort | LastStartTime | LastHeartbeat | Alive | SystemDecommissioned | ClusterDecommissioned | TabletNum | DataUsedCapacity | AvailCapacity | TotalCapacity | UsedPct | MaxDiskUsedPct | Tag | ErrMsg | Version | Status |
-
+-----------+-----------------+-------------+---------------+--------+----------+----------+---------------------+---------------------+-------+----------------------+-----------------------+-----------+------------------+---------------+---------------+---------+----------------+--------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------------------+
-
| 10002 | default_cluster | 172.19.0.12 | 9050 | 9060 | 8040 | 8060 | 2022-06-22 12:51:58 | 2022-06-22 18:15:34 | true | false | false | 4369 | 328.686 MB | 144.083 GB | 196.735 GB | 26.76 % | 26.76 % | {"location" : "default"} | | trunk-440ad03 | {"lastSuccessReportTabletsTime":"2022-06-22 18:15:05","lastStreamLoadTime":-1,"isQueryDisabled":false,"isLoadDisabled":false} |
-
+-----------+-----------------+-------------+---------------+--------+----------+----------+---------------------+---------------------+-------+----------------------+-----------------------+-----------+------------------+---------------+---------------+---------+----------------+--------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------------------+
-
1 row in set (0.00 sec)
Doris单机版安装完成
5. Flink安装配置
5.1 下载安装Flink1.14.4
-
wget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.5-bin-scala_2.12.tgz
-
tar zxvf flink-1.14.4-bin-scala_2.12.tgz
然后需要将下面的依赖拷贝到Flink安装目录下的lib目录下,具体的依赖的lib文件如下:
-
wget https://jiafeng-1308700295.cos.ap-hongkong.myqcloud.com/flink-doris-connector-1.14_2.12-1.0.0-SNAPSHOT.jar
-
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
启动Flink
bin/start-cluster.sh
启动后的界面如下:
6. 开始同步数据到Doris
6.1 创建Doris数据库及表
-
create database demo;
-
use demo;
-
CREATE TABLE all_employees_info (
-
emp_no int NOT NULL,
-
birth_date date,
-
first_name varchar(20),
-
last_name varchar(20),
-
gender char(2),
-
hire_date date,
-
database_name varchar(50),
-
table_name varchar(200)
-
)
-
UNIQUE KEY(`emp_no`, `birth_date`)
-
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1
-
PROPERTIES (
-
"replication_allocation" = "tag.location.default: 1"
-
);
6.2 进入Flink SQL Client
bin/sql-client.sh embedded
开启 checkpoint,每隔10秒做一次 checkpoint
Checkpoint 默认是不开启的,我们需要开启 Checkpoint 提交事务。
Source在启动时会扫描全表,将表按照主键分成多个chunk。并使用增量快照算法逐个读取每个chunk的数据。作业会周期性执行Checkpoint,记录下已经完成的chunk。当发生Failover时,只需要继续读取未完成的chunk。当chunk全部读取完后,会从之前获取的Binlog位点读取增量的变更记录。Flink作业会继续周期性执行Checkpoint,记录下Binlog位点,当作业发生Failover,便会从之前记录的Binlog位点继续处理,从而实现Exactly Once语义
SET execution.checkpointing.interval = 10s;
注意: 这里是演示,生产环境建议checkpoint间隔60秒
6.3 创建MySQL CDC表
在Flink SQL Client 下执行下面的 SQL
-
CREATE TABLE employees_source (
-
database_name STRING METADATA VIRTUAL,
-
table_name STRING METADATA VIRTUAL,
-
emp_no int NOT NULL,
-
birth_date date,
-
first_name STRING,
-
last_name STRING,
-
gender STRING,
-
hire_date date,
-
PRIMARY KEY (`emp_no`) NOT ENFORCED
-
) WITH (
-
'connector' = 'mysql-cdc',
-
'hostname' = 'localhost',
-
'port' = '3306',
-
'username' = 'root',
-
'password' = 'MyNewPass4!',
-
'database-name' = 'emp_[0-9]+',
-
'table-name' = 'employees_[0-9]+'
-
);
- 'database-name' = 'emp_[0-9]+': 这里是使用了正则表达式,同时连接多个库
- 'table-name' = 'employees_[0-9]+':这里是使用了正则表达式,同时连接多个表
查询CDC表,我们可以看到下面的数据,标识一切正常
select * from employees_source limit 10;
6.4 创建 Doris Sink 表
-
CREATE TABLE cdc_doris_sink (
-
emp_no int ,
-
birth_date STRING,
-
first_name STRING,
-
last_name STRING,
-
gender STRING,
-
hire_date STRING,
-
database_name STRING,
-
table_name STRING
-
)
-
WITH (
-
'connector' = 'doris',
-
'fenodes' = '172.19.0.12:8030',
-
'table.identifier' = 'demo.all_employees_info',
-
'username' = 'root',
-
'password' = '',
-
'sink.properties.two_phase_commit'='true',
-
'sink.label-prefix'='doris_demo_emp_001'
-
);
参数说明:
- connector : 指定连接器是doris
- fenodes:doris FE节点IP地址及http port
- table.identifier : Doris对应的数据库及表名
- username:doris用户名
- password:doris用户密码
- sink.properties.two_phase_commit:指定使用两阶段提交,这样在stream load的时候,会在http header里加上
two_phase_commit:true
,不然会失败 - sink.label-prefix : 这个是在两阶段提交的时候必须要加的一个参数,才能保证两端数据一致性,否则会失败
- 其他参数参考官方文档 https://doris.apache.org/zh-CN/docs/ecosystem/flink-doris-connector.html
这个时候查询Doris sink表是没有数据的
select * from cdc_doris_sink;
6.5 将数据插入到Doris表里
执行下面的SQL:
-
insert into cdc_doris_sink (emp_no,birth_date,first_name,last_name,gender,hire_date,database_name,table_name)
-
select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date ,database_name,table_name from employees_source;
然后我们可以看到Flink WEB UI上的任务运行信息
这里我们可以看看TaskManager的日志信息,会发现这里是使用两阶段提交的,而且数据是通过http chunked方式不断朝BE端进行传输的,知道Checkpoint,才会停止。Checkpoint完成后会继续下一个任务的提交。
-
2022-06-22 19:04:01,350 INFO io.debezium.relational.history.DatabaseHistoryMetrics [] - Started database history recovery
-
2022-06-22 19:04:01,350 INFO io.debezium.relational.history.DatabaseHistoryMetrics [] - Finished database history recovery of 0 change(s) in 0 ms
-
2022-06-22 19:04:01,351 INFO io.debezium.util.Threads [] - Requested thread factory for connector MySqlConnector, id = mysql_binlog_source named = binlog-client
-
2022-06-22 19:04:01,352 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - Skip 0 events on streaming start
-
2022-06-22 19:04:01,352 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - Skip 0 rows on streaming start
-
2022-06-22 19:04:01,352 INFO io.debezium.util.Threads [] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client
-
2022-06-22 19:04:01,374 INFO io.debezium.util.Threads [] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client
-
2022-06-22 19:04:01,381 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - Connected to MySQL binlog at localhost:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql_bin.000005, currentBinlogPosition=211725, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null], partition={server=mysql_binlog_source}, snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql_bin.000005, restartBinlogPosition=211725, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null]
-
2022-06-22 19:04:01,381 INFO io.debezium.util.Threads [] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client
-
2022-06-22 19:04:01,381 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - Waiting for keepalive thread to start
-
2022-06-22 19:04:01,497 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - Keepalive thread is running
-
2022-06-22 19:04:08,303 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load stopped.
-
2022-06-22 19:04:08,321 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - load Result {
-
"TxnId": 6963,
-
"Label": "doris_demo_001_0_1",
-
"TwoPhaseCommit": "true",
-
"Status": "Success",
-
"Message": "OK",
-
"NumberTotalRows": 634,
-
"NumberLoadedRows": 634,
-
"NumberFilteredRows": 0,
-
"NumberUnselectedRows": 0,
-
"LoadBytes": 35721,
-
"LoadTimeMs": 9046,
-
"BeginTxnTimeMs": 0,
-
"StreamLoadPutTimeMs": 0,
-
"ReadDataTimeMs": 0,
-
"WriteDataTimeMs": 9041,
-
"CommitAndPublishTimeMs": 0
-
}
-
-
2022-06-22 19:04:08,321 INFO org.apache.doris.flink.sink.writer.RecordBuffer [] - start buffer data, read queue size 0, write queue size 3
-
2022-06-22 19:04:08,321 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load started for doris_demo_001_0_2
-
2022-06-22 19:04:08,321 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - start execute load
-
2022-06-22 19:04:08,325 INFO org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler [] - Committing the state for checkpoint 1
-
2022-06-22 19:04:08,329 INFO org.apache.doris.flink.sink.committer.DorisCommitter [] - load result {
-
"status": "Success",
-
"msg": "transaction [6963] commit successfully."
-
}
-
2022-06-22 19:04:18,303 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load stopped.
-
2022-06-22 19:04:18,310 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - load Result {
-
"TxnId": 6964,
-
"Label": "doris_demo_001_0_2",
-
"TwoPhaseCommit": "true",
-
"Status": "Success",
-
"Message": "OK",
-
"NumberTotalRows": 0,
-
"NumberLoadedRows": 0,
-
"NumberFilteredRows": 0,
-
"NumberUnselectedRows": 0,
-
"LoadBytes": 0,
-
"LoadTimeMs": 9988,
-
"BeginTxnTimeMs": 0,
-
"StreamLoadPutTimeMs": 0,
-
"ReadDataTimeMs": 0,
-
"WriteDataTimeMs": 9983,
-
"CommitAndPublishTimeMs": 0
-
}
-
-
2022-06-22 19:04:18,310 INFO org.apache.doris.flink.sink.writer.RecordBuffer [] - start buffer data, read queue size 0, write queue size 3
-
2022-06-22 19:04:18,310 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load started for doris_demo_001_0_3
-
2022-06-22 19:04:18,310 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - start execute load
-
2022-06-22 19:04:18,312 INFO org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler [] - Committing the state for checkpoint 2
-
2022-06-22 19:04:18,317 INFO org.apache.doris.flink.sink.committer.DorisCommitter [] - load result {
-
"status": "Success",
-
"msg": "transaction [6964] commit successfully."
-
}
-
2022-06-22 19:04:28,303 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load stopped.
-
2022-06-22 19:04:28,308 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - load Result {
-
"TxnId": 6965,
-
"Label": "doris_demo_001_0_3",
-
"TwoPhaseCommit": "true",
-
"Status": "Success",
-
"Message": "OK",
-
"NumberTotalRows": 0,
-
"NumberLoadedRows": 0,
-
"NumberFilteredRows": 0,
-
"NumberUnselectedRows": 0,
-
"LoadBytes": 0,
-
"LoadTimeMs": 9998,
-
"BeginTxnTimeMs": 0,
-
"StreamLoadPutTimeMs": 0,
-
"ReadDataTimeMs": 0,
-
"WriteDataTimeMs": 9993,
-
"CommitAndPublishTimeMs": 0
-
}
-
-
2022-06-22 19:04:28,308 INFO org.apache.doris.flink.sink.writer.RecordBuffer [] - start buffer data, read queue size 0, write queue size 3
-
2022-06-22 19:04:28,308 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load started for doris_demo_001_0_4
-
2022-06-22 19:04:28,308 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - start execute load
-
2022-06-22 19:04:28,311 INFO org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler [] - Committing the state for checkpoint 3
-
2022-06-22 19:04:28,316 INFO org.apache.doris.flink.sink.committer.DorisCommitter [] - load result {
-
"status": "Success",
-
"msg": "transaction [6965] commit successfully."
-
}
-
2022-06-22 19:04:38,303 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load stopped.
-
2022-06-22 19:04:38,308 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - load Result {
-
"TxnId": 6966,
-
"Label": "doris_demo_001_0_4",
-
"TwoPhaseCommit": "true",
-
"Status": "Success",
-
"Message": "OK",
-
"NumberTotalRows": 0,
-
"NumberLoadedRows": 0,
-
"NumberFilteredRows": 0,
-
"NumberUnselectedRows": 0,
-
"LoadBytes": 0,
-
"LoadTimeMs": 9999,
-
"BeginTxnTimeMs": 0,
-
"StreamLoadPutTimeMs": 0,
-
"ReadDataTimeMs": 0,
-
"WriteDataTimeMs": 9994,
-
"CommitAndPublishTimeMs": 0
-
}
-
-
2022-06-22 19:04:38,308 INFO org.apache.doris.flink.sink.writer.RecordBuffer [] - start buffer data, read queue size 0, write queue size 3
-
2022-06-22 19:04:38,308 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load started for doris_demo_001_0_5
-
2022-06-22 19:04:38,308 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - start execute load
-
2022-06-22 19:04:38,311 INFO org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler [] - Committing the state for checkpoint 4
-
2022-06-22 19:04:38,317 INFO org.apache.doris.flink.sink.committer.DorisCommitter [] - load result {
-
"status": "Success",
-
"msg": "transaction [6966] commit successfully."
-
}
-
2022-06-22 19:04:48,303 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load stopped.
-
2022-06-22 19:04:48,310 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - load Result {
-
"TxnId": 6967,
-
"Label": "doris_demo_001_0_5",
-
"TwoPhaseCommit": "true",
-
"Status": "Success",
-
"Message": "OK",
-
"NumberTotalRows": 0,
-
"NumberLoadedRows": 0,
-
"NumberFilteredRows": 0,
-
"NumberUnselectedRows": 0,
-
"LoadBytes": 0,
-
"LoadTimeMs": 10000,
-
"BeginTxnTimeMs": 0,
-
"StreamLoadPutTimeMs": 0,
-
"ReadDataTimeMs": 0,
-
"WriteDataTimeMs": 9996,
-
"CommitAndPublishTimeMs": 0
-
}
-
-
2022-06-22 19:04:48,310 INFO org.apache.doris.flink.sink.writer.RecordBuffer [] - start buffer data, read queue size 0, write queue size 3
-
2022-06-22 19:04:48,310 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load started for doris_demo_001_0_6
-
2022-06-22 19:04:48,310 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - start execute load
-
2022-06-22 19:04:48,312 INFO org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler [] - Committing the state for checkpoint 5
-
2022-06-22 19:04:48,317 INFO org.apache.doris.flink.sink.committer.DorisCommitter [] - load result {
-
"status": "Success",
-
"msg": "transaction [6967] commit successfully."
-
}
-
2022-06-22 19:04:58,303 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load stopped.
-
2022-06-22 19:04:58,308 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - load Result {
-
"TxnId": 6968,
-
"Label": "doris_demo_001_0_6",
-
"TwoPhaseCommit": "true",
-
"Status": "Success",
-
"Message": "OK",
-
"NumberTotalRows": 0,
-
"NumberLoadedRows": 0,
-
"NumberFilteredRows": 0,
-
"NumberUnselectedRows": 0,
-
"LoadBytes": 0,
-
"LoadTimeMs": 9998,
-
"BeginTxnTimeMs": 0,
-
"StreamLoadPutTimeMs": 0,
-
"ReadDataTimeMs": 0,
-
"WriteDataTimeMs": 9993,
-
"CommitAndPublishTimeMs": 0
-
}
-
-
2022-06-22 19:04:58,308 INFO org.apache.doris.flink.sink.writer.RecordBuffer [] - start buffer data, read queue size 0, write queue size 3
-
2022-06-22 19:04:58,308 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load started for doris_demo_001_0_7
-
2022-06-22 19:04:58,308 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - start execute load
-
2022-06-22 19:04:58,311 INFO org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler [] - Committing the state for checkpoint 6
-
2022-06-22 19:04:58,316 INFO org.apache.doris.flink.sink.committer.DorisCommitter [] - load result {
-
"status": "Success",
-
"msg": "transaction [6968] commit successfully."
-
}
-
2022-06-22 19:05:08,303 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load stopped.
-
2022-06-22 19:05:08,309 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - load Result {
-
"TxnId": 6969,
-
"Label": "doris_demo_001_0_7",
-
"TwoPhaseCommit": "true",
-
"Status": "Success",
-
"Message": "OK",
-
"NumberTotalRows": 0,
-
"NumberLoadedRows": 0,
-
"NumberFilteredRows": 0,
-
"NumberUnselectedRows": 0,
-
"LoadBytes": 0,
-
"LoadTimeMs": 9999,
-
"BeginTxnTimeMs": 0,
-
"StreamLoadPutTimeMs": 0,
-
"ReadDataTimeMs": 0,
-
"WriteDataTimeMs": 9995,
-
"CommitAndPublishTimeMs": 0
-
}
-
-
2022-06-22 19:05:08,309 INFO org.apache.doris.flink.sink.writer.RecordBuffer [] - start buffer data, read queue size 0, write queue size 3
-
2022-06-22 19:05:08,309 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load started for doris_demo_001_0_8
-
2022-06-22 19:05:08,309 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - start execute load
-
2022-06-22 19:05:08,311 INFO org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler [] - Committing the state for checkpoint 7
-
2022-06-22 19:05:08,316 INFO org.apache.doris.flink.sink.committer.DorisCommitter [] - load result {
-
"status": "Success",
-
"msg": "transaction [6969] commit successfully."
-
}
-
2022-06-22 19:05:18,303 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load stopped.
-
2022-06-22 19:05:18,308 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - load Result {
-
"TxnId": 6970,
-
"Label": "doris_demo_001_0_8",
-
"TwoPhaseCommit": "true",
-
"Status": "Success",
-
"Message": "OK",
-
"NumberTotalRows": 0,
-
"NumberLoadedRows": 0,
-
"NumberFilteredRows": 0,
-
"NumberUnselectedRows": 0,
-
"LoadBytes": 0,
-
"LoadTimeMs": 9999,
-
"BeginTxnTimeMs": 0,
-
"StreamLoadPutTimeMs": 0,
-
"ReadDataTimeMs": 0,
-
"WriteDataTimeMs": 9993,
-
"CommitAndPublishTimeMs": 0
-
}
-
-
2022-06-22 19:05:18,308 INFO org.apache.doris.flink.sink.writer.RecordBuffer [] - start buffer data, read queue size 0, write queue size 3
-
2022-06-22 19:05:18,308 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load started for doris_demo_001_0_9
-
2022-06-22 19:05:18,308 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - start execute load
-
2022-06-22 19:05:18,311 INFO org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler [] - Committing the state for checkpoint 8
-
2022-06-22 19:05:18,317 INFO org.apache.doris.flink.sink.committer.DorisCommitter [] - load result {
-
"status": "Success",
-
"msg": "transaction [6970] commit successfully."
-
}
-
2022-06-22 19:05:28,303 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load stopped.
-
2022-06-22 19:05:28,310 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - load Result {
-
"TxnId": 6971,
-
"Label": "doris_demo_001_0_9",
-
"TwoPhaseCommit": "true",
-
"Status": "Success",
-
"Message": "OK",
-
"NumberTotalRows": 0,
-
"NumberLoadedRows": 0,
-
"NumberFilteredRows": 0,
-
"NumberUnselectedRows": 0,
-
"LoadBytes": 0,
-
"LoadTimeMs": 10000,
-
"BeginTxnTimeMs": 0,
-
"StreamLoadPutTimeMs": 0,
-
"ReadDataTimeMs": 0,
-
"WriteDataTimeMs": 9996,
-
"CommitAndPublishTimeMs": 0
-
}
-
-
2022-06-22 19:05:28,310 INFO org.apache.doris.flink.sink.writer.RecordBuffer [] - start buffer data, read queue size 0, write queue size 3
-
2022-06-22 19:05:28,310 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load started for doris_demo_001_0_10
-
2022-06-22 19:05:28,310 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - start execute load
-
2022-06-22 19:05:28,315 INFO org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler [] - Committing the state for checkpoint 9
-
2022-06-22 19:05:28,320 INFO org.apache.doris.flink.sink.committer.DorisCommitter [] - load result {
-
"status": "Success",
-
"msg": "transaction [6971] commit successfully."
-
}
-
2022-06-22 19:05:38,303 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load stopped.
-
2022-06-22 19:05:38,308 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - load Result {
-
"TxnId": 6972,
-
"Label": "doris_demo_001_0_10",
-
"TwoPhaseCommit": "true",
-
"Status": "Success",
-
"Message": "OK",
-
"NumberTotalRows": 0,
-
"NumberLoadedRows": 0,
-
"NumberFilteredRows": 0,
-
"NumberUnselectedRows": 0,
-
"LoadBytes": 0,
-
"LoadTimeMs": 9998,
-
"BeginTxnTimeMs": 0,
-
"StreamLoadPutTimeMs": 0,
-
"ReadDataTimeMs": 0,
-
"WriteDataTimeMs": 9992,
-
"CommitAndPublishTimeMs": 0
-
}
-
-
2022-06-22 19:05:38,308 INFO org.apache.doris.flink.sink.writer.RecordBuffer [] - start buffer data, read queue size 0, write queue size 3
-
2022-06-22 19:05:38,308 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load started for doris_demo_001_0_11
-
2022-06-22 19:05:38,308 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - start execute load
-
2022-06-22 19:05:38,311 INFO org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler [] - Committing the state for checkpoint 10
-
2022-06-22 19:05:38,316 INFO org.apache.doris.flink.sink.committer.DorisCommitter [] - load result {
-
"status": "Success",
-
"msg": "transaction [6972] commit successfully."
-
}
-
2022-06-22 19:05:48,303 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load stopped.
-
2022-06-22 19:05:48,315 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - load Result {
-
"TxnId": 6973,
-
"Label": "doris_demo_001_0_11",
-
"TwoPhaseCommit": "true",
-
"Status": "Success",
-
"Message": "OK",
-
"NumberTotalRows": 520,
-
"NumberLoadedRows": 520,
-
"NumberFilteredRows": 0,
-
"NumberUnselectedRows": 0,
-
"LoadBytes": 29293,
-
"LoadTimeMs": 10005,
-
"BeginTxnTimeMs": 0,
-
"StreamLoadPutTimeMs": 0,
-
"ReadDataTimeMs": 0,
-
"WriteDataTimeMs": 10001,
-
"CommitAndPublishTimeMs": 0
-
}
-
-
2022-06-22 19:05:48,315 INFO org.apache.doris.flink.sink.writer.RecordBuffer [] - start buffer data, read queue size 0, write queue size 3
-
2022-06-22 19:05:48,315 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load started for doris_demo_001_0_12
-
2022-06-22 19:05:48,315 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - start execute load
-
2022-06-22 19:05:48,322 INFO org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler [] - Committing the state for checkpoint 11
-
2022-06-22 19:05:48,327 INFO org.apache.doris.flink.sink.committer.DorisCommitter [] - load result {
-
"status": "Success",
-
"msg": "transaction [6973] commit successfully."
-
}
6.6 查询Doris 数据
这里我是插入了636条数据,
-
mysql> select count(1) from all_employees_info ;
-
+----------+
-
| count(1) |
-
+----------+
-
| 634 |
-
+----------+
-
1 row in set (0.01 sec)
-
-
mysql> select * from all_employees_info limit 20;
-
+--------+------------+------------+-------------+--------+------------+---------------+-------------+
-
| emp_no | birth_date | first_name | last_name | gender | hire_date | database_name | table_name |
-
+--------+------------+------------+-------------+--------+------------+---------------+-------------+
-
| 10001 | 1953-09-02 | Georgi | Facello | M | 1986-06-26 | emp_1 | employees_1 |
-
| 10002 | 1964-06-02 | Bezalel | Simmel | F | 1985-11-21 | emp_1 | employees_1 |
-
| 10003 | 1959-12-03 | Parto | Bamford | M | 1986-08-28 | emp_1 | employees_1 |
-
| 10004 | 1954-05-01 | Chirstian | Koblick | M | 1986-12-01 | emp_1 | employees_1 |
-
| 10005 | 1955-01-21 | Kyoichi | Maliniak | M | 1989-09-12 | emp_1 | employees_1 |
-
| 10006 | 1953-04-20 | Anneke | Preusig | F | 1989-06-02 | emp_1 | employees_1 |
-
| 10007 | 1957-05-23 | Tzvetan | Zielinski | F | 1989-02-10 | emp_1 | employees_1 |
-
| 10008 | 1958-02-19 | Saniya | Kalloufi | M | 1994-09-15 | emp_1 | employees_1 |
-
| 10009 | 1952-04-19 | Sumant | Peac | F | 1985-02-18 | emp_1 | employees_1 |
-
| 10010 | 1963-06-01 | Duangkaew | Piveteau | F | 1989-08-24 | emp_1 | employees_1 |
-
| 10011 | 1953-11-07 | Mary | Sluis | F | 1990-01-22 | emp_1 | employees_1 |
-
| 10012 | 1960-10-04 | Patricio | Bridgland | M | 1992-12-18 | emp_1 | employees_1 |
-
| 10013 | 1963-06-07 | Eberhardt | Terkki | M | 1985-10-20 | emp_1 | employees_1 |
-
| 10014 | 1956-02-12 | Berni | Genin | M | 1987-03-11 | emp_1 | employees_1 |
-
| 10015 | 1959-08-19 | Guoxiang | Nooteboom | M | 1987-07-02 | emp_1 | employees_1 |
-
| 10016 | 1961-05-02 | Kazuhito | Cappelletti | M | 1995-01-27 | emp_1 | employees_1 |
-
| 10017 | 1958-07-06 | Cristinel | Bouloucos | F | 1993-08-03 | emp_1 | employees_1 |
-
| 10018 | 1954-06-19 | Kazuhide | Peha | F | 1987-04-03 | emp_1 | employees_1 |
-
| 10019 | 1953-01-23 | Lillian | Haddadi | M | 1999-04-30 | emp_1 | employees_1 |
-
| 10020 | 1952-12-24 | Mayuko | Warwick | M | 1991-01-26 | emp_1 | employees_1 |
-
+--------+------------+------------+-------------+--------+------------+---------------+-------------+
-
20 rows in set (0.00 sec)
6.7 测试删除
-
mysql> use emp_2;
-
Reading table information for completion of table and column names
-
You can turn off this feature to get a quicker startup with -A
-
-
Database changed
-
mysql> show tables;
-
+-----------------+
-
| Tables_in_emp_2 |
-
+-----------------+
-
| employees_1 |
-
| employees_2 |
-
+-----------------+
-
2 rows in set (0.00 sec)
-
-
mysql> delete from employees_2 where emp_no in (12013,12014,12015);
-
Query OK, 3 rows affected (0.01 sec)
验证Doris数据删除
-
mysql> select count(1) from all_employees_info ;
-
+----------+
-
| count(1) |
-
+----------+
-
| 631 |
-
+----------+
-
1 row in set (0.01 sec)
7. 总结
本问主要介绍了FLink CDC分库分表怎么实时同步,并结合Apache Doris Flink Connector最新版本整合的Flink 2PC 和 Doris Stream Load 2PC的机制及整合原理,使用方法等。
相关文章:
Doris Connector 结合 Flink CDC 实现 MySQL 分库分表
1. 概述 在实际业务系统中为了解决单表数据量大带来的各种问题,我们通常采用分库分表的方式对库表进行拆分,以达到提高系统的吞吐量。 但是这样给后面数据分析带来了麻烦,这个时候我们通常试将业务数据库的分库分表同步到数据仓库时&#x…...
ModbusTCP、TCP/IP都走网线,一样吗?
在现代通信技术中,Modbus/TCP和TCP/IP协议是两种广泛应用于工业自动化和网络通信领域的协议。尽管它们都运行在网线上,但它们在设计、结构和应用场景上有着明显的区别。 Modbus/TCP协议是什么 Modbus/TCP是一种基于TCP/IP的应用层协议,它是Mo…...
网络学习(13)|Spring Boot中获取HTTP请求头(Header)内容的详细解析
文章目录 方法一:使用HttpServletRequest实现原理代码示例优点缺点适用场景 方法二:使用RequestContextHolder实现原理代码示例优点缺点适用场景 方法三:使用RequestHeader注解实现原理代码示例优点缺点适用场景 总结 在Spring Boot应用中&am…...
【漏洞复现】宏景eHR pos_dept_post SQL注入漏洞
0x01 产品简介 宏景eHR人力资源管理软件是一款人力资源管理与数字化应用相融合,满足动态化、协同化、流程化、战略化需求的软件。 0x02 漏洞概述 宏景eHR pos_dept_post 接囗处存在SQL注入漏洞,未经过身份认证的远程攻击者利用此漏洞执行任意SQL指令,…...
82. 删除排序链表中的重复元素 and II
链接直达: 保留重复元素 不保留重复元素 题目: 1: 给定一个已排序的链表的头 head , 删除所有重复的元素,使每个元素只出现一次 。返回 已排序的链表 。示例 1:输入:head [1,1,2] 输出:[1…...
C++ 判断目标文件是否被占用(独占)(附源码)
在IM软件中发起文件发送时,如果要发送的是某word文件,并且该word文件被office打开,则会提示文件正在被占用无法发送,如下所示: 那文件被占用到底是如何判断出来的呢?其实很简单,调用系统API函数CreateFile,打开该文件(OPEN_EXISTING),传入FILE_SHARE_READ共享读标记…...
计划任务 之 一次性的计划任务
计划任务 作用:定时自动完成特定的工作 计划任务的分类: (1)一次性的计划任务 例如下周三对系统的重要文件备份一次 (2)周期性重复计划任务 例如每天晚上12:00备份一次 一次性的任务计划:…...
非比较排序之计数排序
目录 一、什么是计数排序 二、思路 三、代码实现 一、什么是计数排序 计数排序是一种非比较型的排序算法,它通过统计待排序数据中每个元素出现的次数,然后根据这个次数来进行排序。计数排序的具体步骤如下: 首先找出待排序数据中的最大值…...
Django路由与会话深度探索:静态、动态路由分发,以及Cookie与Session的奥秘
系列文章目录 Django入门全攻略:从零搭建你的第一个Web项目Django ORM入门指南:从概念到实践,掌握模型创建、迁移与视图操作Django ORM实战:模型字段与元选项配置,以及链式过滤与QF查询详解Django ORM深度游ÿ…...
第7章 用户输入和 while 循环
第7章 用户输入和 while 循环 7.1 函数 input()的工作原理7.1.1 编写清晰的程序7.1.2 使用 int()来获取数值输入7.1.3 求模运算符 7.2 while 循环简介7.2.1 使用 while 循环7.2.2 让用户选择何时退出7.2.3 使用标志7.2.4 使用 break 退出循环7.2.5 在循环中使用 continue7.2.6 …...
xshell远程无法链接上VM的centos7
1、现象如下, 2.1解决办法:查证后发现这个默认的设置为vmnet0 2.2解决办法:重启win10的虚拟机网卡(先禁用再启用) 3.参考文章:Xshell连接不上虚拟机centos7_centos7的nat模式可以ping通网络,但是用xshell连…...
拥抱AI-图片学习中的卷积神经算法详解
一、定义 卷积神经算法(Convolutional Neural Networks, CNN)是深度学习领域中的一种重要算法,特别适用于处理图像相关的任务。以下是卷积神经算法的详细解释: 1. 基本概念 定义:卷积神经网络是一类包含卷积计算且具…...
超详解——深入详解Python基础语法——基础篇
目录 1 .语句和变量 变量赋值示例: 打印变量的值: 2. 语句折行 反斜杠折行示例: 使用括号自动折行: 3. 缩进规范 缩进示例: 4. 多重赋值(链式赋值) 多重赋值的应用: 5 .多…...
系统架构设计师【论文-2017年 试题2】: 论软件架构风格(包括写作要点和经典范文)
题目:论软件架构风格 (2017年 试题2) 软件体系结构风格是描述某一特定应用领域中系统组织方式的惯用模式。体系结构风格 定义一个系统家族,即一个体系结构定义一个词汇表和一组约束。词汇表中包含一些构件和 连接件类型ÿ…...
Spring Boot 事务传播机制详解
Spring Boot 事务传播机制详解 1. 事务传播机制概述 Spring Boot 中的事务传播机制用于处理多个事务方法之间相互调用时的事务行为,保证数据的完整性和一致性。当务传播机制定义了在调用一个事务方法时,当前事务该如何传播或传递。Spring Boot 中的事务…...
【机器学习】生成对抗网络 (Generative Adversarial Networks | GAN)
生成对抗网络 (Generative Adversarial Networks | GAN) 介绍 生成对抗网络 (Generative Adversarial Networks,简称GAN) 是一种强大的深度学习模型,用于生成具有逼真感的图像、音频和文本等内容。GAN 的核心理念是通过训练两个神经网络,生…...
[ADS信号完整性分析]深入理解IBIS AMI模型设计:从基础到实践
在高速数字设计领域,信号完整性(SI)分析对于确保系统性能至关重要。IBIS AMI(Algorithmic Model Interface)模型作为一种强大的工具,能够帮助设计师在系统层面上评估和优化SERDES(串行器/解串器…...
Plotly : 超好用的Python可视化工具
文章目录 安装:开始你的 Plotly 之旅基本折线图:简单却强大的起点带颜色的散点图:数据的多彩世界三维曲面图:探索数据的深度气泡图:让世界看到你的数据小提琴图:数据分布的优雅展现旭日图:分层数…...
Linux电话本的编写-shell脚本编写
该电话本可以实现以下功能 1.添加用户 2.查询用户 3.删除用户 4.展示用户 5.退出 代码展示: #!/bin/bash PHONEBOOKphonebook.txt function add_contact() { echo "Adding new contact..." read -p "Enter name: " name …...
蓝牙开发 基础知识
零、基础知识 0.1、Android 应用可通过 Bluetooth API 执行以下操作 扫描其他蓝牙设备查询本地蓝牙适配器的配对蓝牙设备建立 RFCOMM 通道通过服务发现连接到其他设备与其他设备进行双向数据传输管理多个连接 0.2、蓝牙进行通信的四大必需任务 设置蓝牙查找局部区域内的配对…...
QNX 7.0.0开发总结
1 QNX编译 1.1 基本概念 QNX可以直接使用Linux Makefile编译库和二进制,在Makefile文件中指定CCaarch64-unknown-nto-qnx7.0.0-g,或者CCx86_64-pc-nto-qnx7.0.0-g,保存退出后,运行source /qnx_sdk_path/qnxsdp-env.sh,…...
Golang使用讯飞星火AI接口
一、API申请 https://www.bilibili.com/video/BV1Yw411m7Rs/?spm_id_from333.337.search-card.all.click&vd_source707ec8983cc32e6e065d5496a7f79ee6 注册申请,需要在此页面获取appid、apisecret、apikey https://www.xfyun.cn/ https://console.xfyun.cn/ser…...
矫正儿童发音好帮手
《言语构音语音训练手册——下颌、唇部、舌部构音运动障碍》教辅书 儿童言语构音语音问题越来越受到家长的关注,大多数家长受到儿童说话晚、口齿不清、发音错误等问题的困扰,国外报道2岁儿童言语构音语音障碍达到17%,3岁达4%~7.5%࿰…...
wordpress主题导航主题v4.16.2哈哈版
1.下载授权接口源码onenav-auth-api-v2.zip ,在宝塔新建一个网站,域名为 auth.iotheme.cn,设置wordpress伪静态,申请ssl证书。将上面源码解压后上传到此网站根目录。 2. 在宝塔根目录etc下 hosts 中添加 127.0.0.1 auth.iotheme.…...
内存分布图
1.基本数据类型和常量存放在常量池中。 2.类的成员存放在堆中,如果成员是其他类对象也存放在堆中 3.数组和数组的内容放在堆中 4.类对象存放在栈中。 5.单独的对象存放在栈中。 6.引用数据类型存放在堆或栈中。 Java中对象到底存在堆中还是栈中_java对象在堆还…...
如何发布自己的NPM插件包?
安装 Node.js : 如果没有安装的,Nodejs下载安装:http://nodejs.cn/download/ 首先确保你已经安装了 Node.js 和 npm。你可以通过运行以下命令来检查是否已经安装: node -v npm -v初始化项目: 创建一个新的项目文件夹…...
计算广告读书杂记-待整理
不知不觉已经在字节干了两年多广告研发,也跳槽去了一家广告公司继续深耕,借着这个劲,重新读一遍《计算广告》这本书,并将一些重点概念进行记录。...
No module named _sqlite3解决方案
大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…...
防飞单,赢市场:售楼处客流统计管理新篇章
在竞争激烈的房地产市场中,售楼处作为楼盘销售的重要窗口,其管理效率和服务质量直接关系到楼盘的销售业绩和品牌形象。然而,传统的客户人数统计方式往往存在诸多不足,如数据不准确、统计效率低下等,这些问题给售楼处的…...
LeetCode:419. 甲板上的战舰(遍历 Java)
目录 419. 甲板上的战舰 题目描述: 实现代码与解析: 遍历 原理思路: 419. 甲板上的战舰 题目描述: 给你一个大小为 m x n 的矩阵 board 表示甲板,其中,每个单元格可以是一艘战舰 X 或者是一个空位 . &…...
杭州电子网站建设方案/百度浏览器打开
基本数据类型python的基本数据类型如下:1. int > 整数. 主要用来进行数学运算2. str > 字符串, 可以保存少量数据并进行相应的操作3. bool>判断真假, True, False4. list> 存储大量数据.用[ ]表示5. tuple> 元组, 不可以发生改变 用( )表示6. dict> 字典, 保…...
网站空间怎么登陆/网络推广有前途吗
该楼层疑似违规已被系统折叠 隐藏此楼查看此楼br0 Link encap:Ethernet HWaddr D2:23:4A:E2:84:84inet addr:192.168.1.130 Bcast:192.168.1.255 Mask:255.255.255.0inet6 addr: fe80::d023:4aff:fee2:8484/64 Scope:LinkUP BROADCAST RUNNING MULTICAST MTU:1500 Metric:1RX p…...
前端如何兼职做网站/b2b模式的电商平台有哪些
2019独角兽企业重金招聘Python工程师标准>>> 1.open-falcon系统安装(可以参考http://book.open-falcon.org/zh/intro/index.html) 2.opentsdb数据库安装(可以参考http://opentsdb.net/docs/build/html/installation.htmlÿ…...
手机免费网站制作/如何快速网络推广
一。MYSQL安装。 1.下载并安装mysql。安装过程中,需要设定root的密码。安装完毕后,从开始中点击mysql 5.5 command line client,会让输入password,输入当时设定的password,即可开始使用mysql。 2.创建bugs数据库及bugs…...
wordpress技巧:开启wordpress多站点功能/企业推广网站有哪些
第一种方法,安装虚拟机。现在的硬件,虚拟机也能跑很多程序了。第二种方法,真正的双系统。有点麻烦。因为windows必须在主分区中。所在要在linux中安装windows1、必须先清空一个主分区,最后在硬盘前面开始位置的主分区,…...
ueditor wordpress 插件/2345手机浏览器
函数原型、声明、定义? 函数原型相当于函数声明,包括函数类型、函数名、形参列表(其中形参名可以省略),且不需要函数体,例如: int func_a(int a); double func_b(double b); 而函数定义则需要函…...