使用Java代码操作Kafka(五):Kafka消费 offset API,包含指定 Offset 消费以及指定时间消费
文章目录
- 1、指定 Offset 消费
- 2、指定时间消费
1、指定 Offset 消费
auto.offset.reset = earliest | latest | none 默认是 latest
(1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常
这个参数的力度太大了,不是从头,就是从尾
kafka提供了seek方法
,可以让我们从分区的固定位置开始消费
seek(TopicPartition topicPartition,offset offset)
示例代码:
package com.bigdata.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;public class CustomConsumerSeek {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");// 字段反序列化 key 和 valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 关闭自动提交offsetproperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 2 订阅一个主题ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);// 执行计划// 此时的消费计划是空的,因为没有时间生成Set<TopicPartition> assignment = kafkaConsumer.assignment();while(assignment.size() == 0){// 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环assignment = kafkaConsumer.assignment();}// 获取所有分区的offset =5 以后的数据/*for (TopicPartition tp:assignment) {kafkaConsumer.seek(tp,5);}*/// 获取分区0的offset =5 以后的数据//kafkaConsumer.seek(new TopicPartition("bigdata",0),5);for (TopicPartition tp:assignment) {if(tp.partition() == 0){kafkaConsumer.seek(tp,5);}}while(true){//1 秒中向kafka拉取一批数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> record :records) {// 打印一条数据System.out.println(record);// 可以打印记录中的很多内容,比如 key value offset topic 等信息System.out.println(record.value());}}}
}
2、指定时间消费
示例代码:
package com.bigdata.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;/*** 从某个特定的时间开始进行消费*/
public class Customer05 {public static void main(String[] args) {// 其实就是mapProperties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");// 字段反序列化 key 和 valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testf");// 指定分区的分配方案 为轮询策略//properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");// 指定分区的分配策略为:Sticky(粘性)ArrayList<String> startegys = new ArrayList<>();startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);// 创建一个kafka消费者的对象KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?List<String> topics = new ArrayList<>();topics.add("five");// list总可以设置多个主题的名称kafkaConsumer.subscribe(topics);// 因为消费者是不停的消费,所以是while true// 指定了获取分区数据的起始位置。// 这样写会报错的,因为前期消费需要指定计划,指定计划需要时间// 此时的消费计划是空的,因为没有时间生成Set<TopicPartition> assignment = kafkaConsumer.assignment();while(assignment.size() == 0){// 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环assignment = kafkaConsumer.assignment();}Map<TopicPartition, Long> hashMap = new HashMap<>();for (TopicPartition partition:assignment) {hashMap.put(partition,System.currentTimeMillis()- 60*60*1000);}Map<TopicPartition, OffsetAndTimestamp> map = kafkaConsumer.offsetsForTimes(hashMap);for (TopicPartition partition:assignment) {OffsetAndTimestamp offsetAndTimestamp = map.get(partition);kafkaConsumer.seek(partition,offsetAndTimestamp.offset());}while(true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record:records) {// 打印数据中的值System.out.println(record.value());System.out.println(record.offset());// 打印一条数据System.out.println(record);}}}
}
相关文章:
使用Java代码操作Kafka(五):Kafka消费 offset API,包含指定 Offset 消费以及指定时间消费
文章目录 1、指定 Offset 消费2、指定时间消费 1、指定 Offset 消费 auto.offset.reset earliest | latest | none 默认是 latest (1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning (2)lates…...
Ubuntu安装不同版本的opencv,并任意切换使用
参考: opencv笔记:ubuntu安装opencv以及多版本共存 | 高深远的博客 https://zhuanlan.zhihu.com/p/604658181 安装不同版本opencv及共存、切换并验证。_pkg-config opencv --modversion-CSDN博客 Ubuntu下多版本OpenCV共存和切换_ubuntu20如同时安装o…...
突破内存限制:Mac Mini M2 服务器化实践指南
本篇文章,我们聊聊如何使用 Mac Mini M2 来实现比上篇文章性价比更高的内存服务器使用,分享背后的一些小的思考。 希望对有类似需求的你有帮助。 写在前面 在上文《ThinkPad Redis:构建亿级数据毫秒级查询的平民方案》中,我们…...
【排版教程】Word、WPS 分节符(奇数页等) 自动变成 分节符(下一页) 解决办法
毕业设计排版时,一般要求每章节的起始页为奇数页,空白页不显示页眉和页脚。具体做法如下: 1 Word 在一个章节的内容完成后,在【布局】中,点击【分隔符】,然后选择【奇数页】 这样在下一章节开始的时&…...
【在Linux世界中追寻伟大的One Piece】多线程(二)
目录 1 -> 分离线程 2 -> Linux线程互斥 2.1 -> 进程线程间的互斥相关背景概念 2.2 -> 互斥量mutex 2.3 -> 互斥量的接口 2.4 -> 互斥量实现原理探究 3 -> 可重入VS线程安全 3.1 -> 概念 3.2 -> 常见的线程不安全的情况 3.3 -> 常见的…...
flink学习(8)——窗口函数
增量聚合函数 ——指窗口每进入一条数据就计算一次 例如:要计算数字之和,进去一个12 计算结果为20, 再进入一个7 ——结果为27 reduce aggregate(aggregateFunction) package com.bigdata.day04;public class _04_agg函数 {public static …...
「实战应用」如何用图表控件LightningChart .NET实现散点图?(一)
LightningChart .NET完全由GPU加速,并且性能经过优化,可用于实时显示海量数据-超过10亿个数据点。 LightningChart包括广泛的2D,高级3D,Polar,Smith,3D饼/甜甜圈,地理地图和GIS图表以及适用于科…...
鸿蒙Native使用Demo
DevecoStudio使用Native 今天,给大家带来的是关于DevecoStudio中使用Native进行开发 个人拙见:为什么要使用Native?无论是JS还是TS在复杂的情况下运行速度,肯定不如直接操作内存的C/C的运行速度快,所以,会选择使用Native;这里面的过程是什么?通过映射转化,使用napi提供的接口…...
29.UE5蓝图的网络通讯,多人自定义事件,变量同步
3-9 蓝图的网络通讯、多人自定义事件、变量同步_哔哩哔哩_bilibili 目录 1.网络通讯 1.1玩家Pawn之间的同步 1.2事件同步 1.3UI同步 1.4组播 1.5变量同步 1.网络通讯 1.1玩家Pawn之间的同步 创建一个第三人称项目 将网络模式更改为监听服务器,即将房主作为…...
Scala—列表(可变ListBuffer、不可变List)用法详解
Scala集合概述-链接 大家可以点击上方链接,先对Scala的集合有一个整体的概念🤣🤣🤣 在 Scala 中,列表(List)分为不可变列表(List)和可变列表(ListBuffer&…...
【论文复现】偏标记学习+图像分类
📝个人主页🌹:Eternity._ 🌹🌹期待您的关注 🌹🌹 ❀ 偏标记学习图像分类 概述算法原理核心逻辑效果演示使用方式参考文献 概述 本文复现论文 Progressive Identification of True Labels for Pa…...
C嘎嘎探索篇:栈与队列的交响:C++中的结构艺术
C嘎嘎探索篇:栈与队列的交响:C中的结构艺术 前言: 小编在之前刚完成了C中栈和队列(stack和queue)的讲解,忘记的小伙伴可以去我上一篇文章看一眼的,今天小编将会带领大家吹奏栈和队列的交响&am…...
AIGC-----AIGC在虚拟现实中的应用前景
AIGC在虚拟现实中的应用前景 引言 随着人工智能生成内容(AIGC)的快速发展,虚拟现实(VR)技术的应用也迎来了新的契机。AIGC与VR的结合为创造沉浸式体验带来了全新的可能性,这种组合不仅极大地降低了VR内容的…...
Django 路由层
1. 路由基础概念 URLconf (URL 配置):Django 的路由系统是基于 urls.py 文件定义的。路径匹配:通过模式匹配 URL,并将请求传递给对应的视图处理函数。命名路由:每个路由可以定义一个名称,用于反向解析。 2. 基本路由配…...
《硬件架构的艺术》笔记(八):消抖技术
简介 在电子设备中两个金属触点随着触点的断开闭合便产生了多个信号,这就是抖动。 消抖是用来确保每一次断开或闭合触点时只有一个信号起作用的硬件设备或软件。(就是每次断开闭合只对应一个操作)。 抖动在某些模拟和逻辑电路中可能产生问…...
Spring 与 Spring MVC 与 Spring Boot三者之间的区别与联系
一.什么是Spring?它解决了什么问题? 1.1什么是Spring? Spring,一般指代的是Spring Framework 它是一个开源的应用程序框架,提供了一个简易的开发方式,通过这种开发方式,将避免那些可能致使代码…...
【算法】连通块问题(C/C++)
目录 连通块问题 解决思路 步骤: 初始化: DFS函数: 复杂度分析 代码实现(C) 题目链接:2060. 奶牛选美 - AcWing题库 解题思路: AC代码: 题目链接:687. 扫雷 -…...
如何选择黑白相机和彩色相机
我们在选择成像解决方案时黑白相机很容易被忽略,因为许多新相机提供鲜艳的颜色,鲜明的对比度和改进的弱光性能。然而,有许多应用,选择黑白相机将是更好的选择,因为他们产生更清晰的图像,更好的分辨率&#…...
Rust 力扣 - 740. 删除并获得点数
文章目录 题目描述题解思路题解代码题目链接 题目描述 题解思路 首先对于这题我们如果将所有点数装入一个切片f中,该切片f中的i号下标表示所有点数为i的点数之和 那么这题就转换成了打家劫舍这道题,也就是求选择了切片中某个下标的元素后,该…...
OpenCV从入门到精通实战(七)——探索图像处理:自定义滤波与OpenCV卷积核
本文主要介绍如何使用Python和OpenCV库通过卷积操作来应用不同的图像滤波效果。主要分为几个步骤:图像的读取与处理、自定义卷积函数的实现、不同卷积核的应用,以及结果的展示。 卷积 在图像处理中,卷积是一种重要的操作,它通过…...
Docker核心概念总结
本文只是对 Docker 的概念做了较为详细的介绍,并不涉及一些像 Docker 环境的安装以及 Docker 的一些常见操作和命令。 容器介绍 Docker 是世界领先的软件容器平台,所以想要搞懂 Docker 的概念我们必须先从容器开始说起。 什么是容器? 先来看看容器较为…...
环形缓冲区
什么是环形缓冲区 环形缓冲区,也称为循环缓冲区或环形队列,是一种特殊的FIFO(先进先出)数据结构。它使用一块固定大小的内存空间来缓存数据,并通过两个指针(读指针和写指针)来管理数据的读写。当任意一个指针到达缓冲区末尾时,会自动回绕到缓冲区开头,形成一个"环"。…...
jQuery-Word-Export 使用记录及完整修正文件下载 jquery.wordexport.js
参考资料: jQuery-Word-Export导出word_jquery.wordexport.js下载-CSDN博客 近期又需要自己做个 Html2Doc 的解决方案,因为客户又不想要 Html2pdf 的下载了,当初还给我费尽心思解决Html转pdf时中文输出的问题(html转pdf文件下载之…...
云服务器部署WebSocket项目
WebSocket是一种在单个TCP连接上进行全双工通信的协议,其设计的目的是在Web浏览器和Web服务器之间进行实时通信(实时Web) WebSocket协议的优点包括: 1. 更高效的网络利用率:与HTTP相比,WebSocket的握手只…...
C#+数据库 实现动态权限设置
将权限信息存储在数据库中,支持动态调整。根据用户所属的角色、特定的功能模块,动态加载权限” 1. 数据库设计 根据这种需求,可以通过以下表设计: 用户表 (Users):存储用户信息。角色表 (Roles):存储角色…...
(原创)Android Studio新老界面UI切换及老版本下载地址
前言 这两天下载了一个新版的Android Studio,发现整个界面都发生了很大改动: 新的界面的一些设置可参考一些博客: Android Studio新版UI常用设置 但是对于一些急着开发的小伙伴来说,没有时间去适应,那么怎么办呢&am…...
Ubuntu24虚拟机-gnome-boxes
推荐使用gnome-boxes, virtualbox构建失败,multipass需要开启防火墙 sudo apt install gnome-boxes创建完毕~...
k8s rainbond centos7/win10 -20241124
参考 https://www.rainbond.com/ 国内一站式云原生平台 对centos7环境支持不太行 [lighthouseVM-16-5-centos ~]$ curl -o install.sh https://get.rainbond.com && bash ./install.sh 2024-11-24 09:56:57 ERROR: Ops! Docker daemon is not running. Start docke…...
SpringBoot+Vue滑雪社区网站设计与实现
【1】系统介绍 研究背景 随着互联网技术的快速发展和冰雪运动的普及,滑雪作为一种受欢迎的冬季运动项目,吸引了越来越多的爱好者。与此同时,社交媒体和在线社区平台的兴起为滑雪爱好者提供了一个交流经验、分享心得、获取信息的重要渠道。滑…...
MySql.2
sql查询语句执行过程 SQL 查询语句的执行过程是一个复杂的过程,涉及多个步骤。以下是典型的关系数据库管理系统 (RDBMS) 中 SQL 查询语句的执行过程概述: 1. 客户端发送查询 用户通过 SQL 客户端或应用程序发送 SQL 查询语句给数据库服务器。 2. …...
126企业邮箱注册申请/seo优化一般优化哪些方面
霍夫圆变换的基本原理和霍夫线变换类似,只是点对应的二维极径极角空间被三维的圆心点x, y还有半径r空间取代。 对直线来说,一条直线能由参数极径极角(r, θ)表示。而对圆来说,我们需要三个参数来表示一个圆,…...
制作网站开发/电脑培训班零基础网课
知名企业家、同时也是 NBA 小牛队的老板马克库班(Mark Cuban)曾说过一句话:人工智能,深度学习和机器学习,不论你现在是否能够理解这些概念,你都应该学习。否则三年内,你就会像灭绝的恐龙一样被社…...
pycharm网站开发实例/百度扫一扫识别图片
学习 Python 之 Pygame 开发魂斗罗(一)Pygame回忆Pygame1. 使用pygame创建窗口2. 设置窗口背景颜色3. 获取窗口中的事件4. 在窗口中展示图片(1). pygame中的直角坐标系(2). 展示图片(3). 给部分区域设置颜色5. 在窗口中显示文字6. 播放音乐7. 图片翻转与…...
写作网站不屏蔽/宁波seo快速优化平台
1. 新建 axiosTool.js 文件,设置请求拦截和处理的逻辑 import Vue from vue import axios from axios //取消请求 let CancelToken axios.CancelToken //设置默认请求头,如果不需要可以取消这一步 axios.defaults.headers {X-Requested-With: XMLHttpR…...
西安政府做网站/seo官网
福利来袭>>>团圆佳节,你送祝福我送福利!9月13日晚上十点之前在下方评论区留言,说出你的中秋祝福or小长假安排,就有机会获得爱奇艺VIP卡!!赶快行动吧!end往期精选1.爱奇艺ZoomAI技术 助…...
上海网站制作/全网霸屏推广系统
Combustion 4介绍Combustion是Autodesk公司出品的软件,为视觉特效创建而设计的一整套尖端工具,适用于PC或苹果平台。软件包含矢量绘画、粒子、视频效果处理、轨迹动画以及3D效果合成等五大工具模块。combustion4是discreet公司多次获奖的动态矢量绘图&am…...