当前位置: 首页 > news >正文

pyflink过滤kafka数据

from pyflink.table import (TableEnvironment, EnvironmentSettings)# 输入、输出、过滤条件
columns_in = [
...
]columns_out = [
...
]
filter_condition = "name = '蒋介石' and sex = '男'"# 创建执行环境t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///work/flink-sql-connector-kafka-3.2.0-1.19.jar")source_topic = "foo"
sink_topic = "baa"
kafka_servers = "kafka:9092"
kafka_consumer_group_id = "flink consumer"columnstr = ','.join([f"`{col}` VARCHAR"  for col in columns_in])
source_ddl = f"""
CREATE TABLE kafka_source({columnstr}) WITH ('connector' = 'kafka','topic' = '{source_topic}','properties.bootstrap.servers' = '{kafka_servers}','properties.group.id' = '{kafka_consumer_group_id}','scan.startup.mode' = 'latest-offset','format' = 'json')
"""columnstr2 = ','.join([f"`{col}` VARCHAR"  for col in columns_out])
sink_ddl = f"""
CREATE TABLE kafka_sink ({columnstr2}) with ('connector' = 'kafka','topic' = '{sink_topic}','properties.bootstrap.servers' = '{kafka_servers}','properties.group.id' = '{kafka_consumer_group_id}','scan.startup.mode' = 'latest-offset','format' = 'json')
"""
# 过滤字段
filtersql = f"""
insert into kafka_sink
select {
','.join([f"`{col}`"  for col in columns_out])
}
from kafka_source
where {filter_condition}
"""
t_env.execute_sql(filtersql)
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

相关文章:

pyflink过滤kafka数据

from pyflink.table import (TableEnvironment, EnvironmentSettings)# 输入、输出、过滤条件 columns_in [ ... ]columns_out [ ... ] filter_condition "name 蒋介石 and sex 男"# 创建执行环境t_env TableEnvironment.create(EnvironmentSettings.in_stream…...

Webpack 完整指南

​🌈个人主页:前端青山 🔥系列专栏:Webpack篇 🔖人终将被年少不可得之物困其一生 依旧青山,本期给大家带来webpack篇专栏内容:webpack介绍 目录 介绍 一、webpack 1.1、webpack是什么 1.2 webpack五个核心配置 1.…...

如何在 Ubuntu20.04 安装FTP Server vsftpd

1.安装: sudo apt-get install vsftpd 2.启动 sudo service vsftpd start //启动 sudo service vsftpd stop //停止 sudo service vsftpd restart //重新启动 3.打开配置文件 sudo nano /etc/vsftpd.conf 4.配置:限制在指定目录&…...

基于FPGA的DDS信号发生器(图文并茂+深度原理解析)

篇幅有限,本文详细源文件已打包 至个人主页资源,需要自取...... 前言 DDS(直接数字合成)技术是先进的频率合成手段,在数字信号处理与硬件实现领域作用关键。它因低成本、低功耗、高分辨率以及快速转换时间等优点备受认可。 本文着重探究基于 FPGA 的简易 DDS 信号发生器设…...

QT:绘制事件和定时器

1.绘制时针 xx.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QTimer> #include<QPainter> #include <QTime>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpubl…...

【算法——递归回溯】

这个东西还是很重要的&#xff0c;直接决定了你的动态规划章节的学习深度 78. 子集 方法1&#xff1a; vector<vector<int>>V; void dfs(vector<int> v,vector<int> nums,int index) {if(indexnums.size()) V.push_back(v);else{v.push_back(nums[i…...

手机在网状态接口的使用和注意事项

手机在网状态接口是用于查询手机号码在运营商数据库中的实时状态的工具&#xff0c;这种接口在互联网金融、贷款、租赁、保险等相关行业中尤为重要&#xff0c;因为它可以帮助这些行业进行更有效的风控审核。以下是对手机在网状态接口的详细介绍&#xff1a; 一、手机在网状态…...

WebGl 使用uniform变量动态修改点的颜色

在WebGL中&#xff0c;uniform变量用于在顶点着色器和片元着色器之间传递全局状态信息&#xff0c;这些信息在渲染过程中不会随着顶点的变化而变化。uniform变量可以用来设置变换矩阵、光照参数、材料属性等。由于它们在整个渲染过程中共享&#xff0c;因此可以被所有使用该着色…...

Leetcode 划分字母区间

题目要求&#xff1a; 将字符串 s 划分成尽量多的片段&#xff0c;保证每个片段中出现的字母不会出现在其他片段中。 具体解释如下&#xff1a; 尽量多的片段&#xff1a;题目要求的是在划分过程中&#xff0c;我们要尽量让划分的片段数量最大化&#xff0c;而不是最少化。每…...

可编辑div遇到的那些事

在日常开发中有时可能会遇到input 或 textarea 不能满足的开发场景&#xff0c;比如多行输入的情况下&#xff0c;textarea 的右下角icon 无法去除, 所以此时可以使用div 设置可编辑状态&#xff0c;完成功能开发&#xff0c;在开发的过程中仍会遇到一下问题。 1&#xff0c;如…...

什麼是高速HTTP代理?

高速HTTP代理是一種用於加速和優化互聯網連接的技術。它通過在用戶和目標網站之間充當仲介伺服器&#xff0c;幫助用戶快速訪問網路資源。HTTP代理不僅可以提高訪問速度&#xff0c;還能提供一定程度的隱私保護和安全性。 高速HTTP代理的工作原理 HTTP代理伺服器位於用戶設備…...

三子棋(C 语言)

目录 一、游戏设计的整体思路二、各个步骤的代码实现1. 菜单及循环选择的实现2. 棋盘的初始化和显示3. 轮流下棋及结果判断实现4. 结果判断实现 三、所有代码四、总结 一、游戏设计的整体思路 &#xff08;1&#xff09;提供一个菜单让玩家选择人机对战、玩家对战或者退出游戏…...

HWS赛题 入门 MIPS Pwn-Mplogin(MIPS_shellcode)

解题所涉知识点&#xff1a; 泄露或修改内存数据&#xff1a; 堆地址&#xff1a;栈地址&#xff1a;栈上数据的连带输出(Stack Leak) && Stack溢出覆盖内存libc地址&#xff1a;BSS段地址&#xff1a; 劫持程序执行流程&#xff1a;[[MIPS_ROP]] 获得shell或flag&am…...

纯血鸿蒙启动公测,爱加密鸿蒙加固平台发布,助力鸿蒙应用安全运营!

鸿蒙系统打破了移动操作系统两极格局&#xff0c;实现操作系统核心技术的自主可控、安全可靠&#xff0c;在神州大地上掀起一波科技革新的浪潮&#xff0c;HarmonyOS NEXT成为大型企业必须要布局的应用系统之一。 HarmonyOS NEXT于10月8日正式开启公测&#xff0c;距离面向全体…...

MySQL中 truncate、drop和delete的区别

MySQL中 truncate、drop和delete区别 truncate 执行速度快&#xff0c;删除所有数据&#xff0c;但是保留表结构不记录日志事务不安全&#xff0c;不能回滚可重置自增主键计数器 drop 执行速度较快&#xff0c;删除整张表数据和结构不记录日志事务不安全&#xff0c;不能回…...

什么开放式耳机值得买?开放式耳机推荐排行榜!

长时间佩戴传统入耳式耳机有时可能会影响耳道健康&#xff0c;鉴于此&#xff0c;转而选择不入耳设计的开放式耳机就成了不少人的新倾向&#xff0c;它们有助于减少细菌滋生和耳道闷热的烦恼。为了帮助大家找到合适的选项&#xff0c;下面我将列举一些市面上口碑不错的开放式耳…...

Apache Doris的分区与分桶详解

目录 第一章 Doris介绍和分区分桶作用 1.1 Doris背景介绍 1.2 分区与分桶的意义 第二章 原理解析 2.1 分区机制 2.1.1 定义 2.1.2 类型 2.1.3 工作原理 2.2 分桶机制 2.2.1 概念 2.2.2 实现方式 2.2.3 与分区的关系 第三章 手动分区与自动分区对比 3.1 手动分区 …...

docker详解介绍+基础操作 (二)info详解

1 docker相关信息和优化配置 1&#xff09;查看docker版本详解 rootzz:~# docker version Client: Docker Engine - CommunityVersion: 27.3.1API version: 1.47Go version: go1.22.7Git commit: ce12230Built: Fri Sep 20 11:40:…...

C0023.在Clion中创建控件,对控件进行提升为自定义控件的步骤

新建Ui界面文件 修改新生成的ui文件头文件 关闭之前打开的ui文件&#xff0c;如上图Qt Designer中打开的&#xff0c;然后修改新生成的ui文件对应的头文件&#xff0c;改成自己需要的控件类即可。 提升控件为自定义类 将如下头文件中的类名和头文件名输入到提升窗口中&#…...

探索 C# 常用第三方库与框架

在 C# 开发中&#xff0c;第三方库和框架极大地提高了开发效率和代码质量。通过这些库&#xff0c;开发者可以快速处理 JSON 数据、简化对象映射、记录日志、以及高效地与数据库交互。本文将介绍四个常用的 C# 第三方库&#xff1a;Newtonsoft.Json、AutoMapper、NLog/Serilog …...

Linux简单的操作

ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...

【配置 YOLOX 用于按目录分类的图片数据集】

现在的图标点选越来越多&#xff0c;如何一步解决&#xff0c;采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集&#xff08;每个目录代表一个类别&#xff0c;目录下是该类别的所有图片&#xff09;&#xff0c;你需要进行以下配置步骤&#x…...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...

让AI看见世界:MCP协议与服务器的工作原理

让AI看见世界&#xff1a;MCP协议与服务器的工作原理 MCP&#xff08;Model Context Protocol&#xff09;是一种创新的通信协议&#xff0c;旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天&#xff0c;MCP正成为连接AI与现实世界的重要桥梁。…...

IT供电系统绝缘监测及故障定位解决方案

随着新能源的快速发展&#xff0c;光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域&#xff0c;IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选&#xff0c;但在长期运行中&#xff0c;例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...

今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存

文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...

iOS性能调优实战:借助克魔(KeyMob)与常用工具深度洞察App瓶颈

在日常iOS开发过程中&#xff0c;性能问题往往是最令人头疼的一类Bug。尤其是在App上线前的压测阶段或是处理用户反馈的高发期&#xff0c;开发者往往需要面对卡顿、崩溃、能耗异常、日志混乱等一系列问题。这些问题表面上看似偶发&#xff0c;但背后往往隐藏着系统资源调度不当…...

安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲

文章目录 前言第一部分&#xff1a;体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分&#xff1a;体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...

【笔记】WSL 中 Rust 安装与测试完整记录

#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统&#xff1a;Ubuntu 24.04 LTS (WSL2)架构&#xff1a;x86_64 (GNU/Linux)Rust 版本&#xff1a;rustc 1.87.0 (2025-05-09)Cargo 版本&#xff1a;cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...

【Linux】自动化构建-Make/Makefile

前言 上文我们讲到了Linux中的编译器gcc/g 【Linux】编译器gcc/g及其库的详细介绍-CSDN博客 本来我们将一个对于编译来说很重要的工具&#xff1a;make/makfile 1.背景 在一个工程中源文件不计其数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c;mak…...