当前位置: 首页 > news >正文

Elasticsearch 集成---Spark Streaming 框架集成

一.Spark Streaming 框架介绍

Spark Streaming Spark core API 的扩展,支持实时数据流的处理,并且具有可扩展,
高吞吐量,容错的特点。
数据可以从许多来源获取,如 Kafka Flume Kinesis TCP sockets
并且可以使用复杂的算法进行处理,这些算法使用诸如 map reduce join window 等高
级函数表示。 最后,处理后的数据可以推送到文件系统,数据库等。 实际上,您可以将
Spark 的机器学习和图形处理算法应用于数据流。

二.框架集成

1. 创建 Maven 项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu.es</groupId><artifactId>es-sparkstreaming</artifactId><version>1.0</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.8.0</version></dependency><!-- elasticsearch的客户端 --><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.8.0</version></dependency><!-- elasticsearch依赖2.x的log4j --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><!--        <dependency>--><!--            <groupId>com.fasterxml.jackson.core</groupId>--><!--            <artifactId>jackson-databind</artifactId>--><!--            <version>2.11.1</version>--><!--        </dependency>--><!--        &lt;!&ndash; junit单元测试 &ndash;&gt;--><!--        <dependency>--><!--            <groupId>junit</groupId>--><!--            <artifactId>junit</artifactId>--><!--            <version>4.12</version>--><!--        </dependency>--></dependencies>
</project>

2.功能实现

package com.atguigu.esimport org.apache.http.HttpHost
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.action.index.{IndexRequest, IndexResponse}
import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentTypeobject SparkStreamingESTest {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")val ssc = new StreamingContext(sparkConf, Seconds(3))val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)ds.foreachRDD(rdd => {rdd.foreach(data => {val client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost",9200, "http")))val ss = data.split(" ")val request = new IndexRequest()request.index("product").id(ss(0))val json =s"""| {  "data" : "${ss(1)}" }|""".stripMarginrequest.source(json, XContentType.JSON)val response: IndexResponse = client.index(request, RequestOptions.DEFAULT)println(response.getResult)client.close()})})ssc.start()ssc.awaitTermination()}
}

3.界面截图

三.安装NetCat

1.下载网址:netcat 1.11 for Win32/Win64

2.解压压缩包

右键zip文件-->解压到当前文件夹

3.配置环境变量

右键此电脑-->属性-->高级系统设置-->环境变量

四.测试

Window + R  重新启动cmd命令窗口

4.1测试:输入 nc -l -p 9999

4.2 启动测试

4.3 cmd输入 1001 jianzi

 4.4 postman 查看

get    http://127.0.0.1:9200/product/_doc/1001

相关文章:

Elasticsearch 集成---Spark Streaming 框架集成

一.Spark Streaming 框架介绍 Spark Streaming 是 Spark core API 的扩展&#xff0c;支持实时数据流的处理&#xff0c;并且具有可扩展&#xff0c; 高吞吐量&#xff0c;容错的特点。 数据可以从许多来源获取&#xff0c;如 Kafka &#xff0c; Flume &#xff0c; Kin…...

Kotlin 中的 协程 基础篇

一、什么叫协程 协程可以称为轻量级线程&#xff0c;线程代码块&#xff1b; 二、GlobalScope 协程 CoroutineScope (协程作用域) 的上下文中通过 launch、async 等构造器来启动。GlobalScope ,即全局作用域内启动了一个新的协程&#xff0c;这意味这该协程的生命周期只受整…...

SQL事务

事务的概念&#xff1a; 事务是在数据库上按照一定的逻辑顺序执行的任务序列&#xff0c;既可以由用户手动执行&#xff0c;也可以由某种数据库程序自动执行。事务就是一些SQL语句组&#xff08;每条单独的SQL语句也算一个事务&#xff09;&#xff0c;其中事务中的SQL…...

关于flutter中 initState() 与 setState() 用法

initState()函数是在组件渲染之前执行的。在Flutter中&#xff0c;initState()是StatefulWidget的生命周期方法之一&#xff0c;在调用build()方法之前被调用。当创建一个StatefulWidget并将其添加到组件树中时&#xff0c;Flutter会实例化该组件的状态对象&#xff0c;并在调用…...

智能电话机器人是如何自主学习的

电话机器人主要通过语音识别和针对语意的理解识别客户所说的内容&#xff0c;针对性的回答问题&#xff0c;为企业高效筛选意向客户。除了电话机器人语音识别之外&#xff0c;电话机器人能够自主学习&#xff0c;不断完善产品知识及话术等&#xff0c;是它智能的另一种体现。那…...

【Rust】Rust学习 第十八章模式用来匹配值的结构

模式是 Rust 中特殊的语法&#xff0c;它用来匹配类型中的结构&#xff0c;无论类型是简单还是复杂。结合使用模式和 match 表达式以及其他结构可以提供更多对程序控制流的支配权。模式由如下一些内容组合而成&#xff1a; 字面值解构的数组、枚举、结构体或者元组变量通配符占…...

我的学习笔记:数据处理

数据清洗 对数据进行处理和加工&#xff0c;以使其适合分析和建模。数据清洗包括去除重复数据、填补缺失值、处理异常值和转换数据格式等操作&#xff0c;以提高数据的可靠性和准确性&#xff0c;避免数据分析时出现偏差&#xff0c;提高决策的准确性。 数据去重&#xff1a;通…...

GB28181国标平台测试软件NTV-GBC(包含服务器和模拟客户端)

GB28181国标平台测试软件NTV-GBC用于对GB28181国标平台进行测试(测试用例需要服务器软件&#xff0c;服务器软件可以是任何标准的国标平台&#xff0c;我们测试使用的是NTV-GBS&#xff09;&#xff0c;软件实现了设备注册、注销、目录查询&#xff0c;消息订阅、INVITE&#x…...

云原生:重塑企业的技术疆界

云原生技术正在重新塑造我们对软件开发、部署和运维的理解。这些技术带来了灵活性、可扩展性以及在复杂环境中保证稳定性的可能性&#xff0c;这些都是企业在云原生场景中比较关注的问题。本文将主要聚焦于云原生场景&#xff0c;探讨其影响和作用。 云原生的定义 云原生计算基…...

华为星闪,一项将 “ 更稳 WiFi ” 和 “ 更好蓝牙 ” 融合起来的通信标准

兼顾多用途和专业化的 AI 大模型、移除安卓代码的 HarmonyOS NEXT 、给折叠屏应用提供适配方向的《 折叠屏/平板应用体验评估标准 》。。。 不过除了这些比较贴近我们普通用户&#xff0c;容易讲清楚的东西&#xff0c;华为还官宣了一个大家可能没注意的黑科技&#xff1a; 星…...

IDEA创建Mybatis格式XML文件

设置位置&#xff1a;File | Settings | Editor | File and Code Templates 选择Files&#xff0c;点击号 Name中输入xml模板名&#xff08;名称自行决定&#xff09;&#xff0c;后缀名extension输入xml&#xff08;固定&#xff09; 内容处输入Mybatis的xml文件模板内容&…...

二叉树中的最大路径和-递归

路径 被定义为一条从树中任意节点出发&#xff0c;沿父节点-子节点连接&#xff0c;达到任意节点的序列。同一个节点在一条路径序列中 至多出现一次 。该路径 至少包含一个 节点&#xff0c;且不一定经过根节点。 路径和 是路径中各节点值的总和。 给你一个二叉树的根节点 root…...

Python if-else 速记

文章目录 在 Python 中使用三元运算符作为 if-else 速记总结 编程中经常使用速记符号来简化我们的工作。 速记符号是一种可以更简洁、更省时省力地完成工作的方法。 本文将讨论 Python 中使用的速记符号作为 if-else 语句的快捷方式。 在 Python 中使用三元运算符作为 if-else…...

Python使用内置的json模块来处理JSON数据

目录 1、解释说明&#xff1a; 2、使用示例&#xff1a; 3、注意事项&#xff1a; 1、解释说明&#xff1a; 在Python中&#xff0c;我们可以使用内置的json模块来处理JSON数据。这个模块提供了四个主要的函数&#xff1a;dumps、loads、dump、load。 - dumps&#xff1a;将…...

亿赛通电子文档安全管理系统 RCE漏洞

亿赛通电子文档安全管理系统 RCE漏洞 一、 产品简介二、 漏洞概述三、 复现环境四、 漏洞复现小龙POC检测: 五、 修复建议 免责声明&#xff1a;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失…...

信息安全面试题合集

0x00 前言 本篇会记录一些可能会遇到的面试题&#xff0c;持续更新 0x01 Web SQL注入 sql注入常见的闭合方式有哪些&#xff1f;Mysql5.0上下sql注入有什么区别&#xff1f;SQL注入空格被过滤&#xff0c;有什么绕过方式&#xff1f;过滤了逗号&#xff0c;有什么绕过方式&…...

vue 简单实验 自定义组件 传参数 props

1.代码 <script src"https://unpkg.com/vuenext" rel"external nofollow" ></script> <div id"todo-list-app"><todo-item v-bind:todo"todo1"></todo-item> </div> <script> const ListR…...

目标检测笔记(十一):如何结合特定区域进行目标检测(基于OpenCV的人脸检测实例)

文章目录 背景代码结果 背景 由于我们在做项目的时候可能会涉及到某个指定区域进行目标检测或者人脸识别等任务&#xff0c;所以这篇博客是为了探究如何在传统目标检测的基础上来结合特定区域进行检测&#xff0c;以OpenCV自带的包为例。 一般来说有两种方式实现区域指定&…...

PID直观感受简述

0、仿真控制框图 1、增加p的作用&#xff08;增加响应&#xff09;P 2、增加I的作用&#xff08;消除稳差&#xff09;PI 3、增加D的作用&#xff08;抑制波动&#xff09;PID 加入对噪声很敏 4、综合比对...

Tomcat运行后localhost:8080访问自己编写的网页

主要是注意项目结构&#xff0c;home.html放在src/resources/templates下的home.html下&#xff0c;application.properties可以不做任何配置。还有就是关于web包的位置&#xff0c;作者一开始将web包与tabtab包平行&#xff0c;访问8080出现了此类报错&#xff1a; Whitelabel…...

传感网应用开发1+X实训室建方案

一、概述 1.1建设背景 从院校实际教学情况与人才培养计划为出发点&#xff0c;贯彻传感网应用开发1X实训室职业技能等级标准&#xff0c;充分考虑传感网应用开发1X实训室从业人员的职业发展路径与成长路径&#xff0c;以职业素养、职业技能、知识水平为主要框架结构&#xff…...

PDF校对:让您的文件无瑕疵

无论您是企业家、学生、教育者还是作家&#xff0c;我们都知道&#xff0c;提交或发布一个充满错误的PDF文件可能会给您的声誉或品牌带来严重损害。这就是为什么PDF校对如此关键的原因。现在&#xff0c;让我们深入了解PDF校对的重要性&#xff0c;以及如何确保您的文件尽可能完…...

SpringBoot--解决空字符串转枚举异常

原文网址&#xff1a;SpringBoot--解决空字符串转枚举异常_IT利刃出鞘的博客-CSDN博客 简介 本文介绍如何解决Java的SpringBoot中空字符串转枚举时报错的问题。 问题复现 org.springframework.http.converter.HttpMessageNotReadableException: JSON parse error: Cannot d…...

Redis的常用数据类型详解

Redis是一个开源的、基于内存的数据结构存储系统&#xff0c;它可以用作数据库、缓存和消息代理。Redis支持多种数据类型&#xff0c;包括字符串、列表、集合、有序集合、散列等。理解这些数据类型的特性和使用方式&#xff0c;对于充分利用Redis的能力至关重要。以下是对Redis…...

jpa里IdentityGenerator和IncrementGenerator的区别

IdentityGenerator 和 IncrementGenerator 的区别 IdentityGenerator 和 IncrementGenerator 都是 JPA 中可用的主键生成策略&#xff08;GenerationType&#xff09;之一。它们的区别如下&#xff1a; IdentityGenerator: IDENTITY 主键生成策略利用数据库自动生成的主键。在…...

基于element UI 实现 table 列 拖拽

问题描述 在开发中遇到一个需求&#xff0c;即实现table列的拖拽&#xff0c;但是调研发现&#xff0c;大部分是基于sorttable.js这个包实现的&#xff0c;但是通过实际应用&#xff0c;发现sorttable.js用在操作element table 组件中并不是很舒服&#xff0c;总会莫名其妙的冒…...

(GPT、GEE)遥感云大数据、洪涝灾害监测、红树林遥感制图、河道轮廓监测、洪涝灾害监测、GRACE重力卫星、源遥感影像

近年来遥感技术得到了突飞猛进的发展&#xff0c;航天、航空、临近空间等多遥感平台不断增加&#xff0c;数据的空间、时间、光谱分辨率不断提高&#xff0c;数据量猛增&#xff0c;遥感数据已经越来越具有大数据特征。遥感大数据的出现为相关研究提供了前所未有的机遇&#xf…...

vue中实现将页面或者div内容导出为pdf格式

将Vue单页面转成pdf并下载 步骤1&#xff1a;下载对应的库 npm install html2canvas;npm install jspdf --save 步骤2&#xff1a;创建一个htmlToPdf.js的js文件, 然后在main.js中全局引用一下&#xff0c;编写如下代码&#xff1a; // htmlToPdf.js // 导出页面为PDF格式 …...

Ubuntu 配置国内源

配置国内源 因为众所周知的原因&#xff0c;国外的很多网站在国内是访问不了或者访问极慢的&#xff0c;这其中就包括了Ubuntu的官方源。 所以&#xff0c;想要流畅的使用apt安装应用&#xff0c;就需要配置国内源的镜像。 市面上Ubuntu的国内镜像源非常多&#xff0c;比较有…...

分布式核心知识

文章目录 前言一、分布式中的远程调用1.1RESTful接口1.2RPC协议1.3区别与联系 二、分布式中的CAP原理 前言 关于分布式核心知识详解 一、分布式中的远程调用 在微服务架构中&#xff0c;通常存在多个服务之间的远程调用的需求。远程调用通常包含两个部分&#xff1a;序列化和通…...

【JMeter】常用线程组设置策略

目录 一、前言 二、单场景基准测试 1.介绍 2.线程组设计 3.测试结果 三、单场景并发测试 1.介绍 2.线程组设计 3.测试结果 四、单场景容量/爬坡测试 1.介绍 2.线程组设计 3.测试结果 五、混合场景容量/并发测试 1.介绍 六、稳定性测试 1.介绍 2.线程组设计 …...

【数据结构】回溯算法公式化解题 leetcode经典题目带刷:全排列、组合、子集

目录 回溯算法一、什么是回溯算法1、基本思想&#xff1a;2、一般步骤&#xff1a; 二、题目带练1、全排列2、组合3、子集 三、公式总结 回溯算法 一、什么是回溯算法 回溯算法&#xff08;Backtracking Algorithm&#xff09;是一种解决组合问题、排列问题、选择问题等一类问…...

WPF基础入门-Class3-WPF数据模板

WPF基础入门 Class3&#xff1a;WPF数据模板 1、先在cs文件中定义一些数据 public partial class Class_4 : Window{public Class_4(){InitializeComponent();List<Color> test new List<Color>();test.Add(new Color() { Code "Yellow", Name &qu…...

js将搜索的关键字加颜色

js将搜索的关键字加颜色 使用正则匹配关键字并加入span标签&#xff0c;页面渲染时使用v-html渲染即可 // 文本框内容 let searchCont 测试;const reg new RegExp((${searchCont.value}), g); let data 图片保存测试A; data data.replace(reg, <span style"color:…...

Docker安装Oracle数据库打开、链接速度很慢

问题&#xff1a; 使用Docker安装Oracle数据库打开、链接速度很慢&#xff0c;明显的在在转圈严重影响效率。 解决&#xff1a; 排查到DNS时&#xff0c;发现宿主机DNS配置清空后&#xff0c;通过JDBC连接目标Oracle数据库速度很快 进入容器中进行测试&#xff0c;发现清空DNS…...

学生分班查询系统的创建与使用指南

开学季&#xff0c;负责分班工作的老师们又面临一个难题&#xff1a;如何公布分班结果&#xff1f;将结果放在学校官网上可能会让很多无关人员看到&#xff0c;而不放则会导致家长们纷纷打电话来询问。那么&#xff0c;有没有一种方法可以让家长们自行查看分班结果呢&#xff1…...

全套解决方案:基于pytorch、transformers的中文NLP训练框架,支持大模型训练和文本生成,快速上手,海量训练数据!

全套解决方案&#xff1a;基于pytorch、transformers的中文NLP训练框架&#xff0c;支持大模型训练和文本生成&#xff0c;快速上手&#xff0c;海量训练数据&#xff01; 1.简介 目标&#xff1a;基于pytorch、transformers做中文领域的nlp开箱即用的训练框架&#xff0c;提…...

ffmpeg

文章目录 libavcodec实现 libavformat实现libavfilter实现 libswscale实现对比libavfilter图像处理libswscale vs libyuvlibavutil 命令行工具ffmpeg例子 ffprobe例子 FFmpeg 是一个由 C 语言编写的开源跨平台音视频处理工具集&#xff0c;它具有模块化的架构。下面是 FFmpeg 的…...

CH03_代码的坏味道(下)

循环语句&#xff08;Loops&#xff09; 从最早的编程语言开始&#xff0c;循环就一直是程序设计的核心要素。如今&#xff0c;函数作为一等公民已经得到了广泛的支持&#xff0c;因此我们可以使用以管道取代循环&#xff08;231&#xff09;管道操作&#xff08;如filter和ma…...

journal日志导致服务器磁盘满

背景 ubuntu 18.04服务器磁盘突然100% 一查/var/log/journal目录占了14G 清理 要清理 journal 日志&#xff0c;可以使用以下步骤&#xff1a; 运行以下命令来查看 journal 日志的使用情况&#xff1a; journalctl --disk-usage这将显示 journal 日志的当前使用情况&#x…...

“Go程序员面试笔试宝典”复习便签

一.逃逸分析 1.1逃逸分析是什么&#xff1f; 逃逸分析&#xff0c;主要是Go编译器用来决定变量分配在堆或者栈的手段。 区分于C/C手动管理内存分配&#xff0c;Go将这些工作交给了编译器。 1.2逃逸分析有什么作用 解放程序员。程序员不需要手动指定指针分配内存。 灵活的…...

数组的度(指数组里任一元素出现频数的最大值)

题目&#xff1a; 给定一个非空且只包含非负数的整数数组 nums&#xff0c;数组的 度 的定义是指数组里任一元素出现频数的最大值。 你的任务是在 nums 中找到与 nums 拥有相同大小的度的最短连续子数组&#xff0c;返回其长度。 示例 1&#xff1a; 输入&#xff1a;nums …...

scala array类型参数

在Scala中&#xff0c;数组&#xff08;Array&#xff09;是一种用于存储相同类型元素的数据结构。数组可以用于保存基本数据类型和自定义数据类型的元素。当定义数组类型参数时&#xff0c;您通常是在函数、类或方法签名中使用它们。以下是一些有关Scala数组类型参数的示例&am…...

构建 NodeJS 影院预订微服务并使用 docker 部署(03/4)

一、说明 构建一个微服务的电影网站&#xff0c;需要Docker、NodeJS、MongoDB&#xff0c;这样的案例您见过吗&#xff1f;如果对此有兴趣&#xff0c;您就继续往下看吧。 你好社区&#xff0c;这是&#x1f3f0;“构建 NodeJS 影院微服务”系列的第三篇文章。本系列文章演示了…...

html写一个向flask_socketio发送消息和接收消息并显示在页面上

以下是一个简单的HTML页面&#xff0c;它包含一个输入框、一个发送按钮和一个显示区域。用户可以在输入框中输入消息&#xff0c;点击发送按钮&#xff0c;然后这个消息会被发送到 Flask-SocketIO 服务器。当服务器回应消息时&#xff0c;它会在页面的显示区域显示出来。 <…...

C#使用.Net Core进行跨平台开发

使用 .NET Core 进行跨平台开发是一种灵活的方法&#xff0c;可以在多个操作系统上运行 C# 应用程序。以下是在 C# 中使用 .NET Core 进行跨平台开发的一般步骤&#xff1a; 安装 .NET Core SDK&#xff1a; 在开始之前&#xff0c;需要安装适用于操作系统的 .NET Core SDK。可…...

Java“牵手”天猫店铺所有商品API接口数据,通过店铺ID获取整店商品详情数据,天猫API申请指南

天猫商城是一个网上购物平台&#xff0c;售卖各类商品&#xff0c;包括服装、鞋类、家居用品、美妆产品、电子产品等。天猫商品详情可以帮助消费者更好的了解宝贝信息&#xff0c;从而做出购买决策。同时&#xff0c;消费者也可以通过商品详情了解其他买家对宝贝的评价&#xf…...

php输入post过滤函数,入库出库,显示

第一部分 php输入post过滤函数 function GLOBAL_POST($str) {$str_origin$str; if (empty($str)) return false;$str str_replace( /, "", $str);//替换关键词 $str str_replace("\\", "", $str); $str str_replace("&gt", &…...

matlab-对数据集加噪声并实现tsne可视化

matlab-对数据集加噪声并实现tsne可视化 最近才知道&#xff0c;原来可以不用模型&#xff0c;也能实现对数据集数据的可视化。 **一、**以COIL-100数据集为例子。 问题&#xff1a; 前提&#xff1a;首先对COIL-100数据集根据角度0-175和180-255&#xff0c;分别划分成C1,C…...

【BASH】回顾与知识点梳理(三十八)

【BASH】回顾与知识点梳理 三十八 三十八. 源码概念及简单编译38.1 开放源码的软件安装与升级简介什么是开放源码、编译程序与可执行文件什么是函式库什么是 make 与 configure什么是 Tarball 的软件如何安装与升级软件 38.2 使用传统程序语言进行编译的简单范例单一程序&#…...