JavaWeb_LeadNews_Day6-Kafka
JavaWeb_LeadNews_Day6-Kafka
- Kafka
- 概述
- 安装配置
- kafka入门
- kafka高可用方案
- kafka详解
- 生产者同步异步发送消息
- 生产者参数配置
- 消费者同步异步提交偏移量
- SpringBoot集成kafka
- 自媒体文章上下架
- 实现思路
- 具体实现
- 来源
- Gitee
Kafka
概述
- 对比

- 选择

- 介绍
- producer: 发布消息的对象称之为主题生产者 (Kafka topic producer)
- topic: Kafka将消息分门别类,每一类的消息称之为一个主题 (Topic)
- consumer:订阅消息并处理发布的消息的对象称之为主题消费者 (consumers)
- broker:已发布的消息保存在一组服务器中,称之为Kafka集群,集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅个或多个主题 (topic),并从Broker拉数据,从而消费这些已发布的消息
安装配置
- 安装zookeeper
// 下载zookeeper镜像 docker pull zookeeper:3.4.14 // 创建容器 docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14 - 安装kafka
// 下载kafka镜像 docker pull wurstmeister/kafka:2.12-2.3.1 // 创建容器 docker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.174.133 \ --env KAFKA_ZOOKEEPER_CONNECT=192.168.174.133:2181 \ --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.174.133:9092 \ --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \ --net=host wurstmeister/kafka:2.12-2.3.1// 解释 --net=host,直接使用容器宿主机的网络命名空间,即没有独立的网络环境。它使用宿主机的ip和端口(云主机会不好使)
kafka入门
- 依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId> </dependency> - Producer
public class ProducerQuickStart {public static void main(String[] args) {// 1. kafka链接配置信息Properties prop = new Properties();// 1.1 kafka链接地址prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");// 1.2 key和value的序列化prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建kafka生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(prop);// 3. 发送信息// 参数列表: topic, key, valueProducerRecord<String, String> record = new ProducerRecord<>("topic-first", "key1", "Hello Kafka!");producer.send(record);// 4. 关闭消息通道// 必须关闭, 否则消息发送bucgproducer.close();} } - Consumer
public class ConsumerQuickStart {public static void main(String[] args) {// 1. kafka的配置信息Properties prop = new Properties();// 1.1 链接地址prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");// 1.2 key和value的反序列化器prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 1.3 设置消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");// 2. 创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);// 3. 订阅主题consumer.subscribe(Collections.singleton("topic-first"));// 4. 拉取信息while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key());System.out.println(record.value());}}} } - 总结
- 同一组只有一个消费者能够接收到消息, 如果需要所有消费者都能接收到消息, 需要消费者在不同的组
kafka高可用方案
-
集群

-
备份

kafka定义了两类副本:- 领导者副本
- 追随者副本
数据在领导者副本存储后, 会同步到追随者副本

同步方式
leader失效后, 选择leader的原则- 优先从ISR中选取, 因为ISR的数据和leader是同步的.
- ISR中的follower都不行了, 就从其他的follower中选取.
- 当所有的follower都失效了, 第一种是等待ISR中的follower活过来, 数据可靠, 但等待时间不确定, 第二种是等待任意follower活过来, 最快速度恢复可用性, 但数据不一定完整.
kafka详解
生产者同步异步发送消息
// 同步发送
RecordMetadata metadata = producer.send(record).get();
System.out.println(metadata.offset());// 异步发送
producer.send(record, new Callback(){@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null) {System.out.println("记录异常信息到日志表中");}System.out.println(recordMetadata.offset());}
});
生产者参数配置
- 消息确认
确认机制 说明 acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快 acks=1(默认值) 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应 acks=all 只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 prop.put(ProducerConfig.ACKS_CONFIG, "all"); - 消息重传
设置消息重传次数, 默认每次重试之间等待100msprop.put(ProducerConfig.RETRIES_CONFIG, 10); - 消息压缩
默认情况, 消息发送不会压缩
使用压缩可以降低网络传输开销和存储开销, 而这往往是向kafka发送消息的瓶颈所在压缩算法 说明 snappy 占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果看重性能和网络带宽,建议采用 lz4 占用较少的 CPU,压缩和解压缩速度较快,压缩比也很客观 gzip 占用较多的CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法 prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
消费者同步异步提交偏移量
// 同步提交偏移量
consumer.commitSync();// 异步提交偏移量
consumer.commitAsync(new OffsetCommitCallback(){@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e!=null){System.out.println("记录错误的提交偏移量"+map+", 异常信息为"+e);}}
});// 同步异步提交
try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key());System.out.println(record.value());System.out.println(record.partition());System.out.println(record.offset());}// 异步提交偏移量consumer.commitAsync();}
} catch (Exception e) {e.printStackTrace();System.out.println("记录错误的信息:"+e);
}finally {// 同步consumer.commitSync();
}
SpringBoot集成kafka
- 依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency> <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId> </dependency> - 配置
server:port: 9991 spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.174.133:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer - Producer
@RestController public class HelloController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/hello")public String hello(){kafkaTemplate.send("itcast-topic", "黑马程序员");return "ok";} } - Consumer
@Component public class HelloListener {@KafkaListener(topics = "itcast-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}} } - 传递对象
// Producer User user = new User(); user.setName("tom"); user.setAge(18); kafkaTemplate.send("itcast-topic", JSON.toJSONString(user));// Consumer System.out.println(JSON.parseObject(message, User.class));
自媒体文章上下架
实现思路

具体实现
- Producer
public ResponseResult downOrUp(WmNewsDto dto) {// 1. 检验参数// 1.0 检查文章dto是否为空if(dto == null){return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, "不可缺少");}// 1.1 检查文章上架参数是否合法if(dto.getEnable() != 0 && dto.getEnabl!= 1){// 默认上架dto.setEnable((short) 1);}// 2. 查询文章WmNews news = getById(dto.getId());if(news == null){return ResponseResult.errorRe(AppHttpCodeEnum.DATA_NOT_EXIST, 存在");}// 3. 查询文章状态if(news.getStatus() != WmNews.StaPUBLISHED.getCode()){return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, 章不是发布状态, 不能上下架");}// 4. 上下架news.setEnable(dto.getEnable());updateById(new// 5. 发送消息, 通知article修改文章的配置if(news.getArticleId() != null){HashMap<String, Object> map = HashMap<>();map.put("articleId", news.getArtic());map.put("enable", news.getEnable());kafkaTemplate.(WmNewsMessageConstaWM_NEWS_UP_OR_DOWN_TOPIC, JtoJSONString(map));return ResponseResult.okRe(AppHttpCodeEnum.SUCCESS); } - Consumer
// Listener
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void onMessage(String message)
{if(StringUtils.isNotBlank(message)){Map map = JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);}
}// Service
public void updateByMap(Map map) {// 0 下架, 1 上架Object enable = map.get("enable");boolean isDown = true;if(enable.equals(1)){isDown = false;}// 修改文章update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId, map.get("articleId")).set(ApArticleConfig::getIsDown, isDown));
}
来源
黑马程序员. 黑马头条
Gitee
https://gitee.com/yu-ba-ba-ba/leadnews
相关文章:
JavaWeb_LeadNews_Day6-Kafka
JavaWeb_LeadNews_Day6-Kafka Kafka概述安装配置kafka入门kafka高可用方案kafka详解生产者同步异步发送消息生产者参数配置消费者同步异步提交偏移量 SpringBoot集成kafka 自媒体文章上下架实现思路具体实现 来源Gitee Kafka 概述 对比 选择 介绍 producer: 发布消息的对象称…...
ATTCK覆盖度97.1%!360终端安全管理系统获赛可达认证
近日,国际知名第三方网络安全检测服务机构——赛可达实验室(SKD Labs)发布最新测试报告,360终端安全管理系统以ATT&CK V12框架攻击技术覆盖面377个、覆盖度97.1%,勒索病毒、挖矿病毒检出率100%,误报率0…...
透视俄乌网络战之一:数据擦除软件
数据擦除破坏 1. WhisperGate2. HermeticWiper3. IsaacWiper4. WhisperKill5. CaddyWiper6. DoubleZero7. AcidRain8. RURansom 数据是政府、社会和企业组织运行的关键要素。数据擦除软件可以在不留任何痕迹的情况下擦除数据并阻止操作系统恢复摧,达到摧毁或目标系统…...
微服务中间件--Nacos
Nacos 1. Nacos入门a.服务注册到Nacosb.Nacos服务分级存储模型c.NacosRule负载均衡d.服务实例的权重设置e.环境隔离 - namespacef.Nacos和Eureka的对比 2. Nacos配置管理a.统一配置管理b.配置热更新c.多环境配置共享 1. Nacos入门 Nacos是阿里巴巴的产品,现在是Spr…...
驱动开发点亮led灯
头文件 #ifndef __HEAD_H__ #define __HEAD_H__#define PHY_LED_MODER 0X50006000 #define PHY_LED_ODR 0X50006014 #define PHY_LED_RCC 0X50000A28 #define PHY_LED_FMODER 0X50007000 #define PHY_LED_FODR 0X50007014#endif驱动代码 #include <linux/init.h> #incl…...
回归预测 | MATLAB实现IPSO-SVM改进粒子群优化算法优化支持向量机多输入单输出回归预测(多指标,多图)
回归预测 | MATLAB实现IPSO-SVM改进粒子群优化算法优化支持向量机多输入单输出回归预测(多指标,多图) 目录 回归预测 | MATLAB实现IPSO-SVM改进粒子群优化算法优化支持向量机多输入单输出回归预测(多指标,多图…...
数学建模之“TOPSIS数学模型”原理和代码详解
一、简介 TOPSIS(Technique for Order Preference by Similarity to Ideal Solution)是一种多准则决策分析方法,用于解决多个候选方案之间的排序和选择问题。它基于一种数学模型,通过比较每个候选方案与理想解和负理想解之间的相…...
threejs使用gui改变相机的参数
调节相机远近角度 定义相机的配置: const cameraConfg reactive({ fov: 45 }) gui中加入调节fov的方法 const gui new dat.GUI();const cameraFolder gui.addFolder("相机属性设置");cameraFolder.add(cameraConfg, "fov", 0, 100).name(…...
计算机竞赛 图像识别-人脸识别与疲劳检测 - python opencv
文章目录 0 前言1 课题背景2 Dlib人脸识别2.1 简介2.2 Dlib优点2.3 相关代码2.4 人脸数据库2.5 人脸录入加识别效果 3 疲劳检测算法3.1 眼睛检测算法3.3 点头检测算法 4 PyQt54.1 简介4.2相关界面代码 5 最后 0 前言 🔥 优质竞赛项目系列,今天要分享的是…...
PHP8的字符串操作3-PHP8知识详解
今天继续分享字符串的操作,前面说到了字符串的去除空格和特殊字符,获取字符串的长度,截取字符串、检索字符串。 今天继续分享字符串的其他操作。如:替换字符串、分割和合成字符串。 5、替换字符串 替换字符串就是对指定字符串中…...
Unity VR:XR Interaction Toolkit 输入系统(Input System):获取手柄的输入
文章目录 📕教程说明📕Input System 和 XR Input Subsystem(推荐 Input System)📕Input Action Asset⭐Actions Maps⭐Actions⭐Action Properties🔍Action Type (Value, Button, Pass through) ⭐Binding …...
智慧工地一体化云平台源码:监管端、工地端、危大工程、智慧大屏、物联网、塔机、吊钩、升降机
智慧工地解决方案依托计算机技术、物联网、云计算、大数据、人工智能、VR&AR等技术相结合,为工程项目管理提供先进技术手段,构建工地现场智能监控和控制体系,弥补传统方法在监管中的缺陷,最终实现项目对人、机、料、法、环的全…...
C# 表达式体方法 C#算阶乘
//表达式体方法private int Add(int a, int b) > a b;[Fact]public void Test(){var result1 Factorial(1);//1var result2 Factorial(2);//2var result3 Factorial(3);//6var result4 Factorial(4);//24var result5 Factorial(5);//120var result6 Add(100, 200);//…...
互联网发展历程:保护与隔离,防火墙的安全壁垒
互联网的快速发展,不仅带来了便利和连接,也引发了越来越多的安全威胁。在数字时代,保护数据和网络安全变得尤为重要。然而,在早期的网络中,安全问题常常让人担忧。 安全问题的困扰:网络威胁日益增加 随着互…...
基于IMX6ULLmini的linux裸机开发系列七:中断处理流程
中断上下文 cpu通过内核寄存器来运行指令并进行数据的读写处理的,它在进入中断前一个时刻的具体值,称为中断上下文 中断上下文是指CPU在进入中断之前保存的寄存器状态和其他相关信息。当CPU接收到中断请求时,它会保存当前正在执行的指令的状…...
Postman软件基本用法:浏览器复制请求信息并导入到软件从而测试、发送请求
本文介绍在浏览器中,获取网页中的某一个请求信息,并将其导入到Postman软件,并进行API请求测试的方法。 Postman是一款流行的API开发和测试工具,它提供了一个用户友好的界面,用于创建、测试、调试和文档化API。本文就介…...
react go实现用户历史登录列表页面
refer: http://ip-api.com/ 1.首先需要创建一个保存用户历史的登录的表,然后连接go 2.在用户登录的时候,获取用户的IP IP位置,在后端直接处理数据即可(不需要在前端传递数据) (1)增加路由&am…...
如何做好服务性能测试
一、什么是性能测试 新功能上线或切换底层数据库或扩容调优,根据实际业务场景的需要,做必要的性能压测,收集性能数据,作为上线的基准报告。 性能测试一般分一下几个阶段: 1. 性能测试 并发量小(jmeter 并…...
速通蓝桥杯嵌入式省一教程:(五)用按键和屏幕实现嵌入式交互系统
一个完整的嵌入式系统,包括任务执行部分和人机交互部分。在前四节中,我们已经讲解了LED、LCD和按键,用这三者就能够实现一个人机交互系统,也即搭建整个嵌入式系统的框架。在后续,只要将各个功能加入到这个交互系统中&a…...
虚拟拍摄,如何用stable diffusion制作自己的形象照?
最近收到了某活动的嘉宾邀请,我将分享: 主题:生成式人工智能的创新实践 简要描述:从品牌营销、智能体、数字内容创作、下一代社区范式等方面,分享LLM与图像等生成式模型的落地应用与实践经验。 领域/研究方向ÿ…...
视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...
UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...
关键领域软件测试的突围之路:如何破解安全与效率的平衡难题
在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件,这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下,实现高效测试与快速迭代?这一命题正考验着…...
算法:模拟
1.替换所有的问号 1576. 替换所有的问号 - 力扣(LeetCode) 遍历字符串:通过外层循环逐一检查每个字符。遇到 ? 时处理: 内层循环遍历小写字母(a 到 z)。对每个字母检查是否满足: 与…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
基于SpringBoot在线拍卖系统的设计和实现
摘 要 随着社会的发展,社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 在线拍卖系统,主要的模块包括管理员;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单…...
C# 表达式和运算符(求值顺序)
求值顺序 表达式可以由许多嵌套的子表达式构成。子表达式的求值顺序可以使表达式的最终值发生 变化。 例如,已知表达式3*52,依照子表达式的求值顺序,有两种可能的结果,如图9-3所示。 如果乘法先执行,结果是17。如果5…...
省略号和可变参数模板
本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...
mac:大模型系列测试
0 MAC 前几天经过学生优惠以及国补17K入手了mac studio,然后这两天亲自测试其模型行运用能力如何,是否支持微调、推理速度等能力。下面进入正文。 1 mac 与 unsloth 按照下面的进行安装以及测试,是可以跑通文章里面的代码。训练速度也是很快的。 注意…...
FFmpeg avformat_open_input函数分析
函数内部的总体流程如下: avformat_open_input 精简后的代码如下: int avformat_open_input(AVFormatContext **ps, const char *filename,ff_const59 AVInputFormat *fmt, AVDictionary **options) {AVFormatContext *s *ps;int i, ret 0;AVDictio…...
