当前位置: 首页 > news >正文

Python 消费Kafka手动提交 批量存入Elasticsearch

一、第三方包选择

pip install kafka,对比了kafka和pykafka,还是选择kafka,消费速度更快
pip install elasticsearch==7.12.0(ES版本)

二、创建es连接对象

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulkclass Create_ES(object):_instance = Nonedef __new__(cls, *args, **kwargs):if cls._instance is None:cls._instance = super().__new__(cls)return cls._instancedef __init__(self, hosts):try:self.es = Elasticsearch([{'host':host, 'port':9200}])except Exception as e:print('Connect ES Fail db:{} error:{}'.format(hosts, str(e)))def get_conn(self):return self.esdef set_multi_data(self, datas):'''批量插入数据'''success = bulk(self.es, datas, raise_on_error=True)return success

三、消费kafka数据

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
from . import Create_ESclass AppKfkConsumer(object):def __init__(self):self.server = 'localhost:9092'self.topic = KAFKA_TOPICself.consumer = Noneself.tp = Noneself.consumer_timeout_ms = 5000  # 设置消费超时时间,self.type = 'members'self.group_id = 'test1'  # 设置消费group_id,避免重复消费self.es_index = 'index'  # es的indexdef get_connect(self):self.consumer = KafkaConsumer(group_id=self.group_id,auto_offset_reset='earliest',  # 从最早的数据开始消费bootstrap_servers=self.server,enable_auto_commit=False,  # 关闭自动提交consumer_timeout_ms=self.consumer_timeout_ms)self.tp = TopicPartition(topic=self.topic, partition=0)  # 设置我们要消费的分区self.consumer.assign([self.tp])  # 由consumer对象分配分区def beginConsumer(self):now_offset = 0  # 当前偏移量es_conn = Create_ES()Actions = []while True:for message in self.consumer:now_offset = message.offset  # 获取当前偏移量data = eval(message.value.decode())  # 解析数据action = {"_index": self.es_index,"_type": self.type,"_source": data}Actions.append(action)if len(Actions) >= 50000:result = es_conn.set_multi_data(Actions)  # 批量插入数据Actions = []# 提交偏移量,now_offset+1的原因是因为我发现如果不加1,下次消费会从上次消费最后一条数据开始,重复消费self.consumer.commit(offsets={tp:(OffsetAndMetadata(now_offset+1, None))})if len(Actions) > 0:result = es_conn.set_multi_data(Actions)Actions = []self.consumer.commit(offsets={tp:(OffsetAndMetadata(now_offset+1, None))})def delconnect(self):self.consumer.close()# 执行任务
ks = AppKfkConsumer()
ks.get_connect()
ks.beginConsumer()

相关文章:

Python 消费Kafka手动提交 批量存入Elasticsearch

一、第三方包选择 pip install kafka,对比了kafka和pykafka,还是选择kafka,消费速度更快pip install elasticsearch7.12.0(ES版本) 二、创建es连接对象 from elasticsearch import Elasticsearch from elasticsearch.helpers import bulkc…...

oracle 基础知识表的主键

一、表的约束条件 •约束条件是施加在表的字段上的一组限制条件,它使得只有符合限制条件要求的数据才能输入表。 •保证了表中的数据的正确性 i.约束条件包括了:非空和唯一和核对,即not null 和unique 和check null的含义:不确定 3个人去捡苹…...

opencascade AIS_MouseGesture AIS_MultipleConnectedInteractive源码学习

AIS_MouseGesture //! 鼠标手势 - 同一时刻只能激活一个。 enum AIS_MouseGesture { AIS_MouseGesture_NONE, //!< 无激活手势 // AIS_MouseGesture_SelectRectangle, //!< 矩形选择&#xff1b; //! 按下按钮开始&#xff0c;移动鼠标定义矩形&…...

Unity Apple Vision Pro 开发:如何把 PolySpatial 和 Play To Device 的版本从 1.2.3 升级为 1.3.1

XR 开发社区&#xff1a; SpatialXR社区&#xff1a;完整课程、项目下载、项目孵化宣发、答疑、投融资、专属圈子 &#x1f4d5;教程说明 本教程将介绍如何把 Unity 的 PolySpatial 和 Play To Device 版本从 1.2.3 升级为 1.3.1。 &#x1f4d5;Play To Device 软件升级 ht…...

大数据时代,区块链是如何助力数据开放共享的?

在大数据时代&#xff0c;区块链技术以其独特的优势&#xff0c;为数据开放共享提供了强有力的支持。以下是区块链助力数据开放共享的几个主要方面&#xff1a; 1. 增强数据安全性与隐私保护 加密安全&#xff1a;区块链技术采用先进的加密算法&#xff0c;如国密非对称加密技…...

睿抗2024省赛----RC-u4 章鱼图的判断

题目 对于无向图 G(V,E)&#xff0c;我们将有且只有一个环的、大于 2 个顶点的无向连通图称之为章鱼图&#xff0c;因为其形状像是一个环&#xff08;身体&#xff09;带着若干个树&#xff08;触手&#xff09;&#xff0c;故得名。 给定一个无向图&#xff0c;请你判断是不…...

py2exe,一个神奇的 Python 库

在众多Python打包工具中&#xff0c;py2exe无疑是一款出色的选择。它能够将Python脚本转换成可在Windows平台上独立运行的可执行文件&#xff0c;极大地方便了程序的分发与部署。本文将深入探讨py2exe的特性和使用方法&#xff0c;让你在创建桌面应用程序时更加游刃有余。 安装…...

博途PLC网络连接不上

博途PLC网络连接不上其中的一个原因就是网线接触不好&#xff0c;各种原因都试了&#xff0c;任然连接不上&#xff0c;大家可以把网线拔下&#xff0c;重新插拔或者直接更换一根网线。 1、无线网络网段和PLC连接网段冲突 。。。。...

哪个邮箱最安全最好用啊

企业邮箱安全至关重要&#xff0c;需保护隐私、防财务损失、维护通信安全、避免纠纷&#xff0c;并维持业务连续性。哪个企业邮箱最安全好用呢&#xff1f;Zoho企业邮箱&#xff0c;采用加密技术、反垃圾邮件和病毒保护&#xff0c;支持多因素认证&#xff0c;确保数据安全合规…...

企业微信开发智能升级:AIGC技术赋能,打造高效沟通平台

文章目录 一、AIGC在企业微信开发中的核心价值1. 智能化客服体验2. 自动化工作流程3. 个性化内容推荐4. 深度数据分析与洞察 二、使用AIGC进行企业微信开发的实践路径1. 需求分析与场景定义2. 技术选型与平台搭建3. 模型训练与调优4. 接口对接与功能集成5. 测试与优化 《企业微…...

Apache Doris + Paimon 快速搭建指南|Lakehouse 使用手册(二)

湖仓一体&#xff08;Data Lakehouse&#xff09;融合了数据仓库的高性能、实时性以及数据湖的低成本、灵活性等优势&#xff0c;帮助用户更加便捷地满足各种数据处理分析的需求。在过去多个版本中&#xff0c;Apache Doris 持续加深与数据湖的融合&#xff0c;已演进出一套成熟…...

Inno setup pascal编码下如何美化安装界面支持带边框,圆角,透明阴影窗口

inno setup自带的安装界面太老套了&#xff0c;如何实现类似网易&#xff0c;微信那种带界面的安装&#xff1f;一般有两种思路&#xff1a;提供一个单独的下载器&#xff0c;然后通过下载器将你用innosetup 打包后的软件下载下来&#xff0c;然后&#xff0c;静默安装这个包&a…...

SQL语句(以MySQL为例)——单表、多表查询

笛卡尔积&#xff08;或交叉连接&#xff09;: 笛卡尔乘积是一个数学运算。假设我有两个集合 X 和 Y&#xff0c;那么 X 和 Y 的笛卡尔积就是 X 和 Y 的所有可能组合&#xff0c;也就是第一个对象来自于 X&#xff0c;第二个对象来自于 Y 的所有可能。组合的个数即为两个集合中…...

C++第二十八弹---进一步理解模板:特化和分离编译

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】【C详解】 目录 1. 非类型模板参数 2. 模板的特化 2.1 概念 2.2 函数模板特化 2.3 类模板特化 2.3.1 全特化 2.3.2 偏特化 2.3.3 类模板特化应用示例 3. …...

正则表达式的独占模式,懒惰模式等有那些区别

正则表达式的独占模式、懒惰模式&#xff08;也称为非贪婪模式&#xff09;和贪婪模式&#xff08;默认模式&#xff09;在匹配行为上存在显著的区别。以下是这三种模式的详细解释和区别&#xff1a; 1、贪婪模式&#xff08;Greedy&#xff09;&#xff1a; 默认情况下&…...

【INTEL(ALTERA)】Quartus® Prime Pro Edition 软件 v24.2 中,哪些 Agilex™ 5 IP 功能的硬件验证有限?

目录 说明 解决方法 说明 如下表所示&#xff0c;Quartus Prime 专业版软件 24.2 版为 Agilex™ 5 IP 或功能提供有限的硬件支持。此外&#xff0c;设备的设备型号、比特流和固件尚未最终确定。 影响 Agilex™ 5 特定功能的已知问题可参阅 Agilex 5 知识库文章搜索。 解决…...

Lua编程

文章目录 概述lua数据类型元表注意 闭包表现 实现 lua/c 接口编程skynet中调用层次虚拟栈C闭包注册表userdatalightuserdata 小结 概述 这次是skynet&#xff0c;需要一些lua/c相关的。写一篇博客&#xff0c;记录下。希望有所收获。 lua数据类型 boolean , number , string…...

2019数字经济公测大赛-VMware逃逸

文章目录 环境搭建漏洞点exp 环境搭建 ubuntu :18.04.01vmware: VMware-Workstation-Full-15.5.0-14665864.x86_64.bundle 这里环境搭不成功。。patch过后就报错&#xff0c;不知道咋搞 发现可能是IDA加载后的patch似乎不行对原来的patch可能有影响&#xff0c;重新下了patch&…...

如何改桥接模式

桥接模式主要是解决 路由功能的 因为NAT多层 主要是网络连接数太多时 然后路由器要好 不然光猫 比差路由要强的 光猫 请注意&#xff0c;对光猫的任何设置进行修改前&#xff0c;请一定要备份光猫的配置文件&#xff0c;并在每次修改前都截图保存原始设置信息&#xff01;不要…...

江科大/江协科技 STM32学习笔记P13

文章目录 TIM定时中断1、TIM简介计数器PSC预分频器ARR自动重装寄存器 2、定时器类型基本定时器主模式触发DAC 通用定时器高级定时器 3、定时器原理定时中断基本结构预分频器时序计数器时序RCC时钟树 TIM定时中断 1、TIM简介 定时器的基准时钟一般都是主频72MHz&#xff0c;如果…...

loadrunner录制解决提示安全问题

点击页面任意位置&#xff0c;输入&#xff1a; thisisunsafe...

为什么要读写分离?如何实现业务系统读写分离?

信息化水平提升&#xff0c;很多企业已经接受并高频使用多样的业务系统进行日常作业&#xff0c;而在不断的使用过程中&#xff0c;部分行业和业务&#xff0c;如&#xff1a;直播电商、基础制造、公关传媒等&#xff0c;由于自身特点的原因&#xff0c;常常积累了海量的数据。…...

C#基础——类、构造函数和静态成员

类 类是一个数据类型的蓝图。构成类的方法和变量称为类的成员&#xff0c;对象是类的实例。类的定义规定了类的对象由什么组成及在这个对象上可执行什么操作。 class 类名 { (访问属性) 成员变量; (访问属性) 成员函数; } 访问属性&#xff1a;public&#xff08;公有的&…...

hadoop学习(二)

一.MapReduce 1.1定义&#xff1a;是一个分布式运算程序的编程框架 1.2核心功能&#xff1a;将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序&#xff0c;并发运行在一个Hadoop集群上。 1.3优点 1&#xff09;易于编程 它简单的实现一些接口&#…...

WXZ196微机消谐装置的运行方式了解一下

WXZ196微机消谐装置是一种用于抑制铁磁谐振的设备&#xff0c;可以在电力系统中快速消除各种频率的铁磁谐振&#xff0c;同时可以区分过电压、铁磁谐振以及单相接地&#xff0c;并给出相应的报警信号。该装置采用高速增强型单片机作为核心元件&#xff0c;对PT开口三角电压进行…...

单链表的建立

一.前言 单链表的建立一共有两种方法&#xff0c;一种是头插法&#xff0c;将元素插入在链表的头部&#xff0c;也叫前插法。另外一种则就是尾插法&#xff0c;将元素插入在链表尾部&#xff0c;也叫后插法。 二. 头插法 首先从一个空表开始&#xff0c;重复读入数据&#xff1…...

Shell脚本编程学习

IPv4和IPv6有什么区别&#xff1f; - 知乎 (zhihu.com) Shell 是一个命令解释权&#xff0c;它为用户提供了一个向 Linux 内核发送请求以便运行程序界面系统级程序&#xff0c;用户可以用 Shell 来启动、挂起、停止甚至是编写一些程序。 可以查看当前系统的进程 ps -ef...

从宏基因组量化细菌生长动态

Introduciton 了解细菌在各种环境中的生长动态对于人类健康和环境监测等广泛领域至关重要。传统研究细菌生长的方法往往依赖于培养技术&#xff0c;这不仅耗时&#xff0c;而且对易培养的物种有偏向。然而&#xff0c;随着宏基因组测序技术的兴起&#xff0c;我们现在可以直接…...

Linux---git工具

目录 初步了解 基本原理 基本用法 安装git 拉取远端仓库 提交三板斧 1、添加到缓存区 2、提交到本地仓库 3、提交到远端 其他指令补充 多人协作管理 windows用户提交文件 Linux用户提交文件 初步了解 在Linux中&#xff0c;git是一个指令&#xff0c;可以帮助我们做…...

【JavaScript】函数的动态传参

Javacript&#xff08;简称“JS”&#xff09;是一种具有函数优先的轻量级&#xff0c;解释型或即时编译型的编程语言。虽然它是作为开发Web页面的脚本语言而出名&#xff0c;但是它也被用到了很多非浏览器环境中&#xff0c;JavaScript基于原型编程、多范式的动态脚本语言&…...

从0到1,AI我来了- (4)AI图片识别的理论知识-II

上篇文章&#xff0c;我们理解了我们程序的神经网络设计&#xff0c;这篇我们继续&#xff0c;把训练迭代过程分析一下&#xff0c;完成这两篇文章&#xff0c;下面问题&#xff0c;应该能回答了。 一张图片&#xff0c;如何被计算机读懂&#xff1f;pytorch 封装的网络&#…...

2024 Java 高分面试宝典 一站式搞定技术面

前言 每年9月和10月&#xff0c;被业界称为“金九银十”&#xff0c;这是人才市场一年中最活跃的时期。此时&#xff0c;企业为了来年的业务扩展&#xff0c;纷纷加大招聘力度&#xff0c;空缺岗位众多&#xff0c;招聘需求集中。同时&#xff0c;初秋的招聘活动也避开酷暑&am…...

MongoDB - 聚合操作符 $eq、$gte、$in、$sum、$avg

文章目录 1. $eq2. $gte3. $in4. $sum5. $avg 1. $eq $eq比较两个值并返回&#xff1a;true &#xff08;当值相等时&#xff09;|false&#xff08;当值不相等时&#xff09; { $eq: [ <expression1>, <expression2> ] }构造测试数据&#xff1a; db.inventory…...

C语言 | Leetcode C语言题解之第279题完全平方数

题目&#xff1a; 题解&#xff1a; // 判断是否为完全平方数 bool isPerfectSquare(int x) {int y sqrt(x);return y * y x; }// 判断是否能表示为 4^k*(8m7) bool checkAnswer4(int x) {while (x % 4 0) {x / 4;}return x % 8 7; }int numSquares(int n) {if (isPerfect…...

在appium中,如何通过匹配图片来进行断言?

在Appium中进行图片匹配断言&#xff0c;可以使用OpenCV来实现。以下是使用Appium和OpenCV进行图片匹配断言的示例代码。 首先&#xff0c;需要确保安装了必要的库&#xff1a; pip install opencv-python-headless pip install opencv-python pip install numpy然后&#xf…...

昇思25天学习打卡营第21天|CV-Shufflenet图像分类

打卡 目录 打卡 ShuffleNet 网络介绍 ShuffleNet 模型架构 Pointwise Group Convolution Channel Shuffle ShuffleNet模块 ShuffleNet 模块代码 构建ShuffleNet网络 模块代码 模型训练和评估 模型训练 模型评估 模型预测 ShuffleNet 网络介绍 ShuffleNetV1是旷视科…...

python 图片转文字、语音转文字、文字转语音保存音频并朗读

一、python图片转文字 1、引言 pytesseract是基于Python的OCR工具&#xff0c; 底层使用的是Google的Tesseract-OCR 引擎&#xff0c;支持识别图片中的文字&#xff0c;支持jpeg, png, gif, bmp, tiff等图片格式 2、环境配置 python3.6PIL库安装Google Tesseract OCR 3、安…...

SSRF (服务端请求伪造)

&#x1f3bc;个人主页&#xff1a;金灰 &#x1f60e;作者简介:一名简单的大一学生;易编橙终身成长社群的嘉宾.✨ 专注网络空间安全服务,期待与您的交流分享~ 感谢您的点赞、关注、评论、收藏、是对我最大的认可和支持&#xff01;❤️ &#x1f34a;易编橙终身成长社群&#…...

SQL中的LEFT JOIN、RIGHT JOIN和INNER JOIN

在SQL中&#xff0c;JOIN操作是连接两个或多个数据库表&#xff0c;并根据两个表之间的共同列&#xff08;通常是主键和外键&#xff09;返回数据的重要方法。其中&#xff0c;LEFT JOIN&#xff08;左连接&#xff09;、RIGHT JOIN&#xff08;右连接&#xff09;和INNER JOIN…...

[网鼎杯 2020 朱雀组]Nmap(详细解读版)

这道题考察nmap的一些用法,以及escapeshellarg和escapeshellcmd两个函数的绕过&#xff0c;可以看这里PHP escapeshellarg()escapeshellcmd() 之殇 (seebug.org) 两种解题方法&#xff1a; 第一种通过nmap的-iL参数读取扫描一个文件到指定文件中第二种是利用nmap的参数写入we…...

【React】详解“最新”和“最热”切换与排序

文章目录 一、基本概念和初始化二、切换与排序功能的实现1. 函数定义和参数2. 设置活动 Tab3. 定义新列表变量4. 根据排序类型处理列表4.1 按时间降序排序4.2 按点赞数降序排序 5. 更新评论列表 三、渲染导航 Tab 和评论列表1. map 方法2. key 属性3. className 动态赋值4. onC…...

BUUCTF [MRCTF2020]Ezpop

这道题对于刚接触到pop链的我直接把我整懵了&#xff0c;一边看着魔术方法一边分析 魔术方法可以看这里PHP 魔术方法 - 简介 - PHP 魔术方法 - 简单教程&#xff0c;简单编程 (twle.cn) 代码解析 经过以上的分析我们可以理一下解题思路&#xff1a;接收参数反序列化之前先触发…...

RV1126 Linux 系统,接外设,时好时坏(一)应该从哪些方面排查问题

在 Linux 系统中接外设时,遇到“时好时坏”的问题,可能是由多种因素引起的。以下是一些排查问题的建议。 1. 硬件方面的排查 1.1 连接检查 物理连接: 确保外设与主板之间的连接良好,检查插头、插座及线缆是否牢固。引脚配置: 确认设备树中引脚的配置是否正确,尤其是引脚…...

Vue实现简单小案例

一、创建文件夹 二、引用vue.js <script src"../js/vue.js"></script> 三、准备一个容器 <div id"app"><h1>Hello,{{name}}</h1> </div> 四、创建实例 <script>new Vue({el:"#app", //el用于指…...

【MATLAB APP】建立独立桌面APP

背景&#xff1a;已有MATLAB APP的.mlapp文件&#xff0c;但客户提出需要可以直接使用的exe文件。 要求&#xff1a;点开即用&#xff0c;无需下载MATLAB。使用者无法修改APP的代码。 一、环境配置 APP创建者&#xff1a;安装MATLAB R2023a&#xff0c;配置Application Compile…...

Spring的优缺点?

Spring的优缺点 直接回答相关的Spring的特点&#xff1a; IOC AOP 事务 简化开发&#xff1a; 容易集成JDBCTemplateRestTemplate&#xff08;接口远程调用&#xff09;邮件发送相关异步消息请求支持 更加深入就讲源码了 优点&#xff1a; 方便解耦&#xff0c;简化开发…...

第一百八十三节 Java IO教程 - Java目录事件、Java异步I/O

Java IO教程 - Java目录事件 当文件系统中的对象被修改时&#xff0c;我们可以监听watch服务以获取警报。 java.nio.file包中的以下类和接口提供watch服务。 Watchable接口WatchService接口WatchKey接口WatchEvent接口WatchEvent.Kind接口StandardWatchEventKinds类 可监视对…...

【设计模式】(万字总结)深入理解Java中的创建型设计模式

1. 前言 在软件开发的世界里&#xff0c;设计模式是一种被广泛接受并应用的解决方案。它们不仅仅是代码的设计&#xff0c;更是对问题的思考和解决的方法论。在Java开发中&#xff0c;特别是在面向对象的编程中&#xff0c;设计模式尤为重要。创建型设计模式&#xff0c;作为设…...

【全面讲解下Docker in Docker的原理与实践】

🌈个人主页: 程序员不想敲代码啊 🏆CSDN优质创作者,CSDN实力新星,CSDN博客专家 👍点赞⭐评论⭐收藏 🤝希望本文对您有所裨益,如有不足之处,欢迎在评论区提出指正,让我们共同学习、交流进步! 👉目录 👉前言👉原理👉实践👉安全和最佳实践👉前言 🦛…...

Android Settings增加多击事件,增加开发者模式打开难度

软件平台&#xff1a;Android11 硬件平台&#xff1a;QCS6125 需求来源&#xff1a;用户通过系统异常崩溃&#xff0c;进入原生Settings页面&#xff0c;通过默认的多击版本号方式打开开发者模式&#xff0c;继而打开adb调试开关&#xff0c;安装三方apk。 对付这种需求本来有…...