Flink SQL 基于Update流出现空值无法过滤问题
问题背景
- 问题描述
基于Flink-CDC ,Flink SQL的实时计算作业在运行一段时间后,突然发现插入数据库的计算结果发生部分主键属性发生失败,导致后续计算结果无法插入,
超过失败次数失败的情况
- 问题报错
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO dm_hljy.dws_table_name (op_date, school_year, campus_name, school_name, depart_name, total_opfare, ids, update_time) VALUES ('2024-03-11 00:00:00+08', '2023', 'xxxx', 'xxxx学校', 'xxxx小学部', '203333300000', '57', '2024-03-21 09:31:08.47+08') ON DUPLICATE KEY UPDATE school_year=VALUES(school_year), total_opfare=VALUES(total_opfare), ids=VALUES(ids), update_time=VALUES(update_time) was aborted: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraint Call getNextException to see other errors in the batch.at com.huawei.gauss200.jdbc.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:171) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:586) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332]... 1 moreCaused by: com.huawei.gauss200.jdbc.util.PSQLException: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraintat com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2856) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2587) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:575) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332

定位
定位思路
1.方向一:怀疑数据库插入存在数据处理时,造成数据处理出现空值的情况,即数据本身不为空,但是数据插入却出现了空
2.方向二:Flink-SQL在消费kafka数据时存在了空值,故加工的数据计算结果存在空值
定位过程
- 因插入数据库定位比较麻烦,且数据库已经设置该字段为主属性,故出现插入时处理为空值的概率较小。故先从较为简单的Flink SQL查询数据
- 定位方法一,查询该字段为空的记录,待作业执行完成后,未查询到空值对应记录
select select * from table_name where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
- 因考虑到使用Flink-CDC进行变更数据捕获,故对应的update流存在-U,+U,-D,+I记录,因此随着插入记录存在空值被记录进去的情况,故采用view的方式,先将宽表的加工、关联方式创建为view,然后进行空值的过滤。实施如下
create view view_prd as
select a.* ,b.* from a join b on a.id = b.idselect * from view_prd where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
- 通过查询结果,发现存在最后一条记录存在空值的原因,往源头定位,发现该字段之前为空,后面进行更新填充到值出现-U记录,导致数据插入持续失败

原因
- 因为flink-SQL消费的数据时kafka topic,flink以upsert-kafka形式的connector进行写入,故存在changelog 流中数据更新存在-U,+U的记录(按照Key进行区分唯一条记录),value 为空(-U)的记录kafka也,导致出现空值,

解决
通过在DWS宽表创建一层View(如上),在写入DWS宽表的kafka topic之前,现将该字段空值过滤,即可排除空值涉及记录被纳入结果指标计算的范围中
相关文章:
Flink SQL 基于Update流出现空值无法过滤问题
问题背景 问题描述 基于Flink-CDC ,Flink SQL的实时计算作业在运行一段时间后,突然发现插入数据库的计算结果发生部分主键属性发生失败,导致后续计算结果无法插入, 超过失败次数失败的情况问题报错 Caused by: java.sql.BatchUp…...
git-怎样把连续的多个commit合并成一个?
Git怎样把连续的多个commit合并成一个? Git怎样把连续的多个commit合并成一个? 参考URL: https://www.jianshu.com/p/5b4054b5b29e 查看git日志 git log --graph比如下图的commit 历史,想要把bai “Second change” 和 “Third change” 这…...
2024年2月游戏手柄线上电商(京东天猫淘宝)综合热销排行榜
鲸参谋监测的线上电商(京东天猫淘宝)游戏手柄品牌销售数据已出炉!2月游戏手柄销售数据呈现出强劲的增长势头。 根据鲸参谋数据显示,今年2月游戏手柄月销售量累计约43万件,同比去年上涨了78%;销售额累计达1…...
Sass5分钟速通基础语法
前言 近来在项目中使用sass,想着学习一下,但官方写的教程太冗杂,所以就有了本文速通Sass的基础语法 Sass 是 CSS 的一种预编译语言。它提供了 变量(variables)、嵌套规则(nested rules)、 混合(mixins) 等…...
百度蜘蛛池平台在线发外链-原理以及搭建教程
蜘蛛池平台是一款非常实用的SEO优化工具,它可以帮助网站管理员提高网站的排名和流量。百度蜘蛛池原理是基于百度搜索引擎的搜索算法,通过对网页的内容、结构、链接等方面进行分析和评估,从而判断网页的质量和重要性,从而对网页进行…...
Android_ android使用原生蓝牙协议_连接设备以后,给设备发送指令触发数据传输---Android原生开发工作笔记167
之前通过蓝牙连接设备的时候,直接就是连接上蓝牙以后,设备会自动发送数据,有数据的时候,会自动发送,但是,有一个设备就不会,奇怪了很久,设备启动了也连接上了,但是就是没有数据过来. 是因为,这个设备有几种模式是握力球,在设备连接到蓝牙以后,需要,给设备通过蓝牙发送一个指令…...
【Java面试题】操作系统
文章目录 1.进程/线程/协程1.1辨别进程和线程的异同1.2优缺点1.2.1进程1.2.2线程 1.3进程/线程之间通信的方法1.3.1进程之间通信的方法1.3.2线程之间通信的方法 1.4什么是线程上下文切换1.5协程1.5.1协程的定义?1.5.2使用协程的原因?1.5.3协程的优缺点&a…...
SQLite数据库成为内存中数据库(三)
返回:SQLite—系列文章目录 上一篇:SQLite使用的临时文件(二) 下一篇:SQLite中的原子提交(四) SQLite数据库通常存储在单个普通磁盘中文件。但是,在某些情况下,数据库可能…...
多张图片怎么合成一张gif?快来试试这个方法
将多张图片合成一张gif动图是现在常见的图像处理的方式,适合制作一些简单的动态图片。通过使用在线图片合成网站制作的gif动图不仅体积小画面丰富,画质还很清晰。不需要下载任何软件小白也能轻松上手,支持上传jpg、png以及gif格式图片&#x…...
爬取b站音频和视频数据,未合成一个视频
一、首先找到含有音频和视频的url地址 打开一个视频,刷新后,找到这个包,里面有我们所需要的数据 访问这个数据包后,获取字符串数据,用正则提取,再转为json字符串方便提取。 二、获得标题和音频数据后&…...
mysql进阶知识总结
1.存储引擎 1.1MySQL体系结构 1).连接层 最上层是一些客户端和链接服务,包含本地sock通信和大多数基于客户端/服务端工具实现的类似于TCP/IP的通信。主要完成一些类似于连接处理、授权认证、及相关的安全方案。在该层上引入了线程池的概念,为通过认证…...
量化交易入门(二十五)什么是RSI,原理和炒股实操
前面我们了解了KDJ,MACD,MTM三个技术指标,也进行了回测,结果有好有坏,今天我们来学习第四个指标RSI。RSI指标全称是相对强弱指标(Relative Strength Index),是通过比较一段时期内的平均收盘涨数和平均收盘跌数来分析市…...
快速上手Spring Cloud 九:服务间通信与消息队列
快速上手Spring Cloud 一:Spring Cloud 简介 快速上手Spring Cloud 二:核心组件解析 快速上手Spring Cloud 三:API网关深入探索与实战应用 快速上手Spring Cloud 四:微服务治理与安全 快速上手Spring Cloud 五:Spring …...
python——遍历网卡并禁用/启用
一、遍历网卡 注意:只能遍历到启用状态的网卡,如果网卡是禁止状态,则遍历不到!!! import os import time import psutil import loggingdef get_multi_physical_network_card():physical_nic_list []try:…...
初识 51
keil的使用: 具体细节请移步我上一篇博客:创建第一个51文件-CSDN博客 hex -- 汇编语言实现的文件 -- 直接与单片机对接的文件 单片机 -- 一个集成电脑芯片 单片机开发版 -- 基于单片机的集成电路 stc 89 c52RC / RD 系列单片机 命名规则: 89 -- 版本号? C --…...
【回溯与邻里交换】纸牌三角
1.回溯算法 旋转有3种可能,镜像有2种 所以最后次数:counts/3/2 #include<iostream> using namespace std;int num[9]; int counts0; bool bools[9];//默认为false int dfs(int step){if(step9){//索引 if((num[0]num[1]num[2]num[3]num[3]num[4]n…...
微服务(基础篇-004-Feign)
目录 http客户端Feign Feign替代RestTemplate(1) Feign的介绍(1.1) 使用Feign的步骤(1.2) 自定义配置(2) 配置Feign日志的两种方式(2.1) Feign使用优化…...
Linux IRC
目录 入侵框架检测 检测流程图 账号安全 查找账号中的危险信息 查看保存的历史命令 检测异常端口 入侵框架检测 1、系统安全检查(进程、开放端口、连接、日志) 这一块是目前个人该脚本所实现的功能 2、Rootkit 建议使用rootkit专杀工具来检查&#…...
五、Elasticsearch 集成
目录 5.1 Spring Data 框架集成5.1.1 Spring Data 框架介绍5.1.2 Spring Data Elasticsearch 介绍5.1.3 Spring Data Elasticsearch 版本对比5.1.4 集成步骤 5.1 Spring Data 框架集成 5.1.1 Spring Data 框架介绍 Spring Data 是一个用于简化数据库开发的开源框架。其主要目…...
Qt 完成图片的缩放拖动
1. 事件和函数 主要使用事件paintEvent(QPaintEvent *event)和drawTiledPixmap函数实现绘图。 paintEvent事件在改变窗口大小、移动窗口、手动调用update等情形下会被调用。需先了解下绘图该函数的用法。 - QPainter::drawTiledPixmap(int x, int y, int w, int h, const QPi…...
观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...
Spark 之 入门讲解详细版(1)
1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处&…...
DockerHub与私有镜像仓库在容器化中的应用与管理
哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...
8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂
蛋白质结合剂(如抗体、抑制肽)在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上,高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术,但这类方法普遍面临资源消耗巨大、研发周期冗长…...
STM32+rt-thread判断是否联网
一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...
剑指offer20_链表中环的入口节点
链表中环的入口节点 给定一个链表,若其中包含环,则输出环的入口节点。 若其中不包含环,则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...
今日科技热点速览
🔥 今日科技热点速览 🎮 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售,主打更强图形性能与沉浸式体验,支持多模态交互,受到全球玩家热捧 。 🤖 人工智能持续突破 DeepSeek-R1&…...
pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)
目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关࿰…...
