GO学习之 消息队列(Kafka)
GO系列
1、GO学习之Hello World
2、GO学习之入门语法
3、GO学习之切片操作
4、GO学习之 Map 操作
5、GO学习之 结构体 操作
6、GO学习之 通道(Channel)
7、GO学习之 多线程(goroutine)
8、GO学习之 函数(Function)
9、GO学习之 接口(Interface)
10、GO学习之 网络通信(Net/Http)
11、GO学习之 微框架(Gin)
12、GO学习之 数据库(mysql)
13、GO学习之 数据库(Redis)
14、GO学习之 搜索引擎(ElasticSearch)
15、GO学习之 消息队列(Kafka)
文章目录
- GO系列
- 前言
- 一、Kafka 简介
- 二、基本操作(Go)
- 2.1 下载Kafka包
- 2.2 生产者
- 2.3 消费者
- 四、总结
前言
按照公司目前的任务,go 学习是必经之路了,虽然行业卷,不过技多不压身,依旧努力!!!
在现在的互联网应用中,或者是平台,都面对者大流量、百万并发的压力。在微服务项目中,也有为了保证业务逻辑顺序而发愁,在大量消息突然降临的同时,许多应用都是无法支撑大量访问的,导致系统崩溃,此时此刻,只要能让消息排成队,不要拥挤,多分配几个服务处理依然变得很顺畅了,那 Kafka
就是一个相对比较完美的解决方案。
一、Kafka 简介
Kafka 是由 Apache 软件基金会开发和维护的开源的、分布式的消息队列系统,用于高吞吐量、持久性的消息传递。主要用于实时数据处理,可以处理海量的数据流,并将数据流可靠地传递给多个消费者应用程序。
Kafka 的核心概念:
- 主题(Topic): 主题是消息的逻辑通道,消息发布者将消息发布到一个或多个主题。
- 分区(Partition): 主题可以分为多个分区,每个分区可以视为独立的消息队列,方便水平扩展,
- 生产者(Producer): 生成者负责将消息发送到 Topic。
- 消费者(Consumer): 消费者负责从 Topic 中获取消息并且处理。
- 代理节点(Broker): Kafka 集群由多个代理节点组成,每个节点都是一个独立的 Kafka 服务器,负责存储和传递消息。
Kafka 的优点:
- 高吞吐量: Kafka能够处理大规模数据流,支持每秒百万消息的处理。
- 持久性: Kafka 可以可靠地保存消息,即便消费者宕机或断开连接,消息也不会丢失。
- 水平扩展: Kafka 可以通过添加代理节点来水平扩展,以满足高负载需求。
- 采样化的使用场景: Kafka 适用于多种用途,包括日志收集、实时分析、事件驱动架构等。
- 社区支持: 开源项目,Kafka 拥有庞大的社区支持和生态系统。
Kafka 的缺点:
- 复杂性: 配置和管理 Kafka 集群可能相对负责。
- 存储成本: 由于持久性需求,Kafka 需要大量的存储空间来报错消息。
二、基本操作(Go)
2.1 下载Kafka包
通过 go get 拉取:
go get github.com/Shopify/sarama
注意
有可能出错,也是解决了好长时间,我遇到的错误如下:
PS D:\workspaceGo> go get github.com/Shopify/sarama
go: github.com/Shopify/sarama@v1.41.1: parsing go.mod:module declares its path as: github.com/IBM/saramabut was required as: github.com/Shopify/sarama
经过一番 baidu,google 和询问 ChatGPT 之后解释是这样的:
这个错误是因为在你的 Go 项目中的 go.mod 文件声明了一个错误的模块路径。错误信息中显示,你的项目试图使用 github.com/IBM/sarama 作为模块路径,但实际上你需要使用 github.com/Shopify/sarama。
那是如何解决呢?
- 首先检查 go.mod 配置文件中,有没有错误的 sarama,发现没有。
- 使用命令 go clean -modcache 来清除缓存,重新 go get github.com/Shopify/sarama,依旧报错。
- 网上说用 replace 来替换错误的包路径,在 go.mod 配置文件中最下面添加:replace github.com/Shopify/sarama => github.com/Shopify/sarama v1.26.1,然后再一次执行 go clean -modcache,go mod vendor 命令,发现可以了。
- 我再次执行 go get github.com/Shopify/sarama 拉取包,发现包自动升级了,返回信息如下:
go: upgraded github.com/Shopify/sarama v1.26.1 => v1.41.1- 包已拉取成功,可以编码了。
此错误应该是包路径冲突导致,目前已解决,如有更好的解决方案,请评论区教一下我,谢谢!!!
2.2 生产者
下面示例中,首先用
sarama.NewConfig()
来创建一个 config 配置实体,然后通过 sarama.NewSyncProducer() 来创建一个生产者,用 &sarama.ProducerMessage{} 生成一个消息,通过producer.SendMessage(message)
发送到制定 Topic 中。
package mainimport ("log""time""github.com/Shopify/sarama"
)func main() {// 配置 kafka 生产者config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 5config.Producer.Return.Successes = true// 创建 Kafka 生产者producer, err := sarama.NewSyncProducer([]string{"192.168.1.20:9092"}, config)if err != nil {log.Fatalf("Creating producer: %v", err)}// 延迟关闭生产者链接defer producer.Close()// 定义消息 Topic是 go-test, 值为 Hello Kafkamessage := &sarama.ProducerMessage{Topic: "go-test",Value: sarama.StringEncoder("Hello Kafka!"),}// 发送消息for i := 0; i < 10; i++ {partition, offset, err := producer.SendMessage(message)if err != nil {log.Fatalf("Sending message: %v", err)}log.Printf("Message sent to partition %d at offset %d", partition, offset)time.Sleep(time.Second)}
}
2.3 消费者
消费者代码和生产者思路一致,首先创建一个 配置对象,通过
sarama.NewConsumer()
来创建消费者,然后通过 consumer.ConsumePartition() 监听到一个分区,进行消息消费。
通过 select 来区分是否成功获取到消息,还是获取到错误。
package mainimport ("log""github.com/Shopify/sarama"
)func main() {// 配置 Kafka 消费者config := sarama.NewConfig()config.Consumer.Return.Errors = true// 创建 Kafka 消费者consumer, err := sarama.NewConsumer([]string{"192.168.1.20:9092"}, config)if err != nil {log.Fatal(err)}// 延迟关闭消费者链接defer consumer.Close()//订阅主题,获取分区 partitionpartitionConsumer, err := consumer.ConsumePartition("go-test", 0, sarama.OffsetOldest)if err != nil {log.Fatalf("Consuming partition: %v", err)}// 延迟关闭分区链接defer partitionConsumer.Close()// 消费消息for {select {// 从 分区 通道中获取信息case msg := <-partitionConsumer.Messages():log.Printf("Received message: %s", string(msg.Value))// 如果从通道中获取消息失败case err := <-partitionConsumer.Errors():log.Fatalf("Received error: %v", err)}}
}
运行结果如下:
- 首先执行消费者:
PS D:\workspaceGo\src\kafka> go run .\consumer.go
2023/09/03 09:33:10 Received message: Hello Kafka!
2023/09/03 09:33:46 Received message: Hello Kafka!
2023/09/03 09:34:32 Received message: Hello Kafka!
2023/09/03 09:34:32 Received message: Hello Kafka!
2023/09/03 09:34:32 Received message: Hello Kafka!
2023/09/03 09:34:32 Received message: Hello Kafka!
2023/09/03 09:34:32 Received message: Hello Kafka!
2023/09/03 09:34:32 Received message: Hello Kafka!
2023/09/03 09:34:32 Received message: Hello Kafka!
2023/09/03 09:34:32 Received message: Hello Kafka!
- 再执行生产者:
PS D:\workspaceGo\src\kafka> go run .\producer.go
2023/09/03 09:34:32 Message sent to partition 0 at offset 2
2023/09/03 09:34:32 Message sent to partition 0 at offset 3
2023/09/03 09:34:32 Message sent to partition 0 at offset 4
2023/09/03 09:34:32 Message sent to partition 0 at offset 5
2023/09/03 09:34:32 Message sent to partition 0 at offset 6
2023/09/03 09:34:32 Message sent to partition 0 at offset 7
2023/09/03 09:34:32 Message sent to partition 0 at offset 8
2023/09/03 09:34:32 Message sent to partition 0 at offset 9
2023/09/03 09:34:32 Message sent to partition 0 at offset 10
2023/09/03 09:34:32 Message sent to partition 0 at offset 11
四、总结
此篇中,首先对 Kafka 有了一个初步的介绍,相对于有开发经验的大佬来说,Kafka 再熟悉不过了,不过时间长不用的话难免有点生疏了。
至于如何搭建 Kafka 则是网上资料无数。
接下来就是 Go 操作 Kafka,主要是消息生产者 和 消息消费者 两个示例程序,比较简单,适合我这种菜鸟级别的 Gopher。
那 Go 操作 Kafka 有优点和需要注意的点呢?
优点:
- 高性能: Go 是一门编译型语言,性能高且延迟低,适合处理大量的消息。
- 并发支持: Go 天生支持并发,对于多个 Topic 和 Partition 并行处理非常有帮助。
- 丰富的库支持: Go 社区有丰富的 Kafka 客户端,比如:Sarama。
- 轻量级: Go 语言本身非常轻量级,构建的二进制文件小巧,适合于微服务和容器化构建。
- 跨平台支持: Go 支持多个平台,在不同的操作系统上运行。
需要注意的点:
- 版本兼容: 确保使用的 Kafka 客户端版本兼容,不同的版本可能有不同的特性和 API。
- 异常处理: 需要尽可能的处理错误和异常,以确保消息可靠的 生产 和 消费。
- 序列化和反序列化: 在消息 生产 和 消费的过程中,正确的进行消息的序列化和反序列化,确保消息正确传递。
- 监控和调试: 适当的监控和日志记录,以便调试和故障排除。
相关文章:
GO学习之 消息队列(Kafka)
GO系列 1、GO学习之Hello World 2、GO学习之入门语法 3、GO学习之切片操作 4、GO学习之 Map 操作 5、GO学习之 结构体 操作 6、GO学习之 通道(Channel) 7、GO学习之 多线程(goroutine) 8、GO学习之 函数(Function) 9、GO学习之 接口(Interface) 10、GO学习之 网络通信(Net/Htt…...
搭建自己的OCR服务,第三步:PPOCRLabel标注工具安装
一、安装说明 安装好了PaddleOCR后,还需要安装PPOCRLabel这个标注工具,想要自己训练模型的话,有个标注工具会起很大作用。 尤其是PPOCRLabel就是跟PaddleOCR配套的标注工具,同样是开源的。 在下载 PaddleOCR 整个源码中&#x…...
Java学习笔记37——网络编程01
网络编程入门 网络编程入门网络编程概述网路编程的三要素ip地址InetAddress类的使用端口 网络编程入门 网络编程概述 计算机网络 是指将地理位置不同的具有独立功能的多台计算机及其外部设备,通过通信线路连接起来,在网络操作系统,网络管理…...
powershell 搜索文本并返回行号
目录 powershell 搜索文本并返回行号 python调用powershell搜索文本并返回行号; powershell 搜索文本并返回行号 $keyword PS dir "d:\" -Filter "*.txt" -Recurse | foreach {$line 0 $fileName $_.FullNameGet-Content $fileName | f…...
网络原理
网络原理 传输层 UDP 特点 特点:无连接,不可靠,面向数据报,全双工 格式 怎么进行校验呢? 把UDP数据报中的源端口,目的端口,UDP报文长度的每个字节,都依次进行累加 把累加结果&a…...
力扣(LeetCode)算法_C++——同构字符串
给定两个字符串 s 和 t ,判断它们是否是同构的。 如果 s 中的字符可以按某种映射关系替换得到 t ,那么这两个字符串是同构的。 每个出现的字符都应当映射到另一个字符,同时不改变字符的顺序。不同字符不能映射到同一个字符上,相…...
网管实战⑼:配置华为S5720交换机
配置好汇聚交换机后,需要根据单位情况配置具体的接入交换机。 自从2019年12月底配置好交换机后,基本上都没有怎么操作交换机了。那时候使用的是H3C交换机,主要是H3C S7706、H3C S5120、H3C S5130、H3C S5500、H3C S3600等型号的交换机&#x…...
文件上传漏洞第十六关十七关
第十六关 第十七关 第十六关 直接上传php文件判断限制方式: 同第十五关白名单限制 第十六关源码: 代码逻辑判断了后缀名、content-type,以及利用imagecreatefromgif判断是否为gif图片,最后再做了一次二次渲染 二次渲染图片马&…...
Try llama2 in NUC (by quqi99)
作者:张华 发表于:2023-09-06 版权声明:可以任意转载,转载时请务必以超链接形式标明文章原始出处和作者信息及本版权声明 ( http://blog.csdn.net/quqi99 ) 据说现在在PC机上可以运行llama2大模型了, Way 1 于是照…...
强大易用的开源 建站工具Halo
特点 可插拔架构 Halo 采用可插拔架构,功能模块之间耦合度低、灵活性提高。支持用户按需安装、卸载插件,操作便捷。同时提供插件开发接口以确保较高扩展性和可维护性。 ☑ 支持在运行时安装和卸载插件 ☑ 更加方便地集成三方平台 ☑ 统一的可配置设置表…...
如何使用vuex
1.安装vuex 2.在store文件夹内写index.js 此处tab是自定义的文件 import Vue from "vue" import Vuex from "vuex" import tab from "./tab"Vue.use(Vuex)export default new Vuex.Store({modules:{tab} }) 3.在store文件夹内写tab.js(自定义…...
动手深度学习——Windows下的环境安装流程(一步一步安装,图文并配)
目录 环境安装官网步骤图文版安装Miniconda下载包含本书全部代码的压缩包使用conda创建虚拟(运行)环境使用conda创建虚拟环境并安装本书需要的软件激活之前创建的环境打开Jupyter记事本 环境安装 文章参考来源:http://t.csdn.cn/tu8V8 官网…...
个人博客系统-测试用例+自动化测试
一、个人博客系统测试用例 二、自动化测试 使用selenium4 Junit5单元测试框架,来进行简单的自动化测试。 1. 准备工作 (1)引入依赖,此时的pom.xml文件: <?xml version"1.0" encoding"UTF-8&quo…...
C语言文件读写常用函数
文章目录 1. fopen函数2. fclose函数3. fgetc函数4. fgets函数5. fputc函数6. fputs函数7. fprintf函数8. fscanf函数9. fseek函数10. ftell函数 1. fopen函数 返回值:文件指针(FILE*)参数:文件名(包括文件路径&#…...
【C++基础】实现日期类
👻内容专栏: C/C编程 🐨本文概括: C实现日期类。 🐼本文作者: 阿四啊 🐸发布时间:2023.9.7 对于类的成员函数的声明和定义,我们在类和对象上讲到过,需要进行…...
C语言程序设计—通讯录实现
本篇文章主要是实现一个简易的通讯录: 功能如下: 添加用户修改用户删除用户查找用户(可重名)按名字或年龄排序显示用户保存通讯录日志追加 有如下知识点: 动态数组结构体枚举自定义标识符和宏文件打开与存储函数指针…...
实战:大数据Flink CDC同步Mysql数据到ElasticSearch
文章目录 前言知识积累CDC简介CDC的种类常见的CDC方案比较 Springboot接入Flink CDC环境准备项目搭建 本地运行集群运行将项目打包将包传入集群启动远程将包部署到flink集群 写在最后 前言 前面的博文我们分享了大数据分布式流处理计算框架Flink和其基础环境的搭建,…...
B-Tree 索引和 Hash 索引的对比
分析&回答 B-Tree 索引的特点 B-tree 索引可以用于使用 , >, >, <, < 或者 BETWEEN 运算符的列比较。如果 LIKE 的参数是一个没有以通配符起始的常量字符串的话也可以使用这种索引。 有时,即使有索引可以使用,MySQL 也不使用任何索引。…...
入门Python编程:了解计算机语言、Python介绍和开发环境搭建
文章目录 Python入门什么是计算机语言1. 机器语言2. 符号语言(汇编)3. 高级语言 编译型语言和解释型语言1. 编译型语言2. 解释型语言 Python的介绍Python开发环境搭建Python的交互界面 python学习专栏python基础知识(0基础入门)py…...
深度解析Redisson框架的分布式锁运行原理与高级知识点
推荐阅读 项目实战:AI文本 OCR识别最佳实践 AI Gamma一键生成PPT工具直达链接 玩转cloud Studio 在线编码神器 玩转 GPU AI绘画、AI讲话、翻译,GPU点亮AI想象空间 资源分享 史上最全文档AI绘画stablediffusion资料分享 AI绘画关于SD,MJ,GPT,SDXL百科全书 AI绘画 stable…...
C#扩展方法
参数列表中this的这种用法是在.NET 3.0之后新增的一种特性---扩展方法。通过这个属性可以让程序员在现有的类型上添加扩展方法(无需创建新的派生类型、重新编译或者以其他方式修改原始类型)。 扩展方法是一种特殊的静态方法,虽然是静态方法&a…...
uniapp 高度铺满全屏
问题:在有uni-tabbar的情况下,页面铺满剩下的部分 <template><view :style"{height:screenHeightpx}" class"page"></view> </template> <script>export default {data() {return {screenHeight: &q…...
UG\NX二次开发 判断向量在指定的公差内是否为零,判断是否是零向量 UF_VEC3_is_zero
文章作者:里海 来源网站:王牌飞行员_里海_里海NX二次开发3000例,里海BlockUI专栏,C\C++-CSDN博客 简介: UG\NX二次开发 判断向量在指定的公差内是否为零,判断是否是零向量 UF_VEC3_is_zero 效果: 代码: #include "me.hpp"void ufusr(char* param, int* retco…...
2023年MySQL实战核心技术第一篇
目录 四 . 基础架构:一条SQl查询语句是如何执行的? 4.1 MySQL逻辑架构图: 4.2 MySQL的Server层和存储引擎层 4.2.1 连接器 4.2.1.1 解释 4.2.1.2 MySQL 异常重启 解决方案: 4.2.1.2.1. 定期断开长连接: 4.2.1.2.2. 初始…...
hivesql执行过程
语法解析 SemanticAnalyzer SemanticAnalyzer是Hive中的语义分析器,负责检查Hive SQL程序的语义是否正确。SemanticAnalyzer会对Hive SQL程序进行以下检查: 检查过程 语法检查 SemanticAnalyzer会检查Hive SQL程序的语法是否正确,包括关…...
C语言学习:8、深入数据类型
数据超过类型规定的大小怎么办 C语言中,如果需要用的整数大于int类型的最大值了怎么办? 我们知道int能表示的最大数是2147483647,最小的数是-2147483648,为什么? 因为字32位系统中,寄存器是32位的&#…...
生成树协议 STP(spanning-tree protocol)
一、STP作用 1、消除环路:通过阻断冗余链路来消除网络中可能存在的环路。 2、链路备份:当活动路径发生故障时,激活备份链路,及时恢复网络连通性。 二、STP选举机制 1、目的:找到阻塞的端口 2、STP交换机的角色&am…...
【LeetCode】312.戳气球
题目 有 n 个气球,编号为0 到 n - 1,每个气球上都标有一个数字,这些数字存在数组 nums 中。 现在要求你戳破所有的气球。戳破第 i 个气球,你可以获得 nums[i - 1] * nums[i] * nums[i 1] 枚硬币。 这里的 i - 1 和 i 1 代表和…...
商业数据分析概论
🐳 我正在和鲸社区参加“商业数据分析训练营活动” https://www.heywhale.com/home/competition/6487de6649463ee38dbaf58b ,以下是我的学习笔记: 学习主题:波士顿房价数据快速查看 日期:2023.9.4 关键概念/知识点&…...
Golang GUI框架
Golang GUI框架fyne fyne简介第一个fyne应用fyne应用程序和运行循环fyne更新GUI内容fyne窗口处理fyne解决中文乱码问题fyne应用打包fyne画布和画布对象fyne容器和布局fyne绘制和动画fyne盒子布局fyne网格grid布局fyne网格包裹布局fyne边框布局fyne表单布局fyne中心布局fyne ma…...
舞钢做网站/百度推广怎么优化
网站系统架构层次:前端架构、应用层架构、服务层架构、存储层架构、后台架构、数据采集与监控、安全架构、数据中心机房架构。1.前端架构(浏览器优化技术、CDN、动静分离,静态资源独立部署、图片服务、反向代理、DNS)前端指用户请…...
娄底网站建设/推广普通话的意义50字
使用TeamViewer实现远程桌面连接 背景: 有些朋友反映,借助Ubuntu自带的桌面共享工具desktop sharing会有不再同一网端下出现连接不稳定或者掉线的问题,那么现在我们就可以借助第三方辅助工具TeamViewer啦!那么先来看看这个神器是什…...
做电商要关注哪些网站/策划营销
author:咔咔 wechat:fangkangfk 我这里复制一份最初始的nginx.conf配置文件 user 设置nginx服务的系统使用用户 (一般情况下是处于注释状态) worker_processes 工作进程数(一般跟cpu核数相同即可)…...
wordpress 导购站模板/seo网址
实现员工信息表文件存储格式如下:id,name,age,phone,job1,Alex,22,13651054608,IT2,Egon,23,13304320533,Tearcher3,nezha,25,1333235322,IT 现在需要对这个员工信息文件进行增删改查。 不允许一次性将文件中的行都读入…...
南京百度网站制作/哈尔滨网站优化流程
C 语言提供了三种插入器,其差别在于插入元素的位置不同。 (1)back_inserter,创建一个使用push_back 实现插入的迭代器。 (2)front_inserter,创建一个使用push_front实现的插入迭代器。 (3)inser…...
网站备案是自己可以做吗/广东省广州市佛山市
各位召唤师大家好,我是你们的老朋友克烈队长,今天又来跟大家分享关于英雄联盟的趣闻攻略了。英雄联盟里有各种各样的英雄,有能一瞬间秒人的英雄,也有出肉装当坦克的英雄,还有持续性输出的英雄。今天克烈队长就跟大家一…...