flink如何写入es
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 前言
- 一、写入到Elasticsearch5
- 二、写入到Elasticsearch7
- 总结
前言
Flink sink 流数据写入到es5和es7的简单示例。
一、写入到Elasticsearch5
- pom maven依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch5_2.11</artifactId><version>${flink.version}</version></dependency>
- 代码如下(示例):
public class Es5SinkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Row row=Row.of("张三","001",getTimestamp("2016-10-24 21:59:06"));Row row2=Row.of("张三","002",getTimestamp("2016-10-24 21:50:06"));Row row3=Row.of("张三","002",getTimestamp("2016-10-24 21:51:06"));Row row4=Row.of("李四","003",getTimestamp("2016-10-24 21:50:56"));Row row5=Row.of("李四","004",getTimestamp("2016-10-24 00:48:36"));Row row6=Row.of("王五","005",getTimestamp("2016-10-24 00:48:36"));DataStreamSource<Row> source =env.fromElements(row,row2,row3,row4,row5,row6);Map<String, String> config = new HashMap<>();
// config.put("cluster.name", "my-cluster-name");
// config.put("bulk.flush.max.actions", "1");List<InetSocketAddress> transportAddresses = new ArrayList<>();transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.68.8.60"), 9300));//Sink操作DataStreamSink<Row> rowDataStreamSink = source.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<Row>() {public IndexRequest createIndexRequest(Row element) {Map<String, Object> json = new HashMap<>();json.put("name22", element.getField(0).toString());json.put("no22", element.getField(1));json.put("age", 34);json.put("create_time", element.getField(2));return Requests.indexRequest().index("cc").type("mtype").id(element.getField(1).toString()).source(json);}@Overridepublic void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {//利用requestIndexer进行发送请求,写入数据indexer.add(createIndexRequest(element));}}));env.execute("es demo");}private static java.sql.Timestamp getTimestamp(String str) throws Exception {
// String string = "2016-10-24 21:59:06";SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");java.util.Date date=sdf.parse(str);java.sql.Timestamp s = new java.sql.Timestamp(date.getTime());return s;}
二、写入到Elasticsearch7
- pom maven依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
- 代码如下(示例):
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.types.Row;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class EsSinkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Row row=Row.of("张三","001",getTimestamp("2016-10-24 21:59:06"));Row row2=Row.of("张三","002",getTimestamp("2016-10-24 21:50:06"));Row row3=Row.of("张三","002",getTimestamp("2016-10-24 21:51:06"));Row row4=Row.of("李四","003",getTimestamp("2016-10-24 21:50:56"));Row row5=Row.of("李四","004",getTimestamp("2016-10-24 00:48:36"));Row row6=Row.of("王五","005",getTimestamp("2016-10-24 00:48:36"));DataStreamSource<Row> source =env.fromElements(row,row2,row3,row4,row5,row6);Map<String, String> config = new HashMap<>();
// config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
// config.put("bulk.flush.max.actions", "1");List<HttpHost> hosts = new ArrayList<>();hosts.add(new HttpHost("10.68.8.69",9200,"http"));ElasticsearchSink.Builder<Row> esSinkBuilder = new ElasticsearchSink.Builder<Row>(hosts,new ElasticsearchSinkFunction<Row>() {public IndexRequest createIndexRequest(Row element) {Map<String, Object> json = new HashMap<>();json.put("name22", element.getField(0).toString());json.put("no22", element.getField(1));json.put("age", 34);
// json.put("create_time", element.getField(2));return Requests.indexRequest().index("cc").id(element.getField(1).toString()).source(json);}@Overridepublic void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {//利用requestIndexer进行发送请求,写入数据indexer.add(createIndexRequest(element));}});esSinkBuilder.setBulkFlushMaxActions(100);//Sink操作source.addSink(esSinkBuilder.build());env.execute("es demo");}private static java.sql.Timestamp getTimestamp(String str) throws Exception {
// String string = "2016-10-24 21:59:06";SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");java.util.Date date=sdf.parse(str);java.sql.Timestamp s = new java.sql.Timestamp(date.getTime());return s;}
}
总结
flink写入es5和es7 的区别是引入不同的flink-connector-elasticsearch,es7已没有type的概念故无需再设置type。
相关文章:
flink如何写入es
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、写入到Elasticsearch5二、写入到Elasticsearch7总结 前言 Flink sink 流数据写入到es5和es7的简单示例。 一、写入到Elasticsearch5 pom maven依赖 <d…...
Java、Python、C++和C#的界面开发框架和工具的重新介绍
好的,以下是Java、Python、C和C#的界面开发框架和工具的重新介绍: Java界面开发: Swing: 是Java提供的一个基于组件的GUI工具包,可以创建跨平台的图形用户界面。它提供了丰富的组件和布局管理器,使得界面开发相对简单。…...
Java二叉树的遍历以及最大深度问题
Java学习面试指南:https://javaxiaobear.cn 1、树的相关概念 1、树的基本定义 树是我们计算机中非常重要的一种数据结构,同时使用树这种数据结构,可以描述现实生活中的很多事物,例如家谱、单位的组织架构、等等。 树是由n&#…...
Apollo 9.0搭建问题记录
虚拟机安装 可以看这个:https://blog.csdn.net/qq_45138078/article/details/129815408 写的很详细 内存 为了学习 Apollo ,所以只是使用了虚拟机,内存得大一点(128G),第一次,就是因为分配内…...
【心得】PHP文件包含高级利用攻击面个人笔记
目录 一、nginx日志文件包含 二、临时文件包含 三、php的session文件包含 四、pear文件包含 五 、远程文件包含 文件包含 include "/var/www/html/flag.php"; 一 文件名可控 $file$_GET[file]; include $file.".php"; //用php伪协议 ࿰…...
[scala] 列表常见用法
文章目录 不可变列表 List可变列表 ListBuffer 不可变列表 List 在 Scala 中,列表是一种不可变的数据结构,用于存储一系列元素。列表使用 List 类来表示,它提供了许多方法来操作和处理列表。 下面是一些常见的使用列表的示例: 创…...
python 使用urllib3发起post请求,携带json参数
当通过python脚本,发起http post请求,网络上大多是通过fields传递数据,然而这样,服务器收到的请求,但无法解析json数据。类似这些链接: Python urllib3库使用指南 软件测试|Python urllib3库使用指南 p…...
深入理解堆(Heap):一个强大的数据结构
. 个人主页:晓风飞 专栏:数据结构|Linux|C语言 路漫漫其修远兮,吾将上下而求索 文章目录 前言堆的实现基本操作结构体定义初始化堆(HeapInit)销毁堆(HeapDestroy) 重要函数交换函数(…...
抖音在线查权重系统源码,附带查询接口
抖音权重在线查询只需输入抖音主页链接,即可查询作品情况。 搭建教程 上传源码并解压 修改数据库“bygoukai.sql” 修改“config.php” 如需修改水印请修改第40行 如需修改限制次数,请修改第156行 访问域名user.php即可查看访问用户,停…...
Spring Framework和SpringBoot的区别
目录 一、前言 二、什么是Spring 三、什么是Spring Framework 四、什么是SpringBoot 五、使用Spring Framework构建工程 六、使用SpringBoot构建工程 七、总结 一、前言 作为Java程序员,我们都听说过Spring,也都使用过Spring的相关产品࿰…...
2024--Django平台开发-Django知识点(三)
day03 django知识点 项目相关路由相关 urls.py视图相关 views.py模版相关 templates资源相关 static/media 1.项目相关 新项目 开发时,可能遇到使用其他的版本。虚拟环境 老项目 打开项目虚拟环境 1.1 关于新项目 1.系统解释器命令行【学习】 C:/python38- p…...
Github 2024-01-08开源项目周报 Top14
根据Github Trendings的统计,本周(2024-01-08统计)共有14个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Python项目5TypeScript项目3C项目2Dart项目1QML项目1Go项目1Shell项目1Rust项目1JavaScript项目1C#项目1 免费…...
vue3 的内置组件汇总
官方给出的说明: Fragment: Vue 3 组件不再要求有一个唯一的根节点,清除了很多无用的占位 div。Teleport: 允许组件渲染在别的元素内,主要开发弹窗组件的时候特别有用。Suspense: 异步组件,更方便开发有异步请求的组件。 一、fr…...
ARM工控机Node-red使用教程
嵌入式ARM工控机Node-red安装教程 从前车马很慢书信很远,而现在人们不停探索“科技改变生活”。 智能终端的出现改变了我们的生活方式,钡铼技术嵌入式工控机协助您灵活布建能源管理、大楼自动化、工业自动化、电动车充电站等各种多元性IoT应用ÿ…...
Visual Studio 发布程序自动更新 ClickOnce和AutoUpdater测试
文章目录 前言运行环境ClickOnce(Visual Studio 程序发布)IIS新建文件夹C# 控制台测试安装测试更新测试卸载 AutoUpdaterDotNET实现原理简单使用新建一个WPF项目 代码封装自动更新代码封装简单使用 总结 前言 虽然写的大部分都是不联网项目,…...
Codeforces Round 761 (Div. 2) E. Christmas Chocolates(思维题 树的直径 二进制性质 lca)
题目 n(n<2e5)个值,第i个值ai(0<ai<1e9),所有ai两两不同 初始时,选择两个位置x,y(x≠y),代表需要对这两个位置进行操作,要把其中一个值变成另一个 你可以执行若干次操作,每一次,你可…...
知识图谱之汽车实战案例综述与前瞻分析
知识图谱的前置介绍 什么是知识图谱 知识图谱本质(Knowledge Graph)上是一种叫做语义网络(semantic network ) 的知识库,即具有有向图结构的一个知识库;图的结点代表实体(entity)或者概念(con…...
网关Gateway
什么是网关? 网关实质上是一个网络通向其他网络的 IP 地址,是当前微服务项目的"统一入口"。 网关能做什么? 反向代理 、鉴权、 流量控制、 熔断、 日志监控等 图片原文:http://t.csdnimg.cn/SvUJh 核心概念 Router(…...
java 生成一个当前时间的时间搓
开发过程中 用时间搓数值格式存储 会更加精准 那么 我们在一些日常增删查改中就可以用时间搓来记录操作时间 就一行代码 long timestamp System.currentTimeMillis();他就能生成当前时间的时间搓 运行结果如下 然后 我们可以在 http://shijianchuo.wiicha.com/ 上进行转换查…...
金融中IC和IR的定义
当谈到金融领域时,IC(Information Coefficient)和IR(Information Ratio)通常是用来评估投资组合管理绩效的指标。它们都涉及到投资者对信息的利用和管理的效果。 信息系数(IC - Information Coefficient&a…...
Vue记事本应用实现教程
文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展:显示创建时间8. 功能扩展:记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...
synchronized 学习
学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...
Flask RESTful 示例
目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...
AI Agent与Agentic AI:原理、应用、挑战与未来展望
文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例:使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例:使用OpenAI GPT-3进…...
vue3 字体颜色设置的多种方式
在Vue 3中设置字体颜色可以通过多种方式实现,这取决于你是想在组件内部直接设置,还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法: 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...
MODBUS TCP转CANopen 技术赋能高效协同作业
在现代工业自动化领域,MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步,这两种通讯协议也正在被逐步融合,形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...
三体问题详解
从物理学角度,三体问题之所以不稳定,是因为三个天体在万有引力作用下相互作用,形成一个非线性耦合系统。我们可以从牛顿经典力学出发,列出具体的运动方程,并说明为何这个系统本质上是混沌的,无法得到一般解…...
爬虫基础学习day2
# 爬虫设计领域 工商:企查查、天眼查短视频:抖音、快手、西瓜 ---> 飞瓜电商:京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空:抓取所有航空公司价格 ---> 去哪儿自媒体:采集自媒体数据进…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...
Linux --进程控制
本文从以下五个方面来初步认识进程控制: 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程,创建出来的进程就是子进程,原来的进程为父进程。…...
