藏文网站怎么做/网站优化方案设计
大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库
- 一、版本兼容性
- 二、使用
- 三、Flink SQL
- 四、DataStream
- 五、Lookup Join
- 六、配置
- 通用配置项
- 接收器配置项
- 查找Join配置项
- 七、Doris 和 Flink 列类型映射
- 八、使用Flink CDC访问Doris的示例
- 九、使用FlinkSQL通过CDC访问并实现部分列更新的示例
- 十、使用FlinkCDC访问多个表或整个数据库(支持MySQL、Oracle、PostgreSQL、SQLServer)
- 十一、使用FlinkCDC更新Key列
- 十二、使用Flink根据指定的列删除数据
- 十三、最佳实践应用场景
- 十四、常见问题解答
可以通过Flink操作(读取、插入、修改、删除)支持存储在Doris中的数据。本文介绍了如何通过Datastream和Flink操作Doris。
注意:
- 修改和删除仅支持唯一键模型。
- 当前的删除是为了支持Flink CDC访问数据以实现自动删除。如果要删除其他数据访问方法,您需要自行实现。
一、版本兼容性
二、使用
Maven
添加 flink-doris-connector
<!-- flink-doris-connector -->
<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.16</artifactId><version>1.6.0</version>
</dependency>
- 请根据不同的Flink版本替换相应的Connector和Flink依赖版本。
- 也可以从这里下载相关版本的jar包。
flink-doris-connector下载地址:
- https://repo.maven.apache.org/maven2/org/apache/doris/
编译
- 编译时直接运行sh build.sh即可。
- 编译成功后,会在dist目录下生成目标jar包,如:flink-doris-connector-1.5.0-SNAPSHOT.jar。将此文件复制到 Flink 的类路径中以使用 Flink-Doris-Connector。例如,Flink 运行在 Local 模式,则将此文件放在 lib/ 文件夹中。 Flink运行在Yarn集群模式下,将此文件放入预部署包中。
三、Flink SQL
read
-- doris source
CREATE TABLE flink_doris_source (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH ('connector' = 'doris','fenodes' = 'FE_IP:HTTP_PORT','table.identifier' = 'database.table','username' = 'root','password' = 'password'
);
write
--enable checkpoint
SET 'execution.checkpointing.interval' = '10s';-- doris sink
CREATE TABLE flink_doris_sink (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH ('connector' = 'doris','fenodes' = 'FE_IP:HTTP_PORT','table.identifier' = 'db.table','username' = 'root','password' = 'password','sink.label-prefix' = 'doris_label'
);-- submit insert job
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
四、DataStream
read
DorisOptions.Builder builder = DorisOptions.builder().setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder().setDorisOptions(builder.build()).setDorisReadOptions(DorisReadOptions.builder().build()).setDeserializer(new SimpleListDeserializationSchema()).build();env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
write
DorisSink通过StreamLoad向Doris写入数据,DataStream写入时支持不同的序列化方式
字符串数据流(SimpleStringSerializer)
// enable checkpoint
env.enableCheckpointing(10000);
// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");Properties properties = new Properties();
// When the upstream is writing json, the configuration needs to be enabled.
//properties.setProperty("format", "json");
//properties.setProperty("read_json_by_line", "true");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix.setDeletable(false).setStreamLoadProp(properties); ;builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(new SimpleStringSerializer()) //serialize according to string.setDorisOptions(dorisBuilder.build());//mock string source
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("doris",1));
DataStreamSource<Tuple2<String, Integer>> source = env. fromCollection(data);source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1).sinkTo(builder.build());//mock json string source
//env.fromElements("{\"name\":\"zhangsan\",\"age\":1}").sinkTo(builder.build());
RowData数据流(RowDataSerializer)
// enable checkpoint
env.enableCheckpointing(10000);
// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);//doris sink option
DorisSink.Builder<RowData> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");// json format to streamload
Properties properties = new Properties();
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix.setDeletable(false).setStreamLoadProp(properties); //streamload params//flink rowdata's schema
String[] fields = {"city", "longitude", "latitude", "destroy_date"};
DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(RowDataSerializer.builder() //serialize according to rowdata.setFieldNames(fields).setType("json") //json format.setFieldType(types).build()).setDorisOptions(dorisBuilder.build());//mock rowdata source
DataStream<RowData> source = env. fromElements("").map(new MapFunction<String, RowData>() {@Overridepublic RowData map(String value) throws Exception {GenericRowData genericRowData = new GenericRowData(4);genericRowData.setField(0, StringData.fromString("beijing"));genericRowData.setField(1, 116.405419);genericRowData.setField(2, 39.916927);genericRowData.setField(3, LocalDate.now().toEpochDay());return genericRowData;}});source. sinkTo(builder. build());
SchemaChange数据流(JsonDebeziumSchemaSerializer)
// enable checkpoint
env.enableCheckpointing(10000);Properties props = new Properties();
props. setProperty("format", "json");
props.setProperty("read_json_by_line", "true");
DorisOptions dorisOptions = DorisOptions. builder().setFenodes("127.0.0.1:8030").setTableIdentifier("test.t1").setUsername("root").setPassword("").build();DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-prefix").setStreamLoadProp(props).setDeletable(true);DorisSink.Builder<String> builder = DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setDorisOptions(dorisOptions).setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").sinkTo(builder.build());
五、Lookup Join
CREATE TABLE fact_table (`id` BIGINT,`name` STRING,`city` STRING,`process_time` as proctime()
) WITH ('connector' = 'kafka',...
);create table dim_city(`city` STRING,`level` INT ,`province` STRING,`country` STRING
) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','jdbc-url' = 'jdbc:mysql://127.0.0.1:9030','table.identifier' = 'dim.dim_city','username' = 'root','password' = ''
);SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city
- 这个命令是用于创建两个表和一个查询语句。第一个表是名为"fact_table"的表,它有四个列,分别为"id"、“name”、“city"和"process_time”。其中,"id"列是BIGINT类型,"name"和"city"列是STRING类型,"process_time"列是一个基于当前系统时间的计算列,它使用"proctime()"函数实现。
- 第二个表是名为"dim_city"的表,它有四个列,分别为"city"、“level”、“province"和"country”。其中,“city”、“province"和"country"列是STRING类型,“level"列是INT类型。该表使用"Doris"作为存储引擎,连接器为"connector”,并且需要指定连接器的其他参数,如"fenodes”、“jdbc-url”、“table.identifier”、"username"和"password"等。
- 最后一个命令是一个查询语句,它使用"LEFT JOIN"将"fact_table"和"dim_city"两个表进行连接,并使用"FOR SYSTEM_TIME AS OF"来指定连接时的时间戳,这里使用了"process_time"列的值。查询结果包括"id"、“name”、“city”、“province”、"country"和"level"这些列。
六、配置
通用配置项
fenodes:
- Doris FE http地址,支持多个地址,用逗号分隔
benodes:
- Doris BE http地址,支持多个地址,以逗号分隔。
jdbc-url:
- jdbc连接信息,如:jdbc:mysql://127.0.0.1:9030
table.identifier:
- Doris表名,如:db.tbl
auto-redirect:
- 默认值:true
- 是否重定向 StreamLoad 请求。开启后StreamLoad会通过FE写入,不再显示BE信息。
doris.request.retries:
- 默认值:3
- 向 Doris 发送请求的重试次数
doris.request.connect.timeout.ms:
- 默认值:30000
- 向 Doris 发送请求的连接超时
doris.request.read.timeout.ms:
- 默认值:30000
- 读取向 Doris 发送请求的超时
源配置项
doris.request.query.timeout.s:
- 默认值:3600
- 查询Doris的超时时间,默认1小时,-1表示无超时限制
doris.request.tablet.size:
- 默认值:Integer. MAX_VALUE
- 一个Partition对应的Doris Tablet数量。该值设置得越小,生成的Partition就越多。这提高了 Flink 端的并行度,但同时也给 Doris 带来了更大的压力。
doris.batch.size:
- 默认值:1024
- 一次从BE读取数据的最大行数。增加该值会减少 Flink 和 Doris 之间建立的连接数量。从而减少网络延迟带来的额外时间开销。
doris.exec.mem.limit:
- 默认值:2147483648
- 单个查询的内存限制。默认为2GB,以字节为单位
doris.deserialize.arrow.async:
- 默认值:FALSE
- 是否支持flink-doris-connector迭代所需的Arrow格式异步转换为RowBatch
doris.deserialize.queue.size:
- 默认值:64
- Arrow格式的内部处理队列的异步转换,当doris.deserialize.arrow.async为true时有效
doris.read.field:
- 读取Doris表的列名列表,以逗号分隔
doris.filter.query:
- 过滤读取数据的表达式,这个表达式透明传递给Doris。 Doris使用这个表达式来完成源端的数据过滤。例如年龄=18。
接收器配置项
sink.label-prefix:
- Stream加载导入使用的标签前缀。在2pc场景下,需要全局唯一性来保证Flink的EOS语义。
sink.properties.*:
- 导入流负载参数。
例如: ‘sink.properties.column_separator’ = ', ’ 定义列分隔符, ‘sink.properties.escape_delimiters’ = ‘true’ 特殊字符作为分隔符, ‘\x01’ 将转换为二进制 0x01 - JSON格式导入
‘sink.properties.format’ = ‘json’ ‘sink.properties.按行读取 json’ = ‘true’
详细参数请参考这里。
sink.enable-delete:
- 默认值:TRUE
- 是否启用删除。该选项需要Doris表开启批量删除功能(Doris 0.15+版本默认开启),且仅支持Unique模型。
sink.enable-2pc:
- 默认值:TRUE
- 是否启用两阶段提交(2pc),默认为true,以保证Exactly-Once语义。
sink.buffer-size:
- 默认值:1MB
- 写入数据缓存缓冲区的大小,以字节为单位。不建议修改,默认配置即可
sink.buffer-count:
- 默认值:3
- 写入数据缓冲区的数量。不建议修改,默认配置即可
sink.max-retries:
- 默认值:3
- Commit失败后最大重试次数,默认3
sink.use-cache:
- 默认值:false
- 发生异常时,是否使用内存缓存进行恢复。启用后,Checkpoint 期间的数据将保留在缓存中。
sink.enable.batch-mode:
- 默认值:false
- 是否使用批处理模式写入Doris。使能后,写入时序不依赖于Checkpoint。写入是通过sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval参数控制的。输入机会。
同时开启后,Exactly-once语义将无法保证。 Uniq模型可以用来实现幂等性。
sink.flush.queue-size:
- 默认值:2
- 在批处理模式下,缓存的列大小。
sink.buffer-flush.max-rows:
- 默认值:50000
- 批处理模式下,单批写入的最大数据行数。
sink.buffer-flush.max-bytes:
- 默认值:10MB
- 在批处理模式下,单批写入的最大字节数。
sink.buffer-flush.interval:
- 默认值:10s
- 批处理模式下,异步刷新缓存的时间间隔
sink.ignore.update-before:
- 默认值:true
- 是否忽略update-before事件,默认忽略。
查找Join配置项
lookup.cache.max-rows
- 默认值:-1
- 查找缓存的最大行数,默认值为-1,不启用缓存
lookup.cache.ttl:
- 默认值:10s
- 查找缓存的最大时间,默认10s
lookup.max-retries:
- 默认值:1
- 查找查询失败后重试的次数
lookup.jdbc.async:
- 默认值:false
- 是否启用异步查找,默认为false
lookup.jdbc.read.batch.size:
- 默认值:128
- 异步查找下,每个查询的最大批量大小
lookup.jdbc.read.batch.queue-size:
- 默认值:256
- 异步查找时中间缓冲队列的大小
lookup.jdbc.read.thread-size:
- 默认值:3
- 每个任务中用于查找的jdbc线程数
七、Doris 和 Flink 列类型映射
Doris类型 | Flink类型 |
---|---|
NULL_TYPE | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
CHAR | STRING |
LARGEINT | STRING |
VARCHAR | STRING |
STRING | STRING |
DECIMALV2 | DECIMAL |
ARRAY | ARRAY |
MAP | MAP |
JSON | STRING |
VARIANT | STRING |
IPV4 | STRING |
IPV6 | STRING |
从connector-1.6.1版本开始,增加了对Variant、IPV6、IPV4三种数据类型读取的支持。读取 IPV6 和 Variant 需要 Doris 2.1.1 或更高版本。
八、使用Flink CDC访问Doris的示例
SET 'execution.checkpointing.interval' = '10s';
CREATE TABLE cdc_mysql_source (id int,name VARCHAR,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '127.0.0.1','port' = '3306','username' = 'root','password' = 'password','database-name' = 'database','table-name' = 'table'
);-- Support synchronous insert/update/delete events
CREATE TABLE doris_sink (
id INT,
name STRING
)
WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = '','sink.properties.format' = 'json','sink.properties.read_json_by_line' = 'true','sink.enable-delete' = 'true', -- Synchronize delete events'sink.label-prefix' = 'doris_label'
);insert into doris_sink select id,name from cdc_mysql_source;
九、使用FlinkSQL通过CDC访问并实现部分列更新的示例
-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';CREATE TABLE cdc_mysql_source (id int,name STRING,bank STRING,age int,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '127.0.0.1','port' = '3306','username' = 'root','password' = 'password','database-name' = 'database','table-name' = 'table'
);CREATE TABLE doris_sink (id INT,name STRING,bank STRING,age int
)
WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = '','sink.properties.format' = 'json','sink.properties.read_json_by_line' = 'true','sink.properties.columns' = 'id,name,bank,age','sink.properties.partial_columns' = 'true' --Enable partial column updates
);insert into doris_sink select id,name,bank,age from cdc_mysql_source;
十、使用FlinkCDC访问多个表或整个数据库(支持MySQL、Oracle、PostgreSQL、SQLServer)
MySQL同步示例
<FLINK_HOME>bin/flink run \-Dexecution.checkpointing.interval=10s\-Dparallelism.default=1\-c org.apache.doris.flink.tools.cdc.CdcTools\lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \mysql-sync-database\--database test_db \--mysql-conf hostname=127.0.0.1 \--mysql-conf port=3306 \--mysql-conf username=root \--mysql-conf password=123456 \--mysql-conf database-name=mysql_db \--including-tables "tbl1|test.*" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=123456 \--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1
Oracle同步示例
<FLINK_HOME>bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1 \-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar\oracle-sync-database \--database test_db \--oracle-conf hostname=127.0.0.1 \--oracle-conf port=1521 \--oracle-conf username=admin \--oracle-conf password="password" \--oracle-conf database-name=XE \--oracle-conf schema-name=ADMIN \--including-tables "tbl1|tbl2" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=\--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1
PostgreSQL 同步示例
<FLINK_HOME>/bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1\-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \postgres-sync-database \--database db1\--postgres-conf hostname=127.0.0.1 \--postgres-conf port=5432 \--postgres-conf username=postgres \--postgres-conf password="123456" \--postgres-conf database-name=postgres \--postgres-conf schema-name=public \--postgres-conf slot.name=test \--postgres-conf decoding.plugin.name=pgoutput \--including-tables "tbl1|tbl2" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=\--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1
SQLServer同步示例
<FLINK_HOME>/bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1 \-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \sqlserver-sync-database \--database db1\--sqlserver-conf hostname=127.0.0.1 \--sqlserver-conf port=1433 \--sqlserver-conf username=sa \--sqlserver-conf password="123456" \--sqlserver-conf database-name=CDC_DB \--sqlserver-conf schema-name=dbo \--including-tables "tbl1|tbl2" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=\--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1
十一、使用FlinkCDC更新Key列
一般来说,在业务数据库中,数字被用作表的主键,例如学生表中使用数字(id)作为主键,但随着业务的发展,与数据对应的数字可能会发生变化。在这种情况下,使用FlinkCDC + Doris Connector进行数据同步可以自动更新Doris主键列中的数据。
原理
Flink CDC的底层采集工具是Debezium。Debezium内部使用op字段来识别相应的操作:op字段的值为c、u、d和r,分别对应创建、更新、删除和读取。对于主键列的更新,FlinkCDC将向下游发送DELETE和INSERT事件,在数据同步到Doris后,会自动更新主键列的数据。
示例
Flink程序可以参考上述CDC同步示例。任务成功提交后,在MySQL端执行更新主键列语句(update student set id = ‘1002’ where id = ‘1001’),以修改Doris中的数据。
十二、使用Flink根据指定的列删除数据
通常,Kafka中的消息使用特定的字段来标记操作类型,例如{“op_type”:“delete”,data:{…}}。对于这种类型的数据,希望删除op_type=delete的数据。
默认情况下,DorisSink将根据RowKind来区分事件类型。通常,在cdc的情况下,可以直接获取事件类型,并将隐藏列__DORIS_DELETE_SIGN__赋值以实现删除的目的,而Kafka需要基于业务逻辑进行判断,显示传递给隐藏列的值。
-- Such as upstream data: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(data STRING,op_type STRING
) WITH ('connector' = 'kafka',...
);CREATE TABLE DORIS_SINK(id INT,name STRING,__DORIS_DELETE_SIGN__ INT
) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'db.table','username' = 'root','password' = '','sink.enable-delete' = 'false', -- false means not to get the event type from RowKind'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__' -- Display the import column of the specified streamload
);INSERT INTO DORIS_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name,
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__
from KAFKA_SOURCE;
- 这段代码是一个示例,演示了如何使用Flink从Kafka源表读取数据,并将其写入Doris目标表。具体来说,如果源表中的数据op_type字段的值为"delete",则希望在Doris目标表中删除相应的数据。
- 首先,在Kafka源表的定义中,我们有一个data字段用于存储源数据的JSON字符串,以及一个op_type字段用于标识操作类型。
- 然后,在Doris目标表的定义中,我们有一个id字段和一个name字段来存储数据的具体内容,还有一个名为__DORIS_DELETE_SIGN__的隐藏列,用于标识是否要进行删除操作。
- 在INSERT INTO语句中,我们将从Kafka源表中选择data字段的id和name,并使用json_value函数提取相应的值。同时,我们使用if函数将op_type字段的值与"delete"进行比较,如果相等则将__DORIS_DELETE_SIGN__赋值为1,否则赋值为0。
- 最后,将处理后的数据插入到Doris目标表中。
- 总之,这段代码的作用是根据源表中的op_type字段值,将对应的数据删除或写入到Doris目标表中。
十三、最佳实践应用场景
- 使用Flink Doris Connector最适合的场景是实时/批量将源数据同步到Doris(Mysql、Oracle、PostgreSQL)中,然后使用Flink对Doris和其他数据源中的数据进行联合分析。您还可以使用Flink Doris Connector。
其他注意事项:
- Flink Doris Connector主要依赖于Checkpoint进行流式写入,因此Checkpoint之间的时间间隔就是数据的可见延迟时间。
- 为了确保Flink的Exactly Once语义,Flink Doris Connector默认启用两阶段提交,Doris在1.1版本之后默认启用两阶段提交。1.0可以通过修改BE参数来启用。
十四、常见问题解答
Doris Source读取数据后,为什么流会结束?
- 目前Doris Source是有界流,不支持CDC读取。
Flink能否读取Doris并执行条件下推?
- 通过配置doris.filter.query参数可以实现。
如何写入位图类型?
CREATE TABLE bitmap_sink (
dt int,
page string,
user_id int
)
WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'test.bitmap_test','username' = 'root','password' = '','sink.label-prefix' = 'doris_label','sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
)
errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]
- 在Exactly-Once场景中,Flink Job必须从最新的Checkpoint/Savepoint重新启动,否则会报告上述错误。当不需要Exactly-Once时,可以通过关闭2PC提交(sink.enable-2pc=false)或更改不同的sink.label-prefix来解决。
errCode = 2, detailMessage = transaction [19650] not found
- 发生在Commit阶段,Checkpoint中记录的事务ID在FE端已过期,在此时再次提交时会出现上述错误。此时无法从Checkpoint启动,可以通过修改fe.conf中的streaming_label_keep_max_second配置来延长过期时间,默认为12小时。
errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100
- 这是因为同一库的并发导入超过了100,可以通过调整fe.conf的max_running_txn_num_per_db参数来解决。详细信息,请参考max_running_txn_num_per_db。
- 同时,如果一个任务频繁修改标签并重新启动,也可能导致此错误发生。在2pc场景(重复/聚合模型)中,每个任务的标签需要唯一,在从Checkpoint重新启动时,Flink任务将主动中止之前已经成功预提交但未提交的事务。频繁修改标签并重新启动将导致大量已成功预提交的事务无法中止,占用事务。在Unique模型下,也可以关闭2pc,实现幂等写入。
当Flink向Uniq模型写入一批数据时,如何确保数据的顺序?
- 您可以添加序列列的配置来确保顺序。
Flink任务没有报错,但数据无法同步?
- 在Connector1.1.0之前,数据是批量写入的,并且写入是由数据驱动的。需要确定上游是否有数据写入。在1.1.0之后,它依赖于Checkpoint,并且必须启用Checkpoint才能进行写入。
tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235
- 通常发生在Connector1.1.0之前,这是因为写入频率过快,导致版本过多。可以通过设置sink.batch.size和sink.batch.interval参数来减少Streamload的频率。
源表和Doris表应如何对应?
- 在使用Flink Connector导入数据时,需要注意两个方面。第一,源表的列和类型应与Flink SQL中的列和类型对应;第二,Flink SQL中的列和类型必须与Doris表的列和类型匹配。
TApplicationException: get_next failed: out of sequence response: expected 4 but got 3
- 这是由于 Thrift 中的并发错误造成的。建议您尽可能使用最新的连接器和兼容的 Flink 版本。
DorisRuntimeException: Fail to abort transaction 26153 with url http://192.168.0.1:8040/api/table_name/_stream_load_2pc
- 您可以在TaskManager中搜索日志中止事务响应,并根据HTTP返回码判断是客户端问题还是服务器问题。
org.apache.flink.table.api.SqlParserException when using doris.filter.query: SQL parsing failed. “xx” encountered at row x, column xx
- 这个问题主要是由于条件中的varchar/string类型,需要进行引号转义。正确的写法是xxx = ‘‘xxx’’。这样,Flink SQL解析器会将连续的两个单引号解释为一个单引号字符,而不是字符串的结束,并将拼接的字符串作为属性的值。例如:t1 >= ‘2024-01-01’ 可以写为 ‘doris.filter.query’ = ‘t1 >=’‘2024-01-01’'。
Failed to connect to backend: http://host:webserver_port, and BE is still alive
- 这个问题可能是由于配置了be的IP地址,而该地址无法被外部的Flink集群访问。这主要是因为在连接fe时,be的地址是通过fe进行解析的。例如,如果将be地址添加为’127.0.0.1’,那么Flink集群通过fe获取到的be地址将是’127.0.0.1:webserver_port’,并且Flink将连接到该地址。当出现这种问题时,可以通过将be的实际对应的外部IP地址添加到"with"属性中来解决:‘benodes’=“be_ip:webserver_port,be_ip:webserver_port…”。对于整个数据库的同步,可以使用以下属性:–sink-conf benodes=be_ip:webserver,be_ip:webserver…。
当使用Flink-connector将MySQL数据同步到Doris时,时间戳之间存在几小时的时间差。
- Flink Connector默认使用UTC+8时区从MySQL同步整个数据库。如果您的数据位于不同的时区,您可以使用以下配置进行调整,例如:–mysql-conf debezium.date.format.timestamp.zone=“UTC+3”。
相关文章:

大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库
大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库 一、版本兼容性二、使用三、Flink SQL四、DataStream五、Lookup Join六、配置通用配置项接收器配置项查找Join配置项 七、Doris 和 Flink 列类型映射八、使用Flink CDC访问Doris的示例九、…...

LabVIEW VI 多语言动态加载与运行的实现
在多语言应用程序开发中,确保用户界面能够根据用户的语言偏好动态切换是一个关键需求。本文通过分析一个LabVIEW程序框图,详细说明了如何使用LabVIEW中的属性节点和调用节点来实现VI(虚拟仪器)界面语言的动态加载与运行。此程序允…...

Unity引擎基础知识
目录 Unity基础知识概要 1. 创建工程 2. 工程目录介绍 3. Unity界面和五大面板 4. 游戏物体创建与操作 5. 场景和层管理 6. 组件系统 7. 脚本语言C# 8. 物理引擎和UI系统 学习资源推荐 Unity引擎中如何优化大型游戏项目的性能? Unity C#脚本语言的高级编…...

练习题- 探索正则表达式对象和对象匹配
正则表达式(Regular Expressions)是一种强大而灵活的文本处理工具,它允许我们通过模式匹配来处理字符串。这在数据清理、文本分析等领域有着广泛的应用。在Python中,正则表达式通过re模块提供支持,学习和掌握正则表达式对于处理复杂的文本数据至关重要。 本文将探索如何在…...

Java集合提升
1. 手写ArrayList 1.1. ArrayList底层原理细节 底层结构是一个长度可以动态增长的数组(顺序表)transient Object[] elementData; 特点:在内存中分配连续的空间,只存储数据,不存储地址信息。位置就隐含着地址。优点 节…...

uniapp 微信小程序生成水印图片
效果 源码 <template><view style"overflow: hidden;"><camera device-position"back" flash"auto" class"camera"><cover-view class"text-white padding water-mark"><cover-view class"…...

ElasticSearch相关知识点
ElasticSearch中的倒排索引是如何工作的? 倒排索引是ElasticSearch中用于全文检索的一种数据结构,与正排索引不同的是,正排索引将文档按照词汇顺序组织。而倒排索引是将词汇映射到包含该词汇的文档中。 在ElasticSearch中,倒排索…...

css 文字图片居中及网格布局
以下内容纯自已个人理解,直接上代码: <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><…...

解决ImportError: DLL load failed while importing _rust: 找不到指定的程序
解决ImportError: DLL load failed while importing _rust: 找不到指定的程序 python使用库cryptography 当 from cryptography.hazmat.bindings._rust import exceptions as rust_exceptions 时,会报错: ImportError: DLL load failed while importin…...

集合-List去重
1.利用Set去重 @Test public void distinctList() {List<String> oldList = new ArrayList<>();oldList.add("a");oldList.add("a");oldList.add("b");oldList.add("c");oldList.add("d");List<String> …...

ST-LINK USB communication error 非常有效的解决方法
文章目录 一、检查确定是ST-LINK USB communication error的问题二、关闭文件,打开keil软件所在文件夹,找到STLink文件夹,找到该应用程序双击 一、检查确定是ST-LINK USB communication error的问题 二、关闭文件,打开keil软件所在…...

探索CSS的:future-link伪类:选择指向未来文档的链接
CSS(层叠样式表)是Web设计中用于描述网页元素样式的语言。随着CSS4的提案,引入了许多新的选择器,其中之一是:future-link伪类。然而,需要注意的是,:future-link伪类目前还处于提议阶段,并没有在…...

【C++】序列与关联容器(三)map与multimap容器
【C】序列与关联容器(三)map与multimap容器 一、map二、multiset / multimap 一、map 树中的每个结点的类型是一个std::pair //pair的类型是<const key,value> pair是一个包含两个指针的结构体,第一个指针指向该节点的key,…...

ActiveMQ、RabbitMQ、Kafka、RocketMQ在优先级队列、延迟队列、死信队列、重试队列、消费模式、广播模式的区别
ActiveMQ、RabbitMQ、Kafka、RocketMQ这四款消息队列在优先级队列、延迟队列、死信队列、重试队列、消费模式、广播模式等方面各有其特点和差异。以下是对这些方面的详细比较: 1. 优先级队列 ActiveMQ:支持优先级队列,可以在发送消息时指定…...

首款会员制区块链 Geist 介绍
今天,Pixelcraft Studios 很高兴地宣布即将推出 Geist,这是一个由 Base、Arbitrum、Alchemy 以及 Aavegotchi 支持的全新 L3。 Geist 之前的代号为 “Gotchichain”,是首个专为游戏打造的会员专用区块链。 为什么选择 Geist? …...

CANoe软件中Trace窗口的筛选栏标题不显示(空白)的解决方法
文章目录 问题描述原因分析解决方案扩展知识总结问题描述 不知道什么情况,CANoe软件中Trace窗口的筛选栏标题突然不显示了,一片空白。现象如下: 虽然不影响CANoe软件的使用,但是观感上非常难受,对于强迫症患者非常不友好。 原因分析 按照常规思路,尝试了: 1、重启CAN…...

日期类代码实现-C++
一、目标 通过前面对类和对象的介绍我们可以自己通过C代码初步实现一个简单的日期类。 实现的主要操作有: 1.日期类的构造函数 2.日期类的拷贝构造函数(在头文件中实现) 3.日期类的比较运算符重载 4.日期类的计算运算符重载 5.流插入运…...

【问题记录+总结】VS Code Tex Live 2024 Latex Workshop Springer模板----更新ing
目录 Summary 道阻且长 少即是多 兵马未动粮草先行 没有万能 和一劳永逸 具体问题具体分析 心态 Detail 1、关于模板[官网] 2、settings.json 3、虫和杀虫剂 4、擦 换成Tex Studio都好了。。。 Summary 道阻且长 某中意期刊,只有Latex。之前只简单用过…...

Linux运维_Bash脚本_源码安装Go-1.21.11
Linux运维_Bash脚本_源码安装Go-1.21.11 Bash (Bourne Again Shell) 是一个解释器,负责处理 Unix 系统命令行上的命令。它是由 Brian Fox 编写的免费软件,并于 1989 年发布的免费软件,作为 Sh (Bourne Shell) 的替代品。 您可以在 Linux 和…...

ShareSDK Twitter
创建应用 1.登录Twitter控制台并通过认证 2.点击Developer Portal进入Twitter后台 3.点击Sign up for Free Account创建应用 4.配置应用信息 以下为创建过程示例,图中信息仅为示例,创建时请按照真实信息填写,否则无法正常使用。 权限申请…...

word2vec 如何用多个词表示一个句子
word2vec 模型通常用于将单词映射为固定大小的向量。为了使用多个词表示一个句子,我们可以采用以下几种方法: 词袋模型 (Bag of Words, BoW): 将句子中所有词的向量加起来,不考虑词的顺序。这种方法简单,但会丢失词序信息。 计算…...

IDEA中查看接口的所有实现类和具体实现类
1.IDEA中接口的所有实现类查看 1.CTRLH(hierarchy 结构) 我们选中要查看的接口 按住快捷键ctrlh 在界面右侧可以看到该接口的所有可能实现类 2.右击diagrams->show diagram 选中要查看的接口 右击选择diagrams->show diagram 即可以以图表的方式查看接口和所有实现类…...

DLL的导出和调用
动态链接库在C中非常重要,写了一个简单的例子用于DLL的导出和调用。 DLL的生成 头文件 #include<iostream> #include<stdexcept> using namespace std;#define TESTAPI __declspec(dllexport)// 函数定义 extern "C" {TESTAPI int add(in…...

vscode中调试cuda kernel
关于vscode中调试cpp可参考之前的博客:ubuntu vscode 基本设置 和 调试设置_ubuntu vscode 调试-CSDN博客 这篇我们来讲如何调试.cu的kernel,主要参考的是:https://www.zhihu.com/question/431782036/answer/2468791220 1、基本准备不多说&am…...

SQL的连接查询与pandas的对应关系
在SQL和Pandas中,连接查询(join)是处理数据集之间关系的重要工具。下面是SQL中的各种连接查询类型及其与Pandas中相应操作的对应关系: 1. INNER JOIN SQL: INNER JOIN 返回两个表中具有匹配值的行。 Pandas: merge() 方法的 how…...

【JS】中断和恢复任务序列
前言 封装processTasks函数,实现以下需求 /*** 依次顺序执行一系列任务* 所有任务全部完成后可以得到每个任务的执行结果* 需要返回两个方法,start用于启动任务,pause用于暂停任务* 每个任务具有原子性,即不可中断,只…...

CentOS系统下安装NVIDIA显卡驱动
一、安装显卡驱动 1.安装依赖项 yum -y install gcc pciutils yum -y install gcc yum -y install gcc-c yum -y install make2.查看内核版本 uname -a3.查看显卡版本 lspci | grep -i nvidia4.屏蔽系统自带的nouveau (1)查看nouveau lsmod | grep nouveau (2)打开blackl…...

Linux 与 Windows 服务器操作系统 | 全面对比
在服务器操作系统的领域,Linux 和 Windows 一直是两个备受关注的选择。 首先来看 Windows 操作系统。它由 Microsoft Corporation 开发,在桌面领域占据显著份额,其中 Windows 10 是使用最广泛的版本,广泛应用于个人计算机和企业桌…...

给既有exe程序添加一机一码验证
原文地址:李浩的博客 lihaohello.top 本科期间开发过一款混凝土基本构件设计程序,该程序是一个独立的exe可执行文件,采用VC静态链接MFC库编制而成。近期,需要为该程序添加用户注册验证的功能,从而避免任何用户获取该程…...

【Datawhale X 魔搭 】AI夏令营第四期大模型方向,Task2:头脑风暴会,巧灵脑筋急转弯(持续更新)
队伍名称:巧灵脑筋急转弯 队伍技术栈:python,LLM,RAG,大模型,nlp,Gradio,Vue,java 队友:知唐(队长),我真的敲不动…...