Kafka
这里写目录标题
- 1.Kafka
- 1.1 Kafka概述
- 1.2 kafka安装和配置
- 1.3 入门案例
- 1.4 kafka生产者详解
- 1.4.1 生产者的参数
1.Kafka
1.1 Kafka概述
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。
- producer:发布消息的对象称之为主题生产者(Kafka topic producer)
- topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
- consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
- broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
1.2 kafka安装和配置
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
一、Docker安装zookeeper
下载镜像:
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
一、Docker安装kafka
下载镜像:
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器, 此处需要改为自己虚拟机的ip地址
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1
1.3 入门案例
实现一个简单的生产 > 消费过程
一、引入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>
二、生产者发送消息
- 连接kafka
- 创建生产者对象
- 发送信息
- 关闭消息通道(必须关闭, 否则消息发送不成功)
package com.heima.kafka.sample;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties properties = new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//封装发送的消息ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");//3.发送消息producer.send(record);//4.关闭消息通道,必须关闭,否则消息发送不成功producer.close();}}
三、消费者接收消息
package com.heima.kafka.sample;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** 消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {//1.添加kafka的配置信息Properties properties = new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//2.消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);//3.订阅主题consumer.subscribe(Collections.singletonList("itheima-topic"));//当前线程一直处于监听状态while (true) {//4.获取消息ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}
总结
- 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)
- 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)
1.4 kafka生产者详解
一、同步发送
使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功
RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
System.out.println(recordMetadata.offset());
二、异步发送
调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数
//异步消息发送
producer.send(kvProducerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null){System.out.println("记录异常信息到日志表中");}System.out.println(recordMetadata.offset());}
});
1.4.1 生产者的参数
一、ack
代码的配置方式:
//ack配置 消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");
二、retries
相关文章:
Kafka
这里写目录标题1.Kafka1.1 Kafka概述1.2 kafka安装和配置1.3 入门案例1.4 kafka生产者详解1.4.1 生产者的参数1.Kafka 1.1 Kafka概述 Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。 producer:发布消息的对象称之为主题生产者(Ka…...
数据结构——第三章 栈与队列(2)
栈的运用1.括号匹配2.表达式求值2.1.算术表示式的形式2.2.后缀表达式求值2.3.将算术表达式转换为后缀表达式2.4.算术表达式直接求值3.栈与递归3.1.递归算法3.2.栈与函数调用3.3.递归工作与递归函数3.4.递归到非递归的转换1.括号匹配 void matching(char str[]) {//创建空栈Lin…...
【Linux学习】基础IO——理解缓冲区 | 理解文件系统
🐱作者:一只大喵咪1201 🐱专栏:《Linux学习》 🔥格言:你只管努力,剩下的交给时间! 基础IO☕理解缓冲区🧃缓冲区的共识🧃缓冲区的位置🧃缓冲区的刷…...
RHCSA-重置root密码(3.3)
方法1:rd.break (1)首先重启系统,在此页面按e键,在屏幕上显示内核启动参数 (2)知道linux这行,末尾空格后输入rd.break,然后按ctrlx (3)查看&#…...
无公网IP快解析实现U+随时随地访问
现阶段商品从生产到消费者手中要经过多个环节,为实现对每一个环节进行管理,越来越多的企业选择通过信息化手段来实现。供应链管理系统配合供应链中各实体的业务需求,使操作流程和信息系统紧密配合,做到各环节无缝链接,…...
UVa 307 Sticks 木棍拼接 ID 迭代加深搜
题目链接:Sticks 题目描述: 小明一开始有一些长度相等的木棍,小明现在将木棍砍成了一些长度为整数的木棍,他现在忘记了最开始木棍的长度,你需要找到最短的可能木棍长度,例如给定5,2,1,5,2,1,5,2,15,2,1,5,2…...
阿里云(CentOS)中MySQL8忘记密码的解决方法
阿里云(CentOS)中MySQL8忘记密码的解决方法 方法 在 skip-grant-tables 模式下启动 MySQL,该模式下启动 MySQL 时不启动授权表功能,可以直接免密码登录 实现 编辑 /etc/my.cnf 文件 vim /etc/my.cnf在 [mysqld] 区域末尾添加配置,设置免密…...
三、Spring的入门程序
第一个Spring程序 创建新的空工程spring6 设置JDK版本17,编译器版本17 设置IDEA的Maven:关联自己的maven 在空的工程spring6中创建第一个maven模块:spring6-001-first 在pom.xml添加spring context依赖和junit依赖, <?x…...
摘录一下Python列表和元组的学习笔记
1 基础概念 列表一个值,列表值指的是列表本身,而不是列表中的内容 列表用[]表示 列表中的内容称为 表项 len()函数可以显示列表中表项的个数,比如下面这个例子 spam [cat, bat, dog, rat]print(len(spam))列表的范围选取中,比…...
【量化金融】收益率、对数收益率、年华收益、波动率、夏普比率、索提诺比率、阿尔法和贝塔、最大回撤
【量化金融】收益率、对数收益率、年华收益、波动率、夏普比率、索提诺比率、阿尔法和贝塔、最大回撤 1 收益率 在学术界,建模一般不直接使用资产价格,而是使用资产收益率(Returns)。因为收益率比价格具有更好的统计特性,更便于建模。下经典…...
1_机器学习概述—全流程
文章目录1 机器学习定义2 机器学习常见应用框架(重点)3 机器学习分类3.1 监督学习(Supervised learning)3.2 无监督学习(Unsupervised learning)3.3 半监督学习(Semi-Supervised Learning&#…...
VUE中给对象添加新属性时,界面不刷新怎么办
一、直接添加属性的问题 举例: 定义一个p标签,通过v-for指令进行遍历 然后给botton标签绑定点击事件,我们预期点击按钮时,数据新增一个属性,界面也 新增一行。 <p v-for"(value,key) in item" :key&qu…...
视频号频出10w+,近期爆红的账号有哪些?
回顾2月,视频号持续放出大动作,不仅进行了16小时不间断的NBA全明星直播,还邀请国际奥委会入驻,分享奥运的最新资讯。视频号成为越来越多官方机构宣传推广的有效渠道。官方积极入驻,内容创作生态也在同步繁荣发展&#…...
企业寄件现代化管理教程
现代化企业为了跟上时代发展的步伐,在不断完善着管理制度,其中公司寄件管理,也是重要的一个模块。为了提高公司快递的寄件效率,以及节约寄件成本,实现快递寄件的规范化,越来越多的现代化企业,开…...
django 在网页显示后台进度
1、定义函数打开网页 def PeformanceIndex(request): citys{‘wuhu’: ‘芜湖’, ‘xuancheng’: ‘宣城’, ‘tongling’: ‘铜陵’, ‘suzhou’: ‘宿州’, ‘maanshan’: ‘马鞍山’, ‘liuan’: ‘六安’, ‘huainan’: ‘淮南’, ‘huabei’: ‘淮北’, ‘hefei’: ‘合肥…...
机器学习库(Numpy, Scikit-learn)
Numpy 创建数组 import numpy as npa np.array([1,2,3]) b np.array([(1.5,2,3), (4,5,6)], dtype float) c np.array([[(1.5,2,3), (4,5,6)], [(3,2,1), (4,5,6)]],dtype float)创建占位符 z1np.zeros((3,4)) z2np.ones((2,3,4),dtypenp.int16) z3d np.arange(10,25,5)…...
Linux操作系统学习(进程替换)
文章目录进程替换进程替换是什么?替换的方法进程替换简易shell模拟进程替换 进程替换是什么? 如下图所示: 进程替换就是,把进程B的代码和数据,替换正在执行的进程A的代码和数据在内存中的位置(若代码…...
【C++从入门到放弃】类和对象(中)———类的六大默认成员函数
🧑💻作者: 情话0.0 📝专栏:《C从入门到放弃》 👦个人简介:一名双非编程菜鸟,在这里分享自己的编程学习笔记,欢迎大家的指正与点赞,谢谢! 类和对…...
白盒测试重点复习内容
白盒测试白盒测试之逻辑覆盖法逻辑覆盖用例设计方法1.语句覆盖2.判定覆盖(分支覆盖)3.条件覆盖4.判定条件覆盖5.条件组合覆盖6.路径覆盖白盒测试之基本路径测试法基本路径测试方法的步骤1.根据程序流程图画控制流图2.计算圈复杂度3.导出测试用例4.准备测试用例5.例题白盒测试总…...
【13】linux命令每日分享——groupadd建立组
大家好,这里是sdust-vrlab,Linux是一种免费使用和自由传播的类UNIX操作系统,Linux的基本思想有两点:一切都是文件;每个文件都有确定的用途;linux涉及到IT行业的方方面面,在我们日常的学习中&…...
《第一行代码》 第十章:服务
一,在子线程中更新UI 1,新建项目,修改布局代码 <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_height"match_parent"&g…...
简单介绍编程进制
十进制 十进制的位权为 10,比如十进制的 123,123 1 * 10 ^ 2 2 * 10 ^ 1 3 * 10 ^ 0。 二进制 二进制的位权为 2,比如十进制的 4,二进制为 100,4 1 * 2 ^ 2 0 * 2 ^ 1 0 *2 ^ 0。 Java7 之前,不支…...
windows忘记开机密码怎么办
windows忘记开机密码怎么办 清除windows登录密码 清除windows登录密码简单方法 开机到欢迎界面时,按CtrlAltDelete两次,跳出帐号窗口,输入用户名:administrator,回车, 或者启动时按F8 选“带命令行的安全…...
SpringCloud:Eureka
目录 一、eureka的作用 二、搭建Eureka服务端 三、添加客户端 四、服务发现 提供者与消费者 服务提供者:一次业务中,被其它微服务调用的服务。(提供接口给其它微服务) 服务消费者:一次业务中,调用其它微服务的服…...
如何获取或设置CANoe以太网网卡信息(SET篇)
CAPL提供了一系列函数用来操作CANoe网卡。但是,但是,首先需要明确一点,不管是获取网卡信息,还是设置网卡信息,只能访问CAPL程序所在的节点下的网卡,而不是节点所在的以太网通道下的所有网卡 关于第一张图中,Class节点下,有三个网卡:Ethernet1、VLAN 1.100、VLAN 1.200…...
【软件测试面试题】项目经验?资深测试 (分析+回答) 我不信你还拿不到offer......
目录:导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜)前言 在面试过程中&#…...
tensorflow lite简介-移动设备端机器学习
TensorFlow Lite 是一组工具,可帮助开发者在移动设备、嵌入式设备和 loT 设备上运行模型,以便实现设备端机器学习。 支持多平台 支持多种平台,涵盖 Android 和 iOS 设备、嵌入式 Linux 和微控制器。 原理/流程 工作原理或者使用流程就是上面…...
Node.js常用知识
1、什么是 Node.js 【】Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境。浏览器是 js 的前端运行环境,node.js 是 js 的后端运行环境。他们都有 V8 引擎,有各自的内置 API 2、fs 文件系统模块 【】fs 模块是 Node.js 官方提供的、用来操作文件…...
踩坑:maven打包失败的解决方式总结
Maven打包失败原因总结如下: 失败原因1:无法使用spring-boot-maven-plugin插件 使用spring-boot-maven-plugin插件可以创建一个可执行的JAR应用程序,前提是应用程序的parent为spring-boot-starter-parent。 需要添加parent的包spring-boot…...
【C++】位图
文章目录位图概念位图操作位图代码位图应用位图概念 boss直接登场: 给40亿个不重复的无符号整数,没排过序。给一个无符号整数,如何快速判断一个数是否在这40亿个数中❓ 40亿个整数,大概就是16GB。40亿个字节大概就是4GB。 1Byt…...
seo 网站关键词优化/诊断网站seo现状的方法
无缘参加去年在广州的UCD年会,这个周末还是想方法去杭州参加了UCD2010年会,收益良多,如果真要细细写下来,恐怕足够写成一本厚厚的垃圾书,只好视自己的心情想到什么就写什么吧。 虽然一直对网络营销抱有浓厚的兴趣&a…...
北京展览馆网站建设/免费的行情网站app
你有没有碰到过OpenStack中,VM失去IP地址的问题?如果有的话,你知道那可能是什么问题 ——特别是如果你拥有大量的节点和VM。你的客户会因为没有明显原因却断了与VM的连接而感到 挫败。甚至云的支持团队会为log文件里没有提示却出现问题感到挫…...
速升网站/搜索引擎优化技巧
一、单项选择题(本大题共20小题,每小题2分,共40分)在每小题列出的四个备选项中只有一个是符合题目要求的,请将其代码填写在题后的括号内。错选、多选或未选均无分。1.下面不属于网络操作系统功能的是(B …...
个人空间备案网站名称/曼联vs恩波利比分
概要:why:为什么回收,见whatwhat:垃圾回收哪些内存(不可达对象的确定)when:何时执行GC(安全点、安全区域)how:如何回收(原理——垃圾回收算法、实现——垃圾收集器)1、垃圾回收哪些内存JVM运行时数据区中&a…...
梅花手表网站/百度网盘官网入口
1. 运算符优先级与括号 #define Cube(a) a*a*a 无法解决 Cube(11) ⇒ 11*11*11 ⇒ 4,期待的应当是 8,故将其改造为 #define Cube(a) (a)*(a)*(a) 如此,自身运算的优先级是能解决了,和其他表达式结合时便又存在先运算和后运算的算…...
腾讯邮箱网页版/seo技术服务外包
51%的企业在过去12月内发生过数据泄漏面对日益复杂的安全环境,多样化的攻击手段,传统的防护已经失效,你的安全团队是否做好了准备? 深井式的管理架构,各自封闭的信息系统,无迹可寻的内部泄漏,防…...