当前位置: 首页 > news >正文

【Kafka-go】golang的kafka应用

网络上关于go的Kafka还是比较少的今天就先出一篇入门级别的,之后再看看能能出一个公司业务场景中的消息流。

一、下载github.com/segmentio/kafka-go包

go get github.com/segmentio/kafka-go

二、建立kafka连接

正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9092"    //host 具体看你们自己的配置如果是服务器上的 就是服务器iP:9092 本地就是localhost:9092
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaConn() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}

三、kafka之发送消息(生产者)

/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}

四、kafka之接收消息(消费者)

// readByConn 连接至kafka后接收消息
func readByConn() {// 指定要连接的topic和partition// 连接至Kafka的leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {p := People{}n, err := batch.Read(b)if err != nil {break}err = json.Unmarshal(b[:n], &p)if err != nil {fmt.Println(string(b))fmt.Println(err, "**************")continue}fmt.Println(p)}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}

完整代码

package mainimport ("context""encoding/json""fmt""github.com/segmentio/kafka-go""log""time"
)/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}func main() {writeByConn()readByConn()}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}// readByConn 连接至kafka后接收消息
func readByConn() {// 指定要连接的topic和partition// 连接至Kafka的leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {p := People{}n, err := batch.Read(b)if err != nil {break}err = json.Unmarshal(b[:n], &p)if err != nil {fmt.Println(string(b))fmt.Println(err, "**************")continue}fmt.Println(p)}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}

五、kafka之消费者组实现消息确认(从一次消费消息的末尾开始接收消息)

只需要给读取消息的方法改变一下就可以了


func readByConn() {r := kafka.NewReader(kafka.ReaderConfig{Brokers:  []string{host},GroupID:  "consumer-group-id",Topic:    topic,MaxBytes: 10e6, // 10MB})for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))}
}

完整代码

package mainimport ("context""encoding/json""fmt""github.com/segmentio/kafka-go""log""time"
)/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}func main() {writeByConn()readByConn()
}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}func readByConn() {r := kafka.NewReader(kafka.ReaderConfig{Brokers:  []string{host},GroupID:  "consumer-group-id",Topic:    topic,MaxBytes: 10e6, // 10MB})for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))}
}

相关文章:

【Kafka-go】golang的kafka应用

网络上关于go的Kafka还是比较少的今天就先出一篇入门级别的,之后再看看能能出一个公司业务场景中的消息流。 一、下载github.com/segmentio/kafka-go包 go get github.com/segmentio/kafka-go二、建立kafka连接 正常来说下面的配置host topic partition 应该写在…...

redis:set集合命令,内部编码,使用场景

个人主页 : 个人主页 个人专栏 : 《数据结构》 《C语言》《C》《Linux》《网络》 《redis学习笔记》 文章目录 前言命令SADDSMEMBERSSISMEMBERSCARDSPOPSMOVESREM集合间操作SINTERSINTERSTORESUNIONSUNIONSTORESDIFFSDIFFSTORE 内部编码使用场景总结 前言…...

45期代码随想录算法营总结

代码随想录训练营总结与收获 在为期60天的代码随想录训练营结束后,我感慨良多。这段时间不仅让我在编程技能上有了明显的提升,更让我在学习习惯和时间管理上有了深刻的反思和改变。 报名参加这个训练营对我来说是一个重要的监督机制。之前我总是拖延&a…...

深入理解Java中的instanceof关键字及接口新特性:方法实现的可能性

目录 引言 1. 什么是instanceof关键字? 1.1 语法结构 1.2 instanceof的用法示例 1.3 instanceof的应用场景 2. Java中的接口能包含方法实现吗? 2.1 默认方法(Default Method) 2.2 静态方法(Static Method&…...

【python中如果class没有self会怎行】

python中如果class没有self会怎样TOC 在Python中,self是一个约定俗成的名称,用于表示类的实例。如果没有使用self,会导致以下问题: 1、无法访问实例属性: 在类的方法中,如果没有self,方法将无…...

【算法】(Python)动态规划

动态规划: dynamic programming。"programming"指的是一种表格法,而非编写计算机程序。通常解决最优化问题(optimization problem)。将问题拆分成若干个子问题,求解各子问题来得到原问题的解。适用于多阶段…...

EasyExcel 学习之 导出 “提示问题”

EasyExcel 学习之 导出 “提示问题” 现象分析解决(伪代码)前端 POST 实现后端实现 现象 EasyExcel 支持导出 xlsx、xls、csv 三种文件格式。在导出过程中可能发生各种异常,当发生异常时应该提示错误信息而非导出一个错误的文件。 分析 首…...

应用系统开发(3)低功耗四运算放大器LM324N

LM324N 是一种广泛使用的 低功耗四运算放大器,由德州仪器(Texas Instruments)和其他制造商生产。它具有四个独立的运算放大器,能够在单电源或双电源模式下运行,适合多种模拟电路应用。以下是详细信息: 芯片基本信息 型号:LM324N封装类型:常见 DIP(双列直插封装)或 SO…...

基于微信小程序的电商平台+LW示例参考

1.项目介绍 系统角色:管理员、普通用户功能模块:管理员(用户管理、商品分类、商品管理、订单管理、系统管理等),普通用户(个人中心、收藏、我的订单、查看商品等)技术选型:SpringBo…...

[Android] Graphic Buffer 的申请

前言: MediaCodec 支持 texture mode,即MediaCodec解码video完毕后把 yuv 数据填入 GPU 共享出来的 graphic buffer 里面,app 会把 video 的 yuv数据 和 ui 的数据通过通过软件渲染组件(opengl等)发送给GPU 进行一并渲染。这样做的效率较低&…...

【大数据学习 | HBASE高级】storeFile文件的合并

Compaction 操作分成下面两种: Minor Compaction:是选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile,对于删除、过期、多余版本的数据不进行清除。 Major Compaction:是指将所有的StoreFile合并成一个StoreFile&am…...

多平台编包动态引入依赖的解决方案

最近开发时遇到了这样的需求,A 平台需要引入一个 video.js,B 平台却是不需要的,那么面向 B 平台打包的时候把依赖装进去自然就不大合适。最好的方法是动态引入依赖,根据平台来判断要不要引入 动态引入依赖 很快啊,动…...

[单例模式]

目录 [设计模式] 单例模式 1. 饿汉模式 2. 懒汉模式 3. 单例模式的线程安全问题 [设计模式] 设计模式是软件工程中的一种常见做法, 它可以理解为"模板", 是针对一些常见的特定场景, 给出的一些比较好的固定的解决方案. 不同语言适用的设计模式是不一样的. 这里…...

速盾:游戏盾的功能和原理详解

速盾有一款专注于网络游戏安全的防护系统,它通过实时监测游戏网络流量和玩家行为,以及使用先进的算法和技术进行分析和识别,检测出各种外挂、作弊行为和恶意攻击,从而保障游戏的公平性和玩家的安全性。 速盾游戏盾的主要功能包括…...

Spleeter:音频分离的革命性工具

目录 什么是Spleeter?Spleeter的工作原理Spleeter的应用场景Spleeter的技术优势Spleeter的挑战与局限性结论 什么是Spleeter? Spleeter 是一个由 Deezer 开发的开源音频源分离工具。它基于深度学习技术,尤其是卷积神经网络(CNN&a…...

【笔记】自动驾驶预测与决策规划_Part6_不确定性感知的决策过程

文章目录 0. 前言1. 部分观测的马尔可夫决策过程1.1 POMDP的思想以及与MDP的联系1.1.1 MDP的过程回顾1.1.2 POMDP定义1.1.3 与MDP的联系及区别POMDP 视角MDP 视角决策次数对最优解的影响 1.2 POMDP的3种常规解法1.2.1 连续状态的“Belief MDP”方法1. 信念状态的定义2. Belief …...

openresty入门教程:access_by_lua_block

在OpenResty中,access_by_lua_block 是一个功能强大的指令,它允许你在Nginx的访问控制阶段执行Lua脚本。这个阶段发生在Nginx处理请求的过程中,紧接在rewrite阶段之后,但在请求被传递到后端服务器(如PHP、Node.js等&am…...

Caused by: org.apache.flink.api.common.io.ParseException: Row too short:

Flink版本 1.17.2 错误描述 Caused by: org.apache.flink.api.common.io.ParseException: Row too short: 通过flink中的flinkSql直接使用对应的connector去获取csv文件内容,报获取的数据太短了 可能原因 1.创建的表字段多于csv文件当中的表头 定位 在获取csv…...

hbase的安装与简单操作

好的,这里是关于 HBase 的安装和基本操作的详细步骤,分成几个更清晰的阶段: 第一部分:安装和配置 HBase 1. 环境准备 HBase 依赖于 Hadoop,因此首先确保 Hadoop 已经正确安装和配置。如果没有安装,请先下…...

PySpark本地开发环境搭建

一.前置事项 请注意,需要先实现Windows的本地JDK和Hadoop的安装。 二.windows安装Anaconda 资源:Miniconda3-py38-4.11.0-Windows-x86-64,在window使用的Anaconda资源-CSDN文库 右键以管理员身份运行,选择你的安装路径&#x…...

MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例

一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...

Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)

文章目录 1.什么是Redis?2.为什么要使用redis作为mysql的缓存?3.什么是缓存雪崩、缓存穿透、缓存击穿?3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...

QMC5883L的驱动

简介 本篇文章的代码已经上传到了github上面,开源代码 作为一个电子罗盘模块,我们可以通过I2C从中获取偏航角yaw,相对于六轴陀螺仪的yaw,qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...

Go 语言接口详解

Go 语言接口详解 核心概念 接口定义 在 Go 语言中,接口是一种抽象类型,它定义了一组方法的集合: // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的: // 矩形结构体…...

Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式

点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...

微信小程序云开发平台MySQL的连接方式

注:微信小程序云开发平台指的是腾讯云开发 先给结论:微信小程序云开发平台的MySQL,无法通过获取数据库连接信息的方式进行连接,连接只能通过云开发的SDK连接,具体要参考官方文档: 为什么? 因为…...

并发编程 - go版

1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...

Web后端基础(基础知识)

BS架构:Browser/Server,浏览器/服务器架构模式。客户端只需要浏览器,应用程序的逻辑和数据都存储在服务端。 优点:维护方便缺点:体验一般 CS架构:Client/Server,客户端/服务器架构模式。需要单独…...

抽象类和接口(全)

一、抽象类 1.概念:如果⼀个类中没有包含⾜够的信息来描绘⼀个具体的对象,这样的类就是抽象类。 像是没有实际⼯作的⽅法,我们可以把它设计成⼀个抽象⽅法,包含抽象⽅法的类我们称为抽象类。 2.语法 在Java中,⼀个类如果被 abs…...