当前位置: 首页 > news >正文

Flink SQL 基于Update流出现空值无法过滤问题

问题背景

  • 问题描述
基于Flink-CDC ,Flink SQL的实时计算作业在运行一段时间后,突然发现插入数据库的计算结果发生部分主键属性发生失败,导致后续计算结果无法插入,
超过失败次数失败的情况
  • 问题报错
	Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO dm_hljy.dws_table_name (op_date, school_year, campus_name, school_name, depart_name, total_opfare, ids, update_time) VALUES ('2024-03-11 00:00:00+08', '2023', 'xxxx', 'xxxx学校', 'xxxx小学部', '203333300000', '57', '2024-03-21 09:31:08.47+08') ON DUPLICATE KEY UPDATE school_year=VALUES(school_year), total_opfare=VALUES(total_opfare), ids=VALUES(ids), update_time=VALUES(update_time) was aborted: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraint  Call getNextException to see other errors in the batch.at com.huawei.gauss200.jdbc.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:171) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:586) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332]... 1 moreCaused by: com.huawei.gauss200.jdbc.util.PSQLException: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraintat com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2856) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2587) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:575) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332

在这里插入图片描述

定位

定位思路

1.方向一:怀疑数据库插入存在数据处理时,造成数据处理出现空值的情况,即数据本身不为空,但是数据插入却出现了空
2.方向二:Flink-SQL在消费kafka数据时存在了空值,故加工的数据计算结果存在空值

定位过程

  • 因插入数据库定位比较麻烦,且数据库已经设置该字段为主属性,故出现插入时处理为空值的概率较小。故先从较为简单的Flink SQL查询数据
  • 定位方法一,查询该字段为空的记录,待作业执行完成后,未查询到空值对应记录
 select  select * from table_name where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
  • 因考虑到使用Flink-CDC进行变更数据捕获,故对应的update流存在-U,+U,-D,+I记录,因此随着插入记录存在空值被记录进去的情况,故采用view的方式,先将宽表的加工、关联方式创建为view,然后进行空值的过滤。实施如下
create view view_prd as 
select a.* ,b.*  from a join b on a.id = b.idselect * from view_prd where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
  • 通过查询结果,发现存在最后一条记录存在空值的原因,往源头定位,发现该字段之前为空,后面进行更新填充到值出现-U记录,导致数据插入持续失败
    在这里插入图片描述

原因

  • 因为flink-SQL消费的数据时kafka topic,flink以upsert-kafka形式的connector进行写入,故存在changelog 流中数据更新存在-U,+U的记录(按照Key进行区分唯一条记录),value 为空(-U)的记录kafka也,导致出现空值,
    在这里插入图片描述

解决

通过在DWS宽表创建一层View(如上),在写入DWS宽表的kafka topic之前,现将该字段空值过滤,即可排除空值涉及记录被纳入结果指标计算的范围中

相关文章:

Flink SQL 基于Update流出现空值无法过滤问题

问题背景 问题描述 基于Flink-CDC ,Flink SQL的实时计算作业在运行一段时间后,突然发现插入数据库的计算结果发生部分主键属性发生失败,导致后续计算结果无法插入, 超过失败次数失败的情况问题报错 Caused by: java.sql.BatchUp…...

git-怎样把连续的多个commit合并成一个?

Git怎样把连续的多个commit合并成一个? Git怎样把连续的多个commit合并成一个? 参考URL: https://www.jianshu.com/p/5b4054b5b29e 查看git日志 git log --graph比如下图的commit 历史,想要把bai “Second change” 和 “Third change” 这…...

2024年2月游戏手柄线上电商(京东天猫淘宝)综合热销排行榜

鲸参谋监测的线上电商(京东天猫淘宝)游戏手柄品牌销售数据已出炉!2月游戏手柄销售数据呈现出强劲的增长势头。 根据鲸参谋数据显示,今年2月游戏手柄月销售量累计约43万件,同比去年上涨了78%;销售额累计达1…...

Sass5分钟速通基础语法

前言 近来在项目中使用sass,想着学习一下,但官方写的教程太冗杂,所以就有了本文速通Sass的基础语法 Sass 是 CSS 的一种预编译语言。它提供了 变量(variables)、嵌套规则(nested rules)、 混合(mixins) 等…...

百度蜘蛛池平台在线发外链-原理以及搭建教程

蜘蛛池平台是一款非常实用的SEO优化工具,它可以帮助网站管理员提高网站的排名和流量。百度蜘蛛池原理是基于百度搜索引擎的搜索算法,通过对网页的内容、结构、链接等方面进行分析和评估,从而判断网页的质量和重要性,从而对网页进行…...

Android_ android使用原生蓝牙协议_连接设备以后,给设备发送指令触发数据传输---Android原生开发工作笔记167

之前通过蓝牙连接设备的时候,直接就是连接上蓝牙以后,设备会自动发送数据,有数据的时候,会自动发送,但是,有一个设备就不会,奇怪了很久,设备启动了也连接上了,但是就是没有数据过来. 是因为,这个设备有几种模式是握力球,在设备连接到蓝牙以后,需要,给设备通过蓝牙发送一个指令…...

【Java面试题】操作系统

文章目录 1.进程/线程/协程1.1辨别进程和线程的异同1.2优缺点1.2.1进程1.2.2线程 1.3进程/线程之间通信的方法1.3.1进程之间通信的方法1.3.2线程之间通信的方法 1.4什么是线程上下文切换1.5协程1.5.1协程的定义?1.5.2使用协程的原因?1.5.3协程的优缺点&a…...

SQLite数据库成为内存中数据库(三)

返回:SQLite—系列文章目录 上一篇:SQLite使用的临时文件(二) 下一篇:SQLite中的原子提交(四) ​​ SQLite数据库通常存储在单个普通磁盘中文件。但是,在某些情况下,数据库可能…...

多张图片怎么合成一张gif?快来试试这个方法

将多张图片合成一张gif动图是现在常见的图像处理的方式,适合制作一些简单的动态图片。通过使用在线图片合成网站制作的gif动图不仅体积小画面丰富,画质还很清晰。不需要下载任何软件小白也能轻松上手,支持上传jpg、png以及gif格式图片&#x…...

爬取b站音频和视频数据,未合成一个视频

一、首先找到含有音频和视频的url地址 打开一个视频,刷新后,找到这个包,里面有我们所需要的数据 访问这个数据包后,获取字符串数据,用正则提取,再转为json字符串方便提取。 二、获得标题和音频数据后&…...

mysql进阶知识总结

1.存储引擎 1.1MySQL体系结构 1).连接层 最上层是一些客户端和链接服务,包含本地sock通信和大多数基于客户端/服务端工具实现的类似于TCP/IP的通信。主要完成一些类似于连接处理、授权认证、及相关的安全方案。在该层上引入了线程池的概念,为通过认证…...

量化交易入门(二十五)什么是RSI,原理和炒股实操

前面我们了解了KDJ,MACD,MTM三个技术指标,也进行了回测,结果有好有坏,今天我们来学习第四个指标RSI。RSI指标全称是相对强弱指标(Relative Strength Index),是通过比较一段时期内的平均收盘涨数和平均收盘跌数来分析市…...

快速上手Spring Cloud 九:服务间通信与消息队列

快速上手Spring Cloud 一:Spring Cloud 简介 快速上手Spring Cloud 二:核心组件解析 快速上手Spring Cloud 三:API网关深入探索与实战应用 快速上手Spring Cloud 四:微服务治理与安全 快速上手Spring Cloud 五:Spring …...

python——遍历网卡并禁用/启用

一、遍历网卡 注意:只能遍历到启用状态的网卡,如果网卡是禁止状态,则遍历不到!!! import os import time import psutil import loggingdef get_multi_physical_network_card():physical_nic_list []try:…...

初识 51

keil的使用: 具体细节请移步我上一篇博客:创建第一个51文件-CSDN博客 hex -- 汇编语言实现的文件 -- 直接与单片机对接的文件 单片机 -- 一个集成电脑芯片 单片机开发版 -- 基于单片机的集成电路 stc 89 c52RC / RD 系列单片机 命名规则: 89 -- 版本号? C --…...

【回溯与邻里交换】纸牌三角

1.回溯算法 旋转有3种可能&#xff0c;镜像有2种 所以最后次数&#xff1a;counts/3/2 #include<iostream> using namespace std;int num[9]; int counts0; bool bools[9];//默认为false int dfs(int step){if(step9){//索引 if((num[0]num[1]num[2]num[3]num[3]num[4]n…...

微服务(基础篇-004-Feign)

目录 http客户端Feign Feign替代RestTemplate&#xff08;1&#xff09; Feign的介绍&#xff08;1.1&#xff09; 使用Feign的步骤&#xff08;1.2&#xff09; 自定义配置&#xff08;2&#xff09; 配置Feign日志的两种方式&#xff08;2.1&#xff09; Feign使用优化…...

Linux IRC

目录 入侵框架检测 检测流程图 账号安全 查找账号中的危险信息 查看保存的历史命令 检测异常端口 入侵框架检测 1、系统安全检查&#xff08;进程、开放端口、连接、日志&#xff09; 这一块是目前个人该脚本所实现的功能 2、Rootkit 建议使用rootkit专杀工具来检查&#…...

五、Elasticsearch 集成

目录 5.1 Spring Data 框架集成5.1.1 Spring Data 框架介绍5.1.2 Spring Data Elasticsearch 介绍5.1.3 Spring Data Elasticsearch 版本对比5.1.4 集成步骤 5.1 Spring Data 框架集成 5.1.1 Spring Data 框架介绍 Spring Data 是一个用于简化数据库开发的开源框架。其主要目…...

Qt 完成图片的缩放拖动

1. 事件和函数 主要使用事件paintEvent(QPaintEvent *event)和drawTiledPixmap函数实现绘图。 paintEvent事件在改变窗口大小、移动窗口、手动调用update等情形下会被调用。需先了解下绘图该函数的用法。 - QPainter::drawTiledPixmap(int x, int y, int w, int h, const QPi…...

Docker 离线安装指南

参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性&#xff0c;不同版本的Docker对内核版本有不同要求。例如&#xff0c;Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本&#xff0c;Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...

Objective-C常用命名规范总结

【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名&#xff08;Class Name)2.协议名&#xff08;Protocol Name)3.方法名&#xff08;Method Name)4.属性名&#xff08;Property Name&#xff09;5.局部变量/实例变量&#xff08;Local / Instance Variables&…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1)&#xff1a;从基础到实战的深度解析-CSDN博客&#xff0c;但实际面试中&#xff0c;企业更关注候选人对复杂场景的应对能力&#xff08;如多设备并发扫描、低功耗与高发现率的平衡&#xff09;和前沿技术的…...

【AI学习】三、AI算法中的向量

在人工智能&#xff08;AI&#xff09;算法中&#xff0c;向量&#xff08;Vector&#xff09;是一种将现实世界中的数据&#xff08;如图像、文本、音频等&#xff09;转化为计算机可处理的数值型特征表示的工具。它是连接人类认知&#xff08;如语义、视觉特征&#xff09;与…...

leetcodeSQL解题:3564. 季节性销售分析

leetcodeSQL解题&#xff1a;3564. 季节性销售分析 题目&#xff1a; 表&#xff1a;sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...

【Java_EE】Spring MVC

目录 Spring Web MVC ​编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 ​编辑参数重命名 RequestParam ​编辑​编辑传递集合 RequestParam 传递JSON数据 ​编辑RequestBody ​…...

NFT模式:数字资产确权与链游经济系统构建

NFT模式&#xff1a;数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新&#xff1a;构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议&#xff1a;基于LayerZero协议实现以太坊、Solana等公链资产互通&#xff0c;通过零知…...

CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云

目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...

06 Deep learning神经网络编程基础 激活函数 --吴恩达

深度学习激活函数详解 一、核心作用 引入非线性:使神经网络可学习复杂模式控制输出范围:如Sigmoid将输出限制在(0,1)梯度传递:影响反向传播的稳定性二、常见类型及数学表达 Sigmoid σ ( x ) = 1 1 +...

tree 树组件大数据卡顿问题优化

问题背景 项目中有用到树组件用来做文件目录&#xff0c;但是由于这个树组件的节点越来越多&#xff0c;导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多&#xff0c;导致的浏览器卡顿&#xff0c;这里很明显就需要用到虚拟列表的技术&…...