当前位置: 首页 > news >正文

flink table view datastream互转

case class outer(f1:String,f2:Inner)
case class outerV1(f1:String,f2:Inner,f3:Int)
case class Inner(f3:String,f4:Int)

测试代码

package com.yy.table.convertimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.types.DataTypeobject streamPOJO2table {case class outer(f1:String,f2:Inner)case class outerV1(f1:String,f2:Inner,f3:Int)case class Inner(f3:String,f4:Int)def main(args: Array[String]): Unit = {// flink1.13 流处理环境初始化val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create(env)import org.apache.flink.streaming.api.scala._val ds1: DataStream[outer] = env.fromElements(outer("a",Inner("b",2)),outer("d",Inner("e",4)))val table1: Table = tEnv.fromDataStream(ds1)
//    table1
//      .execute()
//      .print()/*+----+--------------------------------+--------------------------------+
| op |                             f1 |                             f2 |
+----+--------------------------------+--------------------------------+
| +I |                              a |                   (f3=b, f4=2) |
| +I |                              d |                   (f3=e, f4=4) |
+----+--------------------------------+--------------------------------+*///    table1
//      .print()/*5> +I[d, Inner(e,4)]
4> +I[a, Inner(b,2)]*/tEnv.createTemporaryView("view1", ds1)val tableResult1: TableResult = tEnv.executeSql("select f1,f2,(f2.f4 + 100) as f3 from view1")tableResult1.print()/*+----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+*///val t1: Table = tEnv.sqlQuery("select f1,f2,(f2.f4 + 100) as f3 from view1")
//    t1.print()//    println(t1.getResolvedSchema)/*
+----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+
2 rows in set
(`f1` STRING,`f2` *com.yy.table.convert.streamPOJO2table$Inner<`f3` STRING, `f4` INT NOT NULL>* NOT NULL,`f3` INT NOT NULL
)*/println("---- 1 -------")// tableResult转datastreamval o1: DataStream[outerV1] = tEnv.toDataStream[outerV1](t1,classOf[outerV1])
//    o1.print()println("---- 2 -------")tEnv.executeSql("""|select|f1|,f2.f3|,f2.f4|from view1|""".stripMargin)
//      .print()/*+----+--------------------------------+--------------------------------+--------------------------------+
| op |                             f1 |                             f3 |                             f4 |
+----+--------------------------------+--------------------------------+--------------------------------+
| +I |                              a |                              b |                              c |
| +I |                              d |                              e |                              f |
+----+--------------------------------+--------------------------------+--------------------------------+*/tEnv.executeSql("""|select|f1|,(f2.f3,f2.f4)|from view1|""".stripMargin)
//      .print()env.execute("jobName1")}}

相关文章:

flink table view datastream互转

case class outer(f1:String,f2:Inner) case class outerV1(f1:String,f2:Inner,f3:Int) case class Inner(f3:String,f4:Int) 测试代码 package com.yy.table.convertimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.tabl…...

redis重启后数据丢失问题解决(亲测好用)

redis修改密码重启后发现redis中的数据丢失了 解决办法&#xff1a; 首先在redis的安装目录下查找重启之前的dump.rdb文件&#xff0c;发现只有当天的一个dump.rdb文件&#xff0c;确认不是重启备份的文件 然后我就全盘找一下dump.rdb的备份文件&#xff0c;找到前一天的备份…...

敬请期待……

敬请期待…… 《Python百宝箱》 序号文章目录直达链接表白系列1无法拒绝的表白界面https://want595.blog.csdn.net/article/details/1347448942满屏飘字表白代码https://want595.blog.csdn.net/article/details/1350373883无限弹窗表白代码...

3.10 Android eBPF HelloWorld调试(四)

一,读取eBPF map的android应用程序示例 1.1 C++源码及源码解读 /system/memory/bpfmapparsed/hello_world_map_parser.cpp //基于aosp android12#define LOG_TAG "BPF_MAP_PARSER"#include <log/log.h> #include <stdlib.h> #include <unistd.h&g…...

PyTorch常用工具(1)数据处理

文章目录 前言1 数据处理1.1 Dataset1.2 DataLoader 前言 在训练神经网络的过程中需要用到很多的工具&#xff0c;最重要的是数据处理、可视化和GPU加速。本章主要介绍PyTorch在这些方面常用的工具模块&#xff0c;合理使用这些工具可以极大地提高编程效率。 由于内容较多&am…...

docker-简单说说cgroup

前面我们简单说了下namespace&#xff0c; 现在我们来接着简单说说cgroup。通过docker-简单说说namespace文章我们知道&#xff1a; namespace 是为了隔离进程组之间的资源&#xff0c;那cgroup就是为了对进程组的监控和限制资源。Cgroup 可以限制进程组使用的资源数量和分配&a…...

印象笔记04: 如何将印象笔记超级会员价值最大化利用?

印象笔记04&#xff1a; 如何将印象笔记超级会员价值最大化利用&#xff1f; 为什么有这个问题 我不知道有没有人一开始接触印象笔记觉得非常好。奈何只能两个设备同步&#xff0c;局限太多。而会员活动比较优惠——就开了会员。而且我开了十年……。只能开发一下看看怎么最大…...

我的JDK动态代理流程

我的JDK动态代理流程 我梳理的动态代理流程大约是&#xff1a; 如果每一个框架都有自己的BPP&#xff0c;且自己的BPP中都有自己的wrapIfNecessory&#xff0c;那样可能就是一个BPP一个代理类。但通常应该都是各自的框架以提供 Advisior&#xff08;切面&#xff09;的方式&am…...

uniapp Vue3 面包屑导航 带动态样式

上干货 <template><view class"bei"><view class"container"><view class"indicator"></view><!-- 遍历路由列表 --><view v-for"(item, index) in routes" :key"index" :class&quo…...

openGauss学习笔记-174 openGauss 数据库运维-备份与恢复-导入数据-管理并发写入操作

文章目录 openGauss学习笔记-174 openGauss 数据库运维-备份与恢复-导入数据-管理并发写入操作174.1 事务隔离说明174.2 写入和读写操作174.3 并发写入事务的潜在死锁情况 openGauss学习笔记-174 openGauss 数据库运维-备份与恢复-导入数据-管理并发写入操作 174.1 事务隔离说…...

数据分析可被划分为4个重要的类别

1、描述型&#xff1a;发生了什么&#xff1f; 全面、准确、实时的数据有效的可视化 2、诊断型&#xff1a;为什么会发生&#xff1f; 能够深入了解问题的根本原因隔离所有混淆信息的能力 3、预测型&#xff1a;可能发生什么&#xff1f; 通过历史数据来预测特定的结果通过…...

爆火小游戏敲木鱼流量主小程序源码系统+完整的代码包以及安装搭建教程

随着移动互联网的快速发展&#xff0c;小程序已成为一种新的应用形态&#xff0c;深入到人们生活的方方面面。其中&#xff0c;小游戏由于其简单、有趣的特点&#xff0c;吸引了大量用户&#xff0c;也成为了许多开发者的首选。敲木鱼小游戏&#xff0c;以其独特的玩法和轻松的…...

Invoke和BeginInvoke的区别

Invoke和BeginInvoke的区别 本文导读&#xff1a;BeginInvoke() 调用时&#xff0c;当前线程会启用线程池中的某个线程来执行此方法&#xff0c;当前线程不被阻塞&#xff0c;继续运行后面的代码&#xff0c; Invoke() 调用时&#xff0c;会阻塞当前线程&#xff0c;等到 Invo…...

3 分钟为英语学习神器 Anki 部署一个专属同步服务器

Anki 介绍 Anki 是一款基于间隔重复&#xff08;Spaced Repetition&#xff09;原理的学习软件&#xff0c;想象一下&#xff0c;你的大脑就像是一个需要定期维护的精密仪器。间隔重复就好比是一种精准的维护计划&#xff0c;它通过在最佳时刻复习信息&#xff0c;来确保知识在…...

<HarmonyOS第一课>应用程序框架

【习题】应用程序框架 目录 判断题 单选题 多选题 判断题 1. 一个应用只能有一个UIAbility。错误 正确(True)错误(False) 2. 创建的Empty Ability模板工程&#xff0c;初始会生成一个UIAbility文件。正确 正确(True)错误(False) 3. 每调用一次router.pushUrl()方法&…...

SQL 解析 — 如何轻松实现新增语句

KaiwuDB 支持多种不同类型的 SQL 语句&#xff0c;例如 create、insert 等。本文将介绍在 KaiwuDB SQL Parser&#xff08;下文统称解析器&#xff09;中添加新语句的过程及其实现。我们将了解如何使用 goyacc 工具更新解析器&#xff0c;以及执行器和查询计划器&#xff08;pl…...

Android集成OpenSSL实现加解密-集成

导入so 将编译生成的 OpenSSL 动态库文件&#xff08;.so 文件&#xff09;复制到你的 Android 项目的 libs 目录中 导入头文件 将编译生成的include文件夹导入到项目中 build.gradle添加配置 defaultConfig {……testInstrumentationRunner "androidx.test.runner…...

代码随想录算法训练营Day18|513.找树左下角的值、112. 路径总和、113. 路径总和ii、106.从中序与后序遍历序列构造二叉树

目录 513.找树左下角的值 前言 层序遍历 递归法 112. 路径总和 前言 递归法 113. 路径总和ii 前言 递归法 106.从中序与后序遍历序列构造二叉树 前言 思路 递归法 总结 513.找树左下角的值 题目链接 文章链接 前言 本题要求得到二叉树最后一行最左边的值&#xf…...

【蓝桥备赛】技能升级——二分查找

题目链接 技能升级 个人思路 需要给n个技能添加技能点&#xff0c;无论技能点加成如何衰减&#xff0c;每次始终都是选择当前技能加点加成最高的那一项技能&#xff0c;所以最后一次的加点一定也是加在当时技能攻击加成最高的那个。此时&#xff0c;我们去寻找最后一次的加点…...

zyqn-arm软中断设置

所有SGI都是边缘触发的&#xff0c;sgi的灵敏度类型是固定的&#xff0c;不能改变。 软中断初始化流程 1、初始化异常处理 2、初始化中断控制器 3、注册异常处理回调函数到CPU 4、连接软中断信号与注册软中断回调函数 5、使能中断控制器中的软中断中断 6、使能异常处理 …...

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周&#xff0c;有很多同学在写期末Java web作业时&#xff0c;运行tomcat出现乱码问题&#xff0c;经过多次解决与研究&#xff0c;我做了如下整理&#xff1a; 原因&#xff1a; IDEA本身编码与tomcat的编码与Windows编码不同导致&#xff0c;Windows 系统控制台…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄

文&#xff5c;魏琳华 编&#xff5c;王一粟 一场大会&#xff0c;聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中&#xff0c;汇集了学界、创业公司和大厂等三方的热门选手&#xff0c;关于多模态的集中讨论达到了前所未有的热度。其中&#xff0c;…...

ubuntu搭建nfs服务centos挂载访问

在Ubuntu上设置NFS服务器 在Ubuntu上&#xff0c;你可以使用apt包管理器来安装NFS服务器。打开终端并运行&#xff1a; sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享&#xff0c;例如/shared&#xff1a; sudo mkdir /shared sud…...

树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频

使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源&#xff1a; http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...

Xshell远程连接Kali(默认 | 私钥)Note版

前言:xshell远程连接&#xff0c;私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

线程与协程

1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指&#xff1a;像函数调用/返回一样轻量地完成任务切换。 举例说明&#xff1a; 当你在程序中写一个函数调用&#xff1a; funcA() 然后 funcA 执行完后返回&…...

CMake基础:构建流程详解

目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...

基于 TAPD 进行项目管理

起因 自己写了个小工具&#xff0c;仓库用的Github。之前在用markdown进行需求管理&#xff0c;现在随着功能的增加&#xff0c;感觉有点难以管理了&#xff0c;所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD&#xff0c;需要提供一个企业名新建一个项目&#…...

SQL慢可能是触发了ring buffer

简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...

pikachu靶场通关笔记19 SQL注入02-字符型注入(GET)

目录 一、SQL注入 二、字符型SQL注入 三、字符型注入与数字型注入 四、源码分析 五、渗透实战 1、渗透准备 2、SQL注入探测 &#xff08;1&#xff09;输入单引号 &#xff08;2&#xff09;万能注入语句 3、获取回显列orderby 4、获取数据库名database 5、获取表名…...