大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库
大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库
- 一、版本兼容性
- 二、使用
- 三、Flink SQL
- 四、DataStream
- 五、Lookup Join
- 六、配置
- 通用配置项
- 接收器配置项
- 查找Join配置项
- 七、Doris 和 Flink 列类型映射
- 八、使用Flink CDC访问Doris的示例
- 九、使用FlinkSQL通过CDC访问并实现部分列更新的示例
- 十、使用FlinkCDC访问多个表或整个数据库(支持MySQL、Oracle、PostgreSQL、SQLServer)
- 十一、使用FlinkCDC更新Key列
- 十二、使用Flink根据指定的列删除数据
- 十三、最佳实践应用场景
- 十四、常见问题解答
可以通过Flink操作(读取、插入、修改、删除)支持存储在Doris中的数据。本文介绍了如何通过Datastream和Flink操作Doris。
注意:
- 修改和删除仅支持唯一键模型。
- 当前的删除是为了支持Flink CDC访问数据以实现自动删除。如果要删除其他数据访问方法,您需要自行实现。
一、版本兼容性
二、使用
Maven
添加 flink-doris-connector
<!-- flink-doris-connector -->
<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.16</artifactId><version>1.6.0</version>
</dependency>
- 请根据不同的Flink版本替换相应的Connector和Flink依赖版本。
- 也可以从这里下载相关版本的jar包。
flink-doris-connector下载地址:
- https://repo.maven.apache.org/maven2/org/apache/doris/
编译
- 编译时直接运行sh build.sh即可。
- 编译成功后,会在dist目录下生成目标jar包,如:flink-doris-connector-1.5.0-SNAPSHOT.jar。将此文件复制到 Flink 的类路径中以使用 Flink-Doris-Connector。例如,Flink 运行在 Local 模式,则将此文件放在 lib/ 文件夹中。 Flink运行在Yarn集群模式下,将此文件放入预部署包中。
三、Flink SQL
read
-- doris source
CREATE TABLE flink_doris_source (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH ('connector' = 'doris','fenodes' = 'FE_IP:HTTP_PORT','table.identifier' = 'database.table','username' = 'root','password' = 'password'
);
write
--enable checkpoint
SET 'execution.checkpointing.interval' = '10s';-- doris sink
CREATE TABLE flink_doris_sink (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH ('connector' = 'doris','fenodes' = 'FE_IP:HTTP_PORT','table.identifier' = 'db.table','username' = 'root','password' = 'password','sink.label-prefix' = 'doris_label'
);-- submit insert job
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
四、DataStream
read
DorisOptions.Builder builder = DorisOptions.builder().setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder().setDorisOptions(builder.build()).setDorisReadOptions(DorisReadOptions.builder().build()).setDeserializer(new SimpleListDeserializationSchema()).build();env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
write
DorisSink通过StreamLoad向Doris写入数据,DataStream写入时支持不同的序列化方式
字符串数据流(SimpleStringSerializer)
// enable checkpoint
env.enableCheckpointing(10000);
// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");Properties properties = new Properties();
// When the upstream is writing json, the configuration needs to be enabled.
//properties.setProperty("format", "json");
//properties.setProperty("read_json_by_line", "true");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix.setDeletable(false).setStreamLoadProp(properties); ;builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(new SimpleStringSerializer()) //serialize according to string.setDorisOptions(dorisBuilder.build());//mock string source
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("doris",1));
DataStreamSource<Tuple2<String, Integer>> source = env. fromCollection(data);source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1).sinkTo(builder.build());//mock json string source
//env.fromElements("{\"name\":\"zhangsan\",\"age\":1}").sinkTo(builder.build());
RowData数据流(RowDataSerializer)
// enable checkpoint
env.enableCheckpointing(10000);
// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);//doris sink option
DorisSink.Builder<RowData> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");// json format to streamload
Properties properties = new Properties();
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix.setDeletable(false).setStreamLoadProp(properties); //streamload params//flink rowdata's schema
String[] fields = {"city", "longitude", "latitude", "destroy_date"};
DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(RowDataSerializer.builder() //serialize according to rowdata.setFieldNames(fields).setType("json") //json format.setFieldType(types).build()).setDorisOptions(dorisBuilder.build());//mock rowdata source
DataStream<RowData> source = env. fromElements("").map(new MapFunction<String, RowData>() {@Overridepublic RowData map(String value) throws Exception {GenericRowData genericRowData = new GenericRowData(4);genericRowData.setField(0, StringData.fromString("beijing"));genericRowData.setField(1, 116.405419);genericRowData.setField(2, 39.916927);genericRowData.setField(3, LocalDate.now().toEpochDay());return genericRowData;}});source. sinkTo(builder. build());
SchemaChange数据流(JsonDebeziumSchemaSerializer)
// enable checkpoint
env.enableCheckpointing(10000);Properties props = new Properties();
props. setProperty("format", "json");
props.setProperty("read_json_by_line", "true");
DorisOptions dorisOptions = DorisOptions. builder().setFenodes("127.0.0.1:8030").setTableIdentifier("test.t1").setUsername("root").setPassword("").build();DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-prefix").setStreamLoadProp(props).setDeletable(true);DorisSink.Builder<String> builder = DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setDorisOptions(dorisOptions).setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").sinkTo(builder.build());
五、Lookup Join
CREATE TABLE fact_table (`id` BIGINT,`name` STRING,`city` STRING,`process_time` as proctime()
) WITH ('connector' = 'kafka',...
);create table dim_city(`city` STRING,`level` INT ,`province` STRING,`country` STRING
) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','jdbc-url' = 'jdbc:mysql://127.0.0.1:9030','table.identifier' = 'dim.dim_city','username' = 'root','password' = ''
);SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city
- 这个命令是用于创建两个表和一个查询语句。第一个表是名为"fact_table"的表,它有四个列,分别为"id"、“name”、“city"和"process_time”。其中,"id"列是BIGINT类型,"name"和"city"列是STRING类型,"process_time"列是一个基于当前系统时间的计算列,它使用"proctime()"函数实现。
- 第二个表是名为"dim_city"的表,它有四个列,分别为"city"、“level”、“province"和"country”。其中,“city”、“province"和"country"列是STRING类型,“level"列是INT类型。该表使用"Doris"作为存储引擎,连接器为"connector”,并且需要指定连接器的其他参数,如"fenodes”、“jdbc-url”、“table.identifier”、"username"和"password"等。
- 最后一个命令是一个查询语句,它使用"LEFT JOIN"将"fact_table"和"dim_city"两个表进行连接,并使用"FOR SYSTEM_TIME AS OF"来指定连接时的时间戳,这里使用了"process_time"列的值。查询结果包括"id"、“name”、“city”、“province”、"country"和"level"这些列。
六、配置
通用配置项
fenodes:
- Doris FE http地址,支持多个地址,用逗号分隔
benodes:
- Doris BE http地址,支持多个地址,以逗号分隔。
jdbc-url:
- jdbc连接信息,如:jdbc:mysql://127.0.0.1:9030
table.identifier:
- Doris表名,如:db.tbl
auto-redirect:
- 默认值:true
- 是否重定向 StreamLoad 请求。开启后StreamLoad会通过FE写入,不再显示BE信息。
doris.request.retries:
- 默认值:3
- 向 Doris 发送请求的重试次数
doris.request.connect.timeout.ms:
- 默认值:30000
- 向 Doris 发送请求的连接超时
doris.request.read.timeout.ms:
- 默认值:30000
- 读取向 Doris 发送请求的超时
源配置项
doris.request.query.timeout.s:
- 默认值:3600
- 查询Doris的超时时间,默认1小时,-1表示无超时限制
doris.request.tablet.size:
- 默认值:Integer. MAX_VALUE
- 一个Partition对应的Doris Tablet数量。该值设置得越小,生成的Partition就越多。这提高了 Flink 端的并行度,但同时也给 Doris 带来了更大的压力。
doris.batch.size:
- 默认值:1024
- 一次从BE读取数据的最大行数。增加该值会减少 Flink 和 Doris 之间建立的连接数量。从而减少网络延迟带来的额外时间开销。
doris.exec.mem.limit:
- 默认值:2147483648
- 单个查询的内存限制。默认为2GB,以字节为单位
doris.deserialize.arrow.async:
- 默认值:FALSE
- 是否支持flink-doris-connector迭代所需的Arrow格式异步转换为RowBatch
doris.deserialize.queue.size:
- 默认值:64
- Arrow格式的内部处理队列的异步转换,当doris.deserialize.arrow.async为true时有效
doris.read.field:
- 读取Doris表的列名列表,以逗号分隔
doris.filter.query:
- 过滤读取数据的表达式,这个表达式透明传递给Doris。 Doris使用这个表达式来完成源端的数据过滤。例如年龄=18。
接收器配置项
sink.label-prefix:
- Stream加载导入使用的标签前缀。在2pc场景下,需要全局唯一性来保证Flink的EOS语义。
sink.properties.*:
- 导入流负载参数。
例如: ‘sink.properties.column_separator’ = ', ’ 定义列分隔符, ‘sink.properties.escape_delimiters’ = ‘true’ 特殊字符作为分隔符, ‘\x01’ 将转换为二进制 0x01 - JSON格式导入
‘sink.properties.format’ = ‘json’ ‘sink.properties.按行读取 json’ = ‘true’
详细参数请参考这里。
sink.enable-delete:
- 默认值:TRUE
- 是否启用删除。该选项需要Doris表开启批量删除功能(Doris 0.15+版本默认开启),且仅支持Unique模型。
sink.enable-2pc:
- 默认值:TRUE
- 是否启用两阶段提交(2pc),默认为true,以保证Exactly-Once语义。
sink.buffer-size:
- 默认值:1MB
- 写入数据缓存缓冲区的大小,以字节为单位。不建议修改,默认配置即可
sink.buffer-count:
- 默认值:3
- 写入数据缓冲区的数量。不建议修改,默认配置即可
sink.max-retries:
- 默认值:3
- Commit失败后最大重试次数,默认3
sink.use-cache:
- 默认值:false
- 发生异常时,是否使用内存缓存进行恢复。启用后,Checkpoint 期间的数据将保留在缓存中。
sink.enable.batch-mode:
- 默认值:false
- 是否使用批处理模式写入Doris。使能后,写入时序不依赖于Checkpoint。写入是通过sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval参数控制的。输入机会。
同时开启后,Exactly-once语义将无法保证。 Uniq模型可以用来实现幂等性。
sink.flush.queue-size:
- 默认值:2
- 在批处理模式下,缓存的列大小。
sink.buffer-flush.max-rows:
- 默认值:50000
- 批处理模式下,单批写入的最大数据行数。
sink.buffer-flush.max-bytes:
- 默认值:10MB
- 在批处理模式下,单批写入的最大字节数。
sink.buffer-flush.interval:
- 默认值:10s
- 批处理模式下,异步刷新缓存的时间间隔
sink.ignore.update-before:
- 默认值:true
- 是否忽略update-before事件,默认忽略。
查找Join配置项
lookup.cache.max-rows
- 默认值:-1
- 查找缓存的最大行数,默认值为-1,不启用缓存
lookup.cache.ttl:
- 默认值:10s
- 查找缓存的最大时间,默认10s
lookup.max-retries:
- 默认值:1
- 查找查询失败后重试的次数
lookup.jdbc.async:
- 默认值:false
- 是否启用异步查找,默认为false
lookup.jdbc.read.batch.size:
- 默认值:128
- 异步查找下,每个查询的最大批量大小
lookup.jdbc.read.batch.queue-size:
- 默认值:256
- 异步查找时中间缓冲队列的大小
lookup.jdbc.read.thread-size:
- 默认值:3
- 每个任务中用于查找的jdbc线程数
七、Doris 和 Flink 列类型映射
Doris类型 | Flink类型 |
---|---|
NULL_TYPE | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
CHAR | STRING |
LARGEINT | STRING |
VARCHAR | STRING |
STRING | STRING |
DECIMALV2 | DECIMAL |
ARRAY | ARRAY |
MAP | MAP |
JSON | STRING |
VARIANT | STRING |
IPV4 | STRING |
IPV6 | STRING |
从connector-1.6.1版本开始,增加了对Variant、IPV6、IPV4三种数据类型读取的支持。读取 IPV6 和 Variant 需要 Doris 2.1.1 或更高版本。
八、使用Flink CDC访问Doris的示例
SET 'execution.checkpointing.interval' = '10s';
CREATE TABLE cdc_mysql_source (id int,name VARCHAR,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '127.0.0.1','port' = '3306','username' = 'root','password' = 'password','database-name' = 'database','table-name' = 'table'
);-- Support synchronous insert/update/delete events
CREATE TABLE doris_sink (
id INT,
name STRING
)
WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = '','sink.properties.format' = 'json','sink.properties.read_json_by_line' = 'true','sink.enable-delete' = 'true', -- Synchronize delete events'sink.label-prefix' = 'doris_label'
);insert into doris_sink select id,name from cdc_mysql_source;
九、使用FlinkSQL通过CDC访问并实现部分列更新的示例
-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';CREATE TABLE cdc_mysql_source (id int,name STRING,bank STRING,age int,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '127.0.0.1','port' = '3306','username' = 'root','password' = 'password','database-name' = 'database','table-name' = 'table'
);CREATE TABLE doris_sink (id INT,name STRING,bank STRING,age int
)
WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = '','sink.properties.format' = 'json','sink.properties.read_json_by_line' = 'true','sink.properties.columns' = 'id,name,bank,age','sink.properties.partial_columns' = 'true' --Enable partial column updates
);insert into doris_sink select id,name,bank,age from cdc_mysql_source;
十、使用FlinkCDC访问多个表或整个数据库(支持MySQL、Oracle、PostgreSQL、SQLServer)
MySQL同步示例
<FLINK_HOME>bin/flink run \-Dexecution.checkpointing.interval=10s\-Dparallelism.default=1\-c org.apache.doris.flink.tools.cdc.CdcTools\lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \mysql-sync-database\--database test_db \--mysql-conf hostname=127.0.0.1 \--mysql-conf port=3306 \--mysql-conf username=root \--mysql-conf password=123456 \--mysql-conf database-name=mysql_db \--including-tables "tbl1|test.*" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=123456 \--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1
Oracle同步示例
<FLINK_HOME>bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1 \-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar\oracle-sync-database \--database test_db \--oracle-conf hostname=127.0.0.1 \--oracle-conf port=1521 \--oracle-conf username=admin \--oracle-conf password="password" \--oracle-conf database-name=XE \--oracle-conf schema-name=ADMIN \--including-tables "tbl1|tbl2" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=\--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1
PostgreSQL 同步示例
<FLINK_HOME>/bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1\-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \postgres-sync-database \--database db1\--postgres-conf hostname=127.0.0.1 \--postgres-conf port=5432 \--postgres-conf username=postgres \--postgres-conf password="123456" \--postgres-conf database-name=postgres \--postgres-conf schema-name=public \--postgres-conf slot.name=test \--postgres-conf decoding.plugin.name=pgoutput \--including-tables "tbl1|tbl2" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=\--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1
SQLServer同步示例
<FLINK_HOME>/bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1 \-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \sqlserver-sync-database \--database db1\--sqlserver-conf hostname=127.0.0.1 \--sqlserver-conf port=1433 \--sqlserver-conf username=sa \--sqlserver-conf password="123456" \--sqlserver-conf database-name=CDC_DB \--sqlserver-conf schema-name=dbo \--including-tables "tbl1|tbl2" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=\--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1
十一、使用FlinkCDC更新Key列
一般来说,在业务数据库中,数字被用作表的主键,例如学生表中使用数字(id)作为主键,但随着业务的发展,与数据对应的数字可能会发生变化。在这种情况下,使用FlinkCDC + Doris Connector进行数据同步可以自动更新Doris主键列中的数据。
原理
Flink CDC的底层采集工具是Debezium。Debezium内部使用op字段来识别相应的操作:op字段的值为c、u、d和r,分别对应创建、更新、删除和读取。对于主键列的更新,FlinkCDC将向下游发送DELETE和INSERT事件,在数据同步到Doris后,会自动更新主键列的数据。
示例
Flink程序可以参考上述CDC同步示例。任务成功提交后,在MySQL端执行更新主键列语句(update student set id = ‘1002’ where id = ‘1001’),以修改Doris中的数据。
十二、使用Flink根据指定的列删除数据
通常,Kafka中的消息使用特定的字段来标记操作类型,例如{“op_type”:“delete”,data:{…}}。对于这种类型的数据,希望删除op_type=delete的数据。
默认情况下,DorisSink将根据RowKind来区分事件类型。通常,在cdc的情况下,可以直接获取事件类型,并将隐藏列__DORIS_DELETE_SIGN__赋值以实现删除的目的,而Kafka需要基于业务逻辑进行判断,显示传递给隐藏列的值。
-- Such as upstream data: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(data STRING,op_type STRING
) WITH ('connector' = 'kafka',...
);CREATE TABLE DORIS_SINK(id INT,name STRING,__DORIS_DELETE_SIGN__ INT
) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'db.table','username' = 'root','password' = '','sink.enable-delete' = 'false', -- false means not to get the event type from RowKind'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__' -- Display the import column of the specified streamload
);INSERT INTO DORIS_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name,
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__
from KAFKA_SOURCE;
- 这段代码是一个示例,演示了如何使用Flink从Kafka源表读取数据,并将其写入Doris目标表。具体来说,如果源表中的数据op_type字段的值为"delete",则希望在Doris目标表中删除相应的数据。
- 首先,在Kafka源表的定义中,我们有一个data字段用于存储源数据的JSON字符串,以及一个op_type字段用于标识操作类型。
- 然后,在Doris目标表的定义中,我们有一个id字段和一个name字段来存储数据的具体内容,还有一个名为__DORIS_DELETE_SIGN__的隐藏列,用于标识是否要进行删除操作。
- 在INSERT INTO语句中,我们将从Kafka源表中选择data字段的id和name,并使用json_value函数提取相应的值。同时,我们使用if函数将op_type字段的值与"delete"进行比较,如果相等则将__DORIS_DELETE_SIGN__赋值为1,否则赋值为0。
- 最后,将处理后的数据插入到Doris目标表中。
- 总之,这段代码的作用是根据源表中的op_type字段值,将对应的数据删除或写入到Doris目标表中。
十三、最佳实践应用场景
- 使用Flink Doris Connector最适合的场景是实时/批量将源数据同步到Doris(Mysql、Oracle、PostgreSQL)中,然后使用Flink对Doris和其他数据源中的数据进行联合分析。您还可以使用Flink Doris Connector。
其他注意事项:
- Flink Doris Connector主要依赖于Checkpoint进行流式写入,因此Checkpoint之间的时间间隔就是数据的可见延迟时间。
- 为了确保Flink的Exactly Once语义,Flink Doris Connector默认启用两阶段提交,Doris在1.1版本之后默认启用两阶段提交。1.0可以通过修改BE参数来启用。
十四、常见问题解答
Doris Source读取数据后,为什么流会结束?
- 目前Doris Source是有界流,不支持CDC读取。
Flink能否读取Doris并执行条件下推?
- 通过配置doris.filter.query参数可以实现。
如何写入位图类型?
CREATE TABLE bitmap_sink (
dt int,
page string,
user_id int
)
WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'test.bitmap_test','username' = 'root','password' = '','sink.label-prefix' = 'doris_label','sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
)
errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]
- 在Exactly-Once场景中,Flink Job必须从最新的Checkpoint/Savepoint重新启动,否则会报告上述错误。当不需要Exactly-Once时,可以通过关闭2PC提交(sink.enable-2pc=false)或更改不同的sink.label-prefix来解决。
errCode = 2, detailMessage = transaction [19650] not found
- 发生在Commit阶段,Checkpoint中记录的事务ID在FE端已过期,在此时再次提交时会出现上述错误。此时无法从Checkpoint启动,可以通过修改fe.conf中的streaming_label_keep_max_second配置来延长过期时间,默认为12小时。
errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100
- 这是因为同一库的并发导入超过了100,可以通过调整fe.conf的max_running_txn_num_per_db参数来解决。详细信息,请参考max_running_txn_num_per_db。
- 同时,如果一个任务频繁修改标签并重新启动,也可能导致此错误发生。在2pc场景(重复/聚合模型)中,每个任务的标签需要唯一,在从Checkpoint重新启动时,Flink任务将主动中止之前已经成功预提交但未提交的事务。频繁修改标签并重新启动将导致大量已成功预提交的事务无法中止,占用事务。在Unique模型下,也可以关闭2pc,实现幂等写入。
当Flink向Uniq模型写入一批数据时,如何确保数据的顺序?
- 您可以添加序列列的配置来确保顺序。
Flink任务没有报错,但数据无法同步?
- 在Connector1.1.0之前,数据是批量写入的,并且写入是由数据驱动的。需要确定上游是否有数据写入。在1.1.0之后,它依赖于Checkpoint,并且必须启用Checkpoint才能进行写入。
tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235
- 通常发生在Connector1.1.0之前,这是因为写入频率过快,导致版本过多。可以通过设置sink.batch.size和sink.batch.interval参数来减少Streamload的频率。
源表和Doris表应如何对应?
- 在使用Flink Connector导入数据时,需要注意两个方面。第一,源表的列和类型应与Flink SQL中的列和类型对应;第二,Flink SQL中的列和类型必须与Doris表的列和类型匹配。
TApplicationException: get_next failed: out of sequence response: expected 4 but got 3
- 这是由于 Thrift 中的并发错误造成的。建议您尽可能使用最新的连接器和兼容的 Flink 版本。
DorisRuntimeException: Fail to abort transaction 26153 with url http://192.168.0.1:8040/api/table_name/_stream_load_2pc
- 您可以在TaskManager中搜索日志中止事务响应,并根据HTTP返回码判断是客户端问题还是服务器问题。
org.apache.flink.table.api.SqlParserException when using doris.filter.query: SQL parsing failed. “xx” encountered at row x, column xx
- 这个问题主要是由于条件中的varchar/string类型,需要进行引号转义。正确的写法是xxx = ‘‘xxx’’。这样,Flink SQL解析器会将连续的两个单引号解释为一个单引号字符,而不是字符串的结束,并将拼接的字符串作为属性的值。例如:t1 >= ‘2024-01-01’ 可以写为 ‘doris.filter.query’ = ‘t1 >=’‘2024-01-01’'。
Failed to connect to backend: http://host:webserver_port, and BE is still alive
- 这个问题可能是由于配置了be的IP地址,而该地址无法被外部的Flink集群访问。这主要是因为在连接fe时,be的地址是通过fe进行解析的。例如,如果将be地址添加为’127.0.0.1’,那么Flink集群通过fe获取到的be地址将是’127.0.0.1:webserver_port’,并且Flink将连接到该地址。当出现这种问题时,可以通过将be的实际对应的外部IP地址添加到"with"属性中来解决:‘benodes’=“be_ip:webserver_port,be_ip:webserver_port…”。对于整个数据库的同步,可以使用以下属性:–sink-conf benodes=be_ip:webserver,be_ip:webserver…。
当使用Flink-connector将MySQL数据同步到Doris时,时间戳之间存在几小时的时间差。
- Flink Connector默认使用UTC+8时区从MySQL同步整个数据库。如果您的数据位于不同的时区,您可以使用以下配置进行调整,例如:–mysql-conf debezium.date.format.timestamp.zone=“UTC+3”。
相关文章:

大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库
大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库 一、版本兼容性二、使用三、Flink SQL四、DataStream五、Lookup Join六、配置通用配置项接收器配置项查找Join配置项 七、Doris 和 Flink 列类型映射八、使用Flink CDC访问Doris的示例九、…...

LabVIEW VI 多语言动态加载与运行的实现
在多语言应用程序开发中,确保用户界面能够根据用户的语言偏好动态切换是一个关键需求。本文通过分析一个LabVIEW程序框图,详细说明了如何使用LabVIEW中的属性节点和调用节点来实现VI(虚拟仪器)界面语言的动态加载与运行。此程序允…...

Unity引擎基础知识
目录 Unity基础知识概要 1. 创建工程 2. 工程目录介绍 3. Unity界面和五大面板 4. 游戏物体创建与操作 5. 场景和层管理 6. 组件系统 7. 脚本语言C# 8. 物理引擎和UI系统 学习资源推荐 Unity引擎中如何优化大型游戏项目的性能? Unity C#脚本语言的高级编…...
练习题- 探索正则表达式对象和对象匹配
正则表达式(Regular Expressions)是一种强大而灵活的文本处理工具,它允许我们通过模式匹配来处理字符串。这在数据清理、文本分析等领域有着广泛的应用。在Python中,正则表达式通过re模块提供支持,学习和掌握正则表达式对于处理复杂的文本数据至关重要。 本文将探索如何在…...

Java集合提升
1. 手写ArrayList 1.1. ArrayList底层原理细节 底层结构是一个长度可以动态增长的数组(顺序表)transient Object[] elementData; 特点:在内存中分配连续的空间,只存储数据,不存储地址信息。位置就隐含着地址。优点 节…...

uniapp 微信小程序生成水印图片
效果 源码 <template><view style"overflow: hidden;"><camera device-position"back" flash"auto" class"camera"><cover-view class"text-white padding water-mark"><cover-view class"…...
ElasticSearch相关知识点
ElasticSearch中的倒排索引是如何工作的? 倒排索引是ElasticSearch中用于全文检索的一种数据结构,与正排索引不同的是,正排索引将文档按照词汇顺序组织。而倒排索引是将词汇映射到包含该词汇的文档中。 在ElasticSearch中,倒排索…...

css 文字图片居中及网格布局
以下内容纯自已个人理解,直接上代码: <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><…...
解决ImportError: DLL load failed while importing _rust: 找不到指定的程序
解决ImportError: DLL load failed while importing _rust: 找不到指定的程序 python使用库cryptography 当 from cryptography.hazmat.bindings._rust import exceptions as rust_exceptions 时,会报错: ImportError: DLL load failed while importin…...
集合-List去重
1.利用Set去重 @Test public void distinctList() {List<String> oldList = new ArrayList<>();oldList.add("a");oldList.add("a");oldList.add("b");oldList.add("c");oldList.add("d");List<String> …...

ST-LINK USB communication error 非常有效的解决方法
文章目录 一、检查确定是ST-LINK USB communication error的问题二、关闭文件,打开keil软件所在文件夹,找到STLink文件夹,找到该应用程序双击 一、检查确定是ST-LINK USB communication error的问题 二、关闭文件,打开keil软件所在…...
探索CSS的:future-link伪类:选择指向未来文档的链接
CSS(层叠样式表)是Web设计中用于描述网页元素样式的语言。随着CSS4的提案,引入了许多新的选择器,其中之一是:future-link伪类。然而,需要注意的是,:future-link伪类目前还处于提议阶段,并没有在…...

【C++】序列与关联容器(三)map与multimap容器
【C】序列与关联容器(三)map与multimap容器 一、map二、multiset / multimap 一、map 树中的每个结点的类型是一个std::pair //pair的类型是<const key,value> pair是一个包含两个指针的结构体,第一个指针指向该节点的key,…...
ActiveMQ、RabbitMQ、Kafka、RocketMQ在优先级队列、延迟队列、死信队列、重试队列、消费模式、广播模式的区别
ActiveMQ、RabbitMQ、Kafka、RocketMQ这四款消息队列在优先级队列、延迟队列、死信队列、重试队列、消费模式、广播模式等方面各有其特点和差异。以下是对这些方面的详细比较: 1. 优先级队列 ActiveMQ:支持优先级队列,可以在发送消息时指定…...

首款会员制区块链 Geist 介绍
今天,Pixelcraft Studios 很高兴地宣布即将推出 Geist,这是一个由 Base、Arbitrum、Alchemy 以及 Aavegotchi 支持的全新 L3。 Geist 之前的代号为 “Gotchichain”,是首个专为游戏打造的会员专用区块链。 为什么选择 Geist? …...

CANoe软件中Trace窗口的筛选栏标题不显示(空白)的解决方法
文章目录 问题描述原因分析解决方案扩展知识总结问题描述 不知道什么情况,CANoe软件中Trace窗口的筛选栏标题突然不显示了,一片空白。现象如下: 虽然不影响CANoe软件的使用,但是观感上非常难受,对于强迫症患者非常不友好。 原因分析 按照常规思路,尝试了: 1、重启CAN…...

日期类代码实现-C++
一、目标 通过前面对类和对象的介绍我们可以自己通过C代码初步实现一个简单的日期类。 实现的主要操作有: 1.日期类的构造函数 2.日期类的拷贝构造函数(在头文件中实现) 3.日期类的比较运算符重载 4.日期类的计算运算符重载 5.流插入运…...

【问题记录+总结】VS Code Tex Live 2024 Latex Workshop Springer模板----更新ing
目录 Summary 道阻且长 少即是多 兵马未动粮草先行 没有万能 和一劳永逸 具体问题具体分析 心态 Detail 1、关于模板[官网] 2、settings.json 3、虫和杀虫剂 4、擦 换成Tex Studio都好了。。。 Summary 道阻且长 某中意期刊,只有Latex。之前只简单用过…...
Linux运维_Bash脚本_源码安装Go-1.21.11
Linux运维_Bash脚本_源码安装Go-1.21.11 Bash (Bourne Again Shell) 是一个解释器,负责处理 Unix 系统命令行上的命令。它是由 Brian Fox 编写的免费软件,并于 1989 年发布的免费软件,作为 Sh (Bourne Shell) 的替代品。 您可以在 Linux 和…...

ShareSDK Twitter
创建应用 1.登录Twitter控制台并通过认证 2.点击Developer Portal进入Twitter后台 3.点击Sign up for Free Account创建应用 4.配置应用信息 以下为创建过程示例,图中信息仅为示例,创建时请按照真实信息填写,否则无法正常使用。 权限申请…...
Vim 调用外部命令学习笔记
Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...
java_网络服务相关_gateway_nacos_feign区别联系
1. spring-cloud-starter-gateway 作用:作为微服务架构的网关,统一入口,处理所有外部请求。 核心能力: 路由转发(基于路径、服务名等)过滤器(鉴权、限流、日志、Header 处理)支持负…...

【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...

MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
FFmpeg 低延迟同屏方案
引言 在实时互动需求激增的当下,无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作,还是游戏直播的画面实时传输,低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架,凭借其灵活的编解码、数据…...

Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)
0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述,后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作,其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...
基础测试工具使用经验
背景 vtune,perf, nsight system等基础测试工具,都是用过的,但是没有记录,都逐渐忘了。所以写这篇博客总结记录一下,只要以后发现新的用法,就记得来编辑补充一下 perf 比较基础的用法: 先改这…...