kafka 快速上手
下载 Apache Kafka
演示window 安装
编写启动脚本,脚本的路径根据自己实际的来

启动说明
先启动zookeeper后启动kafka,关闭是先关kafka,然后关闭zookeeper
巧记: 铲屎官(zookeeper)总是第一个到,最后一个走
启动zookeeper
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
启动kafka
call bin/windows/kafka-server-start.bat config/server.properties
测试脚本,主要用于创建主题 ‘test-topic’
# 创建主题(窗口1)
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --create# 查看主题
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --list
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --describe# 修改某主题的分区
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --alter --partitions 2# 生产消息(窗口2)向test-topic主题发送消息
bin/window> kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test-topic
>hello kafka# 消费消息(窗口3)消费test-topic主题的消息
bin/window> kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic
package com.ldj.kafka.admin;import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;import java.util.*;/*** User: ldj* Date: 2024/6/13* Time: 0:00* Description: 创建主题*/
public class AdminTopic {public static void main(String[] args) {Map<String, Object> adminConfigMap = new HashMap<>();adminConfigMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(adminConfigMap);/*** 使用kafka默认的分区算法创建分区*/NewTopic topic1 = new NewTopic("topic-01", 1, (short) 1);NewTopic topic2 = new NewTopic("topic-02", 2, (short) 2);CreateTopicsResult addResult1 = adminClient.createTopics(Arrays.asList(topic1, topic2));/*** 手动为主题(topic-03)分配分区* topic-03主题下的0号分区有2个副本,它们中的一个在节点id=1中,一个在节点id=2中;* list里第一个副本就是leader(主写),后面都是follower(主备份)* 例如:0分区,nodeId=1的节点里的副本是主写、2分区,nodeId=3的节点里的副本是主写*/Map<Integer, List<Integer>> partition = new HashMap<>();partition.put(0, Arrays.asList(1, 2));partition.put(1, Arrays.asList(2, 3));partition.put(2, Arrays.asList(3, 1));NewTopic topic3 = new NewTopic("topic-03", partition);CreateTopicsResult addResult2 = adminClient.createTopics(Collections.singletonList(topic3));//DeleteTopicsResult delResult = adminClient.deleteTopics(Arrays.asList("topic-02"));adminClient.close();}}
package com.ldj.kafka.producer;import com.alibaba.fastjson.JSON;
import com.ldj.kafka.model.UserEntity;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;/*** User: ldj* Date: 2024/6/12* Time: 21:08* Description: 生产者*/
public class KfkProducer {public static void main(String[] args) throws Exception {//生产者配置Map<String, Object> producerConfigMap = new HashMap<>();producerConfigMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");producerConfigMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);producerConfigMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//批量发送producerConfigMap.put(ProducerConfig.BATCH_SIZE_CONFIG, 2);//消息传输应答安全级别 0-消息到达broker(效率高,但不安全) 1-消息在leader副本持久化(折中方案) -1/all -消息在leader和flower副本都持久化(安全,但效率低)producerConfigMap.put(ProducerConfig.ACKS_CONFIG, "all");//ProducerState 缓存5条数据,重试数据会与5条数据做比较,结论只能保证一个分区的数据幂等性,跨会话幂等性需要通过事务操作解决(重启后全局消息id的随机id会发生改变)//消息发送失败重试次数,重试会导致消息重复!!(考虑幂等性),消息乱序(判断偏移量是否连续,错乱消息回到在缓冲区重新排序)!!producerConfigMap.put(ProducerConfig.RETRIES_CONFIG, 3);//kafka有消息幂等性处理(全局唯一消息id/随机id-分区-偏移量),默认false-不开启producerConfigMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//解决跨会话幂等性,还需结合事务操作,忽略//producerConfigMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id");//创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfigMap);//TODO 事务初始化方法//producer.initTransactions();//构建消息 ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)try {//TODO 开启事务//producer.beginTransaction();for (int i = 0; i < 10; i++) {UserEntity userEntity = new UserEntity().setUserId(2436687942335620L + i).setUsername("lisi").setGender(1).setAge(18);ProducerRecord<String, String> record = new ProducerRecord<>("test-topic",userEntity.getUserId().toString(),JSON.toJSONString(userEntity));//发送数据到BrokerFuture<RecordMetadata> future = producer.send(record, (RecordMetadata var1, Exception var2) -> {if (Objects.isNull(var2)) {System.out.printf("[%s]消息发送成功!", userEntity.getUserId());} else {System.out.printf("[%s]消息发送失败!err:%s", userEntity.getUserId(), var2.getCause());}});//TODO 提交事务//producer.commitTransaction();//注意没有下面这行代码,是异步线程从缓冲区读取数据异步发送消息,反之是同步发送,必须等待回调消息返回才会往下执行System.out.printf("发送消息[%s]----", userEntity.getUserId());RecordMetadata recordMetadata = future.get();System.out.println(recordMetadata.offset());}} finally {//TODO 终止事务//producer.abortTransaction();//关闭通道producer.close();}}}
package com.ldj.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;/*** User: ldj* Date: 2024/6/12* Time: 21:10* Description: 消费者*/
public class KfkConsumer {public static void main(String[] args) {//消费者配置Map<String, Object> consumerConfigMap = new HashMap<>();consumerConfigMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");consumerConfigMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);consumerConfigMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//所属消费组consumerConfigMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test123456");//创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfigMap);//消费主题的消息 ConsumerRebalanceListenerconsumer.subscribe(Collections.singletonList("test-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));//数据存储结构:Map<TopicPartition, List<ConsumerRecord<K, V>>> records;for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());}}} finally {//关闭消费者consumer.close();}}}
相关文章:
kafka 快速上手
下载 Apache Kafka 演示window 安装 编写启动脚本,脚本的路径根据自己实际的来 启动说明 先启动zookeeper后启动kafka,关闭是先关kafka,然后关闭zookeeper 巧记: 铲屎官(zookeeper)总是第一个到,最后一个走 启动zookeeper call bi…...
Python记忆组合透明度语言模型
🎯要点 🎯浏览器语言推理识别神经网络 | 🎯不同语言秽语训练识别数据集 | 🎯交互式语言处理解释 Transformer 语言模型 | 🎯可视化Transformer 语言模型 | 🎯语言模型生成优质歌词 | 🎯模型不确…...
如何保证数据库和缓存的一致性
背景:为了提高查询效率,一般会用redis作为缓存。客户端查询数据时,如果能直接命中缓存,就不用再去查数据库,从而减轻数据库的压力,而且redis是基于内存的数据库,读取速度比数据库要快很多。 更新…...
Java基础 - 多线程
多线程 创建新线程 实例化一个Thread实例,然后调用它的start()方法 Thread t new Thread(); t.start(); // 启动新线程从Thread派生一个自定义类,然后覆写run()方法: public class Main {public static void main(String[] args) {Threa…...
云顶之弈-测试报告
一. 项目背景 个人博客系统采用前后端分离的方法来实现,同时使用了数据库来存储相关的数据,同时将其部署到云服务器上。前端主要有四个页面构成:登录页、列表页、详情页以及编辑页,以上模拟实现了最简单的个人博客系统。其结合后…...
TCP/IP协议分析实验:通过一次下载任务抓包分析
TCP/IP协议分析 一、实验简介 本实验主要讲解TCP/IP协议的应用,通过一次下载任务,抓取TCP/IP数据报文,对TCP连接和断开的过程进行分析,查看TCP“三次握手”和“四次挥手”的数据报文,并对其进行简单的分析。 二、实…...
Python项目开发实战:企业QQ小程序(案例教程)
一、引言 在当今数字化快速发展的时代,企业对于线上服务的需求日益增长。企业QQ小程序作为一种轻量级的应用形态,因其无需下载安装、即开即用、占用内存少等优势,受到了越来越多企业的青睐。本文将以Python语言为基础,探讨如何开发一款企业QQ小程序,以满足企业的实际需求。…...
list模拟与实现(附源码)
文章目录 声明list的简单介绍list的简单使用list中sort效率测试list的简单模拟封装迭代器insert模拟erase模拟头插、尾插、头删、尾删模拟自定义类型迭代器遍历const迭代器clear和析构函数拷贝构造(传统写法)拷贝构造(现代写法) 源…...
Java应用中文件上传安全性分析与安全实践
✨✨谢谢大家捧场,祝屏幕前的小伙伴们每天都有好运相伴左右,一定要天天开心哦!✨✨ 🎈🎈作者主页: 喔的嘛呀🎈🎈 目录 引言 一. 文件上传的风险 二. 使用合适的框架和库 1. Spr…...
noVNC 小记
1. 怎么查看Ubuntu版本...
设置systemctl start kibana启动kibana
1、编辑kibana.service vi /etc/systemd/system/kibana.service [Unit] DescriptionKibana Server Manager [Service] Typesimple Useres ExecStart/home/es/kibana-7.10.2-linux-x86_64/bin/kibana PrivateTmptrue [Install] WantedBymulti-user.target 2、启动kibana # 刷…...
PostgreSQL:在CASE WHEN语句中使用SELECT语句
CASE WHEN语句是一种条件语句,用于多条件查询,相当于java的if/else。它允许我们根据不同的条件执行不同的操作。你甚至能在条件里面写子查询。而在一些情况下,我们可能需要在CASE WHEN语句中使用SELECT语句来检索数据或计算结果。下面是一些示…...
游戏心理学Day13
游戏成瘾 成瘾的概念来自于药物依赖,表现为为了感受药物带来的精神效应,或是为了避免由于断药所引起的不适和强迫性,连续定期使用该药的 行为现在成瘾除了药物成瘾外,还包括行为成瘾。成瘾的核心特征是不知道成瘾的概念来自于药…...
GitLab中用户权限
0 Preface/Foreword 1 权限介绍 包含5种权限: Guest(访客):可以创建issue、发表comment,不能读写版本库Reporter(报告者):可以克隆代码,不能提交。适合QA/PMDeveloper&…...
RunMe_About PreparationForDellBiosWUTTest
:: ***************************************************************************************************************************************************************** :: 20240613 :: 该脚本可以用作BIOS WU测试前的准备工作,包括:自动检测"C:\DellB…...
C++中变量的使用细节和命名方案
C中变量的使用细节和命名方案 C提倡使用有一定含义的变量名。如果变量表示差旅费,应将其命名为cost_of_trip或 costOfTrip,而不要将其命名为x或cot。必须遵循几种简单的 C命名规则。 在名称中只能使用字母字符、数字和下划线()。 名称的第一个字符不能是数字。 区分…...
[ACTF新生赛2020]SoulLike
两个文件 ubuntu运行 IDA打开 清晰的逻辑 很明显,我们要sub83a 返回ture 这里第一个知识点来了 你点开汇编会发现 这里一堆xor巨多 然后IDA初始化设置的函数,根本不能分析这么多 我们要去改IDA的设置 cfg 里面的 hexrays文件 在max_funsize这 修改为1024,默认是64 等待一…...
C#——析构函数详情
析构函数 C# 中的析构函数(也被称作“终结器”)同样是类中的一个特殊成员函数,主要用于在垃圾回收器回收类实例时执行一些必要的清理操作。 析构函数: 当一个对象被释放的时候执行 C# 中的析构函数具有以下特点: * 析构函数只…...
探索重要的无监督学习方法:K-means 聚类模型
在数据科学和机器学习领域,聚类分析是一种重要的无监督学习方法,用于将数据集中的对象分成多个组(簇),使得同一簇中的对象相似度较高,而不同簇中的对象相似度较低。K-means 聚类是最广泛使用的聚类算法之一,它以其简单、快速和易于理解的特点受到了广泛关注。本文将深入…...
将web项目打包成electron桌面端教程(二)vue3+vite+ts
说明:我用的demo项目是vue3vitets,如果是vue2/cli就不用往下看啦,建议找找其他教程哦~下依赖npm下载不下来的,基本换成cnpm/pnpm/yarn就可以了 一、项目准备 1、自己新创建一个,这里就不过多赘述了 2、将需要打包成…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
【Oracle APEX开发小技巧12】
有如下需求: 有一个问题反馈页面,要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据,方便管理员及时处理反馈。 我的方法:直接将逻辑写在SQL中,这样可以直接在页面展示 完整代码: SELECTSF.FE…...
工业安全零事故的智能守护者:一体化AI智能安防平台
前言: 通过AI视觉技术,为船厂提供全面的安全监控解决方案,涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面,能够实现对应负责人反馈机制,并最终实现数据的统计报表。提升船厂…...
Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
Java编程之桥接模式
定义 桥接模式(Bridge Pattern)属于结构型设计模式,它的核心意图是将抽象部分与实现部分分离,使它们可以独立地变化。这种模式通过组合关系来替代继承关系,从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...
Ubuntu系统多网卡多相机IP设置方法
目录 1、硬件情况 2、如何设置网卡和相机IP 2.1 万兆网卡连接交换机,交换机再连相机 2.1.1 网卡设置 2.1.2 相机设置 2.3 万兆网卡直连相机 1、硬件情况 2个网卡n个相机 电脑系统信息,系统版本:Ubuntu22.04.5 LTS;内核版本…...
【Kafka】Kafka从入门到实战:构建高吞吐量分布式消息系统
Kafka从入门到实战:构建高吞吐量分布式消息系统 一、Kafka概述 Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,后成为Apache顶级项目。它被设计用于高吞吐量、低延迟的消息处理,能够处理来自多个生产者的海量数据,并将这些数据实时传递给消费者。 Kafka核心特…...
uni-app学习笔记三十五--扩展组件的安装和使用
由于内置组件不能满足日常开发需要,uniapp官方也提供了众多的扩展组件供我们使用。由于不是内置组件,需要安装才能使用。 一、安装扩展插件 安装方法: 1.访问uniapp官方文档组件部分:组件使用的入门教程 | uni-app官网 点击左侧…...
