当前位置: 首页 > news >正文

【Kafka-go】golang的kafka应用

网络上关于go的Kafka还是比较少的今天就先出一篇入门级别的,之后再看看能能出一个公司业务场景中的消息流。

一、下载github.com/segmentio/kafka-go包

go get github.com/segmentio/kafka-go

二、建立kafka连接

正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9092"    //host 具体看你们自己的配置如果是服务器上的 就是服务器iP:9092 本地就是localhost:9092
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaConn() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}

三、kafka之发送消息(生产者)

/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}

四、kafka之接收消息(消费者)

// readByConn 连接至kafka后接收消息
func readByConn() {// 指定要连接的topic和partition// 连接至Kafka的leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {p := People{}n, err := batch.Read(b)if err != nil {break}err = json.Unmarshal(b[:n], &p)if err != nil {fmt.Println(string(b))fmt.Println(err, "**************")continue}fmt.Println(p)}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}

完整代码

package mainimport ("context""encoding/json""fmt""github.com/segmentio/kafka-go""log""time"
)/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}func main() {writeByConn()readByConn()}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}// readByConn 连接至kafka后接收消息
func readByConn() {// 指定要连接的topic和partition// 连接至Kafka的leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {p := People{}n, err := batch.Read(b)if err != nil {break}err = json.Unmarshal(b[:n], &p)if err != nil {fmt.Println(string(b))fmt.Println(err, "**************")continue}fmt.Println(p)}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}

五、kafka之消费者组实现消息确认(从一次消费消息的末尾开始接收消息)

只需要给读取消息的方法改变一下就可以了


func readByConn() {r := kafka.NewReader(kafka.ReaderConfig{Brokers:  []string{host},GroupID:  "consumer-group-id",Topic:    topic,MaxBytes: 10e6, // 10MB})for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))}
}

完整代码

package mainimport ("context""encoding/json""fmt""github.com/segmentio/kafka-go""log""time"
)/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}func main() {writeByConn()readByConn()
}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}func readByConn() {r := kafka.NewReader(kafka.ReaderConfig{Brokers:  []string{host},GroupID:  "consumer-group-id",Topic:    topic,MaxBytes: 10e6, // 10MB})for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))}
}

相关文章:

【Kafka-go】golang的kafka应用

网络上关于go的Kafka还是比较少的今天就先出一篇入门级别的,之后再看看能能出一个公司业务场景中的消息流。 一、下载github.com/segmentio/kafka-go包 go get github.com/segmentio/kafka-go二、建立kafka连接 正常来说下面的配置host topic partition 应该写在…...

redis:set集合命令,内部编码,使用场景

个人主页 : 个人主页 个人专栏 : 《数据结构》 《C语言》《C》《Linux》《网络》 《redis学习笔记》 文章目录 前言命令SADDSMEMBERSSISMEMBERSCARDSPOPSMOVESREM集合间操作SINTERSINTERSTORESUNIONSUNIONSTORESDIFFSDIFFSTORE 内部编码使用场景总结 前言…...

45期代码随想录算法营总结

代码随想录训练营总结与收获 在为期60天的代码随想录训练营结束后,我感慨良多。这段时间不仅让我在编程技能上有了明显的提升,更让我在学习习惯和时间管理上有了深刻的反思和改变。 报名参加这个训练营对我来说是一个重要的监督机制。之前我总是拖延&a…...

深入理解Java中的instanceof关键字及接口新特性:方法实现的可能性

目录 引言 1. 什么是instanceof关键字? 1.1 语法结构 1.2 instanceof的用法示例 1.3 instanceof的应用场景 2. Java中的接口能包含方法实现吗? 2.1 默认方法(Default Method) 2.2 静态方法(Static Method&…...

【python中如果class没有self会怎行】

python中如果class没有self会怎样TOC 在Python中,self是一个约定俗成的名称,用于表示类的实例。如果没有使用self,会导致以下问题: 1、无法访问实例属性: 在类的方法中,如果没有self,方法将无…...

【算法】(Python)动态规划

动态规划: dynamic programming。"programming"指的是一种表格法,而非编写计算机程序。通常解决最优化问题(optimization problem)。将问题拆分成若干个子问题,求解各子问题来得到原问题的解。适用于多阶段…...

EasyExcel 学习之 导出 “提示问题”

EasyExcel 学习之 导出 “提示问题” 现象分析解决(伪代码)前端 POST 实现后端实现 现象 EasyExcel 支持导出 xlsx、xls、csv 三种文件格式。在导出过程中可能发生各种异常,当发生异常时应该提示错误信息而非导出一个错误的文件。 分析 首…...

应用系统开发(3)低功耗四运算放大器LM324N

LM324N 是一种广泛使用的 低功耗四运算放大器,由德州仪器(Texas Instruments)和其他制造商生产。它具有四个独立的运算放大器,能够在单电源或双电源模式下运行,适合多种模拟电路应用。以下是详细信息: 芯片基本信息 型号:LM324N封装类型:常见 DIP(双列直插封装)或 SO…...

基于微信小程序的电商平台+LW示例参考

1.项目介绍 系统角色:管理员、普通用户功能模块:管理员(用户管理、商品分类、商品管理、订单管理、系统管理等),普通用户(个人中心、收藏、我的订单、查看商品等)技术选型:SpringBo…...

[Android] Graphic Buffer 的申请

前言: MediaCodec 支持 texture mode,即MediaCodec解码video完毕后把 yuv 数据填入 GPU 共享出来的 graphic buffer 里面,app 会把 video 的 yuv数据 和 ui 的数据通过通过软件渲染组件(opengl等)发送给GPU 进行一并渲染。这样做的效率较低&…...

【大数据学习 | HBASE高级】storeFile文件的合并

Compaction 操作分成下面两种: Minor Compaction:是选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile,对于删除、过期、多余版本的数据不进行清除。 Major Compaction:是指将所有的StoreFile合并成一个StoreFile&am…...

多平台编包动态引入依赖的解决方案

最近开发时遇到了这样的需求,A 平台需要引入一个 video.js,B 平台却是不需要的,那么面向 B 平台打包的时候把依赖装进去自然就不大合适。最好的方法是动态引入依赖,根据平台来判断要不要引入 动态引入依赖 很快啊,动…...

[单例模式]

目录 [设计模式] 单例模式 1. 饿汉模式 2. 懒汉模式 3. 单例模式的线程安全问题 [设计模式] 设计模式是软件工程中的一种常见做法, 它可以理解为"模板", 是针对一些常见的特定场景, 给出的一些比较好的固定的解决方案. 不同语言适用的设计模式是不一样的. 这里…...

速盾:游戏盾的功能和原理详解

速盾有一款专注于网络游戏安全的防护系统,它通过实时监测游戏网络流量和玩家行为,以及使用先进的算法和技术进行分析和识别,检测出各种外挂、作弊行为和恶意攻击,从而保障游戏的公平性和玩家的安全性。 速盾游戏盾的主要功能包括…...

Spleeter:音频分离的革命性工具

目录 什么是Spleeter?Spleeter的工作原理Spleeter的应用场景Spleeter的技术优势Spleeter的挑战与局限性结论 什么是Spleeter? Spleeter 是一个由 Deezer 开发的开源音频源分离工具。它基于深度学习技术,尤其是卷积神经网络(CNN&a…...

【笔记】自动驾驶预测与决策规划_Part6_不确定性感知的决策过程

文章目录 0. 前言1. 部分观测的马尔可夫决策过程1.1 POMDP的思想以及与MDP的联系1.1.1 MDP的过程回顾1.1.2 POMDP定义1.1.3 与MDP的联系及区别POMDP 视角MDP 视角决策次数对最优解的影响 1.2 POMDP的3种常规解法1.2.1 连续状态的“Belief MDP”方法1. 信念状态的定义2. Belief …...

openresty入门教程:access_by_lua_block

在OpenResty中,access_by_lua_block 是一个功能强大的指令,它允许你在Nginx的访问控制阶段执行Lua脚本。这个阶段发生在Nginx处理请求的过程中,紧接在rewrite阶段之后,但在请求被传递到后端服务器(如PHP、Node.js等&am…...

Caused by: org.apache.flink.api.common.io.ParseException: Row too short:

Flink版本 1.17.2 错误描述 Caused by: org.apache.flink.api.common.io.ParseException: Row too short: 通过flink中的flinkSql直接使用对应的connector去获取csv文件内容,报获取的数据太短了 可能原因 1.创建的表字段多于csv文件当中的表头 定位 在获取csv…...

hbase的安装与简单操作

好的,这里是关于 HBase 的安装和基本操作的详细步骤,分成几个更清晰的阶段: 第一部分:安装和配置 HBase 1. 环境准备 HBase 依赖于 Hadoop,因此首先确保 Hadoop 已经正确安装和配置。如果没有安装,请先下…...

PySpark本地开发环境搭建

一.前置事项 请注意,需要先实现Windows的本地JDK和Hadoop的安装。 二.windows安装Anaconda 资源:Miniconda3-py38-4.11.0-Windows-x86-64,在window使用的Anaconda资源-CSDN文库 右键以管理员身份运行,选择你的安装路径&#x…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段,极易成为DDoS攻击的目标。一旦遭遇攻击,可能导致服务器瘫痪、玩家流失,甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案,帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

反向工程与模型迁移:打造未来商品详情API的可持续创新体系

在电商行业蓬勃发展的当下,商品详情API作为连接电商平台与开发者、商家及用户的关键纽带,其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息(如名称、价格、库存等)的获取与展示,已难以满足市场对个性化、智能…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

C++ 基础特性深度解析

目录 引言 一、命名空间(namespace) C 中的命名空间​ 与 C 语言的对比​ 二、缺省参数​ C 中的缺省参数​ 与 C 语言的对比​ 三、引用(reference)​ C 中的引用​ 与 C 语言的对比​ 四、inline(内联函数…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...

今日科技热点速览

🔥 今日科技热点速览 🎮 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售,主打更强图形性能与沉浸式体验,支持多模态交互,受到全球玩家热捧 。 🤖 人工智能持续突破 DeepSeek-R1&…...

蓝桥杯 冶炼金属

原题目链接 🔧 冶炼金属转换率推测题解 📜 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V,是一个正整数,表示每 V V V 个普通金属 O O O 可以冶炼出 …...

用机器学习破解新能源领域的“弃风”难题

音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...

嵌入式常见 CPU 架构

架构类型架构厂商芯片厂商典型芯片特点与应用场景PICRISC (8/16 位)MicrochipMicrochipPIC16F877A、PIC18F4550简化指令集,单周期执行;低功耗、CIP 独立外设;用于家电、小电机控制、安防面板等嵌入式场景8051CISC (8 位)Intel(原始…...

Windows电脑能装鸿蒙吗_Windows电脑体验鸿蒙电脑操作系统教程

鸿蒙电脑版操作系统来了,很多小伙伴想体验鸿蒙电脑版操作系统,可惜,鸿蒙系统并不支持你正在使用的传统的电脑来安装。不过可以通过可以使用华为官方提供的虚拟机,来体验大家心心念念的鸿蒙系统啦!注意:虚拟…...