腾讯mini项目-【指标监控服务重构】2023-08-16
今日已办
v1
验证 StageHandler 在处理消息时是否为单例,【错误尝试】
type StageHandler struct {
}func (s StageHandler) Middleware1(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Logger.Info("StageHandler Middleware 1")fmt.Printf("%p\n", &s)return h(msg)}
}func (s StageHandler) Middleware2(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Logger.Info("StageHandler Middleware 2")fmt.Printf("%p\n", &s)return h(msg)}
}func (s StageHandler) Middleware3(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Logger.Info("StageHandler Middleware 3")fmt.Printf("%p\n", &s)return h(msg)}
}func (s StageHandler) Handler1(msg *message.Message) error {log.Logger.Info("StageHandler Handler 1")fmt.Printf("%p\n", &s)return nil
}
v2
- 定义不同 Handler
type CrashHandler struct {Topic string
}func (s CrashHandler) Handler1(msg *message.Message) error {log.Logger.Info(s.Topic + ": CrashHandler Handler 1 start")fmt.Printf("%p\n", &s)time.Sleep(1 * time.Second)log.Logger.Info(s.Topic + ": CrashHandler Handler 1 end")return nil
}type LagHandler struct {Topic string
}func (s LagHandler) Handler1(msg *message.Message) error {log.Logger.Info(s.Topic + ": LagHandler Handler 1 start")fmt.Printf("%p\n", &s)time.Sleep(1 * time.Second)log.Logger.Info(s.Topic + ": LagHandler Handler 1 end")return nil
}
- 添加到router中
for _, topic := range topics {var category stringvar handlerFunc message.NoPublishHandlerFuncif strings.Contains(topic, performance.CategoryCrash) {category = performance.CategoryCrashhandlerFunc = CrashHandler{Topic: category}.Handler1} else if strings.Contains(topic, performance.CategoryLag) {category = performance.CategoryLaghandlerFunc = LagHandler{Topic: category}.Handler1} else {continue}handler := router.AddNoPublisherHandler(topic+"test-handler", topic, subscriber, handlerFunc)}
- 结论
- handler 实例会不断创建
- 不同的 handler 可以并行处理不同主题的消息
- 相同的 handler 在处理该主题的消息时是顺序的
官方文档: Message Router (watermill.io)
订阅者可以一次消费一条消息,也可以并行消费多条消息
- Single stream of messages 是最简单的方法,这意味着在调用
msg.Ack()
之前,订阅者将不会收到任何新消息 - Multiple message streams 仅部分订阅者支持。通过一次订阅多个主题分区,可以并行消费多条消息,甚至是之前未确认的消息(例如,Kafka 订阅者就是这样工作的) Router 通过运行并发 HandlerFuncs(每个分区一个)来处理此模型
v3
存在并发安全问题
- 公用一个上下文
- 频繁的修改上下文中的字段值
- 不同Handler和MiddleWare存在并发
解决思路
- 将一次消息处理会使用到的数据集合定义为一个结构体
type ContextData struct {Status intEvent schema.EventAppID string // API 上报FetchScenario string // API 上报
}
- 使用message的Context来传递这个数据
- 移除掉 ProfileCtx 的相关设计
- 使用
watermillzap.Logger
来替换本身的LoggerAdapter
,更加直观且与原项目适配
完整代码
profile/internal/watermill/consumer/consumer_context.go
// Package consumer
// @Author xzx 2023/8/11 18:53:00
package consumerimport ("context"kc "github.com/Kevinello/kafka-client""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin""github.com/garsue/watermillzap""github.com/qiulin/watermill-kafkago/pkg/kafkago""go.uber.org/zap""profile/internal/config""profile/internal/connector""profile/internal/log""profile/internal/schema/performance""strings""time"
)// Consume
// @Description
// @Author xzx 2023-08-16 22:52:52
func Consume() {logger := watermillzap.NewLogger(log.Logger)publisher, subscriber := newPubSub(logger)router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {log.Logger.Fatal("creates a new Router with given configuration error", zap.Error(err))}router.AddPlugin(plugin.SignalsHandler)router.AddMiddleware(middleware.Retry{MaxRetries: 3,InitialInterval: time.Millisecond * 100,Logger: logger,}.Middleware,middleware.Recoverer,)getTopics := kc.GetTopicReMatch(strings.Split(config.Profile.GetString("kafka.topicRE"), ","))topics, err := getTopics(config.Profile.GetString("kafka.bootstrap"))if err != nil {log.Logger.Fatal("get topics failed", zap.Error(err))return}for _, topic := range topics {var category stringvar handlerFunc message.HandlerFuncif strings.Contains(topic, performance.CategoryCrash) {category = performance.CategoryCrashhandlerFunc = CrashWriteKafka} else if strings.Contains(topic, performance.CategoryLag) {category = performance.CategoryLaghandlerFunc = LagWriteKafka} else {continue}router.AddHandler(category, topic, subscriber, connector.GetTopic(category), publisher, handlerFunc).AddMiddleware(UnpackKafkaMessage,InitPerformanceEvent,AnalyzeEvent)}if err = router.Run(context.Background()); err != nil {log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))}
}// newPubSub
// @Description
// @Author xzx 2023-08-16 22:52:45
// @Param logger
// @Return message.Publisher
// @Return message.Subscriber
func newPubSub(logger watermill.LoggerAdapter) (message.Publisher, message.Subscriber) {marshaler := kafkago.DefaultMarshaler{}publisher := kafkago.NewPublisher(kafkago.PublisherConfig{Brokers: []string{config.Profile.GetString("kafka.bootstrap")},Async: false,Marshaler: marshaler,OTELEnabled: false,Ipv4Only: true,Timeout: 100 * time.Second,}, logger)subscriber, err := kafkago.NewSubscriber(kafkago.SubscriberConfig{Brokers: []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler: marshaler,ConsumerGroup: config.Profile.GetString("kafka.group"),OTELEnabled: false,}, logger)if err != nil {log.Logger.Fatal("Unable to create subscriber", zap.Error(err))}return publisher, subscriber
}
profile/internal/watermill/consumer/consumer_stage.go
// Package consumer
// @Author xzx 2023/8/12 10:01:00
package consumerimport ("context""encoding/json""github.com/ThreeDotsLabs/watermill/message""go.uber.org/zap""profile/internal/connector""profile/internal/log""profile/internal/schema""profile/internal/schema/performance""profile/internal/state"
)type ContextData struct {Status intEvent schema.EventAppID string // API 上报FetchScenario string // API 上报
}// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param h
// @Return message.HandlerFunc
func UnpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {var data ContextData// 反序列化,存入通用结构体if contextErr := json.Unmarshal(msg.Payload, &data.Event); contextErr != nil {data.Status = state.StatusUnmarshalErrorreturn nil, contextErr}log.Logger.Info("[1-UnpackKafkaItem] unpack kafka item success", zap.Any("event", data.Event))msg.SetContext(context.WithValue(msg.Context(), "data", data))return h(msg)}
}// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func InitPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)event, contextErr := performance.EventFactory(data.Event.Category, data.Event.Dimensions, data.Event.Values)if contextErr != nil {data.Status = state.StatusEventFactoryErrorreturn nil, contextErr}log.Logger.Info("[2-InitPerformanceEvent] Consume performance event success", zap.Any("event", data.Event))data.Event.ProfileData = eventmsg.SetContext(context.WithValue(msg.Context(), "data", data))return h(msg)}
}// AnalyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func AnalyzeEvent(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)contextErr := data.Event.ProfileData.Analyze()if contextErr != nil {data.Status = state.StatusAnalyzeErrorreturn nil, contextErr}log.Logger.Info("[3-AnalyzeEvent] analyze event success", zap.Any("event", data.Event))// clear dimensions and valuesdata.Event.Dimensions = nildata.Event.Values = nilmsg.SetContext(context.WithValue(msg.Context(), "data", data))return h(msg)}
}// CrashWriteKafka
// @Description
// @Author xzx 2023-08-12 15:09:15
// @Param msg
// @Return []*message.Message
// @Return error
func CrashWriteKafka(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)toWriteBytes, contextErr := json.Marshal(data.Event)if contextErr != nil {data.Status = state.StatusUnmarshalErrorreturn nil, contextErr}msg = message.NewMessage(data.Event.ID, toWriteBytes)log.Logger.Info("[4-CrashWriteKafka] write kafka success", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))return message.Messages{msg}, nil
}func LagWriteKafka(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)toWriteBytes, contextErr := json.Marshal(data.Event)if contextErr != nil {data.Status = state.StatusUnmarshalErrorreturn nil, contextErr}msg = message.NewMessage(data.Event.ID, toWriteBytes)log.Logger.Info("[4-LagWriteKafka] write kafka success", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))return message.Messages{msg}, nil
}
测试
上报PERF_LAG Event
可以并发处理 2 条消息,不必等待上一条消息处理完
多次测试发现是由于两条消息走了不同的 Handler
暂未修复,明明是同一主题的两条消息却都走了两条不同的链路,而且 publisher 最后写回的主题也是写到了不同的主题上,并且上报另一个类型的事件,即另一个主题的消息却无法触发消费者消费!
暂定先写死两个主题名称测试是否正常
明日待办
- 开会讨论项目规划和任务分工
- 继续完成需求
相关文章:
腾讯mini项目-【指标监控服务重构】2023-08-16
今日已办 v1 验证 StageHandler 在处理消息时是否为单例,【错误尝试】 type StageHandler struct { }func (s StageHandler) Middleware1(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Log…...
PTA:7-3 两个递增链表的差集
^两个递增链表的差集 题目输入样例输出样例 代码 题目 输入样例 5 1 3 5 7 9 3 2 3 5输出样例 3 1 7 9代码 #include <iostream> #include <list> #include <unordered_set> using namespace std; int main() {int n1, n2;cin >> n1;list<int&g…...
智能合约漏洞案例,DEI 漏洞复现
智能合约漏洞案例,DEI 漏洞复现 1. 漏洞简介 https://twitter.com/eugenioclrc/status/1654576296507088906 2. 相关地址或交易 https://explorer.phalcon.xyz/tx/arbitrum/0xb1141785b7b94eb37c39c37f0272744c6e79ca1517529fec3f4af59d4c3c37ef 攻击交易 3. …...
Attention is all you need 论文笔记
该论文引入Transformer,主要核心是自注意力机制,自注意力(Self-Attention)机制是一种可以考虑输入序列中所有位置信息的机制。 RNN介绍 引入RNN为了更好的处理序列信息,比如我 吃 苹果,前后的输入之间是有…...
Hdoop伪分布式集群搭建
文章目录 Hadoop安装部署前言1.环境2.步骤3.效果图 具体步骤(一)前期准备(1)ping外网(2)配置主机名(3)配置时钟同步(4)关闭防火墙 (二)…...
java临时文件
临时文件 有时候,我们程序运行时需要产生中间文件,但是这些文件只是临时用途,并不做长久保存。 我们可以使用临时文件,不需要长久保存。 public static File createTempFile(String prefix, String suffix)prefix 前缀 suffix …...
C++中的<string>头文件 和 <cstring>头文件简介
C中的<string>头文件 和 <cstring>头文件简介 在C中<string> 和 <cstring> 是两个不同的头文件。 <string> 是C标准库中的头文件,定义了一个名为std::string的类,提供了对字符串的操作如size()、length()、empty() 及字…...
安装MySQL
Centos7下安装MySQL详细步骤_centos7安装mysql教程_欢欢李的博客-CSDN博客...
输入学生成绩,函数返回最大元素的数组下标,求最高分学生成绩(输入负数表示输入结束)
scanfscore()函数用于输入学生的成绩 int scanfscore(int score[N])//输入学生的成绩 {int i -1;do {i;printf("输入学生成绩:");scanf("%d", &score[i]);} while (score[i] > 0);return i; } findmax()用于寻找最大值 int findmax(int score[N…...
常用音频接口:TDM,PDM,I2S,PCM
常用音频接口:TDM,PDM,I2S,PCM_tdm音频_沙漠的甲壳虫的博客-CSDN博客 I2S/PCM接口及音频codec_音频pcm接口模块设计-CSDN博客 2个TDM8功放调试ing_周龙(AI湖湘学派)的博客-CSDN博客 数字音频接口时序----IIS、TDM、PCM、PDM_td…...
git clone报错Failed to connect to github.com port 443 after 21055 ms:
git 设置代理端口号 git config --global http.proxy http://127.0.0.1:10085 和 git config --global https.proxy http://127.0.0.1:10085 然后就可以成功git clone hugging face的数据集了 如果是https://huggingface.co/datasets/shibing624/medical/tree/main 那么…...
【操作系统】深入浅出死锁问题
死锁的概念 在多线程编程中,我们为了防止多线程竞争共享资源而导致数据错乱,都会在操作共享资源而导致数据错乱,都会在操作共享资源之前加上互斥锁,只有成功获得到锁的线程,才能操作共享资源,获取不到锁的…...
springboot实现webSocket服务端和客户端demo
1:pom导入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId><version>2.2.7.RELEASE</version></dependency>2:myWebSocketClien…...
代码走读: FFMPEG-ffplayer02
AVFrame int attribute_align_arg avcodec_receive_frame(AVCodecContext *avctx, AVFrame *frame) 选取一个音频解码器 和 一个视频解码器分别介绍该解码器功能 音频G722 g722dec.c -> g722_decode_frame 通过 ff_get_buffer 给 传入的 frame 指针分配内存 g722_decode_…...
【数据结构】——排序算法的相关习题
目录 一、选择题题型一 (插入排序)1、直接插入排序2、折半插入排序3、希尔排序 题型二(交换排序)1、冒泡排序2、快速排序 题型三(选择排序)1、简单选择排序~2、堆排序 ~题型四(归并排序…...
C高级day5(Makefile)
一、Xmind整理: 二、上课笔记整理: 1.#----->把带参宏的参数替换成字符串 #include <stdio.h> #include <stdlib.h> #include <string.h> #define MAX(a,b) a>b?a:b #define STR(n) #n int main(int argc, const char *argv…...
Android 系统中适配OAID获取
一、OAID概念 OAID(Open Anonymous Identification)是一种匿名身份识别标识符, 用于在移动设备上进行广告追踪和个性化广告投放。它是由中国移动通信集 团、中国电信集团和中国联通集团共同推出的一项行业标准 OAID值为一个64位的数字 二、…...
差分数组leetcode 2770 数组的最大美丽值
什么是差分数组 差分数组是一种数据结构,它存储的是一个数组每个相邻元素的差值。换句话说,给定一个数组arr[],其对应的差分数组diff[]将满足: diff[i] arr[i1] - arr[i] 对于所有 0 < i < n-1 差分数组的作用 用于高效…...
请求响应状态码
请求与响应&状态码 Requests部分 请求行、消息报头、请求正文。 Header解释示例Accept指定客户端能够接收的内容类型Accept: text/plain, text/htmlAccept-Chars et浏览器可以接受的字符编码集。Accept-Charset: iso-8859-5Accept-Encodi ng指定浏览器可以支持的web服务…...
安卓机型系统美化 Color.xml文件必备常识 自定义颜色资源
color.xml文件是Android工程中用来进行颜色资源管理的文件.可以在color.xml文件中通过<color>标签来定义颜色资源.我们在布局文件中、代码中、style定义中或者其他资源文件中,都可以引用之前在color.xml文件中定义的颜色资源。 将color.xml文件拷到res/value…...
YOLO物体检测-系列教程1:YOLOV1整体解读(预选框/置信度/分类任/回归任务/损失函数/公式解析/置信度/非极大值抑制)
🎈🎈🎈YOLO 系列教程 总目录 YOLOV1整体解读 YOLOV2整体解读 YOLOV1提出论文:You Only Look Once: Unified, Real-Time Object Detection 1、物体检测经典方法 two-stage(两阶段):Faster-rc…...
2023/9/12 -- C++/QT
作业 实现一个图形类(Shape),包含受保护成员属性:周长、面积, 公共成员函数:特殊成员函数书写 定义一个圆形类(Circle),继承自图形类,包含私有属性…...
【Purple Pi OH RK3566鸿蒙开发板】OpenHarmony音频播放应用,真实体验感爆棚!
本文转载于Purple Pi OH开发爱好者,作者ITMING 。 原文链接:https://bbs.elecfans.com/jishu_2376383_1_1.html 01注意事项 DevEco Studio 4.0 Beta2(Build Version: 4.0.0.400) OpenHarmony SDK API 9 创建工程类型选择Appli…...
Android rom开发:9.0系统上实现4G wifi 以太网共存
framework层修改网络优先级,4G > wifi > eth 修改patch如下: diff --git a/frameworks/base/services/core/java/com/android/server/connectivity/NetworkAgentInfo.java b/frameworks/base/services/core/java/com/android/server/connectivit…...
高速自动驾驶HMI人机交互
概述 目的 本文档的目的是描述高速自动驾驶功能涉及的HMI显示需求技术规范和设计说明。 范围 术语及缩写 设计与实验标准 设计标准 设计标准-非法规类设计标准-法规类 HMI交互需求 CL4功能界面 HMI显示器[伊1] 中应包含CL4功能设置界面,提供给用户进行设置操作或显…...
【自然语言处理】关系抽取 —— SOLS 讲解
SOLS 论文信息 标题:Speaker-Oriented Latent Structures for Dialogue-Based Relation Extraction 作者:Guoshun Nan, Guoqing Luo, Sicong Leng, Yao Xiao, Wei Lu 发布时间与更新时间:2021.09.11 主题:自然语言处理、关系抽取、对话场景、跨语句、DialogRE、GCN arXiv:…...
周易算卦流程c++实现
代码 #include<iostream> using namespace std; #include<vector> #include<cstdlib> #include<ctime> #include<Windows.h>int huaYiXiangLiang(int all, int& left) {Sleep(3000);srand(time(0));left rand() % all 1;while (true) {if…...
软件架构设计(十三) 构件与中间件技术
中间件的定义 其实中间件是属于构件的一种。是一种独立的系统软件或服务程序,可以帮助分布式应用软件在不同技术之间共享资源。 我们把它定性为一类系统软件,比如我们常说的消息中间件,数据库中间件等等都是中间件的一种体现。一般情况都是给应用系统提供服务,而不是直接…...
PyTorch深度学习实战——基于ResNet模型实现猫狗分类
PyTorch深度学习实战——基于ResNet模型实现猫狗分类 0. 前言1. ResNet 架构2. 基于预训练 ResNet 模型实现猫狗分类相关链接 0. 前言 从 VGG11 到 VGG19,不同之处仅在于网络层数,一般来说,神经网络越深,它的准确率就越高。但并非…...
机器学习第六课--朴素贝叶斯
朴素贝叶斯广泛地应用在文本分类任务中,其中最为经典的场景为垃圾文本分类(如垃圾邮件分类:给定一个邮件,把它自动分类为垃圾或者正常邮件)。这个任务本身是属于文本分析任务,因为对应的数据均为文本类型,所以对于此类任务我们首先…...
简约网站模板/app怎么开发出来的
“便笺”是Win10里内置的一项小功能,不过很多人对它并不了解。其实Win10的“便笺”(Sticky Notes),无论在颜值还是功能性方面,都可以堪称同类软件中的佼佼者。尤其是1809版之后的便笺3.0,更是融入了很多新功能,妥妥的记…...
做外国订单有什么网站/百度识图网页版在线
集线器-------集线器也叫Hub,工作在物理层(最底层),没有相匹配的软件系统,是纯硬件设备。集线器主要用来连接计算机等网络终端。集线器为共享式带宽,连接在集线器上的任何一个设备发送数据时,其…...
drupal 网站实例/上海百度竞价托管
卡布列克数(Kaprekar number)是具有以下性质的数:对于某个正整数X {\displaystyle X}在n进位下存在正整数 A, B 及 m,且0 < B < b n {\displaystyle 0X 2 A n m B {\displaystyle X^{2}An^{m}B}X A B {\displaystyle XAB}简单的说,…...
做网站的主要作用/网站建设的基本流程
皮尔逊Pearson 相关系数:使用前提:大小一致、连续、服从正态分布的数据集;斯皮尔曼spearman等级相关系数:皮尔逊Pearson 相关系数使用前提任何一个条件不满足时可以考虑使用该系数;肯德尔等级kendallta相关系数&#x…...
400元做网站送网推/怎么做推广赚钱
http://httpsegmenter.googlecode.com/svn/...
wordpress添加用户/宣传推广
题意大概是这样,给你一个字符串,你可以进行的操作是这样的, 每次拿走这个串的第一个字母,或者最后一个字母,然后放到 一个新串的末尾(当然啦,新串一开始是为空的),当把旧…...