当前位置: 首页 > news >正文

大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库

大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库

  • 一、版本兼容性
  • 二、使用
  • 三、Flink SQL
  • 四、DataStream
  • 五、Lookup Join
  • 六、配置
    • 通用配置项
    • 接收器配置项
    • 查找Join配置项
  • 七、Doris 和 Flink 列类型映射
  • 八、使用Flink CDC访问Doris的示例
  • 九、使用FlinkSQL通过CDC访问并实现部分列更新的示例
  • 十、使用FlinkCDC访问多个表或整个数据库(支持MySQL、Oracle、PostgreSQL、SQLServer)
  • 十一、使用FlinkCDC更新Key列
  • 十二、使用Flink根据指定的列删除数据
  • 十三、最佳实践应用场景
  • 十四、常见问题解答

可以通过Flink操作(读取、插入、修改、删除)支持存储在Doris中的数据。本文介绍了如何通过Datastream和Flink操作Doris。

注意:

  • 修改和删除仅支持唯一键模型。
  • 当前的删除是为了支持Flink CDC访问数据以实现自动删除。如果要删除其他数据访问方法,您需要自行实现。

一、版本兼容性

在这里插入图片描述

二、使用

Maven

添加 flink-doris-connector

<!-- flink-doris-connector -->
<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.16</artifactId><version>1.6.0</version>
</dependency>
  • 请根据不同的Flink版本替换相应的Connector和Flink依赖版本。
  • 也可以从这里下载相关版本的jar包。

flink-doris-connector下载地址:

  • https://repo.maven.apache.org/maven2/org/apache/doris/

编译

  • 编译时直接运行sh build.sh即可。
  • 编译成功后,会在dist目录下生成目标jar包,如:flink-doris-connector-1.5.0-SNAPSHOT.jar。将此文件复制到 Flink 的类路径中以使用 Flink-Doris-Connector。例如,Flink 运行在 Local 模式,则将此文件放在 lib/ 文件夹中。 Flink运行在Yarn集群模式下,将此文件放入预部署包中。

三、Flink SQL

read

-- doris source
CREATE TABLE flink_doris_source (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH ('connector' = 'doris','fenodes' = 'FE_IP:HTTP_PORT','table.identifier' = 'database.table','username' = 'root','password' = 'password'
);

write

--enable checkpoint
SET 'execution.checkpointing.interval' = '10s';-- doris sink
CREATE TABLE flink_doris_sink (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH ('connector' = 'doris','fenodes' = 'FE_IP:HTTP_PORT','table.identifier' = 'db.table','username' = 'root','password' = 'password','sink.label-prefix' = 'doris_label'
);-- submit insert job
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

四、DataStream

read

DorisOptions.Builder builder = DorisOptions.builder().setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder().setDorisOptions(builder.build()).setDorisReadOptions(DorisReadOptions.builder().build()).setDeserializer(new SimpleListDeserializationSchema()).build();env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();

write

DorisSink通过StreamLoad向Doris写入数据,DataStream写入时支持不同的序列化方式

字符串数据流(SimpleStringSerializer)

// enable checkpoint
env.enableCheckpointing(10000);
// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");Properties properties = new Properties();
// When the upstream is writing json, the configuration needs to be enabled.
//properties.setProperty("format", "json");
//properties.setProperty("read_json_by_line", "true");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix.setDeletable(false).setStreamLoadProp(properties); ;builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(new SimpleStringSerializer()) //serialize according to string.setDorisOptions(dorisBuilder.build());//mock string source
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("doris",1));
DataStreamSource<Tuple2<String, Integer>> source = env. fromCollection(data);source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1).sinkTo(builder.build());//mock json string source
//env.fromElements("{\"name\":\"zhangsan\",\"age\":1}").sinkTo(builder.build());

RowData数据流(RowDataSerializer)

// enable checkpoint
env.enableCheckpointing(10000);
// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);//doris sink option
DorisSink.Builder<RowData> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");// json format to streamload
Properties properties = new Properties();
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix.setDeletable(false).setStreamLoadProp(properties); //streamload params//flink rowdata's schema
String[] fields = {"city", "longitude", "latitude", "destroy_date"};
DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(RowDataSerializer.builder() //serialize according to rowdata.setFieldNames(fields).setType("json") //json format.setFieldType(types).build()).setDorisOptions(dorisBuilder.build());//mock rowdata source
DataStream<RowData> source = env. fromElements("").map(new MapFunction<String, RowData>() {@Overridepublic RowData map(String value) throws Exception {GenericRowData genericRowData = new GenericRowData(4);genericRowData.setField(0, StringData.fromString("beijing"));genericRowData.setField(1, 116.405419);genericRowData.setField(2, 39.916927);genericRowData.setField(3, LocalDate.now().toEpochDay());return genericRowData;}});source. sinkTo(builder. build());

SchemaChange数据流(JsonDebeziumSchemaSerializer)

// enable checkpoint
env.enableCheckpointing(10000);Properties props = new Properties();
props. setProperty("format", "json");
props.setProperty("read_json_by_line", "true");
DorisOptions dorisOptions = DorisOptions. builder().setFenodes("127.0.0.1:8030").setTableIdentifier("test.t1").setUsername("root").setPassword("").build();DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-prefix").setStreamLoadProp(props).setDeletable(true);DorisSink.Builder<String> builder = DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setDorisOptions(dorisOptions).setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").sinkTo(builder.build());

五、Lookup Join

CREATE TABLE fact_table (`id` BIGINT,`name` STRING,`city` STRING,`process_time` as proctime()
) WITH ('connector' = 'kafka',...
);create table dim_city(`city` STRING,`level` INT ,`province` STRING,`country` STRING
) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','jdbc-url' = 'jdbc:mysql://127.0.0.1:9030','table.identifier' = 'dim.dim_city','username' = 'root','password' = ''
);SELECT a.id, a.name, a.city, c.province, c.country,c.level 
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city
  • 这个命令是用于创建两个表和一个查询语句。第一个表是名为"fact_table"的表,它有四个列,分别为"id"、“name”、“city"和"process_time”。其中,"id"列是BIGINT类型,"name"和"city"列是STRING类型,"process_time"列是一个基于当前系统时间的计算列,它使用"proctime()"函数实现。
  • 第二个表是名为"dim_city"的表,它有四个列,分别为"city"、“level”、“province"和"country”。其中,“city”、“province"和"country"列是STRING类型,“level"列是INT类型。该表使用"Doris"作为存储引擎,连接器为"connector”,并且需要指定连接器的其他参数,如"fenodes”、“jdbc-url”、“table.identifier”、"username"和"password"等。
  • 最后一个命令是一个查询语句,它使用"LEFT JOIN"将"fact_table"和"dim_city"两个表进行连接,并使用"FOR SYSTEM_TIME AS OF"来指定连接时的时间戳,这里使用了"process_time"列的值。查询结果包括"id"、“name”、“city”、“province”、"country"和"level"这些列。

六、配置

通用配置项

fenodes:

  • Doris FE http地址,支持多个地址,用逗号分隔

benodes:

  • Doris BE http地址,支持多个地址,以逗号分隔。

jdbc-url:

  • jdbc连接信息,如:jdbc:mysql://127.0.0.1:9030

table.identifier:

  • Doris表名,如:db.tbl

auto-redirect:

  • 默认值:true
  • 是否重定向 StreamLoad 请求。开启后StreamLoad会通过FE写入,不再显示BE信息。

doris.request.retries:

  • 默认值:3
  • 向 Doris 发送请求的重试次数

doris.request.connect.timeout.ms:

  • 默认值:30000
  • 向 Doris 发送请求的连接超时

doris.request.read.timeout.ms:

  • 默认值:30000
  • 读取向 Doris 发送请求的超时

源配置项

doris.request.query.timeout.s:

  • 默认值:3600
  • 查询Doris的超时时间,默认1小时,-1表示无超时限制

doris.request.tablet.size:

  • 默认值:Integer. MAX_VALUE
  • 一个Partition对应的Doris Tablet数量。该值设置得越小,生成的Partition就越多。这提高了 Flink 端的并行度,但同时也给 Doris 带来了更大的压力。

doris.batch.size:

  • 默认值:1024
  • 一次从BE读取数据的最大行数。增加该值会减少 Flink 和 Doris 之间建立的连接数量。从而减少网络延迟带来的额外时间开销。

doris.exec.mem.limit:

  • 默认值:2147483648
  • 单个查询的内存限制。默认为2GB,以字节为单位

doris.deserialize.arrow.async:

  • 默认值:FALSE
  • 是否支持flink-doris-connector迭代所需的Arrow格式异步转换为RowBatch

doris.deserialize.queue.size:

  • 默认值:64
  • Arrow格式的内部处理队列的异步转换,当doris.deserialize.arrow.async为true时有效

doris.read.field:

  • 读取Doris表的列名列表,以逗号分隔

doris.filter.query:

  • 过滤读取数据的表达式,这个表达式透明传递给Doris。 Doris使用这个表达式来完成源端的数据过滤。例如年龄=18。

接收器配置项

sink.label-prefix:

  • Stream加载导入使用的标签前缀。在2pc场景下,需要全局唯一性来保证Flink的EOS语义。

sink.properties.*:

  • 导入流负载参数。
    例如: ‘sink.properties.column_separator’ = ', ’ 定义列分隔符, ‘sink.properties.escape_delimiters’ = ‘true’ 特殊字符作为分隔符, ‘\x01’ 将转换为二进制 0x01
  • JSON格式导入
    ‘sink.properties.format’ = ‘json’ ‘sink.properties.按行读取 json’ = ‘true’
    详细参数请参考这里。

sink.enable-delete:

  • 默认值:TRUE
  • 是否启用删除。该选项需要Doris表开启批量删除功能(Doris 0.15+版本默认开启),且仅支持Unique模型。

sink.enable-2pc:

  • 默认值:TRUE
  • 是否启用两阶段提交(2pc),默认为true,以保证Exactly-Once语义。

sink.buffer-size:

  • 默认值:1MB
  • 写入数据缓存缓冲区的大小,以字节为单位。不建议修改,默认配置即可

sink.buffer-count:

  • 默认值:3
  • 写入数据缓冲区的数量。不建议修改,默认配置即可

sink.max-retries:

  • 默认值:3
  • Commit失败后最大重试次数,默认3

sink.use-cache:

  • 默认值:false
  • 发生异常时,是否使用内存缓存进行恢复。启用后,Checkpoint 期间的数据将保留在缓存中。

sink.enable.batch-mode:

  • 默认值:false
  • 是否使用批处理模式写入Doris。使能后,写入时序不依赖于Checkpoint。写入是通过sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval参数控制的。输入机会。
    同时开启后,Exactly-once语义将无法保证。 Uniq模型可以用来实现幂等性。

sink.flush.queue-size:

  • 默认值:2
  • 在批处理模式下,缓存的列大小。

sink.buffer-flush.max-rows:

  • 默认值:50000
  • 批处理模式下,单批写入的最大数据行数。

sink.buffer-flush.max-bytes:

  • 默认值:10MB
  • 在批处理模式下,单批写入的最大字节数。

sink.buffer-flush.interval:

  • 默认值:10s
  • 批处理模式下,异步刷新缓存的时间间隔

sink.ignore.update-before:

  • 默认值:true
  • 是否忽略update-before事件,默认忽略。

查找Join配置项

lookup.cache.max-rows

  • 默认值:-1
  • 查找缓存的最大行数,默认值为-1,不启用缓存

lookup.cache.ttl:

  • 默认值:10s
  • 查找缓存的最大时间,默认10s

lookup.max-retries:

  • 默认值:1
  • 查找查询失败后重试的次数

lookup.jdbc.async:

  • 默认值:false
  • 是否启用异步查找,默认为false

lookup.jdbc.read.batch.size:

  • 默认值:128
  • 异步查找下,每个查询的最大批量大小

lookup.jdbc.read.batch.queue-size:

  • 默认值:256
  • 异步查找时中间缓冲队列的大小

lookup.jdbc.read.thread-size:

  • 默认值:3
  • 每个任务中用于查找的jdbc线程数

七、Doris 和 Flink 列类型映射

Doris类型Flink类型
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
STRINGSTRING
DECIMALV2DECIMAL
ARRAYARRAY
MAPMAP
JSONSTRING
VARIANTSTRING
IPV4STRING
IPV6STRING

从connector-1.6.1版本开始,增加了对Variant、IPV6、IPV4三种数据类型读取的支持。读取 IPV6 和 Variant 需要 Doris 2.1.1 或更高版本。

八、使用Flink CDC访问Doris的示例

SET 'execution.checkpointing.interval' = '10s';
CREATE TABLE cdc_mysql_source (id int,name VARCHAR,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '127.0.0.1','port' = '3306','username' = 'root','password' = 'password','database-name' = 'database','table-name' = 'table'
);-- Support synchronous insert/update/delete events
CREATE TABLE doris_sink (
id INT,
name STRING
) 
WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = '','sink.properties.format' = 'json','sink.properties.read_json_by_line' = 'true','sink.enable-delete' = 'true', -- Synchronize delete events'sink.label-prefix' = 'doris_label'
);insert into doris_sink select id,name from cdc_mysql_source;

九、使用FlinkSQL通过CDC访问并实现部分列更新的示例

-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';CREATE TABLE cdc_mysql_source (id int,name STRING,bank STRING,age int,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '127.0.0.1','port' = '3306','username' = 'root','password' = 'password','database-name' = 'database','table-name' = 'table'
);CREATE TABLE doris_sink (id INT,name STRING,bank STRING,age int
) 
WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = '','sink.properties.format' = 'json','sink.properties.read_json_by_line' = 'true','sink.properties.columns' = 'id,name,bank,age','sink.properties.partial_columns' = 'true' --Enable partial column updates
);insert into doris_sink select id,name,bank,age from cdc_mysql_source;

十、使用FlinkCDC访问多个表或整个数据库(支持MySQL、Oracle、PostgreSQL、SQLServer)

MySQL同步示例

<FLINK_HOME>bin/flink run \-Dexecution.checkpointing.interval=10s\-Dparallelism.default=1\-c org.apache.doris.flink.tools.cdc.CdcTools\lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \mysql-sync-database\--database test_db \--mysql-conf hostname=127.0.0.1 \--mysql-conf port=3306 \--mysql-conf username=root \--mysql-conf password=123456 \--mysql-conf database-name=mysql_db \--including-tables "tbl1|test.*" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=123456 \--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1

Oracle同步示例

<FLINK_HOME>bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1 \-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar\oracle-sync-database \--database test_db \--oracle-conf hostname=127.0.0.1 \--oracle-conf port=1521 \--oracle-conf username=admin \--oracle-conf password="password" \--oracle-conf database-name=XE \--oracle-conf schema-name=ADMIN \--including-tables "tbl1|tbl2" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=\--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1

PostgreSQL 同步示例

<FLINK_HOME>/bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1\-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \postgres-sync-database \--database db1\--postgres-conf hostname=127.0.0.1 \--postgres-conf port=5432 \--postgres-conf username=postgres \--postgres-conf password="123456" \--postgres-conf database-name=postgres \--postgres-conf schema-name=public \--postgres-conf slot.name=test \--postgres-conf decoding.plugin.name=pgoutput \--including-tables "tbl1|tbl2" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=\--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1

SQLServer同步示例

<FLINK_HOME>/bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1 \-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \sqlserver-sync-database \--database db1\--sqlserver-conf hostname=127.0.0.1 \--sqlserver-conf port=1433 \--sqlserver-conf username=sa \--sqlserver-conf password="123456" \--sqlserver-conf database-name=CDC_DB \--sqlserver-conf schema-name=dbo \--including-tables "tbl1|tbl2" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=\--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1

十一、使用FlinkCDC更新Key列

一般来说,在业务数据库中,数字被用作表的主键,例如学生表中使用数字(id)作为主键,但随着业务的发展,与数据对应的数字可能会发生变化。在这种情况下,使用FlinkCDC + Doris Connector进行数据同步可以自动更新Doris主键列中的数据。

原理

Flink CDC的底层采集工具是Debezium。Debezium内部使用op字段来识别相应的操作:op字段的值为c、u、d和r,分别对应创建、更新、删除和读取。对于主键列的更新,FlinkCDC将向下游发送DELETE和INSERT事件,在数据同步到Doris后,会自动更新主键列的数据。

示例

Flink程序可以参考上述CDC同步示例。任务成功提交后,在MySQL端执行更新主键列语句(update student set id = ‘1002’ where id = ‘1001’),以修改Doris中的数据。

十二、使用Flink根据指定的列删除数据

通常,Kafka中的消息使用特定的字段来标记操作类型,例如{“op_type”:“delete”,data:{…}}。对于这种类型的数据,希望删除op_type=delete的数据。

默认情况下,DorisSink将根据RowKind来区分事件类型。通常,在cdc的情况下,可以直接获取事件类型,并将隐藏列__DORIS_DELETE_SIGN__赋值以实现删除的目的,而Kafka需要基于业务逻辑进行判断,显示传递给隐藏列的值。

-- Such as upstream data: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(data STRING,op_type STRING
) WITH ('connector' = 'kafka',...
);CREATE TABLE DORIS_SINK(id INT,name STRING,__DORIS_DELETE_SIGN__ INT
) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'db.table','username' = 'root','password' = '','sink.enable-delete' = 'false',        -- false means not to get the event type from RowKind'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__'  -- Display the import column of the specified streamload
);INSERT INTO DORIS_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name, 
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
from KAFKA_SOURCE;
  • 这段代码是一个示例,演示了如何使用Flink从Kafka源表读取数据,并将其写入Doris目标表。具体来说,如果源表中的数据op_type字段的值为"delete",则希望在Doris目标表中删除相应的数据。
  • 首先,在Kafka源表的定义中,我们有一个data字段用于存储源数据的JSON字符串,以及一个op_type字段用于标识操作类型。
  • 然后,在Doris目标表的定义中,我们有一个id字段和一个name字段来存储数据的具体内容,还有一个名为__DORIS_DELETE_SIGN__的隐藏列,用于标识是否要进行删除操作。
  • 在INSERT INTO语句中,我们将从Kafka源表中选择data字段的id和name,并使用json_value函数提取相应的值。同时,我们使用if函数将op_type字段的值与"delete"进行比较,如果相等则将__DORIS_DELETE_SIGN__赋值为1,否则赋值为0。
  • 最后,将处理后的数据插入到Doris目标表中。
  • 总之,这段代码的作用是根据源表中的op_type字段值,将对应的数据删除或写入到Doris目标表中。

十三、最佳实践应用场景

  • 使用Flink Doris Connector最适合的场景是实时/批量将源数据同步到Doris(Mysql、Oracle、PostgreSQL)中,然后使用Flink对Doris和其他数据源中的数据进行联合分析。您还可以使用Flink Doris Connector。

其他注意事项:

  • Flink Doris Connector主要依赖于Checkpoint进行流式写入,因此Checkpoint之间的时间间隔就是数据的可见延迟时间。
  • 为了确保Flink的Exactly Once语义,Flink Doris Connector默认启用两阶段提交,Doris在1.1版本之后默认启用两阶段提交。1.0可以通过修改BE参数来启用。

十四、常见问题解答

Doris Source读取数据后,为什么流会结束?

  • 目前Doris Source是有界流,不支持CDC读取。

Flink能否读取Doris并执行条件下推?

  • 通过配置doris.filter.query参数可以实现。

如何写入位图类型?

CREATE TABLE bitmap_sink (
dt int,
page string,
user_id int
)
WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'test.bitmap_test','username' = 'root','password' = '','sink.label-prefix' = 'doris_label','sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
)

errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]

  • 在Exactly-Once场景中,Flink Job必须从最新的Checkpoint/Savepoint重新启动,否则会报告上述错误。当不需要Exactly-Once时,可以通过关闭2PC提交(sink.enable-2pc=false)或更改不同的sink.label-prefix来解决。

errCode = 2, detailMessage = transaction [19650] not found

  • 发生在Commit阶段,Checkpoint中记录的事务ID在FE端已过期,在此时再次提交时会出现上述错误。此时无法从Checkpoint启动,可以通过修改fe.conf中的streaming_label_keep_max_second配置来延长过期时间,默认为12小时。

errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100

  • 这是因为同一库的并发导入超过了100,可以通过调整fe.conf的max_running_txn_num_per_db参数来解决。详细信息,请参考max_running_txn_num_per_db。
  • 同时,如果一个任务频繁修改标签并重新启动,也可能导致此错误发生。在2pc场景(重复/聚合模型)中,每个任务的标签需要唯一,在从Checkpoint重新启动时,Flink任务将主动中止之前已经成功预提交但未提交的事务。频繁修改标签并重新启动将导致大量已成功预提交的事务无法中止,占用事务。在Unique模型下,也可以关闭2pc,实现幂等写入。

当Flink向Uniq模型写入一批数据时,如何确保数据的顺序?

  • 您可以添加序列列的配置来确保顺序。

Flink任务没有报错,但数据无法同步?

  • 在Connector1.1.0之前,数据是批量写入的,并且写入是由数据驱动的。需要确定上游是否有数据写入。在1.1.0之后,它依赖于Checkpoint,并且必须启用Checkpoint才能进行写入。

tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235

  • 通常发生在Connector1.1.0之前,这是因为写入频率过快,导致版本过多。可以通过设置sink.batch.size和sink.batch.interval参数来减少Streamload的频率。

源表和Doris表应如何对应?

  • 在使用Flink Connector导入数据时,需要注意两个方面。第一,源表的列和类型应与Flink SQL中的列和类型对应;第二,Flink SQL中的列和类型必须与Doris表的列和类型匹配。

TApplicationException: get_next failed: out of sequence response: expected 4 but got 3

  • 这是由于 Thrift 中的并发错误造成的。建议您尽可能使用最新的连接器和兼容的 Flink 版本。

DorisRuntimeException: Fail to abort transaction 26153 with url http://192.168.0.1:8040/api/table_name/_stream_load_2pc

  • 您可以在TaskManager中搜索日志中止事务响应,并根据HTTP返回码判断是客户端问题还是服务器问题。

org.apache.flink.table.api.SqlParserException when using doris.filter.query: SQL parsing failed. “xx” encountered at row x, column xx

  • 这个问题主要是由于条件中的varchar/string类型,需要进行引号转义。正确的写法是xxx = ‘‘xxx’’。这样,Flink SQL解析器会将连续的两个单引号解释为一个单引号字符,而不是字符串的结束,并将拼接的字符串作为属性的值。例如:t1 ‍>= ‘2024-01-01’ 可以写为 ‘doris.filter.query’ = ‘t1 ‍>=’‘2024-01-01’'。

Failed to connect to backend: http://host:webserver_port, and BE is still alive

  • 这个问题可能是由于配置了be的IP地址,而该地址无法被外部的Flink集群访问。这主要是因为在连接fe时,be的地址是通过fe进行解析的。例如,如果将be地址添加为’127.0.0.1’,那么Flink集群通过fe获取到的be地址将是’127.0.0.1:webserver_port’,并且Flink将连接到该地址。当出现这种问题时,可以通过将be的实际对应的外部IP地址添加到"with"属性中来解决:‘benodes’=“be_ip:webserver_port,be_ip:webserver_port…”。对于整个数据库的同步,可以使用以下属性:–sink-conf benodes=be_ip:webserver,be_ip:webserver…。

当使用Flink-connector将MySQL数据同步到Doris时,时间戳之间存在几小时的时间差。

  • Flink Connector默认使用UTC+8时区从MySQL同步整个数据库。如果您的数据位于不同的时区,您可以使用以下配置进行调整,例如:–mysql-conf debezium.date.format.timestamp.zone=“UTC+3”。

相关文章:

大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库

大数据系列之&#xff1a;Flink Doris Connector&#xff0c;实时同步数据到Doris数据库 一、版本兼容性二、使用三、Flink SQL四、DataStream五、Lookup Join六、配置通用配置项接收器配置项查找Join配置项 七、Doris 和 Flink 列类型映射八、使用Flink CDC访问Doris的示例九、…...

LabVIEW VI 多语言动态加载与运行的实现

在多语言应用程序开发中&#xff0c;确保用户界面能够根据用户的语言偏好动态切换是一个关键需求。本文通过分析一个LabVIEW程序框图&#xff0c;详细说明了如何使用LabVIEW中的属性节点和调用节点来实现VI&#xff08;虚拟仪器&#xff09;界面语言的动态加载与运行。此程序允…...

Unity引擎基础知识

目录 Unity基础知识概要 1. 创建工程 2. 工程目录介绍 3. Unity界面和五大面板 4. 游戏物体创建与操作 5. 场景和层管理 6. 组件系统 7. 脚本语言C# 8. 物理引擎和UI系统 学习资源推荐 Unity引擎中如何优化大型游戏项目的性能&#xff1f; Unity C#脚本语言的高级编…...

练习题- 探索正则表达式对象和对象匹配

正则表达式(Regular Expressions)是一种强大而灵活的文本处理工具,它允许我们通过模式匹配来处理字符串。这在数据清理、文本分析等领域有着广泛的应用。在Python中,正则表达式通过re模块提供支持,学习和掌握正则表达式对于处理复杂的文本数据至关重要。 本文将探索如何在…...

Java集合提升

1. 手写ArrayList 1.1. ArrayList底层原理细节 底层结构是一个长度可以动态增长的数组&#xff08;顺序表&#xff09;transient Object[] elementData; 特点&#xff1a;在内存中分配连续的空间&#xff0c;只存储数据&#xff0c;不存储地址信息。位置就隐含着地址。优点 节…...

uniapp 微信小程序生成水印图片

效果 源码 <template><view style"overflow: hidden;"><camera device-position"back" flash"auto" class"camera"><cover-view class"text-white padding water-mark"><cover-view class"…...

ElasticSearch相关知识点

ElasticSearch中的倒排索引是如何工作的&#xff1f; 倒排索引是ElasticSearch中用于全文检索的一种数据结构&#xff0c;与正排索引不同的是&#xff0c;正排索引将文档按照词汇顺序组织。而倒排索引是将词汇映射到包含该词汇的文档中。 在ElasticSearch中&#xff0c;倒排索…...

css 文字图片居中及网格布局

以下内容纯自已个人理解&#xff0c;直接上代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><…...

解决ImportError: DLL load failed while importing _rust: 找不到指定的程序

解决ImportError: DLL load failed while importing _rust: 找不到指定的程序 python使用库cryptography 当 from cryptography.hazmat.bindings._rust import exceptions as rust_exceptions 时&#xff0c;会报错&#xff1a; ImportError: DLL load failed while importin…...

集合-List去重

1.利用Set去重 @Test public void distinctList() {List<String> oldList = new ArrayList<>();oldList.add("a");oldList.add("a");oldList.add("b");oldList.add("c");oldList.add("d");List<String> …...

ST-LINK USB communication error 非常有效的解决方法

文章目录 一、检查确定是ST-LINK USB communication error的问题二、关闭文件&#xff0c;打开keil软件所在文件夹&#xff0c;找到STLink文件夹&#xff0c;找到该应用程序双击 一、检查确定是ST-LINK USB communication error的问题 二、关闭文件&#xff0c;打开keil软件所在…...

探索CSS的:future-link伪类:选择指向未来文档的链接

CSS&#xff08;层叠样式表&#xff09;是Web设计中用于描述网页元素样式的语言。随着CSS4的提案&#xff0c;引入了许多新的选择器&#xff0c;其中之一是:future-link伪类。然而&#xff0c;需要注意的是&#xff0c;:future-link伪类目前还处于提议阶段&#xff0c;并没有在…...

【C++】序列与关联容器(三)map与multimap容器

【C】序列与关联容器&#xff08;三&#xff09;map与multimap容器 一、map二、multiset / multimap 一、map 树中的每个结点的类型是一个std::pair //pair的类型是<const key,value> pair是一个包含两个指针的结构体&#xff0c;第一个指针指向该节点的key&#xff0c;…...

ActiveMQ、RabbitMQ、Kafka、RocketMQ在优先级队列、延迟队列、死信队列、重试队列、消费模式、广播模式的区别

ActiveMQ、RabbitMQ、Kafka、RocketMQ这四款消息队列在优先级队列、延迟队列、死信队列、重试队列、消费模式、广播模式等方面各有其特点和差异。以下是对这些方面的详细比较&#xff1a; 1. 优先级队列 ActiveMQ&#xff1a;支持优先级队列&#xff0c;可以在发送消息时指定…...

首款会员制区块链 Geist 介绍

今天&#xff0c;Pixelcraft Studios 很高兴地宣布即将推出 Geist&#xff0c;这是一个由 Base、Arbitrum、Alchemy 以及 Aavegotchi 支持的全新 L3。 Geist 之前的代号为 “Gotchichain”&#xff0c;是首个专为游戏打造的会员专用区块链。 为什么选择 Geist&#xff1f; …...

CANoe软件中Trace窗口的筛选栏标题不显示(空白)的解决方法

文章目录 问题描述原因分析解决方案扩展知识总结问题描述 不知道什么情况,CANoe软件中Trace窗口的筛选栏标题突然不显示了,一片空白。现象如下: 虽然不影响CANoe软件的使用,但是观感上非常难受,对于强迫症患者非常不友好。 原因分析 按照常规思路,尝试了: 1、重启CAN…...

日期类代码实现-C++

一、目标 通过前面对类和对象的介绍我们可以自己通过C代码初步实现一个简单的日期类。 实现的主要操作有&#xff1a; 1.日期类的构造函数 2.日期类的拷贝构造函数&#xff08;在头文件中实现&#xff09; 3.日期类的比较运算符重载 4.日期类的计算运算符重载 5.流插入运…...

【问题记录+总结】VS Code Tex Live 2024 Latex Workshop Springer模板----更新ing

目录 Summary 道阻且长 少即是多 兵马未动粮草先行 没有万能 和一劳永逸 具体问题具体分析 心态 Detail 1、关于模板[官网] 2、settings.json 3、虫和杀虫剂 4、擦 换成Tex Studio都好了。。。 Summary 道阻且长 某中意期刊&#xff0c;只有Latex。之前只简单用过…...

Linux运维_Bash脚本_源码安装Go-1.21.11

Linux运维_Bash脚本_源码安装Go-1.21.11 Bash (Bourne Again Shell) 是一个解释器&#xff0c;负责处理 Unix 系统命令行上的命令。它是由 Brian Fox 编写的免费软件&#xff0c;并于 1989 年发布的免费软件&#xff0c;作为 Sh (Bourne Shell) 的替代品。 您可以在 Linux 和…...

ShareSDK Twitter

创建应用 1.登录Twitter控制台并通过认证 2.点击Developer Portal进入Twitter后台 3.点击Sign up for Free Account创建应用 4.配置应用信息 以下为创建过程示例&#xff0c;图中信息仅为示例&#xff0c;创建时请按照真实信息填写&#xff0c;否则无法正常使用。 权限申请…...

word2vec 如何用多个词表示一个句子

word2vec 模型通常用于将单词映射为固定大小的向量。为了使用多个词表示一个句子&#xff0c;我们可以采用以下几种方法&#xff1a; 词袋模型 (Bag of Words, BoW): 将句子中所有词的向量加起来&#xff0c;不考虑词的顺序。这种方法简单&#xff0c;但会丢失词序信息。 计算…...

IDEA中查看接口的所有实现类和具体实现类

1.IDEA中接口的所有实现类查看 1.CTRLH(hierarchy 结构) 我们选中要查看的接口 按住快捷键ctrlh 在界面右侧可以看到该接口的所有可能实现类 2.右击diagrams->show diagram 选中要查看的接口 右击选择diagrams->show diagram 即可以以图表的方式查看接口和所有实现类…...

DLL的导出和调用

动态链接库在C中非常重要&#xff0c;写了一个简单的例子用于DLL的导出和调用。 DLL的生成 头文件 #include<iostream> #include<stdexcept> using namespace std;#define TESTAPI __declspec(dllexport)// 函数定义 extern "C" {TESTAPI int add(in…...

vscode中调试cuda kernel

关于vscode中调试cpp可参考之前的博客&#xff1a;ubuntu vscode 基本设置 和 调试设置_ubuntu vscode 调试-CSDN博客 这篇我们来讲如何调试.cu的kernel&#xff0c;主要参考的是&#xff1a;https://www.zhihu.com/question/431782036/answer/2468791220 1、基本准备不多说&am…...

SQL的连接查询与pandas的对应关系

在SQL和Pandas中&#xff0c;连接查询&#xff08;join&#xff09;是处理数据集之间关系的重要工具。下面是SQL中的各种连接查询类型及其与Pandas中相应操作的对应关系&#xff1a; 1. INNER JOIN SQL: INNER JOIN 返回两个表中具有匹配值的行。 Pandas: merge() 方法的 how…...

【JS】中断和恢复任务序列

前言 封装processTasks函数&#xff0c;实现以下需求 /*** 依次顺序执行一系列任务* 所有任务全部完成后可以得到每个任务的执行结果* 需要返回两个方法&#xff0c;start用于启动任务&#xff0c;pause用于暂停任务* 每个任务具有原子性&#xff0c;即不可中断&#xff0c;只…...

CentOS系统下安装NVIDIA显卡驱动

一、安装显卡驱动 1.安装依赖项 yum -y install gcc pciutils yum -y install gcc yum -y install gcc-c yum -y install make2.查看内核版本 uname -a3.查看显卡版本 lspci | grep -i nvidia4.屏蔽系统自带的nouveau (1)查看nouveau lsmod | grep nouveau (2)打开blackl…...

Linux 与 Windows 服务器操作系统 | 全面对比

在服务器操作系统的领域&#xff0c;Linux 和 Windows 一直是两个备受关注的选择。 首先来看 Windows 操作系统。它由 Microsoft Corporation 开发&#xff0c;在桌面领域占据显著份额&#xff0c;其中 Windows 10 是使用最广泛的版本&#xff0c;广泛应用于个人计算机和企业桌…...

给既有exe程序添加一机一码验证

原文地址&#xff1a;李浩的博客 lihaohello.top 本科期间开发过一款混凝土基本构件设计程序&#xff0c;该程序是一个独立的exe可执行文件&#xff0c;采用VC静态链接MFC库编制而成。近期&#xff0c;需要为该程序添加用户注册验证的功能&#xff0c;从而避免任何用户获取该程…...

【Datawhale X 魔搭 】AI夏令营第四期大模型方向,Task2:头脑风暴会,巧灵脑筋急转弯(持续更新)

队伍名称&#xff1a;巧灵脑筋急转弯 队伍技术栈&#xff1a;python&#xff0c;LLM&#xff0c;RAG&#xff0c;大模型&#xff0c;nlp&#xff0c;Gradio&#xff0c;Vue&#xff0c;java 队友&#xff1a;知唐&#xff08;队长&#xff09;&#xff0c;我真的敲不动…...

mysql 多个外键

在MySQL中&#xff0c;一个表可以有多个外键约束&#xff0c;它们分别关联到不同的主表。在创建表时&#xff0c;可以在每个外键约束上指定不同的外键名称。以下是一个简单的例子&#xff0c;演示如何在创建表时定义多个外键&#xff1a; CREATE TABLE orders (order_id INT AU…...

解决方案上新了丨趋动科技推出基于银河麒麟操作系统的异构算力池化解决方案

趋动科技携手麒麟软件打造基于银河麒麟操作系统的异构算力池化解决方案&#xff0c;共同探索AI领域新场景。 人工智能技术作为数字经济发展的重要推手&#xff0c;在各行业业务场景中落地需要大量AI算力资源的有效保障。在IT基础设施普遍云化的今天&#xff0c;AI算力一方面需…...

14.创建一个实战maven的springboot项目

项目核心主要部分 pom.xml文件 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://mave…...

docker部署LNMP

docker部署LNMP nginx 1.22 172.111.0.10 docker-nginx mysql 8.0.30 172.111.0.20 docker-mysql php 8.1.27 172.111.0.30 docker-php docker&#xff1a;单节点部署&#xff0c;只能在一台机器上部署&#xff0c;如果跨机器容器无法操作&#xff0c;无法通信。 做高可用…...

在Spring Boot应用中,如果你希望在访问应用时加上项目的名称或者一个特定的路径前缀

在Spring Boot应用中&#xff0c;如果你希望在访问应用时加上项目的名称或者一个特定的路径前缀 在Spring Boot应用中&#xff0c;如果你希望在访问应用时加上项目的名称或者一个特定的路径前缀&#xff0c;你可以通过配置server.servlet.context-path属性来实现。这通常在app…...

东南大学:Wi-Fi 6搭档全光以太,打造“数智东南”信息高速路

东南大学&#xff1a;Wi-Fi 6搭档全光以太&#xff0c;打造“数智东南”信息高速路 - 华为企业业务 打好ICT底座&#xff0c;平台和应用层面就会非常通畅了。首先&#xff0c;出海企业的需求既有普遍性&#xff0c;也有垂直性行业的特性需求。普遍性需求需要通信、沟通数据和传…...

C++:stack类(vector和list优缺点、deque)

目录 前言 数据结构 deque vector和list的优缺点 push pop top size empty 完整代码 前言 stack类就是数据结构中的栈 C数据结构&#xff1a;栈-CSDN博客 stack类所拥有的函数相比与string、vector和list类都少很多&#xff0c;这是因为栈这个数据结构是后进先出的…...

负载均衡、高可用

负载均衡 负载均衡&#xff08;Load Balance&#xff09;&#xff1a;可以利用多个计算机和组合进行海量请求处理&#xff0c;从而获得很高的处理效率&#xff0c;也可以用多个计算机做备份&#xff08;高可用&#xff09;&#xff0c;使得任何一个机器坏了整个系统还是能正常…...

从Retrofit支持suspend协程请求说开去

在现代Android开发中&#xff0c;异步请求已经成为不可或缺的一部分。传统的异步请求往往涉及大量的回调逻辑&#xff0c;使代码难以维护和调试。随着Kotlin协程的引入&#xff0c;异步编程得到了极大的简化。而作为最流行的网络请求库之一&#xff0c;Retrofit早在Kotlin协程的…...

深入浅出:你需要了解的用户数据报协议(UDP)

文章目录 **UDP概述****1. 无连接性****2. 尽最大努力交付****3. 面向报文****4. 多种交互通信支持****5. 较少的首部开销** **UDP报文的首部格式****详细解释每个字段** **UDP的多路分用模型****多路分用的实际应用** **检验和的计算方法****伪首部的详细内容****检验和计算步…...

C++的Magic Static

什么是“Magic Static”&#xff1f; C 中&#xff0c;函数内部的静态变量只会在第一次执行该函数时被初始化&#xff0c;而且这种初始化在 C11 标准之后是线程安全的。这意味着即使多个线程同时第一次调用该函数&#xff0c;静态变量也只会被初始化一次&#xff0c;并且在初始…...

vscode添加宏定义

1 起因 在用vscode看项目代码时&#xff0c;如果源文件中的代码块被某个宏定义给包裹住了&#xff0c;则在vscode的默认配置下&#xff0c;不会高亮显示这块被包裹住的代码&#xff0c;如下图中229行开始的代码被STM32F40_41xxx所控制&#xff0c;没有高亮显示。 由于STM32F4…...

Postman接口关联

接口关联 接口之间存在依赖关系&#xff0c;接口B要依赖于接口A的返回值。 例如&#xff1a;现在有两个接口&#xff0c;接口1&#xff1a;获取接口统一鉴权码token接口&#xff0c;接口2&#xff1a;创建标签接口。接口2里的请求参数需要依赖接口1返回的值&#xff0c;即需要…...

用Python制作开心消消乐游戏|附源码

制作一个完整的“开心消消乐”风格的游戏在Python中是一个相对复杂的项目&#xff0c;因为它涉及到图形界面、游戏逻辑、动画效果以及用户交互等多个方面。不过&#xff0c;我可以为你提供一个简化的版本和概念框架&#xff0c;帮助你理解如何开始这个项目&#xff0c;并提供一…...

ArcGIS10.8 安装教程

目录 一、环境及安装包准备 二、安装流程 1、解压安装包ArcGIS_108.rar 2、安装 三、汉化 四、激活 五、自定义菜单&#xff08;可选&#xff09; 六、打开软件按查看 七、安装过程中出现的报错 八、其他 一、环境及安装包准备 安装环境&#xff1a;win7 安装包下载…...

2024网络安全学习路线,最全保姆级教程,学完直接拿捏!

关键词&#xff1a; 网络安全入门、渗透测试学习、零基础学安全、网络安全学习路线 首先咱们聊聊&#xff0c;学习网络安全方向通常会有哪些问题 前排提示&#xff1a;文末有CSDN独家网络安全资料包&#xff01; 1、打基础时间太长 学基础花费很长时间&#xff0c;光语言都有…...

Apache Doris 中Compaction问题分析和典型案例

说明 此文档主要说明一些常见compaction问题的排查思路和临时处理手段。这些问题包括 Compaction socre高Compaction失败compaction占用资源多Compaction core 如果问题紧急&#xff0c;可联系社区同学处理 如果阅读中有问题&#xff0c;可以反馈给社区同学。 1 compaction …...

redis面试(十七)MultiLock加锁和释放锁

MultiLock MultiLock&#xff0c;英语直译为多个锁。 redisson分布式锁中的MultiLock这个机制&#xff0c;可以将多个锁合并为一个大锁&#xff0c;对一个大锁进行统一的申请加锁以及释放锁 一次性锁定多个资源&#xff0c;再去处理一些事情&#xff0c;然后事后一次性释放所…...

电脑开机LOGO修改教程_BIOS启动图片替换方法

准备工具&#xff1a;刷BIOS神器和change logo&#xff0c;打包下载地址&#xff1a;https://download.csdn.net/download/baiseled/89374686 一.打开刷BIOS神器&#xff0c;点击备份BIOS&#xff0c;保存到桌面 二.打开change logo&#xff0c;1.点击load image&#xff0c;选…...

微前端架构的持续集成与持续部署实践

在软件开发中&#xff0c;持续集成&#xff08;Continuous Integration, CI&#xff09;和持续部署&#xff08;Continuous Deployment, CD&#xff09;是实现高效、自动化软件交付的关键实践。微前端架构通过将应用拆分为多个自治的子应用&#xff0c;带来了开发和部署上的灵活…...