当前位置: 首页 > news >正文

Flink Table API/SQL 多分支sink

背景

在某个场景中,需要从Kafka中获取数据,经过转换处理后,需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错:

public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet = tEnv.createStatementSet();String s = LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStream<String> dataStream = env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table = tEnv.fromDataStream(dataStream);table.insertInto("kafka_sink").execute();table.insertInto("kafka_sink_1").execute();streamStatementSet.execute();}
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment.at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:199) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:187) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110) ~[?:?]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:877) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:756) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:955) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:57) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]

解决

使用 StreamStatementSet. 具体参考官网:
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-between-datastream-and-table

改良后的代码:

public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet = tEnv.createStatementSet();String s = LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStream<String> dataStream = env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table = tEnv.fromDataStream(dataStream);streamStatementSet.addInsert("kafka_sink", table);streamStatementSet.addInsert("kafka_sink_1", table);streamStatementSet.execute();}

相关文章:

Flink Table API/SQL 多分支sink

背景 在某个场景中&#xff0c;需要从Kafka中获取数据&#xff0c;经过转换处理后&#xff0c;需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错&#xff1a; public static void main(String[] args) {final StreamExecuti…...

Vue3 中 导航守卫 的使用

在Vue 3中&#xff0c;导航守卫&#xff08;Navigation Guards&#xff09;用于在路由切换前后执行一些操作&#xff0c;例如验证用户权限、取消路由导航等。Vue 3中的导航守卫与Vue 2中的导航守卫略有不同。下面是Vue 3中导航守卫的使用方式&#xff1a; 全局前置守卫&#xf…...

云原生概论

云原生是一种新兴的技术趋势&#xff0c;它旨在将应用程序设计和部署方式从传统的基础设施转向云端。云原生应用程序是一种针对云环境进行优化的应用程序&#xff0c;能够充分利用云端提供的弹性和可扩展性。本文将探讨云原生的概念、优势、应用场景以及未来发展方向。 一、云…...

hive-sql

hive-常用SQL汇总 查看数据库 -- 查看所有的数据库 show databases; 使用默认的库 -- 下面的语句可以查看默认的库 use default ;查看某个库下的表 -- 查看所有的表 show tables ; -- 查看包含 stu的表 &#xff0c;这种是通配的方法来查看 show tables like *stu*; 查…...

Rspack 创建 vue2/3 项目接入 antdv(rspack.config.js 配置 less 主题)

一、简介 Rspack CLI 官方文档。 rspack.config.js 官方文档。 二、创建 vue 项目 创建项目&#xff08;文档中还提供了 Rspack 内置 monorepo 框架 Nx 的创建方式&#xff0c;根据需求进行选择&#xff09; # npm 方式 $ npm create rspacklatest# yarn 方式 $ yarn create…...

基于centos7完成docker服务的一些基础操作

目录 要求完成 具体操作 1.安装docker服务&#xff0c;配置镜像加速器 2.下载系统镜像&#xff08;Ubuntu、 centos&#xff09; 3.基于下载的镜像创建两个容器 &#xff08;容器名一个为自己名字全拼&#xff0c;一个为首名字字母&#xff09; 4.容器的启动、 停止及重启…...

Microsoft Visual Studio + Qt插件编程出现错误error MSB4184问题

文章目录 报错解决 报错 C:\Users\Administrator\AppData\Local\QtMsBuild\qt_globals.targets(786,7): error MSB4184: 无法计算表达式“[System.IO.File]::ReadAllText(C:\Users\Administrator\AppData\Local\QtMsBuild\qt.natvis.xml)”。 未能找到文件“C:\Users\Administ…...

QT Quick之quick与C++混合编程

Qt quick能够生成非常绚丽界面&#xff0c;但有其局限性的&#xff0c;对于一些业务逻辑和复杂算法&#xff0c;比如低阶的网络编程如 QTcpSocket &#xff0c;多线程&#xff0c;又如 XML 文档处理类库 QXmlStreamReader / QXmlStreamWriter 等等&#xff0c;在 QML 中要么不可…...

Ros noetic Move_base 相关状态位置的获取 实战使用教程

前言: 有一段时间没有更新,这篇文章是为了后续MPC路径跟踪算法开设的帖子用于更新我自己的思路,由于MPC算法,要镶嵌到整个导航任务中去,就绕不开这个move_base包中相关的参数设置和其中相关状态位置的获取和解读等等。 因为最近遇到小车在其他的环境中有些时候,不需要自己…...

【SpringBoot】SpringBoot项目与Vue对接接口的步骤

下面是SpringBoot项目与Vue对接接口的步骤&#xff1a; 创建SpringBoot项目&#xff0c;在项目中添加依赖&#xff0c;如Spring MVC、MyBatis等框架。 在SpringBoot项目中编写接口方法&#xff0c;使用注解标识请求方式&#xff0c;如GetMapping、PostMapping等&#xff0c;并…...

Glog安装与使用

安装 脚本 #!/bin/bash git clone https://github.com/google/glog.git cd glog git checkout v0.4.0 mkdir build && cd build cmake .. make -j4 echo "your password" | sudo -S make install使用 main.cc #include <glog/logging.h>int main(i…...

windows开发环境搭建

下载msys2&#xff0c;官网下载即可&#xff1a; MSYS2 安装其他的编译工具&#xff08;貌似不需要把中间的命令全部执行&#xff09;&#xff1a; MSYS2使用教程——win10系统64位安装msys2最新版&#xff08;msys2-x86_xxxx.exe&#xff09;_msys64_Dreamhai的博客-CSDN博…...

8月17日上课内容 第三章 LVS+Keepalived群集

本章结构 Keepalived概述 keepalived 概述 1.服务功能 故障自动切换 健康检查 节点服务器高可用 HA keepalived工作原理 Keepalived 是一个基于VRRP协议来实现的LVS服务高可用方案&#xff0c;可以解决静态路由出现的单点故障问题 在一个LVS服务集群中通常有主服务器 (MAST…...

Threejs学习05——球缓冲几何体背景贴图和环境贴图

实现随机多个三角形随机位置随机颜色展示效果 这是一个非常简单基础的threejs的学习应用&#xff01;本节主要学习的是球面缓冲几何体的贴图部分&#xff0c;这里有环境贴图以及背景贴图&#xff0c;这样可以有一种身临其境的效果&#xff01;这里环境贴图用的是一个.hdr的文件…...

LVS+Keepalived群集实验

目录 Keepalived 是什么 Keepalived 功能 Keepalived 模块 工作原理 脑裂现象及解决方案 脑裂 形成脑裂的原因 解决脑裂的几种方法&#xff1a; 为了减少或避免HA集群中出现脑裂现象&#xff0c;我们可以采取以下措施&#xff1a; Keepalived服务主要功能&#xff0…...

软考高级之系统架构师之系统开发基础

架构 场景 场景&#xff08;scenarios&#xff09;在进行体系结构评估时&#xff0c;一般首先要精确地得出具体的质量目标&#xff0c;并以之作为判定该体系结构优劣的标准。为得出这些目标而采用的机制做场景。场景是从风险承担者的角度对与系统的交互的简短描述。在体系结构…...

Web 3.0 安全风险,您需要了解这些内容

随着技术的不断发展&#xff0c;Web 3.0 正在逐渐成为现实&#xff0c;为我们带来了许多新的机遇和挑战。然而&#xff0c;与任何新技术一样&#xff0c;Web 3.0 也伴随着一系列安全风险&#xff0c;这些风险需要被认真对待。在这篇文章中&#xff0c;我们将探讨一些与Web 3.0 …...

万宾科技22款产品入选《城市生命线安全工程监测技术产品名录》

2023年8月17日-18日&#xff0c;由北京市地下管线协会主办的2023首届城市生命线安全与发展大会在北京召开&#xff0c;本次大会汇聚中央及地方政府主管领导、院士专家、行业领袖、龙头代表、产业精英等。 大会聚焦安全监管智慧平台和燃气爆炸、城市内涝、地下管线交互风险、第三…...

MFC 隐藏窗口

亲测能用 改变主窗体的创建方式 将 C***App::InitInstance() 函数中的代码 CMFCApplication1Dlg dlg; m_pMainWnd &dlg; INT_PTR nResponse dlg.DoModal(); 替换为 CMFCApplication1Dlg dlg; m_pMainWnd &dlg; //INT_PTR nResponse dlg.DoModal(); INT_PTR nRe…...

Java数据库连接池原理及spring boot使用数据库连接池(HikariCP、Druid)

和线程池类似&#xff0c;数据库连接池的作用是建立一些和数据库的连接供需要连接数据库的业务使用&#xff0c;避免了每次和数据库建立、销毁连接的性能消耗&#xff0c;通过设置连接池参数可以防止建立连接过多导致服务宕机等&#xff0c;以下介绍Java中主要使用的几种数据库…...

React第五十七节 Router中RouterProvider使用详解及注意事项

前言 在 React Router v6.4 中&#xff0c;RouterProvider 是一个核心组件&#xff0c;用于提供基于数据路由&#xff08;data routers&#xff09;的新型路由方案。 它替代了传统的 <BrowserRouter>&#xff0c;支持更强大的数据加载和操作功能&#xff08;如 loader 和…...

阿里云ACP云计算备考笔记 (5)——弹性伸缩

目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...

边缘计算医疗风险自查APP开发方案

核心目标:在便携设备(智能手表/家用检测仪)部署轻量化疾病预测模型,实现低延迟、隐私安全的实时健康风险评估。 一、技术架构设计 #mermaid-svg-iuNaeeLK2YoFKfao {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg…...

FFmpeg 低延迟同屏方案

引言 在实时互动需求激增的当下&#xff0c;无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作&#xff0c;还是游戏直播的画面实时传输&#xff0c;低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架&#xff0c;凭借其灵活的编解码、数据…...

【JVM】- 内存结构

引言 JVM&#xff1a;Java Virtual Machine 定义&#xff1a;Java虚拟机&#xff0c;Java二进制字节码的运行环境好处&#xff1a; 一次编写&#xff0c;到处运行自动内存管理&#xff0c;垃圾回收的功能数组下标越界检查&#xff08;会抛异常&#xff0c;不会覆盖到其他代码…...

微信小程序 - 手机震动

一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注&#xff1a;文档 https://developers.weixin.qq…...

sqlserver 根据指定字符 解析拼接字符串

DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

什么是EULA和DPA

文章目录 EULA&#xff08;End User License Agreement&#xff09;DPA&#xff08;Data Protection Agreement&#xff09;一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA&#xff08;End User License Agreement&#xff09; 定义&#xff1a; EULA即…...

Golang——9、反射和文件操作

反射和文件操作 1、反射1.1、reflect.TypeOf()获取任意值的类型对象1.2、reflect.ValueOf()1.3、结构体反射 2、文件操作2.1、os.Open()打开文件2.2、方式一&#xff1a;使用Read()读取文件2.3、方式二&#xff1a;bufio读取文件2.4、方式三&#xff1a;os.ReadFile读取2.5、写…...

探索Selenium:自动化测试的神奇钥匙

目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...