当前位置: 首页 > news >正文

Flink Table API/SQL 多分支sink

背景

在某个场景中,需要从Kafka中获取数据,经过转换处理后,需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错:

public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet = tEnv.createStatementSet();String s = LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStream<String> dataStream = env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table = tEnv.fromDataStream(dataStream);table.insertInto("kafka_sink").execute();table.insertInto("kafka_sink_1").execute();streamStatementSet.execute();}
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment.at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:199) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:187) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110) ~[?:?]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:877) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:756) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:955) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:57) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]

解决

使用 StreamStatementSet. 具体参考官网:
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-between-datastream-and-table

改良后的代码:

public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet = tEnv.createStatementSet();String s = LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStream<String> dataStream = env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table = tEnv.fromDataStream(dataStream);streamStatementSet.addInsert("kafka_sink", table);streamStatementSet.addInsert("kafka_sink_1", table);streamStatementSet.execute();}

相关文章:

Flink Table API/SQL 多分支sink

背景 在某个场景中&#xff0c;需要从Kafka中获取数据&#xff0c;经过转换处理后&#xff0c;需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错&#xff1a; public static void main(String[] args) {final StreamExecuti…...

Vue3 中 导航守卫 的使用

在Vue 3中&#xff0c;导航守卫&#xff08;Navigation Guards&#xff09;用于在路由切换前后执行一些操作&#xff0c;例如验证用户权限、取消路由导航等。Vue 3中的导航守卫与Vue 2中的导航守卫略有不同。下面是Vue 3中导航守卫的使用方式&#xff1a; 全局前置守卫&#xf…...

云原生概论

云原生是一种新兴的技术趋势&#xff0c;它旨在将应用程序设计和部署方式从传统的基础设施转向云端。云原生应用程序是一种针对云环境进行优化的应用程序&#xff0c;能够充分利用云端提供的弹性和可扩展性。本文将探讨云原生的概念、优势、应用场景以及未来发展方向。 一、云…...

hive-sql

hive-常用SQL汇总 查看数据库 -- 查看所有的数据库 show databases; 使用默认的库 -- 下面的语句可以查看默认的库 use default ;查看某个库下的表 -- 查看所有的表 show tables ; -- 查看包含 stu的表 &#xff0c;这种是通配的方法来查看 show tables like *stu*; 查…...

Rspack 创建 vue2/3 项目接入 antdv(rspack.config.js 配置 less 主题)

一、简介 Rspack CLI 官方文档。 rspack.config.js 官方文档。 二、创建 vue 项目 创建项目&#xff08;文档中还提供了 Rspack 内置 monorepo 框架 Nx 的创建方式&#xff0c;根据需求进行选择&#xff09; # npm 方式 $ npm create rspacklatest# yarn 方式 $ yarn create…...

基于centos7完成docker服务的一些基础操作

目录 要求完成 具体操作 1.安装docker服务&#xff0c;配置镜像加速器 2.下载系统镜像&#xff08;Ubuntu、 centos&#xff09; 3.基于下载的镜像创建两个容器 &#xff08;容器名一个为自己名字全拼&#xff0c;一个为首名字字母&#xff09; 4.容器的启动、 停止及重启…...

Microsoft Visual Studio + Qt插件编程出现错误error MSB4184问题

文章目录 报错解决 报错 C:\Users\Administrator\AppData\Local\QtMsBuild\qt_globals.targets(786,7): error MSB4184: 无法计算表达式“[System.IO.File]::ReadAllText(C:\Users\Administrator\AppData\Local\QtMsBuild\qt.natvis.xml)”。 未能找到文件“C:\Users\Administ…...

QT Quick之quick与C++混合编程

Qt quick能够生成非常绚丽界面&#xff0c;但有其局限性的&#xff0c;对于一些业务逻辑和复杂算法&#xff0c;比如低阶的网络编程如 QTcpSocket &#xff0c;多线程&#xff0c;又如 XML 文档处理类库 QXmlStreamReader / QXmlStreamWriter 等等&#xff0c;在 QML 中要么不可…...

Ros noetic Move_base 相关状态位置的获取 实战使用教程

前言: 有一段时间没有更新,这篇文章是为了后续MPC路径跟踪算法开设的帖子用于更新我自己的思路,由于MPC算法,要镶嵌到整个导航任务中去,就绕不开这个move_base包中相关的参数设置和其中相关状态位置的获取和解读等等。 因为最近遇到小车在其他的环境中有些时候,不需要自己…...

【SpringBoot】SpringBoot项目与Vue对接接口的步骤

下面是SpringBoot项目与Vue对接接口的步骤&#xff1a; 创建SpringBoot项目&#xff0c;在项目中添加依赖&#xff0c;如Spring MVC、MyBatis等框架。 在SpringBoot项目中编写接口方法&#xff0c;使用注解标识请求方式&#xff0c;如GetMapping、PostMapping等&#xff0c;并…...

Glog安装与使用

安装 脚本 #!/bin/bash git clone https://github.com/google/glog.git cd glog git checkout v0.4.0 mkdir build && cd build cmake .. make -j4 echo "your password" | sudo -S make install使用 main.cc #include <glog/logging.h>int main(i…...

windows开发环境搭建

下载msys2&#xff0c;官网下载即可&#xff1a; MSYS2 安装其他的编译工具&#xff08;貌似不需要把中间的命令全部执行&#xff09;&#xff1a; MSYS2使用教程——win10系统64位安装msys2最新版&#xff08;msys2-x86_xxxx.exe&#xff09;_msys64_Dreamhai的博客-CSDN博…...

8月17日上课内容 第三章 LVS+Keepalived群集

本章结构 Keepalived概述 keepalived 概述 1.服务功能 故障自动切换 健康检查 节点服务器高可用 HA keepalived工作原理 Keepalived 是一个基于VRRP协议来实现的LVS服务高可用方案&#xff0c;可以解决静态路由出现的单点故障问题 在一个LVS服务集群中通常有主服务器 (MAST…...

Threejs学习05——球缓冲几何体背景贴图和环境贴图

实现随机多个三角形随机位置随机颜色展示效果 这是一个非常简单基础的threejs的学习应用&#xff01;本节主要学习的是球面缓冲几何体的贴图部分&#xff0c;这里有环境贴图以及背景贴图&#xff0c;这样可以有一种身临其境的效果&#xff01;这里环境贴图用的是一个.hdr的文件…...

LVS+Keepalived群集实验

目录 Keepalived 是什么 Keepalived 功能 Keepalived 模块 工作原理 脑裂现象及解决方案 脑裂 形成脑裂的原因 解决脑裂的几种方法&#xff1a; 为了减少或避免HA集群中出现脑裂现象&#xff0c;我们可以采取以下措施&#xff1a; Keepalived服务主要功能&#xff0…...

软考高级之系统架构师之系统开发基础

架构 场景 场景&#xff08;scenarios&#xff09;在进行体系结构评估时&#xff0c;一般首先要精确地得出具体的质量目标&#xff0c;并以之作为判定该体系结构优劣的标准。为得出这些目标而采用的机制做场景。场景是从风险承担者的角度对与系统的交互的简短描述。在体系结构…...

Web 3.0 安全风险,您需要了解这些内容

随着技术的不断发展&#xff0c;Web 3.0 正在逐渐成为现实&#xff0c;为我们带来了许多新的机遇和挑战。然而&#xff0c;与任何新技术一样&#xff0c;Web 3.0 也伴随着一系列安全风险&#xff0c;这些风险需要被认真对待。在这篇文章中&#xff0c;我们将探讨一些与Web 3.0 …...

万宾科技22款产品入选《城市生命线安全工程监测技术产品名录》

2023年8月17日-18日&#xff0c;由北京市地下管线协会主办的2023首届城市生命线安全与发展大会在北京召开&#xff0c;本次大会汇聚中央及地方政府主管领导、院士专家、行业领袖、龙头代表、产业精英等。 大会聚焦安全监管智慧平台和燃气爆炸、城市内涝、地下管线交互风险、第三…...

MFC 隐藏窗口

亲测能用 改变主窗体的创建方式 将 C***App::InitInstance() 函数中的代码 CMFCApplication1Dlg dlg; m_pMainWnd &dlg; INT_PTR nResponse dlg.DoModal(); 替换为 CMFCApplication1Dlg dlg; m_pMainWnd &dlg; //INT_PTR nResponse dlg.DoModal(); INT_PTR nRe…...

Java数据库连接池原理及spring boot使用数据库连接池(HikariCP、Druid)

和线程池类似&#xff0c;数据库连接池的作用是建立一些和数据库的连接供需要连接数据库的业务使用&#xff0c;避免了每次和数据库建立、销毁连接的性能消耗&#xff0c;通过设置连接池参数可以防止建立连接过多导致服务宕机等&#xff0c;以下介绍Java中主要使用的几种数据库…...

百度商业AI 技术创新大赛赛道二:AIGC推理性能优化TOP10之经验分享

朋友们&#xff0c;AIGC性能优化大赛已经结束了&#xff0c;看新闻很多队员已经完成了答辩和领奖环节&#xff0c;我根据内幕人了解到&#xff0c;比赛的最终代码及结果是不会分享出来的&#xff0c;因为办比赛的目的就是吸引最优秀的代码然后给公司节省自己开发的成本&#xf…...

微服务时代java异常捕捉

一、尽量不要使用e.printStackTrace(),而是使用log打印。 ​反例:​ try{ // do what you want }catch(Exception e){ e.printStackTrace(); } ​ 正例&#xff1a;​ try{ // do what you want }catch(Exception e){ log.info("你的程序有异常啦,{}",e)…...

Hadoop支持LZO压缩

LZO(Lempel-Ziv-Oberhumer)是一种快速压缩算法,特别适用于大数据处理。在Hadoop生态系统中,LZO压缩通常用于Hadoop MapReduce作业的输入和输出数据,以减少存储空间和数据传输的开销。 以下是在Hadoop中使用LZO压缩的一般步骤: 安装LZO库和工具: 首先,需要在Hadoop集群…...

vue3 01-setup函数

1.setup函数的作用: 1.是组合式api的入口2.比beforeCreate 执行更早3.没有this组件实例一开始创建vue3页面的时候是这样的 <template></template> <script> export default{setup(){return{ }} } </script>给容器传参在页面中显示 数据给模板使用,以…...

iOS swift 类似AirDrop的近场数据传输 MultipeerConnectivity 框架

文章目录 1.github上的demo 1.github上的demo insidegui/MultipeerDemo – github insidegui/MultipeerKit – github...

Lnton羚通云算力平台OpenCV-PythonCanny边缘检测教程

Canny 边缘检测是一种经典的边缘检测算法&#xff0c;由 John F. Canny 在 1986 年提出。它被广泛应用于计算机视觉和图像处理领域&#xff0c;用于检测图像中的边缘。 ​【原理】 1. 去噪 由于边缘检测非常容易收到图像的噪声影响&#xff0c;第一步使用 5x5 高斯滤波去除图…...

2023-8-23 滑动窗口

题目链接&#xff1a;滑动窗口 #include <iostream>using namespace std;const int N 1000010;int n, k; int a[N], q[N];int main() {scanf("%d%d", &n, &k);for(int i 0; i < n; i) scanf("%d", &a[i]);int hh 0, tt -1;for(…...

SOA通信中间件常用的通信协议

摘要&#xff1a; SOA&#xff08;面向服务的架构&#xff09;的软件设计原则之一是模块化。 前言 SOA&#xff08;面向服务的架构&#xff09;的软件设计原则之一是模块化。模块化可以提高软件系统的可维护性和代码重用性&#xff0c;并且能够隔离故障。举例来说&#xff0c;…...

解决npm安装依赖失败,node和node-sass版本不匹配的问题

npm安装依赖报错&#xff1a; npm ERR! cb() never called! npm ERR! This is an error with npm itself. 一. 问题描述 用npm安装依赖报错&#xff1a; npm ERR! cb() never called! npm ERR! This is an error with npm itself. Please report this error at: npm ERR! …...

2023 网络建设与运维 X86架构计算机操作系统安装与管理题解

任务描述: 随着信息技术的快速发展,集团计划2023年把部分业务由原有的X86架构服务器上迁移到ARM架构服务器上,同时根据目前的部分业务需求进行了部分调整和优化。 一、X86架构计算机操作系统安装与管理 1.PC1系统为ubuntu-desktop-amd64系统(已安装,语言为英文),登录用户…...

黄岩城乡住房和建设局网站/网络推广产品要给多少钱

1、字符串判断 str1 str2     当两个串有相同内容、长度时为真 str1 ! str2    当串str1和str2不等时为真 -n str1       当串的长度大于0时为真(串非空&#xff0c;变量) -z str1       当串的长度为0时为真(空串) str1        当串str1为非空时…...

又拍云 wordpress使用/如何自己做网页

1.spark&#xff1a;用来实现快速而通用的集群计算平台。也可以说是一个集群计算的通用框架。 2.优点&#xff1a;能够在内存中进行计算&#xff0c;内存式存储和容错机制。 3.组件&#xff1a; 1.spark core&#xff1a;RDD&#xff0c;弹性分布式数据集&#xff1b; spark …...

做企业网站什么软件好/网络服务提供者不履行法律行政法规规定

问题描述 给你一个按非递减顺序 排序的整数数组 nums&#xff0c;返回每个数字的平方 组成的新数组&#xff0c;要求也按非递减顺序排序.解决思路 ① 先修改数组中元素的值, 将其改成该数的平方. ②再对数组进行排序.源代码 class Solution {public int[] sortedSquares(int[]…...

wordpress 旅游模板/网站建设的意义和目的

随着汽车黑科技越来越多&#xff0c;关于汽车上的隐藏功能很多车主都没有太多认识&#xff0c;甚至有时候要看了汽车说明书才能了解汽车被动的安全功能。今天名悦集团为您介绍那些汽车上常见的安全装置&#xff0c;被忽略的“保命”装置哦。 副驾驶安全气囊开关 当副驾驶放儿童…...

来个网站急急急2021年/镇江seo

悄悄打入公司内部UED的一个Python爱好者小众群&#xff0c;前两天一位牛人发了条消息&#xff1a; 小的测试题&#xff1a;re.split((\W),test,test,test.)返回什么结果一开始看&#xff0c;我倒没注意W是大写的&#xff0c;以为是小写的w代表单词字符&#xff08;含下划线&…...

贵州最好的网站建设推广公司/东莞疫情最新消息今天又封了

一、去掉图片间隙&#xff1a;hack1、img{ display&#xff1a;block&#xff1b; }hack2、将<div></div>与<img>写在同一行 二、ie6双倍浮向(双倍边距)出现情况&#xff1a;在浮动和外边距同事出现时会出现双向浮动。hack&#xff1a;给浮动元素添加&#x…...