当前位置: 首页 > news >正文

pyflink过滤kafka数据

from pyflink.table import (TableEnvironment, EnvironmentSettings)# 输入、输出、过滤条件
columns_in = [
...
]columns_out = [
...
]
filter_condition = "name = '蒋介石' and sex = '男'"# 创建执行环境t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///work/flink-sql-connector-kafka-3.2.0-1.19.jar")source_topic = "foo"
sink_topic = "baa"
kafka_servers = "kafka:9092"
kafka_consumer_group_id = "flink consumer"columnstr = ','.join([f"`{col}` VARCHAR"  for col in columns_in])
source_ddl = f"""
CREATE TABLE kafka_source({columnstr}) WITH ('connector' = 'kafka','topic' = '{source_topic}','properties.bootstrap.servers' = '{kafka_servers}','properties.group.id' = '{kafka_consumer_group_id}','scan.startup.mode' = 'latest-offset','format' = 'json')
"""columnstr2 = ','.join([f"`{col}` VARCHAR"  for col in columns_out])
sink_ddl = f"""
CREATE TABLE kafka_sink ({columnstr2}) with ('connector' = 'kafka','topic' = '{sink_topic}','properties.bootstrap.servers' = '{kafka_servers}','properties.group.id' = '{kafka_consumer_group_id}','scan.startup.mode' = 'latest-offset','format' = 'json')
"""
# 过滤字段
filtersql = f"""
insert into kafka_sink
select {
','.join([f"`{col}`"  for col in columns_out])
}
from kafka_source
where {filter_condition}
"""
t_env.execute_sql(filtersql)
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

相关文章:

pyflink过滤kafka数据

from pyflink.table import (TableEnvironment, EnvironmentSettings)# 输入、输出、过滤条件 columns_in [ ... ]columns_out [ ... ] filter_condition "name 蒋介石 and sex 男"# 创建执行环境t_env TableEnvironment.create(EnvironmentSettings.in_stream…...

Webpack 完整指南

​🌈个人主页:前端青山 🔥系列专栏:Webpack篇 🔖人终将被年少不可得之物困其一生 依旧青山,本期给大家带来webpack篇专栏内容:webpack介绍 目录 介绍 一、webpack 1.1、webpack是什么 1.2 webpack五个核心配置 1.…...

如何在 Ubuntu20.04 安装FTP Server vsftpd

1.安装: sudo apt-get install vsftpd 2.启动 sudo service vsftpd start //启动 sudo service vsftpd stop //停止 sudo service vsftpd restart //重新启动 3.打开配置文件 sudo nano /etc/vsftpd.conf 4.配置:限制在指定目录&…...

基于FPGA的DDS信号发生器(图文并茂+深度原理解析)

篇幅有限,本文详细源文件已打包 至个人主页资源,需要自取...... 前言 DDS(直接数字合成)技术是先进的频率合成手段,在数字信号处理与硬件实现领域作用关键。它因低成本、低功耗、高分辨率以及快速转换时间等优点备受认可。 本文着重探究基于 FPGA 的简易 DDS 信号发生器设…...

QT:绘制事件和定时器

1.绘制时针 xx.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QTimer> #include<QPainter> #include <QTime>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpubl…...

【算法——递归回溯】

这个东西还是很重要的&#xff0c;直接决定了你的动态规划章节的学习深度 78. 子集 方法1&#xff1a; vector<vector<int>>V; void dfs(vector<int> v,vector<int> nums,int index) {if(indexnums.size()) V.push_back(v);else{v.push_back(nums[i…...

手机在网状态接口的使用和注意事项

手机在网状态接口是用于查询手机号码在运营商数据库中的实时状态的工具&#xff0c;这种接口在互联网金融、贷款、租赁、保险等相关行业中尤为重要&#xff0c;因为它可以帮助这些行业进行更有效的风控审核。以下是对手机在网状态接口的详细介绍&#xff1a; 一、手机在网状态…...

WebGl 使用uniform变量动态修改点的颜色

在WebGL中&#xff0c;uniform变量用于在顶点着色器和片元着色器之间传递全局状态信息&#xff0c;这些信息在渲染过程中不会随着顶点的变化而变化。uniform变量可以用来设置变换矩阵、光照参数、材料属性等。由于它们在整个渲染过程中共享&#xff0c;因此可以被所有使用该着色…...

Leetcode 划分字母区间

题目要求&#xff1a; 将字符串 s 划分成尽量多的片段&#xff0c;保证每个片段中出现的字母不会出现在其他片段中。 具体解释如下&#xff1a; 尽量多的片段&#xff1a;题目要求的是在划分过程中&#xff0c;我们要尽量让划分的片段数量最大化&#xff0c;而不是最少化。每…...

可编辑div遇到的那些事

在日常开发中有时可能会遇到input 或 textarea 不能满足的开发场景&#xff0c;比如多行输入的情况下&#xff0c;textarea 的右下角icon 无法去除, 所以此时可以使用div 设置可编辑状态&#xff0c;完成功能开发&#xff0c;在开发的过程中仍会遇到一下问题。 1&#xff0c;如…...

什麼是高速HTTP代理?

高速HTTP代理是一種用於加速和優化互聯網連接的技術。它通過在用戶和目標網站之間充當仲介伺服器&#xff0c;幫助用戶快速訪問網路資源。HTTP代理不僅可以提高訪問速度&#xff0c;還能提供一定程度的隱私保護和安全性。 高速HTTP代理的工作原理 HTTP代理伺服器位於用戶設備…...

三子棋(C 语言)

目录 一、游戏设计的整体思路二、各个步骤的代码实现1. 菜单及循环选择的实现2. 棋盘的初始化和显示3. 轮流下棋及结果判断实现4. 结果判断实现 三、所有代码四、总结 一、游戏设计的整体思路 &#xff08;1&#xff09;提供一个菜单让玩家选择人机对战、玩家对战或者退出游戏…...

HWS赛题 入门 MIPS Pwn-Mplogin(MIPS_shellcode)

解题所涉知识点&#xff1a; 泄露或修改内存数据&#xff1a; 堆地址&#xff1a;栈地址&#xff1a;栈上数据的连带输出(Stack Leak) && Stack溢出覆盖内存libc地址&#xff1a;BSS段地址&#xff1a; 劫持程序执行流程&#xff1a;[[MIPS_ROP]] 获得shell或flag&am…...

纯血鸿蒙启动公测,爱加密鸿蒙加固平台发布,助力鸿蒙应用安全运营!

鸿蒙系统打破了移动操作系统两极格局&#xff0c;实现操作系统核心技术的自主可控、安全可靠&#xff0c;在神州大地上掀起一波科技革新的浪潮&#xff0c;HarmonyOS NEXT成为大型企业必须要布局的应用系统之一。 HarmonyOS NEXT于10月8日正式开启公测&#xff0c;距离面向全体…...

MySQL中 truncate、drop和delete的区别

MySQL中 truncate、drop和delete区别 truncate 执行速度快&#xff0c;删除所有数据&#xff0c;但是保留表结构不记录日志事务不安全&#xff0c;不能回滚可重置自增主键计数器 drop 执行速度较快&#xff0c;删除整张表数据和结构不记录日志事务不安全&#xff0c;不能回…...

什么开放式耳机值得买?开放式耳机推荐排行榜!

长时间佩戴传统入耳式耳机有时可能会影响耳道健康&#xff0c;鉴于此&#xff0c;转而选择不入耳设计的开放式耳机就成了不少人的新倾向&#xff0c;它们有助于减少细菌滋生和耳道闷热的烦恼。为了帮助大家找到合适的选项&#xff0c;下面我将列举一些市面上口碑不错的开放式耳…...

Apache Doris的分区与分桶详解

目录 第一章 Doris介绍和分区分桶作用 1.1 Doris背景介绍 1.2 分区与分桶的意义 第二章 原理解析 2.1 分区机制 2.1.1 定义 2.1.2 类型 2.1.3 工作原理 2.2 分桶机制 2.2.1 概念 2.2.2 实现方式 2.2.3 与分区的关系 第三章 手动分区与自动分区对比 3.1 手动分区 …...

docker详解介绍+基础操作 (二)info详解

1 docker相关信息和优化配置 1&#xff09;查看docker版本详解 rootzz:~# docker version Client: Docker Engine - CommunityVersion: 27.3.1API version: 1.47Go version: go1.22.7Git commit: ce12230Built: Fri Sep 20 11:40:…...

C0023.在Clion中创建控件,对控件进行提升为自定义控件的步骤

新建Ui界面文件 修改新生成的ui文件头文件 关闭之前打开的ui文件&#xff0c;如上图Qt Designer中打开的&#xff0c;然后修改新生成的ui文件对应的头文件&#xff0c;改成自己需要的控件类即可。 提升控件为自定义类 将如下头文件中的类名和头文件名输入到提升窗口中&#…...

探索 C# 常用第三方库与框架

在 C# 开发中&#xff0c;第三方库和框架极大地提高了开发效率和代码质量。通过这些库&#xff0c;开发者可以快速处理 JSON 数据、简化对象映射、记录日志、以及高效地与数据库交互。本文将介绍四个常用的 C# 第三方库&#xff1a;Newtonsoft.Json、AutoMapper、NLog/Serilog …...

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...

oracle与MySQL数据库之间数据同步的技术要点

Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异&#xff0c;它们的数据同步要求既要保持数据的准确性和一致性&#xff0c;又要处理好性能问题。以下是一些主要的技术要点&#xff1a; 数据结构差异 数据类型差异&#xff…...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

【Go】3、Go语言进阶与依赖管理

前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课&#xff0c;做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程&#xff0c;它的核心机制是 Goroutine 协程、Channel 通道&#xff0c;并基于CSP&#xff08;Communicating Sequential Processes&#xff0…...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明

AI 领域的快速发展正在催生一个新时代&#xff0c;智能代理&#xff08;agents&#xff09;不再是孤立的个体&#xff0c;而是能够像一个数字团队一样协作。然而&#xff0c;当前 AI 生态系统的碎片化阻碍了这一愿景的实现&#xff0c;导致了“AI 巴别塔问题”——不同代理之间…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)

引言&#xff1a;为什么 Eureka 依然是存量系统的核心&#xff1f; 尽管 Nacos 等新注册中心崛起&#xff0c;但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制&#xff0c;是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

【Java_EE】Spring MVC

目录 Spring Web MVC ​编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 ​编辑参数重命名 RequestParam ​编辑​编辑传递集合 RequestParam 传递JSON数据 ​编辑RequestBody ​…...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战

在现代战争中&#xff0c;电磁频谱已成为继陆、海、空、天之后的 “第五维战场”&#xff0c;雷达作为电磁频谱领域的关键装备&#xff0c;其干扰与抗干扰能力的较量&#xff0c;直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器&#xff0c;凭借数字射…...

算法笔记2

1.字符串拼接最好用StringBuilder&#xff0c;不用String 2.创建List<>类型的数组并创建内存 List arr[] new ArrayList[26]; Arrays.setAll(arr, i -> new ArrayList<>()); 3.去掉首尾空格...

vulnyx Blogger writeup

信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面&#xff0c;gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress&#xff0c;说明目标所使用的cms是wordpress&#xff0c;访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...