使用golang实现日志收集系统的logagent
整体架构
参考 七米老师的日志收集项目

主要用go实现logagent的部分,logagent的作用主要是实时监控日志追加的变化,并将变化发送到kafka中。
之前我们已经实现了 用go连接kafka并向其中发送数据,也实现了使用tail库监控日志追加操作。
我们把这两部分结合起来实现监控日志追加并发送到kafka。
使用github.com/go-ini/ini配置参数
// 读取配置参数cfg, err:=ini.Load("config/config.ini")if err!=nil {logrus.Error((" load config error"))return}
[kafka]
address = 127.0.0.1:9092
chan_size = 1000[collect]
logfile_path= D:/learn/go/log-collector-lmh/log_agent/config_version/log_file/xx.log
配置参数主要包括,kafka的启动端口,存储的数据大小限制,日志文件的路径。
初始化kafka
kafka.go
package kafkaimport ("github.com/Shopify/sarama""github.com/sirupsen/logrus"
)var (Client sarama.SyncProducerMsgChan chan *sarama.ProducerMessage //占用的字节数少,传递的指针
)func InitKafka(kafkaAddr string, chanSize int64) (err error){config:=sarama.NewConfig()// 生产者配置config.Producer.RequiredAcks=sarama.WaitForAllconfig.Producer.Partitioner=sarama.NewRandomPartitionerconfig.Producer.Return.Successes=true// 连接kafkaClient,err=sarama.NewSyncProducer([]string{kafkaAddr}, config)if err!=nil {logrus.Error("producer closed", err)return}// 从管道中读取日志并发送到kafkaMsgChan = make(chan *sarama.ProducerMessage, chanSize)go sendMsg()return
}func sendMsg(){for {select {case msg := <- MsgChan:pid, offset, err := Client.SendMessage(msg)if err != nil {logrus.Warning("send msg failed, err:", err)return}logrus.Infof("send msg to kafka success. pid:%v offset:%v", pid, offset)}}
}
这里实现了连接kafka,并使用协程不断地读取MsgChan,读取到数据后向kafka发送,这里MsgChan通道的数据由tail监控到的日志变化写入。
main.go中调用
// 初始化kafkakafkaAddr:=cfg.Section("kafka").Key("address").String()chanSize:=cfg.Section("kafka").Key("chan_size").MustInt64(0)err=kafka.InitKafka(kafkaAddr, chanSize)if err!=nil {logrus.Error("kafka init failed")}logrus.Info("Kafka init success")
初始化tailf,并将日志数据写入ChanMsg
tailF.go
package tailF
import ("github.com/hpcloud/tail""fmt"
)
var (TailObj *tail.Tail
)
func InitTail(filename string) (err error) {config := tail.Config{ReOpen: true,Follow: true,Location: &tail.SeekInfo{Offset: 0, Whence: 2},MustExist: false,Poll: true,}// 打开文件开始读取数据TailObj, err = tail.TailFile(filename, config)if err != nil {fmt.Printf("create tail %s failed, err:%v\n", filename, err)return}return
}
main.go中对应
// 初始化tailffileName:=cfg.Section("collect").Key("logfile_path").String()err=tailF.InitTail(fileName)if err!=nil {logrus.Error(" tailf init failed")}logrus.Info("Init tail success")// 把读取的日志发往kafkaerr=run()if err!=nil {logrus.Error(" run error%s", err)return}logrus.Info("run success")
main.go中实现的run函数,读取tailF的数据,并写入ChanMsg
func run () (err error){for {line,ok:=<-tailF.TailObj.Linesif !ok {logrus.Warn("tail file %s close reopen\n", tailF.TailObj.Filename)// 读取出错等一秒time.Sleep(time.Second)continue}// 使用通道将传输日志改为异步// 读取的日志封装为ProducerMessagemsg:=&sarama.ProducerMessage{}msg.Topic="web_log"msg.Value=sarama.StringEncoder(line.Text)// 放到channel中kafka.MsgChan<-msg}
}
完整main.go
package mainimport ("config_version/kafka""config_version/tailF""time""github.com/Shopify/sarama""github.com/go-ini/ini""github.com/sirupsen/logrus"
)func main() {// 读取配置参数cfg, err:=ini.Load("config/config.ini")if err!=nil {logrus.Error((" load config error"))return}// 初始化kafkakafkaAddr:=cfg.Section("kafka").Key("address").String()chanSize:=cfg.Section("kafka").Key("chan_size").MustInt64(0)err=kafka.InitKafka(kafkaAddr, chanSize)if err!=nil {logrus.Error("kafka init failed")}logrus.Info("Kafka init success")// 初始化tailffileName:=cfg.Section("collect").Key("logfile_path").String()err=tailF.InitTail(fileName)if err!=nil {logrus.Error(" tailf init failed")}logrus.Info("Init tail success")// 把读取的日志发往kafkaerr=run()if err!=nil {logrus.Error(" run error%s", err)return}logrus.Info("run success")}func run () (err error){for {line,ok:=<-tailF.TailObj.Linesif !ok {logrus.Warn("tail file %s close reopen\n", tailF.TailObj.Filename)// 读取出错等一秒time.Sleep(time.Second)continue}// 使用通道将传输日志改为异步// 读取的日志封装为ProducerMessagemsg:=&sarama.ProducerMessage{}msg.Topic="web_log"msg.Value=sarama.StringEncoder(line.Text)// 放到channel中kafka.MsgChan<-msg}
}
至此, 我们实现了简化版的日志收集系统的logagent功能,目前日志的路径还需要手动写入配置文件中,修改的话还需重启项目,之后可以使用ETCD实现日志路径的自动配置。
相关文章:
使用golang实现日志收集系统的logagent
整体架构 参考 七米老师的日志收集项目 主要用go实现logagent的部分,logagent的作用主要是实时监控日志追加的变化,并将变化发送到kafka中。 之前我们已经实现了 用go连接kafka并向其中发送数据,也实现了使用tail库监控日志追加操作。 我们…...
小红书点赞不显示怎么回事?小红书笔记评论被吞怎么办
小红书作为一个互联网产品,是一个软件。既然是软件就会有一定的程序漏洞,这是无法避免的。但是很多时候其实并不一定是漏洞的问题。今天就来和大家谈谈小红书点赞不显示怎么回事,小红书评论被吞又是怎么一回事,这些难道都是程序性…...
地址变换和缺页置换习题
1.设某进程页面的访问序列为4,3,2,1,4,3,5,4,3,2,1,5,当分配给该进程的内存页框数分别为3和4时,对于先进先出,最近最少使用,最佳页面置换算法,分别发生多少次缺页中断? 答: 分配的…...
PAT 乙级 1010 一元多项式求导(解题思路+AC代码)
题目: 设计函数求一元多项式的导数。(注:xn(n为整数)的一阶导数为nxn−1。) 输入格式: 以指数递降方式输入多项式非零项系数和指数(绝对值均为不超过 1000 的整数)。数字间以空格分…...
一维河流污染持续排放模拟(水污染扩散)
一、处理河道转换为geojson数据 以淮河为例处理示例数据: {"type": "FeatureCollection","features": [{"geometry": {"coordinates": [[[115.5803,34.4982],[115.5922,34.498],[115.6061,34.4994],[115.6203,…...
数据优化 | CnOpenDataA股上市公司招聘数据
就业是经济的“晴雨表”,更是社会的“稳定器”。稳定和扩大就业一直是国家宏观调控的重要目标,2021年中央经济工作会议八次提到“就业”这一关键词。在新冠肺炎疫情蔓延、世界经济下行及人口老龄化加快等多重因素的叠加之下,稳就业保民生成为…...
nacos和eureka的区别
nacos和eureka的区别 Eureka是什么 Eureka详解Nacos是什么 Nacos详解Nacos和Eureka的区别 CAP理论连接方式服务异常剔除操作实例方式自我保护机制 Eureka是什么 Eureka 是Spring Cloud 微服务框架默认的也是推荐的服务注册中心,由Netflix公司与2012将其开源出来,Eureka基于RE…...
canvas.toDataURL生成图片报错的解决方案
问题原因: toDataURL方法存在跨域限制,如果执行时dom内含有跨域的图片则浏览器执行时会报错。 这个根据不同的系统有不同的表现,例如:生成完毕但控制台有warning类型的警告,或者直接异常报error。 解决思路ÿ…...
电容笔和Apple pencil的区别是什么?好用电容笔推荐
Apple Pencil与目前市场上常见的电容笔最大的不同之处在于,普通电容笔并不具备苹果Pencil特有的重力压感,而仅仅是一种倾斜的压感。不过,其在其它方面的表现也很出色,与Apple Pencil相似,而且价格仅为200元。现在&…...
关于onnx 转ncnn 的问题
文章目录修改模型Detect层设计转换后处理优质文章由于有些操作是没法支持的 如5维的操作: Unsupported slice axes ! Unsupported slice axes ! Unsupported slice axes ! Unsupported slice axes ! Unsupported slice axes ! Unsupported slice axes !参考&#…...
设计模式之《责任链模式》
------《责任链模式》责任链模式的概念为什么用责任链模式工作中用在哪里设计思路代码实现总结责任链模式的概念 责任链模式是一种行为型设计模式,它允许你将请求沿着处理链传递,直到有一个处理者能够处理该请求为止。 在责任链模式中,每个…...
Android Studio实现多功能日记本
项目目录一、项目概述二、系统特点三、开发环境四、详细设计1、E-R图2、数据库3、系统设置五、运行演示一、项目概述 本次实现了功能实用且齐全的日记本,界面友好,使用便捷,采用MVC架构设计。使用SQLite数据库存储数据,数据表有主…...
只依赖Tensorrt和opencv的yolov5源代码
simple_yolo.hpp #ifndef SIMPLE_YOLO_HPP #define SIMPLE_YOLO_HPP/*简单的yolo接口,容易集成但是高性能 */#include <vector> #include <memory> #include <string> #include <future> #include <opencv2/opencv.hpp>namespace Si…...
多路I/O转接 poll(了解)
poll() 的机制与 select() 类似,与 select() 在本质上没有多大差别,管理多个描述符也是进行轮询,根据描述符的状态进行处理,但是 poll() 没有最大文件描述符数量的限制(但是数量过大后性能也是会下降)。 p…...
听说你也在为配置tomcat server而烦恼,看我这一篇,让你醍醐灌顶!
一.通过maven创建项目 二.下载tomcat服务器 我们一般在tomcat官网中进行tomcat的下载 Apache Tomcat - Welcome! 三.添加配置:我们点击下图中的文件配置 四.测试配置的tomcat 我们在文件的body中输入 测试内容: 在控制台中显式tomcat运行的信息&#…...
【从零开始学Skynet】工具篇(二):虚拟机文件的复制粘贴
大家在Linux系统下开发的时候肯定会遇到虚拟机与主机间无法复制粘贴的问题,现在我们就来解决这样的问题,方便我们的开发。 1、打开设置 我们可以系统界面的菜单栏点击“控制”,然后打开“设置”; 也可以在VirtualBox界面打开“设…...
全球自动驾驶竞争力最新排行榜,4家中国企业上榜
发展至今,自动驾驶技术不仅是汽车行业的一个主战场,更是全球科技领域中备受关注和充满竞争的一个重要领域。近年来,各大汽车制造商和科技公司都在投入大量财力物力人力进行自动驾驶技术的研发,并进一步争夺市场份额。 当然&#…...
APP启动流程分析
1、要分析的问题 1、与正常trace比对,确认过耗时在哪个步骤(am create/pause/stop/start/doframe)? 2、与正常trace比对,确认过耗时在哪个cpu state(Running/Runnable/Sleep/Uninterruptible Sleep)? 2、启动分析 …...
IIR数字滤波器简介与实现
一、简介: IIR是一种数字滤波器,其输出是输入信号和过去输出的某些加权和。IIR滤波器由反馈和前馈组成,可以用于滤除或增强信号的特定频率成分。 IIR滤波器的输出表示为: y[n] b0 * x[n] b1 * x[n-1] b2 * x[n-2] … - a1 * …...
3.5 函数的极值与最大值和最小值
学习目标: 我要学习函数的极值、最大值和最小值,我会采取以下几个步骤: 理解基本概念:首先,我会理解函数的极值、最大值和最小值的概念。例如,我会学习函数在特定区间内的最高点和最低点,并且理…...
【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器
一.自适应梯度算法Adagrad概述 Adagrad(Adaptive Gradient Algorithm)是一种自适应学习率的优化算法,由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率,适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...
中南大学无人机智能体的全面评估!BEDI:用于评估无人机上具身智能体的综合性基准测试
作者:Mingning Guo, Mengwei Wu, Jiarun He, Shaoxian Li, Haifeng Li, Chao Tao单位:中南大学地球科学与信息物理学院论文标题:BEDI: A Comprehensive Benchmark for Evaluating Embodied Agents on UAVs论文链接:https://arxiv.…...
工程地质软件市场:发展现状、趋势与策略建议
一、引言 在工程建设领域,准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具,正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...
听写流程自动化实践,轻量级教育辅助
随着智能教育工具的发展,越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式,也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建,…...
保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek
文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama(有网络的电脑)2.2.3 安装Ollama(无网络的电脑)2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...
零知开源——STM32F103RBT6驱动 ICM20948 九轴传感器及 vofa + 上位机可视化教程
STM32F1 本教程使用零知标准板(STM32F103RBT6)通过I2C驱动ICM20948九轴传感器,实现姿态解算,并通过串口将数据实时发送至VOFA上位机进行3D可视化。代码基于开源库修改优化,适合嵌入式及物联网开发者。在基础驱动上新增…...
鸿蒙(HarmonyOS5)实现跳一跳小游戏
下面我将介绍如何使用鸿蒙的ArkUI框架,实现一个简单的跳一跳小游戏。 1. 项目结构 src/main/ets/ ├── MainAbility │ ├── pages │ │ ├── Index.ets // 主页面 │ │ └── GamePage.ets // 游戏页面 │ └── model │ …...
实战设计模式之模板方法模式
概述 模板方法模式定义了一个操作中的算法骨架,并将某些步骤延迟到子类中实现。模板方法使得子类可以在不改变算法结构的前提下,重新定义算法中的某些步骤。简单来说,就是在一个方法中定义了要执行的步骤顺序或算法框架,但允许子类…...
Java并发编程实战 Day 11:并发设计模式
【Java并发编程实战 Day 11】并发设计模式 开篇 这是"Java并发编程实战"系列的第11天,今天我们聚焦于并发设计模式。并发设计模式是解决多线程环境下常见问题的经典解决方案,它们不仅提供了优雅的设计思路,还能显著提升系统的性能…...
