Flink Connector 开发
Flink Streaming Connector
Flink
是新一代流批统一的计算引擎,它需要从不同的第三方存储引擎中把数据读过来,进行处理,然后再写出到另外的存储引擎中。Connector
的作用就相当于一个连接器,连接Flink
计算引擎跟外界存储系统。Flink
里有以下几种方式,当然也不限于这几种方式可以跟外界进行数据交换:
【1】Flink
里面预定义了一些source
和sink
;
【2】Flink
内部也提供了一些Boundled connectors
;
【3】可以使用第三方Apache Bahir
项目中提供的连接器;
【4】是通过异步IO
方式;
预定义的 source 和 sink
Flink
里预定义了一部分source
和sink
。在这里分了几类。
基于文件的 source 和 sink
如果要从文本文件中读取数据,可以直接使用:
env.readTextFile(path)
就可以以文本的形式读取该文件中的内容。当然也可以使用:根据指定的fileInputFormat
格式读取文件中的内容。
env.readFile(fileInputFormat, path)
如果数据在Flink
内进行了一系列的计算,想把结果写出到文件里,也可以直接使用内部预定义的一些sink
,比如将结果已文本或csv
格式写出到文件中,可以使用DataStream
的writeAsText(path)
和DataSet
的writeAsCsv(path)
。
基于 Socket 的 Source 和 Sink
提供 Socket
的host name
及port
,可以直接用StreamExecutionEnvironment
预定的接口socketTextStream
创建基于Socket
的source
,从该 socket
中以文本的形式读取数据。当然如果想把结果写出到另外一个Socket
,也可以直接调用DataStream writeToSocket
。
//从 socket 中读取数据流
env.socketTextStream("localhost",777);
//输出至 socket
resultDataStream.writeToSocket("hadoop1",6666,new SimpleStringSchema())
基于内存 Collections、Iterators 的 Source
可以直接基于内存中的集合或者迭代器,调用StreamExecutionEnvironment fromCollection
、fromElements
构建相应的source
。结果数据也可以直接print
、printToError
的方式写出到标准输出或标准错误。详细也可以参考Flink
源码中提供的一些相对应的Examples
来查看异常预定义 source
和sink
的使用方法,例如WordCount
、SocketWindowWordCount
。
//从Java.util.Collection集合中读取数据作为数据源
ArrayList<String> list = new ArrayList<>(5);
list.add("flink");
list.add("scala");
list.add("spark");
list.add("hadoop");
list.add("hive");
env.fromCollection(list).print();//从Java.util.Collection集合中读取数据作为数据源env.fromElements("flink", "scala", "spark", "hadoop", "hive").print();
Bundled Connectors
Flink
里已经提供了一些绑定的Connector
,例如kafka source
和sink
,Es sink
等。读写kafka
、es
、rabbitMQ
时可以直接使用相应 connector
的api
即可。
虽然该部分是Flink
项目源代码里的一部分,但是真正意义上不算作Flink
引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交Job
时候需要注意,job
代码jar
包中一定要将相应的connetor
相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。
Apache Bahir 中的连接器
Apache Bahir
最初是从Apache Spark
中独立出来项目提供,以提供不限于Spark
相关的扩展 / 插件、连接器和其他可插入组件的实现。通过提供多样化的流连接器streaming connectors
和SQL
数据源扩展分析平台的覆盖面。如有需要写到flume
、redis
的需求的话,可以使用该项目提供的connector
。
Async I/O
流计算中经常需要与外部存储系统交互,比如需要关联MySQL
中的某个表。一般来说,如果用同步I/O
的方式,会造成系统中出现大的等待时间,影响吞吐和延迟。为了解决这个问题,异步I/O
可以并发处理多个请求,提高吞吐,减少延迟。Async
的原理可参考官方文档
相关文章:
Flink Connector 开发
Flink Streaming Connector Flink是新一代流批统一的计算引擎,它需要从不同的第三方存储引擎中把数据读过来,进行处理,然后再写出到另外的存储引擎中。Connector的作用就相当于一个连接器,连接Flink计算引擎跟外界存储系统。Flin…...
Golang leetcode707 设计链表 (链表大成)
文章目录 设计链表 Leetcode707不使用头节点使用头节点 推荐** 设计链表 Leetcode707 题目要求我们通过实现几个方法来完成对链表的各个操作 由于在go语言中都为值传递,(注意这里与值类型、引用类型的而区别),所以即使我们直接在…...
Django和Vue项目运行过程中遇到的问题及解决办法
这是我从CSDN上边买来的一个系统的资源,准备在此基础上改成自己的系统,但是在运行项目这一步上都把自己难为了好几天,经过不断的摸索,终于完成了第一步!!! 如果大家也遇到同样的问题࿰…...
Single-Image Crowd Counting via Multi-Column Convolutional Neural Network
Single-Image Crowd Counting via Multi-Column Convolutional Neural Network 论文背景人群密度方法过去的发展历史早期方法基于轨迹聚类的方法基于特征回归的方法基于图像的方法 Multi-column CNN用于人群计数基于密度图的人群计数通过几何自适应核生成密度图密度图估计的多列…...
el-cascader隐藏某一级的勾选框及vue报错Error in callback for watcher “options“的解决办法
今天用到饿了么的级联选择器时出现了这个报错Error in callback for watcher “options“: “TypeError: Cannot read propertie ‘level‘ of null,因为需求是在不同类型 el-cascader多选的时候默认是可以勾选所有级的选项的,如下图: 包含级联cascader的options、select的…...
2024美赛数学建模思路A题B题C题D题E题F题思路汇总 选题分析
文章目录 1 赛题思路2 美赛比赛日期和时间3 赛题类型4 美赛常见数模问题5 建模资料 1 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 2 美赛比赛日期和时间 比赛开始时间:北京时间2024年2月2日(周五ÿ…...
C++ 常用设计模式
一、工厂模式 from:C开发常用的设计模式及其实现详解 - 知乎 摘抄: 简单工厂、工厂、抽象工厂: 简单工厂需要工厂内部判断,而工厂模式不需要修改工厂类: 抽象工厂: 接上图: 未完待续.........
高性价比的高速吹风机/高速风筒解决方案,基于普冉单片机开发
高速吹风机是近些年非常火的一款产品,快速崛起并颠覆了传统吹风机,高速吹风机也成为了传统吹风机替代的一个大趋势。高速吹风机是利用高转速产生的大风量来快速吹干头发,由于其精巧的外观设计、超低的噪声、出色的干发效果,高速吹…...
toRefs的用法
文章目录 toRefs是什么toRefs的作用以及为什么要用它? toRefs是什么 toRefs 是 Vue 3 Composition API 中的一个函数,它用于将响应式对象转换为普通对象,其中对象的每个属性都是 ref 对象。这是因为在 Vue 3 中,reactive 创建的对…...
MySQL基础篇(三)约束
一、概述 概念:约束是作用于表中字段上的规则,用于限制存储在表中的数据。 目的:保证数据库中数据的正确、有效性和完整性。 分类: 注意:约束是作用于表中字段上的,可以在创建表/修改表的时候添加约束。 二…...
Java进阶 1-2 枚举
目录 常量特定方法 职责链模式的枚举实现 状态机模式的枚举实现 多路分发 1、使用枚举类型实现分发 2、使用常量特定方法实现分发 3、使用EnumMap实现分发 4、使用二维数组实现分发 本笔记参考自: 《On Java 中文版》 常量特定方法 在Java中,我们…...
一个人最大的内驱力是什么?
1、不因为孤独或外界压力而降低「生活标准“」的能力。 ”因为寂寞去约炮“、“因为家里催婚匆忙结婚“、”因为没谈过恋爱随便找个人交往。 “你的每一次选择都是在为自己想要的世界而投的票,往后余生是幸福还是悲剧,就是在这一次次 的将就与坚持死磕中…...
解决方法:公众号的API上传素材报错40005
公众号的API上传素材报错40005 Error uploading file : {"errcode":40005,"errmsg":"invalid file type hint: [YOkxGA0122w487] rid: 223442-323247e7bd5-5d75322d88"}上传错误原因分析: 之前成功的示例,文件名为"…...
音量控制软件sound control mac功能亮点
sound control mac可以帮助用户控制某个独立应用程序的音量,通过每应用音量,均衡器,平衡和音频路由独立控制每个应用的音频,还有整个系统的音量。 sound control mac功能亮点 每个应用程序的音量控制 独立控制应用的数量。 键盘音…...
Spring Boot 生产就绪中文文档-下
本文为官方文档直译版本。原文链接 由于篇幅较长,遂分两篇。上半部分中文文档 Spring Boot 生产就绪中文文档-下 度量标准入门受支持的监控系统AppOpticsAtlasDatadogDynatracev2 API自动配置手动配置 v1 API (旧版)与版本无关的设置 ElasticGangliaGraphiteHumioIn…...
DS|树结构及应用
题目一:DS树 -- 树的先根遍历(双亲转先序) 题目描述: 给出一棵树的双亲表示法结果,用一个二维数组表示,位置下标从0开始,如果双亲位置为-1则表示该结点为根结点 编写程序,输出该树…...
Java 读取超大excel文件
注意:此参考解决方案只是针对xlsx格式的excel文件! Maven <dependency><groupId>com.monitorjbl</groupId><artifactId>xlsx-streamer</artifactId><version>2.2.0</version> </dependency>读取方式1…...
K8S中的job和CronJob
Job 介绍 Kubernetes jobs主要是针对短时和批量的工作负载。它是为了结束而运行的,而不是像deployment、replicasets、replication controllers和DaemonSets等其他对象那样持续运行。 示例 apiVersion: batch/v1 kind: Job metadata:name: pispec:template:spec:r…...
中国文化文物和旅游统计年鉴,数据含pdf、excel等格式,文本形式呈现,可预览数据
基本信息. 数据名称: 中国旅游统计年鉴 数据格式: pdf、xls不定 数据时间: 2012-2020年 数据几何类型: 文本 数据坐标系: —— 数据来源:文化和旅游部、网络公开数据 原名为《中国旅游统计年鉴》2020年后更名为《中国文化文物和旅游统计年鉴》ÿ…...
Java版企业电子招标采购系统源码——鸿鹄电子招投标系统的技术特点
在数字化时代,采购管理也正经历着前所未有的变革。全过程数字化采购管理成为了企业追求高效、透明和规范的关键。该系统通过Spring Cloud、Spring Boot2、Mybatis等先进技术,打造了从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理。通过…...
go语言语法基础
文章目录 前言一、输入和输出常用的字符串格式化符号 二、注释三、Go常用基本语言数据类型数字类型布尔类型字符类型变量与常量数组和切片数组切片 map类型创建map增删改查特别提醒 指针 四、运算符五、条件判断语句if系列switch六、循环语句for循环标准写法死循环while循环do …...
eclipse 和java环境的安装教程
安装 Eclipse 和配置 Java 环境是一个多步骤的过程,涉及到安装 Java Development Kit (JDK) 和 Eclipse IDE。以下是基本步骤: 安装 Java Development Kit (JDK) 下载 JDK: 访问 Oracle 官方网站(Oracle JDK)或者选择…...
Win11系统的优化方法参考文档(彻底优化策略)
目录 一、个性化-应用-关闭防火墙等的设置 二、任务栏优化设置 三、Win11开始菜单更改为Win10经典菜单 四、将Micresoft Store 从固定任务栏取消 五、电源性能优化 六、解决卡顿 七、卸载系统自带软件 八、任务管理器开机启动项的禁用 九、调整为最佳性能 十…...
Leetcode13-解密消息(2325)
1、题目 给你字符串 key 和 message ,分别表示一个加密密钥和一段加密消息。解密 message 的步骤如下: 使用 key 中 26 个英文小写字母第一次出现的顺序作为替换表中的字母 顺序 。 将替换表与普通英文字母表对齐,形成对照表。 按照对照表 …...
二进制安装包安装Prometheus插件安装(mysql_exporter)
简介 mysql_exporter是用来收集MysQL或者Mariadb数据库相关指标的,mysql_exporter需要连接到数据库并有相关权限。既可以用二进制安装部署,也可以通过容器形式部署,但为了数据收集的准确性,推荐二进制安装。 一,下载安…...
原生微信小程序如何动态修改svg图片颜色及尺寸、宽高(封装svgIcon组件)
最终效果 前言 动态设置Svg图片颜色就是修改Svg源码的path中的fill属性, 通过wx.getFileSystemManager().readFile读取.xlsx文件 把文件转成base64 封装svg-icon组件 1、在项目的components下新建svg-icon文件夹,新增base64.js文件 class Base64 {cons…...
Python从入门到网络爬虫(面向对象详解)
前言 Python从设计之初就已经是一门面向对象的语言,正因为如此,在Python中创建一个类和对象是很容易的。本章节我们将详细介绍Python的面向对象编程。如果你以前没有接触过面向对象的编程语言,那你可能需要先了解一些面向对象语言的一些基本…...
NPDP产品经理含金量高吗?难考吗?
NPDP的中文翻译为产品经理国际资格认证。NPDP考试起源于美国,由美国产品开发与管理协会(PDMA)发起。NPDP认证是集理论、方法与实践为一体的全方位知识体系,为公司组织层级进行规划、决策、执行提供良好的方法体系支撑。࿰…...
目标检测 YOLOv5 - 推理时的数据增强
目标检测 YOLOv5 - 推理时的数据增强 flyfish 版本 YOLOv5 6.2 参考地址 https://github.com/ultralytics/yolov5/issues/303在训练时可以使用数据增强,在推理阶段也可以使用数据增强 在测试使用数据增强有个名字叫做Test-Time Augmentation (TTA) 实际使用中使…...
篇二:springboot2.7 OAuth2 server使用jdbc存储RegisteredClient
上一篇 <<springboot 2.7 oauth server配置源码走读一>>中简单描述了oauth2 server的配置,其中使用了内存保存 RegisteredClient,本篇改用mysql存储。 db存储需要创建表,表结构应该是什么样的呢,从spring给我们封装好…...
做网站工资多少/seo人员的职责
$.ajax({ //以下属性值均为可选 type: "POST", //默认值: "GET")。请求方式 ("POST" 或 "GET"), 默认为 "GET"。注意:其它 HTTP 请求方法,如 PUT 和 DELETE 也可以使用,但仅部…...
网站建设的软件/seo博客模板
来看看strdup在Glibc 2.20(标准C库)中的实现: 默认参数s不为空指针,这个在我们的数据结构库中是有问题的 改进: 当前版本g编译器不允许析构函数抛异常这么做 打印出来结果:1 3 然后程序崩溃 我们都删除了…...
做图文链接网站/一个新手如何推销产品
Windows 10系统是微软独立发布的最后一个Windows版本,下一代Windows都将作为更新形式出现,因此,WIN10都会开启自动更新,当我们使用电脑WIN10的时候,老是提示我们需要更新系统,很烦人,那么怎么关…...
迪虎科技网站建设/怎么做好seo推广
这里需要注意include的两种不同写法,#include<***.h> 和 #include"***.h" 采用"< >"方式进行包含的头bai文件表示让编译器在编译器的预设标准路径下去搜索相应的头文件,如果找不到则报错。 例如:VS2008的安…...
做五金出口在哪个网站好点/十大推广app平台
在 CSS 中,类选择器以一个点号显示:.center {text-align: center}在上面的例子中,所有拥有 center 类的 HTML 元素均为居中。在下面的 HTML 代码中,h1 和 p 元素都有 center 类。这意味着两者都将遵守 ".center" 选择器…...
泰安诚信的企业建站公司/百度提交入口网址是指在哪里
栈的压入、弹出序列 题目描述思路实现题目描述 输入两个整数序列,第一个序列表示栈的压入顺序,请判断第二个序列是否可能为该栈的弹出顺序。假设压入栈的所有数字均不相等。例如序列1,2,3,4,5是某栈的压入顺序,序列4,5,3,2,1是该压栈序列对应…...