当前位置: 首页 > news >正文

Flink 输出至 Elasticsearch

【1】引入pom.xml依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.10.0</version>
</dependency>

【2】ES6 Scala代码,自动导入的scala包需要修改为scala._ 否则会出现错误。

package com.zzx.flinkimport java.utilimport org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requestsobject EsSinkTest {def main(args: Array[String]): Unit = {// 创建一个流处理执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//从文件中读取数据并转换为 类val inputStreamFromFile: DataStream[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//转换val dataStream: DataStream[SensorReading] = inputStreamFromFile.map( data => {var dataArray = data.split(",")SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)})//定义一个 HttpHostsval httpHost = new util.ArrayList[HttpHost]()//默认 9200 我的修改为了 9201httpHost.add(new HttpHost("192.168.1.12",9200,"http"))httpHost.add(new HttpHost("127.0.0.1",9200,"http"))//定义一个 ElasticSearchFuntion 操作 es的functionval esSinkFunc = new ElasticsearchSinkFunction[SensorReading] {//element 每一条数据 通过 index 发送override def process(element: SensorReading, runtimeContext: RuntimeContext, index: RequestIndexer): Unit = {//包装写入 es 的数据val dataSource = new util.HashMap[String,String]()dataSource.put("sensor_id",element.id)dataSource.put("temp",element.temperature.toString)dataSource.put("ts",element.timestamp.toString)//indexval indexRequest = Requests.indexRequest().index("sensor_temp").`type`("readingdata").source(dataSource)index.add(indexRequest)println("saved successfully " + element.toString)}}//输出值 esdataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHost,esSinkFunc).build())env.execute("es")}
}

【3】ES6输出展示

​ [点击并拖拽以移动] ​​

相关文章:

Flink 输出至 Elasticsearch

【1】引入pom.xml依赖 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.10.0</version> </dependency>【2】ES6 Scala代码&#xff0c;自动导入的…...

web三层架构

目录 1.什么是三层架构 2.运用三层架构的目的 2.1规范代码 2.2解耦 2.3代码的复用和劳动成本的减少 3.各个层次的任务 3.1web层&#xff08;表现层) 3.2service 层(业务逻辑层) 3.3dao 持久层(数据访问层) 4.结合mybatis简单实例演示 1.什么是三层架构 三层架构就是把…...

智能优化算法应用:基于厨师算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于厨师算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于厨师算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.厨师算法4.实验参数设定5.算法结果6.参考文献7.MA…...

写在2023年末,软件测试面试题总结

大家好&#xff0c;最近有不少小伙伴在后台留言&#xff0c;得准备年后面试了&#xff0c;又不知道从何下手&#xff01;为了帮大家节约时间&#xff0c;特意准备了一份面试相关的资料&#xff0c;内容非常的全面&#xff0c;真的可以好好补一补&#xff0c;希望大家在都能拿到…...

51系列--数码管显示的4X4矩阵键盘设计

本文介绍基于51单片机的4X4矩阵键盘数码管显示设计&#xff08;完整Proteus仿真源文件及C代码见文末链接&#xff09; 一、系统及功能介绍 本设计主控芯片选用51单片机&#xff0c;主要实现矩阵键盘对应按键键值在数码管上显示出来&#xff0c;矩阵键盘是4X4共计16位按键&…...

医院绩效考核系统源码,java源码,商业级医院绩效核算系统源码

医院绩效定义&#xff1a; “医院工作量绩效方案”是一套以工作量&#xff08;RBRVS&#xff0c;相对价值比率&#xff09;为核算基础&#xff0c;以工作岗位、技术含量、风险程度、服务数量等业绩为主要依据&#xff0c;以工作效率和效益、工作质量、患者满意度等指标为综合考…...

JavaScript基础练习题(五)

生成一个范围内的随机整数&#xff1a;编写一个函数&#xff0c;接收两个参数&#xff0c;表示范围的最小值和最大值&#xff0c;然后生成一个在这个范围内的随机整数。 生成指定长度的随机字符串&#xff1a;编写一个函数&#xff0c;接收一个参数表示字符串的长度&#xff0…...

flutter项目从创建到运行,以及一些常用的命令

# 创建项目 命令行 flutter create flutter_app &#xff08;这种vsCode软件可用&#xff09; 按下ctrlshiftp&#xff0c; 输入 Flutter: New Project 选择 Application 选择项目存放位置 输入项目名字 点击 enter 完成创建 # 运行项目 1、命令行中运行&#xff1a; cd flutte…...

【Amazon 实验②】Amazon WAF功能增强之使用Cloudfront、Lambda@Edge阻挡攻击

文章目录 一、方案介绍二、架构图三、部署方案1. 进入Cloud9 编辑器&#xff0c;新打开一个teminal2. 克隆代码3. 解绑上一个实验中Cloudfront 分配绑定的防火墙4. 使用CDK部署方案5. CDK部署完成6. 关联LambdaEdge函数 四、方案效果 一、方案介绍 采用 LambdaEdge DynamoDB 架…...

There are 4 missing blocks. The following files may be corrupted

There are 4 missing blocks. The following files may be corrupted Please check the logs or run fsck in order to identify the missing blocks. See the Hadoop FAQ for common causes and potential solutions. 步骤1&#xff0c;检查文件缺失情况 hadoop fsck /tmp/l…...

一起玩儿物联网人工智能小车(ESP32)——13. 用ESP32的GPIO控制智能小车运动起来(一)

摘要&#xff1a;本文更深入的讲述了GPIO的相关知识&#xff0c;并完成了导线连接工作&#xff0c;为下一步的软件开发做好了准备。 通用输入输出端口&#xff08;GPIO&#xff1a;General Purpose Input/Output Port&#xff09;&#xff0c;在前面已经有了初步的介绍&#xf…...

D9741 PWM控制器电路,定时闩锁、短路保护电路,输出基准电压(2.5V) 采用SOP16封装

D9741是一块脉宽调制方三用于也收路像机和笔记本电的等设备上的直流转换器。在便携式的仪器设备上。 主要特点&#xff1a;● 高精度基准电路 ● 定时闩锁、短路保护电路 ● 低电压输入时误操作保护电路 ● 输出基准电…...

【UE5.1】程序化生成Nanite植被

目录 效果 步骤 一、下载Gaea软件和树林资产 二、使用Gaea生成贴图 三、 生成地形 四、生成草地 五、生成树林 六、生成湖泊 七、其它功能介绍 7.1 调整树林生成的面积 7.2 让植物随风飘动 7.3 玩家和植物互动 7.4 雪中树林 7.5 环境音效 效果 步骤 一、下载Ga…...

【软件工程】漫谈增量过程模型:软件开发的逐步之道

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a; 软件工程 ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 前言&#xff1a; 正文 增量过程模型&#xff08;Incremental Process Model&#xff09; 主要特点和阶段&#xff1a; 优点&#xff1…...

Android Camera

1. 相关的API Android有三套关于摄像头的API(库)&#xff0c;分别是Camera、Camera2和CameraX&#xff0c;其中Camera已废弃&#xff0c;在Android5.0以后推荐使用Camera2和CameraX&#xff0c;Camera2推出是用来替换Camera的&#xff0c;它拥有丰富的API可以为复杂的用例提供…...

Python开发雷点总结

数值运算&#xff08;加减乘除&#xff09; 1. invalid value赋值 当变量本身具有数值属性&#xff08;后续会参加数值运算&#xff09;&#xff0c;对invalid value设置应该为np.nan&#xff0c; 而非None&#xff1b;反之&#xff0c;容易抛出以下错误&#xff1a; TypeEr…...

Linux中磁盘管理与文件系统

目录 一.磁盘基础&#xff1a; 1.磁盘的结构&#xff1a; 2.硬盘的数据结构&#xff1a; 3.硬盘存储容量 &#xff1a; 4.硬盘接口类型&#xff1a; 二.MBR与磁盘分区&#xff1a; 1.MBR的概念&#xff1a; 2.硬盘的分区&#xff1a; 为什么分区&#xff1a; 2.表示&am…...

Vue2+element-ui 实现select选择器结合Tree树形控件实现下拉树效果

效果&#xff1a; DOM部分 &#xff1a; // 设置el-option隐藏的下拉选项&#xff0c;选项显示的是汉字label&#xff0c;值是value // 如果不设置一个下拉选项&#xff0c;下面的树形组件将无法正常使用 <el-form-item label"报警区域" prop"monitorId"…...

LINUX 解决系统卡死:扩大内存交换分区

最近电脑总是卡住&#xff0c;让我很是苦恼。运行程序时发现可能是内存占满之后导致界面卡住。下面是在我16G内存的电脑上折腾的过程与结果&#xff1a; 查看当前的交换内存大小free -m&#xff08;单位&#xff1a;-m选项表示以兆字节&#xff08;MB&#xff09;为单位显示内…...

Vue项目Nginx代理F5刷新出现404问题解决

一.背景 项目用户反馈&#xff0c;F5刷新后&#xff0c;浏览器出现404。最近公司加强网络管理&#xff0c;我记得之前可以刷新&#xff0c;有点怀疑是跟加强网络管理有关。具体原因没有时间去深度跟踪&#xff0c;先百度找到了解决方法&#xff0c;记录一下。 二.解决办法 主…...

关于MybatisPlus自动转化驼峰命名规则配置mapUnderscoreToCamelCase的个人测试和总结

关于MybatisPlus自动转化驼峰命名规则配置mapUnderscoreToCamelCase的个人测试和总结 测试一&#xff1a;没有添加 自动转化的配置&#xff0c;且domain中的属性名称和数据库的字段名称一致测试二&#xff1a;没有添加自动转化配置i&#xff0c;domain属性名userPassword和数据…...

css中的BFC

定义 BFC(Block formatting context)直译为"块级格式化上下文"。它是一个独立的渲染区域&#xff0c;只有Block-level box参与&#xff0c; 它规定了内部的Block-level Box如何布局&#xff0c;并且与这个区域外部毫不相干。 涉及概念 box Box 是 CSS 布局的对象…...

音视频类App广告变现如何破局,最大化广告变现收益,让应用增收?

音视频App已然成为了我们日常获取、发布和交换信息的重要方式&#xff0c;在音视频行业不断的拓展中&#xff0c;用户的渗透率提升。 据数据显示&#xff0c;我国网络视听用户的规模已达9亿人次&#xff0c;网民使用率也突破了90%。庞大的市场规模和用户需求吸引了大批开发者和…...

基于llama-index对embedding模型进行微调

QA对话目前是大语言模型的一大应用场景&#xff0c;在QA对话中&#xff0c;由于大语言模型信息的滞后性以及不包含业务知识的特点&#xff0c;我们经常需要外挂知识库来协助大模型解决一些问题。在外挂知识库的过程中&#xff0c;embedding模型的召回效果直接影响到大模型的回答…...

如何本地搭建FastDFS文件服务器并实现远程访问【内网穿透】

文章目录 前言1. 本地搭建FastDFS文件系统1.1 环境安装1.2 安装libfastcommon1.3 安装FastDFS1.4 配置Tracker1.5 配置Storage1.6 测试上传下载1.7 与Nginx整合1.8 安装Nginx1.9 配置Nginx 2. 局域网测试访问FastDFS3. 安装cpolar内网穿透4. 配置公网访问地址5. 固定公网地址5.…...

spring基于Xml管理bean---Ioc依赖注入:对象类型属性赋值(2)----内部bean的引入(bean和bean之间的引入)、(3)级联方式注入

bean创建对象类型赋值方式 第一&#xff1a;外部bean的引入 第二&#xff1a;内部bean的引入 第三&#xff1a;级联属性赋值 文章目录 bean创建对象类型赋值方式对象类型内部bean赋值代码分析总结 对象类型属性级联方式的赋值扩展知识 对象类型内部bean赋值 代码分析 <b…...

Python电能质量扰动信号分类(二)基于CNN模型的一维信号分类

目录 前言 1 电能质量数据集制作与加载 1.1 导入数据 1.2 制作数据集 2 CNN-2D分类模型和训练、评估 2.1 定义CNN-2d分类模型 2.2 定义模型参数 2.3 模型结构 2.4 模型训练 2.5 模型评估 3 CNN-1D分类模型和训练、评估 3.1 定义CNN-1d分类模型 3.2 定义模型参数 …...

如何解决报错:Another app is currently holding yum lock?

在运行yum 相关命令的时候&#xff0c;不知道怎么回事无法进行下载安装&#xff0c;报出 Another app is currently holding the yum lock; waiting for it to exit... 的错误提示。 Another app is currently holding the yum lock. 意思是另外一个应用正在锁住进程锁。 …...

electron使用electron-builder进行MacOS的 打包、签名、公证、上架、自动更新

一、前言 由于electron在macOS下的坑太多&#xff0c;本文不可能把所有的问题都列出来&#xff0c;也不可能把所有的解决方案贴出来&#xff1b;本文也不太会讲解每一个配置点为什么要这么设置的原因&#xff0c;因为有些点我也说不清&#xff0c;我尽可能会说明的。所以&…...

RAD Studio 12 安装激活说明及常见问题

目录 RAD Studio 安装说明 RAD Studio 最新的修补程序更新 RAD Studio 产品相关信息 Embarcadero 产品在线注册步骤 单机版授权产品注册注意事项 Embarcadero 产品离线注册步骤 Embarcadero 产品安装次数查询 Embarcadero 序号注册次数限制 EDN账号 - 查询授权序号、下…...

虚拟交易网站开发/长沙网站seo公司

vi里怎样跳转到某一指定行 输入 &#xff1a;行号 :$跳到最后一行 gg跳到第一行。转载于:https://www.cnblogs.com/liuweilinlin/p/3173132.html...

廊坊网站建设公司哪个好/阿里指数查询

原题目 题目一&#xff1a;pow&#xff08;x&#xff0c;n&#xff09;&#xff1a; 实现 pow(x, n) &#xff0c;即计算 x 的 n 次幂函数。 示例 1: 输入: 2.00000, 10 输出: 1024.00000 示例 2: 输入: 2.10000, 3 输出: 9.26100 示例 3: 输入: 2.00000, -2 输出: 0.25000 解…...

网站全景看图怎么做/做网站优化的公司

201806/refactor.pngYugong 是一个成熟工具&#xff0c; 在阿里巴巴去 IOE 行动中起了重要作用&#xff0c; 它与 Otter / Canal 都是阿里中间件团队出品。 它们三者各有分工&#xff1a; Yugong 设计目标是异构数据库迁移&#xff1b; Canal 设计用来解决 MySQL binlog 订阅和…...

广州白云做网站的公司/销售管理怎么带团队

目录1. 画布(canvas)1.1 设置画布大小2. 画笔2.1 画笔的状态2.2 画笔的属性2.3 绘图命令3. 命令详解4. 绘图举例4.1 太阳花4.2 绘制小蟒蛇4.3 绘制五角星python2.6版本中后引入的一个简单的绘图工具&#xff0c;叫做海龟绘图(Turtle Graphics),turtle库是python的内部库,使用导…...

网站的建设流程图/最新新闻热点事件2023

一.什么是Prometheus&#xff1f; 一个开源的 Docker 容器监控工具&#xff0c;基于时间序列。 Prometheus 起初是 SoundCloud 创建的一个开源系统监控报警工具。自其 2012 年开创以来&#xff0c;众多公司、组织都采用了 Prometheus&#xff0c;该项目也有一个非常活跃的开发…...

网站建设客户说没用/广告网

1.“我可以向你问路吗?” “到那里?” “到你心里。” 2.“我可以向你借一块钱吗?” “为什么?” “我想打电话告诉我妈&#xff0c;我刚遇到我的梦中情人。”或“我要打电话给你妈妈谢谢她。” 3.“你爸爸是小偷吗?” “不是。” “那他怎么能把灿烂的星星偷来放在你双眸…...