当前位置: 首页 > news >正文

【Kafka-go】golang的kafka应用

网络上关于go的Kafka还是比较少的今天就先出一篇入门级别的,之后再看看能能出一个公司业务场景中的消息流。

一、下载github.com/segmentio/kafka-go包

go get github.com/segmentio/kafka-go

二、建立kafka连接

正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9092"    //host 具体看你们自己的配置如果是服务器上的 就是服务器iP:9092 本地就是localhost:9092
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaConn() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}

三、kafka之发送消息(生产者)

/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}

四、kafka之接收消息(消费者)

// readByConn 连接至kafka后接收消息
func readByConn() {// 指定要连接的topic和partition// 连接至Kafka的leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {p := People{}n, err := batch.Read(b)if err != nil {break}err = json.Unmarshal(b[:n], &p)if err != nil {fmt.Println(string(b))fmt.Println(err, "**************")continue}fmt.Println(p)}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}

完整代码

package mainimport ("context""encoding/json""fmt""github.com/segmentio/kafka-go""log""time"
)/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}func main() {writeByConn()readByConn()}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}// readByConn 连接至kafka后接收消息
func readByConn() {// 指定要连接的topic和partition// 连接至Kafka的leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {p := People{}n, err := batch.Read(b)if err != nil {break}err = json.Unmarshal(b[:n], &p)if err != nil {fmt.Println(string(b))fmt.Println(err, "**************")continue}fmt.Println(p)}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}

五、kafka之消费者组实现消息确认(从一次消费消息的末尾开始接收消息)

只需要给读取消息的方法改变一下就可以了


func readByConn() {r := kafka.NewReader(kafka.ReaderConfig{Brokers:  []string{host},GroupID:  "consumer-group-id",Topic:    topic,MaxBytes: 10e6, // 10MB})for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))}
}

完整代码

package mainimport ("context""encoding/json""fmt""github.com/segmentio/kafka-go""log""time"
)/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}func main() {writeByConn()readByConn()
}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}func readByConn() {r := kafka.NewReader(kafka.ReaderConfig{Brokers:  []string{host},GroupID:  "consumer-group-id",Topic:    topic,MaxBytes: 10e6, // 10MB})for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))}
}

相关文章:

【Kafka-go】golang的kafka应用

网络上关于go的Kafka还是比较少的今天就先出一篇入门级别的,之后再看看能能出一个公司业务场景中的消息流。 一、下载github.com/segmentio/kafka-go包 go get github.com/segmentio/kafka-go二、建立kafka连接 正常来说下面的配置host topic partition 应该写在…...

redis:set集合命令,内部编码,使用场景

个人主页 : 个人主页 个人专栏 : 《数据结构》 《C语言》《C》《Linux》《网络》 《redis学习笔记》 文章目录 前言命令SADDSMEMBERSSISMEMBERSCARDSPOPSMOVESREM集合间操作SINTERSINTERSTORESUNIONSUNIONSTORESDIFFSDIFFSTORE 内部编码使用场景总结 前言…...

45期代码随想录算法营总结

代码随想录训练营总结与收获 在为期60天的代码随想录训练营结束后,我感慨良多。这段时间不仅让我在编程技能上有了明显的提升,更让我在学习习惯和时间管理上有了深刻的反思和改变。 报名参加这个训练营对我来说是一个重要的监督机制。之前我总是拖延&a…...

深入理解Java中的instanceof关键字及接口新特性:方法实现的可能性

目录 引言 1. 什么是instanceof关键字? 1.1 语法结构 1.2 instanceof的用法示例 1.3 instanceof的应用场景 2. Java中的接口能包含方法实现吗? 2.1 默认方法(Default Method) 2.2 静态方法(Static Method&…...

【python中如果class没有self会怎行】

python中如果class没有self会怎样TOC 在Python中,self是一个约定俗成的名称,用于表示类的实例。如果没有使用self,会导致以下问题: 1、无法访问实例属性: 在类的方法中,如果没有self,方法将无…...

【算法】(Python)动态规划

动态规划: dynamic programming。"programming"指的是一种表格法,而非编写计算机程序。通常解决最优化问题(optimization problem)。将问题拆分成若干个子问题,求解各子问题来得到原问题的解。适用于多阶段…...

EasyExcel 学习之 导出 “提示问题”

EasyExcel 学习之 导出 “提示问题” 现象分析解决(伪代码)前端 POST 实现后端实现 现象 EasyExcel 支持导出 xlsx、xls、csv 三种文件格式。在导出过程中可能发生各种异常,当发生异常时应该提示错误信息而非导出一个错误的文件。 分析 首…...

应用系统开发(3)低功耗四运算放大器LM324N

LM324N 是一种广泛使用的 低功耗四运算放大器,由德州仪器(Texas Instruments)和其他制造商生产。它具有四个独立的运算放大器,能够在单电源或双电源模式下运行,适合多种模拟电路应用。以下是详细信息: 芯片基本信息 型号:LM324N封装类型:常见 DIP(双列直插封装)或 SO…...

基于微信小程序的电商平台+LW示例参考

1.项目介绍 系统角色:管理员、普通用户功能模块:管理员(用户管理、商品分类、商品管理、订单管理、系统管理等),普通用户(个人中心、收藏、我的订单、查看商品等)技术选型:SpringBo…...

[Android] Graphic Buffer 的申请

前言: MediaCodec 支持 texture mode,即MediaCodec解码video完毕后把 yuv 数据填入 GPU 共享出来的 graphic buffer 里面,app 会把 video 的 yuv数据 和 ui 的数据通过通过软件渲染组件(opengl等)发送给GPU 进行一并渲染。这样做的效率较低&…...

【大数据学习 | HBASE高级】storeFile文件的合并

Compaction 操作分成下面两种: Minor Compaction:是选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile,对于删除、过期、多余版本的数据不进行清除。 Major Compaction:是指将所有的StoreFile合并成一个StoreFile&am…...

多平台编包动态引入依赖的解决方案

最近开发时遇到了这样的需求,A 平台需要引入一个 video.js,B 平台却是不需要的,那么面向 B 平台打包的时候把依赖装进去自然就不大合适。最好的方法是动态引入依赖,根据平台来判断要不要引入 动态引入依赖 很快啊,动…...

[单例模式]

目录 [设计模式] 单例模式 1. 饿汉模式 2. 懒汉模式 3. 单例模式的线程安全问题 [设计模式] 设计模式是软件工程中的一种常见做法, 它可以理解为"模板", 是针对一些常见的特定场景, 给出的一些比较好的固定的解决方案. 不同语言适用的设计模式是不一样的. 这里…...

速盾:游戏盾的功能和原理详解

速盾有一款专注于网络游戏安全的防护系统,它通过实时监测游戏网络流量和玩家行为,以及使用先进的算法和技术进行分析和识别,检测出各种外挂、作弊行为和恶意攻击,从而保障游戏的公平性和玩家的安全性。 速盾游戏盾的主要功能包括…...

Spleeter:音频分离的革命性工具

目录 什么是Spleeter?Spleeter的工作原理Spleeter的应用场景Spleeter的技术优势Spleeter的挑战与局限性结论 什么是Spleeter? Spleeter 是一个由 Deezer 开发的开源音频源分离工具。它基于深度学习技术,尤其是卷积神经网络(CNN&a…...

【笔记】自动驾驶预测与决策规划_Part6_不确定性感知的决策过程

文章目录 0. 前言1. 部分观测的马尔可夫决策过程1.1 POMDP的思想以及与MDP的联系1.1.1 MDP的过程回顾1.1.2 POMDP定义1.1.3 与MDP的联系及区别POMDP 视角MDP 视角决策次数对最优解的影响 1.2 POMDP的3种常规解法1.2.1 连续状态的“Belief MDP”方法1. 信念状态的定义2. Belief …...

openresty入门教程:access_by_lua_block

在OpenResty中,access_by_lua_block 是一个功能强大的指令,它允许你在Nginx的访问控制阶段执行Lua脚本。这个阶段发生在Nginx处理请求的过程中,紧接在rewrite阶段之后,但在请求被传递到后端服务器(如PHP、Node.js等&am…...

Caused by: org.apache.flink.api.common.io.ParseException: Row too short:

Flink版本 1.17.2 错误描述 Caused by: org.apache.flink.api.common.io.ParseException: Row too short: 通过flink中的flinkSql直接使用对应的connector去获取csv文件内容,报获取的数据太短了 可能原因 1.创建的表字段多于csv文件当中的表头 定位 在获取csv…...

hbase的安装与简单操作

好的,这里是关于 HBase 的安装和基本操作的详细步骤,分成几个更清晰的阶段: 第一部分:安装和配置 HBase 1. 环境准备 HBase 依赖于 Hadoop,因此首先确保 Hadoop 已经正确安装和配置。如果没有安装,请先下…...

PySpark本地开发环境搭建

一.前置事项 请注意,需要先实现Windows的本地JDK和Hadoop的安装。 二.windows安装Anaconda 资源:Miniconda3-py38-4.11.0-Windows-x86-64,在window使用的Anaconda资源-CSDN文库 右键以管理员身份运行,选择你的安装路径&#x…...

【杂谈】-递归进化:人工智能的自我改进与监管挑战

递归进化:人工智能的自我改进与监管挑战 文章目录 递归进化:人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管?3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...

在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能

下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能,包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...

Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)

文章目录 1.什么是Redis?2.为什么要使用redis作为mysql的缓存?3.什么是缓存雪崩、缓存穿透、缓存击穿?3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...

23-Oracle 23 ai 区块链表(Blockchain Table)

小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...

Python实现prophet 理论及参数优化

文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候,写过一篇简单实现,后期随着对该模型的深入研究,本次记录涉及到prophet 的公式以及参数调优,从公式可以更直观…...

linux 错误码总结

1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...

多模态大语言模型arxiv论文略读(108)

CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题:CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者:Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

网络编程(UDP编程)

思维导图 UDP基础编程(单播) 1.流程图 服务器:短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...

Swagger和OpenApi的前世今生

Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...

零基础在实践中学习网络安全-皮卡丘靶场(第九期-Unsafe Fileupload模块)(yakit方式)

本期内容并不是很难,相信大家会学的很愉快,当然对于有后端基础的朋友来说,本期内容更加容易了解,当然没有基础的也别担心,本期内容会详细解释有关内容 本期用到的软件:yakit(因为经过之前好多期…...