当前位置: 首页 > news >正文

org.apache.flink.table.api.TableException: Sink does not exists

FlinkSQL_1.12_用DDL实现Kafka到MySQL的数据传输_实现按照条件进行过滤写入MySQL_flink从kafka拉取数据并过滤数据写入mysql_旧城里的阳光的博客-CSDN博客

参考这篇文章,写了kafka到mysql的代码例子,因为自己改了表结构,运行下面代码:

package org.test.flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;//TODO 用DDL实现Kafka到MySQL的数据传输
public class FlinkSQL15_SQL_DDL_Kafka_MySQL {public static void main(String[] args) throws Exception {//1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.使用DDL的方式加载数据--注册SourceTabletableEnv.executeSql("create table source_sensor(account_id  BIGINT)" +"with (" +"'connector.type' = 'kafka'," +"'connector.version' = 'universal'," +"'connector.topic' = 'testtopic'," +"'connector.properties.bootstrap.servers' = '11.0.24.216:9092'," +"'connector.properties.group.id' = 'bigdata1109'," +"'format.type' = 'json'"+ ")");Table table = tableEnv.sqlQuery("select * from source_sensor");//3.注册SinkTable:MysqltableEnv.executeSql("CREATE TABLE spend_report (\n" +"    account_id BIGINT,\n" +"    PRIMARY KEY (account_id) NOT ENFORCED)" +"with (" +"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://11.0.24.216:4306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false',"+"'table-name' = 'spend_report',"+"'username' = 'root',"+"'password' = '123456'"+ ")");//4.执行查询kafka数据
//        Table source_sensor = tableEnv.from("source_sensor");
//        //5.将数据写入Mysql
//        source_sensor.executeInsert("sink_sensor");
//table.executeInsert("sink_sensor");//6.执行任务env.execute();}
}

发现报错如下:

Exception in thread "main" org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`sink_sensor` does not existsat org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247)at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:159)at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)at scala.collection.Iterator.foreach(Iterator.scala:943)at scala.collection.Iterator.foreach$(Iterator.scala:943)at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)at scala.collection.IterableLike.foreach(IterableLike.scala:74)at scala.collection.IterableLike.foreach$(IterableLike.scala:73)at scala.collection.AbstractIterable.foreach(Iterable.scala:56)at scala.collection.TraversableLike.map(TraversableLike.scala:286)at scala.collection.TraversableLike.map$(TraversableLike.scala:279)at scala.collection.AbstractTraversable.map(Traversable.scala:108)at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:159)at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:554)at org.test.flink.FlinkSQL15_SQL_DDL_Kafka_MySQL.main(FlinkSQL15_SQL_DDL_Kafka_MySQL.java:50)

点击table.executeInsert看了下源码:

    /*** Writes the {@link Table} to a {@link TableSink} that was registered under the specified path,* and then execute the insert operation.** <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or {@link* TableEnvironment#useCatalog(String)} for the rules on the path resolution.** <p>A batch {@link Table} can only be written to a {@code* org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a {@code* org.apache.flink.table.sinks.AppendStreamTableSink}, a {@code* org.apache.flink.table.sinks.RetractStreamTableSink}, or an {@code* org.apache.flink.table.sinks.UpsertStreamTableSink}.** <p>Example:** <pre>{@code* Table table = tableEnv.fromQuery("select * from MyTable");* TableResult tableResult = table.executeInsert("MySink");* tableResult...* }</pre>** @param tablePath The path of the registered TableSink to which the Table is written.* @return The insert operation execution result.*/TableResult executeInsert(String tablePath);

 发现executeInsert方法的参数tablePath需要传入表名,这里的表名应该和

tableEnv.executeSql("create table source_sensor(account_id  BIGINT)"

的表名source_sensor一致。

将:

table.executeInsert("sink_sensor");

改成:

table.executeInsert("source_sensor");

后执行成功。

flink1.2的demo完整代码:flink-java-1.12.7: flink1.12.7的java demo,包括flink wordcount示例,如何连接kafka

 

相关文章:

org.apache.flink.table.api.TableException: Sink does not exists

FlinkSQL_1.12_用DDL实现Kafka到MySQL的数据传输_实现按照条件进行过滤写入MySQL_flink从kafka拉取数据并过滤数据写入mysql_旧城里的阳光的博客-CSDN博客 参考这篇文章&#xff0c;写了kafka到mysql的代码例子&#xff0c;因为自己改了表结构&#xff0c;运行下面代码&#x…...

【多线程】CAS 详解

CAS 详解 一. 什么是 CAS二. CAS 的应用1. 实现原子类2. 实现自旋锁 三. CAS 的 ABA 问题四. 相关面试题 一. 什么是 CAS CAS: 全称Compare and swap&#xff0c;字面意思:”比较并交换“一个 CAS 涉及到以下操作&#xff1a; 我们假设内存中的原数据 V&#xff0c;旧的预期值…...

卷积神经网络实现咖啡豆分类 - P7

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制&#x1f680; 文章来源&#xff1a;K同学的学习圈子 目录 环境步骤环境设置包引用全局设备对象 数据准备查看图像的信息制作数据集 模型设…...

C++之默认与自定义构造函数问题(二百一十七)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…...

Docker从认识到实践再到底层原理(五)|Docker镜像

前言 那么这里博主先安利一些干货满满的专栏了&#xff01; 首先是博主的高质量博客的汇总&#xff0c;这个专栏里面的博客&#xff0c;都是博主最最用心写的一部分&#xff0c;干货满满&#xff0c;希望对大家有帮助。 高质量博客汇总 然后就是博主最近最花时间的一个专栏…...

【Flowable】任务监听器(五)

前言 之前有需要使用到Flowable&#xff0c;鉴于网上的资料不是很多也不是很全也是捣鼓了半天&#xff0c;因此争取能在这里简单分享一下经验&#xff0c;帮助有需要的朋友&#xff0c;也非常欢迎大家指出不足的地方。 一、监听器 在Flowable中&#xff0c;我们可以使用监听…...

spring-kafka中ContainerProperties.AckMode详解

近期&#xff0c;我们线上遇到了一个性能问题&#xff0c;几乎快引起线上故障&#xff0c;后来仅仅是修改了一行代码&#xff0c;性能就提升了几十倍。一行代码几十倍&#xff0c;数据听起来很夸张&#xff0c;不过这是真实的数据&#xff0c;线上错误的配置的确有可能导致性能…...

【rpc】Dubbo和Zookeeper结合使用,它们的作用与联系(通俗易懂,一文理解)

目录 Dubbo是什么&#xff1f; 把系统模块变成分布式&#xff0c;有哪些好处&#xff0c;本来能在一台机子上运行&#xff0c;为什么还要远程调用 Zookeeper是什么&#xff1f; 它们进行配合使用时&#xff0c;之间的关系 服务注册 服务发现 动态地址管理 Dubbo是…...

ChatGPT的未来

随着人工智能的快速发展&#xff0c;ChatGPT作为一种自然语言生成模型&#xff0c;在各个领域都展现出了巨大的潜力。它不仅可以用于日常对话、创意助手和知识查询&#xff0c;还可以应用于教育、医疗、商业等各个领域&#xff0c;为人们带来更多便利和创新。 在教育领域&#…...

Pytorch模型转ONNX部署

开始以为会很困难&#xff0c;但是其实非常方便&#xff0c;下边分两步走&#xff1a;1. pytorch模型转onnx&#xff1b;2. 使用onnx进行inference 0. 准备工作 0.1 安装onnx 安装onnx和onnxruntime&#xff0c;onnx貌似是个环境。。倒是没有直接使用&#xff0c;onnxruntim…...

k8s优雅停服

在应用程序的整个生命周期中&#xff0c;正在运行的 pod 会由于多种原因而终止。在某些情况下&#xff0c;Kubernetes 会因用户输入&#xff08;例如更新或删除 Deployment 时&#xff09;而终止 pod。在其他情况下&#xff0c;Kubernetes 需要释放给定节点上的资源时会终止 po…...

面试题五:computed的使用

题记 大部分的工作中使用computed的频次很低的&#xff0c;所以今天拿出来一文对于computed进行详细的介绍&#xff0c;因为Vue的灵魂之一就是computed。 模板内的表达式非常便利&#xff0c;但是设计它们的初衷是用于简单运算的。在模板中放入太多的逻辑会让模板过重且难以维护…...

完美的分布式监控系统 Prometheus与优雅的开源可视化平台 Grafana

1、之间的关系 prometheus与grafana之间是相辅相成的关系。简而言之Grafana作为可视化的平台&#xff0c;平台的数据从Prometheus中取到来进行仪表盘的展示。而Prometheus这源源不断的给Grafana提供数据的支持。 Prometheus是一个开源的系统监控和报警系统&#xff0c;能够监…...

黑马JVM总结(九)

&#xff08;1&#xff09;StringTable_调优1 我们知道StringTable底层是一个哈希表&#xff0c;哈希表的性能是跟它的大小相关的&#xff0c;如果哈希表这个桶的个数比较多&#xff0c;元素相对分散&#xff0c;哈希碰撞的几率就会减少&#xff0c;查找的速度较快&#xff0c…...

如何使用 RunwayML 进行创意 AI 创作

标题&#xff1a;如何使用 RunwayML 进行创意 AI 创作 介绍 RunwayML 是一个基于浏览器的人工智能创作工具&#xff0c;可让用户使用各种 AI 功能来生成图像、视频、音乐、文字和其他创意内容。RunwayML 的功能包括&#xff1a; * 图像生成&#xff1a;使用生成式对抗网络 (…...

【css】能被4整除 css :class,判断一个数能否被另外一个数整除,余数

判断一个数能否被另外一个数整除 一个数能被4整除的表达式可以表示为&#xff1a;num%40&#xff0c;其中&#xff0c;num为待判断的数&#xff0c;% 为取模运算符&#xff0c;为等于运算符。这个表达式的意思是&#xff0c;如果num除以4的余数为0&#xff0c;则返回true&…...

ChatGPT与日本首相交流核废水事件-精准Prompt...

了解更多请点击&#xff1a;ChatGPT与日本首相交流核废水事件-精准Prompt...https://mp.weixin.qq.com/s?__bizMzg2NDY3NjY5NA&mid2247490070&idx1&snebdc608acd419bb3e71ca46acee04890&chksmce64e42ff9136d39743d16059e2c9509cc799a7b15e8f4d4f71caa25968554…...

关于 firefox 不能访问 http 的解决

情景&#xff1a; 我在虚拟机 192.168.x.111 上配置了 DNS 服务器&#xff0c;在 kali 上设置 192.168.x.111 为 DNS 服务器后&#xff0c;使用 firefox 地址栏搜索域名 www.xxx.com &#xff0c;访问在 192.168.x.111 搭建的网站&#xff0c;本来经 192.168.x.111 DNS 服务器解…...

68、Spring Data JPA 的 方法名关键字查询

★ 方法名关键字查询&#xff08;全自动&#xff09; &#xff08;1&#xff09;继承 CrudRepository 接口 的 DAO 组件可按特定规则来定义查询方法&#xff0c;只要这些查询方法的 方法名 遵守特定的规则&#xff0c;Spring Data 将会自动为这些方法生成 查询语句、提供 方法…...

Brother CNC联网数采集和远程控制

兄弟CNC IP地址设定参考&#xff1a;https://www.sohu.com/a/544461221_121353733没有能力写代码的兄弟可以提前下载好网络调试助手NetAssist&#xff0c;这样就不用写代码来测试连接CNC了。 以上是网络调试助手抓取CNC的产出命令&#xff0c;结果有多个行string需要自行解析&…...

剑指offer20_链表中环的入口节点

链表中环的入口节点 给定一个链表&#xff0c;若其中包含环&#xff0c;则输出环的入口节点。 若其中不包含环&#xff0c;则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...

SpringTask-03.入门案例

一.入门案例 启动类&#xff1a; package com.sky;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCach…...

​​企业大模型服务合规指南:深度解析备案与登记制度​​

伴随AI技术的爆炸式发展&#xff0c;尤其是大模型&#xff08;LLM&#xff09;在各行各业的深度应用和整合&#xff0c;企业利用AI技术提升效率、创新服务的步伐不断加快。无论是像DeepSeek这样的前沿技术提供者&#xff0c;还是积极拥抱AI转型的传统企业&#xff0c;在面向公众…...

算术操作符与类型转换:从基础到精通

目录 前言&#xff1a;从基础到实践——探索运算符与类型转换的奥秘 算术操作符超级详解 算术操作符&#xff1a;、-、*、/、% 赋值操作符&#xff1a;和复合赋值 单⽬操作符&#xff1a;、--、、- 前言&#xff1a;从基础到实践——探索运算符与类型转换的奥秘 在先前的文…...

快速排序算法改进:随机快排-荷兰国旗划分详解

随机快速排序-荷兰国旗划分算法详解 一、基础知识回顾1.1 快速排序简介1.2 荷兰国旗问题 二、随机快排 - 荷兰国旗划分原理2.1 随机化枢轴选择2.2 荷兰国旗划分过程2.3 结合随机快排与荷兰国旗划分 三、代码实现3.1 Python实现3.2 Java实现3.3 C实现 四、性能分析4.1 时间复杂度…...

写一个shell脚本,把局域网内,把能ping通的IP和不能ping通的IP分类,并保存到两个文本文件里

写一个shell脚本&#xff0c;把局域网内&#xff0c;把能ping通的IP和不能ping通的IP分类&#xff0c;并保存到两个文本文件里 脚本1 #!/bin/bash #定义变量 ip10.1.1 #循环去ping主机的IP for ((i1;i<10;i)) doping -c1 $ip.$i &>/dev/null[ $? -eq 0 ] &&am…...

基于 HTTP 的单向流式通信协议SSE详解

SSE&#xff08;Server-Sent Events&#xff09;详解 &#x1f9e0; 什么是 SSE&#xff1f; SSE&#xff08;Server-Sent Events&#xff09; 是 HTML5 标准中定义的一种通信机制&#xff0c;它允许服务器主动将事件推送给客户端&#xff08;浏览器&#xff09;。与传统的 H…...

C# WPF 左右布局实现学习笔记(1)

开发流程视频&#xff1a; https://www.youtube.com/watch?vCkHyDYeImjY&ab_channelC%23DesignPro Git源码&#xff1a; GitHub - CSharpDesignPro/Page-Navigation-using-MVVM: WPF - Page Navigation using MVVM 1. 新建工程 新建WPF应用&#xff08;.NET Framework) 2.…...

【Linux】使用1Panel 面板让服务器定时自动执行任务

服务器就是一台24小时开机的主机&#xff0c;相比自己家中不定时开关机的主机更适合完成定时任务&#xff0c;例如下载资源、备份上传&#xff0c;或者登录某个网站执行一些操作&#xff0c;只需要编写 脚本&#xff0c;然后让服务器定时来执行这个脚本就可以。 有很多方法实现…...

设计模式域——软件设计模式全集

摘要 软件设计模式是软件工程领域中经过验证的、可复用的解决方案&#xff0c;旨在解决常见的软件设计问题。它们是软件开发经验的总结&#xff0c;能够帮助开发人员在设计阶段快速找到合适的解决方案&#xff0c;提高代码的可维护性、可扩展性和可复用性。设计模式主要分为三…...