当前位置: 首页 > news >正文

go的singleFlight学习

Package singleflight provides a duplicate function call suppression mechanism
“golang.org/x/sync/singleflight”

原来底层是 waitGroup,我还以为等待的协程主动让出 cpu 了,没想到 waitGroup.Wait() 阻塞了
doCall 不但返回值是 func 的 val 和 error,而且 doCall 内部也给 chan 写入了一遍
这样外部的同步 Do 和异步 DoChan 都能复用了

当 Do->doCall 执行 fn 发生 panic 时:
对首发请求,直接在 defer 中把 fn 中捕获的 panic 进行回放
对非首发请求,c.wg.Wait() 结束之后,对 c.err 进行断言,判断是否是一个 panic 错误,如是则回放
这样就保证了首发请求和非首发请求都发生了 panic

一个协程对waitGroup进行Add(1)操作后,多个协程都可以监听它的读

package singleflightimport ("bytes""errors""fmt""runtime""runtime/debug""sync"
)// errGoexit indicates the runtime.Goexit was called in
// the user given function.
// 用户给定的函数中,调用了 runtime.Goexit
var errGoexit = errors.New("runtime.Goexit was called")// A panicError is an arbitrary value recovered from a panic
// with the stack trace during the execution of given function.
// 执行给定函数期间,panicError 是一个从 panic 中收到的任意值
// 带有栈追踪
type panicError struct {// value 中存储 errorvalue interface{}stack []byte
}// Error implements error interface.
func (p *panicError) Error() string {return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}func (p *panicError) Unwrap() error {err, ok := p.value.(error)if !ok {return nil}return err
}func newPanicError(v interface{}) error {stack := debug.Stack()// The first line of the stack trace is of the form "goroutine N [status]:"// but by the time the panic reaches Do the goroutine may no longer exist// and its status will have changed. Trim out the misleading line.// 去掉误导性的信息// 栈帧第一行,是"goroutine N [status]:"的信息if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {stack = stack[line+1:]}return &panicError{value: v, stack: stack}
}// call is an in-flight or completed singleflight.Do call
type call struct {wg sync.WaitGroup// These fields are written once before the WaitGroup is done// and are only read after the WaitGroup is done.// WaitGroup Done 之前,这两个字段只会被写入一次// WaitGroup Done 之后,才能读取val interface{}err error// These fields are read and written with the singleflight// mutex held before the WaitGroup is done, and are read but// not written after the WaitGroup is done.// WaitGroup Done 之前,singleflight mutex 持有它的期间,这些字段被读取和写入// WaitGroup Done 之后,仅用于读取,不再被写入dups  intchans []chan<- Result
}// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
// Group 代表一个 work 类,形成一个 namespace
// 在该命名空间中(in which),可以通过重复抑制(duplicate suppression)来执行工作单元
type Group struct {mu sync.Mutex       // protects mm  map[string]*call // lazily initialized
}// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {Val    interface{}Err    errorShared bool
}// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
// 
// duplicate caller 会等待在 singleFlight 上,等待最开始的任务执行结束
// 返回的值 shared ,指示是否要将结果共享给其他 caller
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {g.mu.Lock()if g.m == nil {g.m = make(map[string]*call)}if c, ok := g.m[key]; ok {c.dups++g.mu.Unlock()// 等待 waitGroup Donec.wg.Wait()// 此时一定是 waitGroup Done 了// 发生了 panic ,不能吞掉panic错误// 发生了 error if e, ok := c.err.(*panicError); ok {panic(e)} else if c.err == errGoexit {// 优雅地退出 goroutine,防止对上游协程产生干扰runtime.Goexit()}// 返回最终结果return c.val, c.err, true}// 第一次进来的时候,执行这里c := new(call)// waitGroup 计数从 0 -> 1c.wg.Add(1)g.m[key] = cg.mu.Unlock()g.doCall(c, key, fn)return c.val, c.err, c.dups > 0
}// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {normalReturn := falserecovered := false// use double-defer to distinguish panic from runtime.Goexit,// more details see https://golang.org/cl/134395// double-defer 以区分panic 和runtime.Goexitdefer func() {// the given function invoked runtime.Goexit// 把当前的堆栈给记录下来// normalReturn=true,正常结束// normalReturn=false && recovered == true,panic,需要外部还原panic的堆栈// normalReturn=false && recovered == false,go协程主动退出,需要制造一个errif !normalReturn && !recovered {c.err = errGoexit}g.mu.Lock()defer g.mu.Unlock()c.wg.Done()if g.m[key] == c {delete(g.m, key)}if e, ok := c.err.(*panicError); ok {// In order to prevent the waiting channels from being blocked forever,// needs to ensure that this panic cannot be recovered.if len(c.chans) > 0 {// goroutine的崩溃不会影响主goroutine或其他goroutine。go panic(e)// 能让panic爆出来select {} // Keep this goroutine around so that it will appear in the crash dump.} else {panic(e)}} else if c.err == errGoexit {// Already in the process of goexit, no need to call again} else {// Normal returnfor _, ch := range c.chans {ch <- Result{c.val, c.err, c.dups > 0}}}}()func() {defer func() {if !normalReturn {// Ideally, we would wait to take a stack trace until we've determined// whether this is a panic or a runtime.Goexit.// 理想情况下,会等到确定是否是 panic/runtime.Goexit 后才进行堆栈跟踪//// Unfortunately, the only way we can distinguish the two is to see// whether the recover stopped the goroutine from terminating, and by// the time we know that, the part of the stack trace relevant to the// panic has been discarded.// 不幸的是,我们区分两者的唯一方法是查看 recover 是否阻止了 goroutine 终止// 而当我们知道这一点时,与 panic 相关的堆栈跟踪部分已被丢弃。// 把 recover 拦住之后,返回一个 error ,然后在外部再进行放大,杀人于无形,让外部不知道singleFlightif r := recover(); r != nil {c.err = newPanicError(r)}}}()c.val, c.err = fn()normalReturn = true}()if !normalReturn {recovered = true}
}// Forget tells the singleflight to forget about a key.  Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
// 如果不想等之前的 singleflight 返回,则在 map[string]*call 中删除之前的 key 
func (g *Group) Forget(key string) {g.mu.Lock()delete(g.m, key)g.mu.Unlock()
}

时序分析
首发请求,先在Do中制造call,然后 c.wg.Add(1),然后将其放到map中

c.wg.Add(1)
g.m[key] = c

首发结束时,在doCall的defer中,先 c.wg.Done(),然后将任务从map中移除:delete(g.m, key)

c.wg.Done()
if g.m[key] == c {delete(g.m, key)
}

首发请求和首发结束都在锁的操作下执行。
所以抢到锁的时候,要么是首发请求执行请求的开始,要么是首发请求执行请求的结束

附录

一石激起千层浪

sync.WaitGroup 反向运用

func TestDemo333(t *testing.T) {var wg sync.WaitGroupwg.Add(1)for i := 1; i <= 3; i++ {go func(taskID int) {fmt.Println(i, "before wait")wg.Wait()fmt.Println(i, "wait finish")}(i)}time.Sleep(4 * time.Second)fmt.Println("main before send done")wg.Done() // 在协程结束时,调用Done方法fmt.Println("main after send done")select {}
}

debug.Stack()

属于 runtime/debug 包
用于获取当前程序的堆栈跟踪(stack trace),通常用于调试和错误处理

当调用 stack := debug.Stack() 时,实际上是在获取当前程序的堆栈信息,并将其存储在一个字符串类型的变量 stack 中。
这个字符串包含了程序在调用 debug.Stack() 时的调用栈信息,包括函数名、文件名和行号等。

package mainimport ("fmt""runtime/debug"
)func main() {stack := debug.Stack()fmt.Println(string(stack))
}func functionA() {functionB()
}func functionB() {stack := debug.Stack()fmt.Println(string(stack))
}

示例中定义两个函数 functionA 和 functionB。
在 functionB 中,调用了 debug.Stack() 并打印了堆栈信息。
当运行这个程序时,你会看到类似以下的输出:

goroutine 1 [running]:
main.functionB()/path/to/your/project/main.go:14 +0x8e
main.functionA()/path/to/your/project/main.go:7 +0x56
main.main()/path/to/your/project/main.go:22 +0x4a
runtime.main()/usr/local/go/src/runtime/proc.go:225 +0x235
runtime.goexit()/usr/local/go/src/runtime/asm_amd64.s:1571 +0x1

这个输出显示了程序在调用 debug.Stack() 时的堆栈跟踪信息。
这对于调试程序和查找错误非常有用。

runtime.Goexit()

属于 runtime 包,用于退出当前的 goroutine。
当调用 runtime.Goexit() 时,当前正在执行的 goroutine 会立即终止,但不会对其他 goroutine 产生影响。

runtime.Goexit() 的使用场景通常包括:
(1) 优雅地退出 goroutine:
在某些情况下,可能需要在满足特定条件时退出 goroutine,而不是等待它自然完成。
使用 runtime.Goexit() 可以实现这一点。
(2) 避免 panic 引起的异常退出:
如果 goroutine 中发生了 panic,它会向上传播并可能影响到其他 goroutine。
在这种情况下,使用 runtime.Goexit() 可以优雅地退出当前 goroutine
避免 panic 对其他 goroutine 的影响
(3) 控制 goroutine 的生命周期:
在某些复杂的并发场景中,可能需要手动控制 goroutine 的生命周期。
通过在适当的时候调用 runtime.Goexit(),可以实现这一点。

package mainimport ("fmt""runtime""time"
)var someCondition = truefunc main() {go func() {for {fmt.Println("Running...")time.Sleep(1 * time.Second)if someCondition {fmt.Println("Exiting...")runtime.Goexit()}}}()time.Sleep(5 * time.Second)fmt.Println("Main function finished.")
}

在这个示例中,启动了一个 goroutine,它会每隔一秒钟打印 “Running…”。
当 someCondition 为 true 时,goroutine 会打印 “Exiting…” 并调用 runtime.Goexit() 退出。
主函数在等待 5 秒钟后结束。

过度使用 runtime.Goexit() 可能会导致代码难以理解和维护。
在大多数情况下,使用 channel 和其他同步机制来控制 goroutine 的生命周期是更好的选择。

DoChan的学习

// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {ch := make(chan Result, 1)g.mu.Lock()if g.m == nil {g.m = make(map[string]*call)}if c, ok := g.m[key]; ok {c.dups++c.chans = append(c.chans, ch)g.mu.Unlock()return ch}c := &call{chans: []chan<- Result{ch}}c.wg.Add(1)g.m[key] = cg.mu.Unlock()go g.doCall(c, key, fn)return ch
}

相关文章:

go的singleFlight学习

Package singleflight provides a duplicate function call suppression mechanism “golang.org/x/sync/singleflight” 原来底层是 waitGroup&#xff0c;我还以为等待的协程主动让出 cpu 了&#xff0c;没想到 waitGroup.Wait() 阻塞了 doCall 不但返回值是 func 的 val 和…...

高电压技术-冲击高压发生器MATLAB仿真

微❤关注“电气仔推送”获得资料&#xff08;专享优惠&#xff09; 冲击电压发生器是产生冲击电压波的装置&#xff0c;用于检验电力设备耐受大气过电压和操作过电压的绝缘性能&#xff0c;冲击电压发生器能产生标准雷电冲击电压波形&#xff0c;雷电冲击电压截波,标准操作冲击…...

【STM32】SysTick系统滴答定时器

1.SysTick简介 CM4内核的处理和CM3一样&#xff0c;内部都包含了一个SysTick定时器&#xff0c;SysTick 是一个24 位的倒计数定时器&#xff0c;当计到0 时 &#xff0c;将 从RELOAD 寄存器中自动重装载定时初值。只要不把它在SysTick 控制及状态寄存器中的使能位清除&#xf…...

编码遵循五大设计原则创建出更加健壮、可维护和可扩展的软件系统

一、单一职责原则&#xff08;SRP&#xff09; * 定义&#xff1a;一个类应该只有一个引起它变化的原因。 * 解释&#xff1a;意味着一个类应该专注于做一件事情&#xff0c;当需求发生变化时&#xff0c;只影响到一个类。这有助于降低类间的耦合&#xff0c;使得代码更易于理…...

记录一个问题

问题描述 如果一个物料既在A总成零件号下计算为托盘库&#xff0c;在B总成零件号下计算为箱库&#xff0c;则放于箱库。 A中选择排名第21的递补进托盘库。&#xff08;也需要判断递补的是否在其他总成零件中为箱库&#xff0c;是的话继续递补判断&#xff09; 解决思路 为了…...

ONLYOFFICE 8.1版本桌面编辑器测评:重塑办公效率的巅峰之作

在数字化办公日益普及的今天&#xff0c;一款高效、便捷且功能强大的桌面编辑器成为了职场人士不可或缺的工具。ONLYOFFICE 8.1版本桌面编辑器凭借其卓越的性能和丰富的功能&#xff0c;成功吸引了众多用户的目光。今天&#xff0c;我们将对ONLYOFFICE 8.1版本桌面编辑器进行全…...

【shell脚本速成】python安装脚本

文章目录 案例需求应用场景解决问题脚本思路案例代码 &#x1f308;你好呀&#xff01;我是 山顶风景独好 &#x1f388;欢迎踏入我的博客世界&#xff0c;能与您在此邂逅&#xff0c;真是缘分使然&#xff01;&#x1f60a; &#x1f338;愿您在此停留的每一刻&#xff0c;都沐…...

Redis报错:MISCONF Redis is configured to save RDB snapshots

错误提示内容&#xff1a; 2024-06-25 16:30:49 : Connection: Redis_Server > [runCommand] PING 2024-06-25 16:30:49 : Connection: Redis_Server > Response received : -MISCONF Redis is configured to save RDB snapshots, but it is currently not able to pers…...

关于使用绿联 USB-A转RJ45 2.5G网卡提速的解决问题

问题 网络下载速率低 网线是七类网线&#xff0c;外接的USB网卡驱动 我的自带网卡是 I219v 在嵌入了2.5G网络后一直无法到达1.5G以上。 平均测速300~500M 解决方案 更新了USB的网卡驱动 禁用了 I219-V的驱动。测速即可 USB驱动下载地址 https://download.csdn.net/downlo…...

Qt: QPushButton 按钮实现 上图标下文字

效果如下&#xff1a; 实现有如下几种方式&#xff1a; 1. 使用 QPushButton 设置 setStyleSheet 例&#xff1a; ui->recorder->setStyleSheet("QPushButton{"\"border: 1px solid #00d2ff; "\"min-height: 60px; "\"col…...

使用阿里云效API操作流水线

使用阿里云效&#xff08;Alibaba Cloud DevOps&#xff09;API操作流水线时&#xff0c;需要注意以下几个方面&#xff1a; 认证与授权 确保你已经获取了正确的访问凭证&#xff08;AccessKey ID 和 AccessKey Secret&#xff09;&#xff0c;并且这些凭证具有足够的权限来执行…...

使用命令行创建uniapp+TS项目,使用vscode编辑器

一:如果没有pnpm,先安装pnpm 二:使用npx工具和degit工具从 GitHub 上的 dcloudio/uni-preset-vue 仓库克隆一个名为 vite-ts 的分支,到项目中. 执行完上面命令后,去manifest.json添加appid(自己微信小程序的Id),也可不执行直接下一步,执行pnpm install ,再执行pnpm:dev:mp-weix…...

ABC355 Bingo2

分析&#xff1a; 找出其中一行或列或任意对角线被全部标记&#xff0c;即可输出回合数&#xff0c;否则输出-1 如果x%n0&#xff0c;行是x/n&#xff0c;列是n 如果x%n&#xff01;0&#xff0c;行是x/n1&#xff0c;列是x%n 如果行列或行列n1即为对角线。 标记行列对角线…...

Spring+Vue项目部署

目录 一、需要的资源 二、步骤 1.首先要拥有一个服务器 2.项目准备 vue&#xff1a; 打包: 3.服务器装环境 文件上传 设置application.yml覆盖 添加启动和停止脚本 ​编辑 安装jdk1.8 安装nginx 安装mysql 报错&#xff1a;「ERR」1273-Unknown collation: utf8m…...

【uml期末复习】统一建模语言大纲

前言&#xff1a; 关于uml的期末复习的常考知识点&#xff0c;可能对你们有帮助&#x1f609; 目录 第一部分 概念与基础 第一章 面向对象技术 第二章 统一软件过程 第三章 UML概述 第四章 用例图 第五章 类图 第六章 对象图 第七章 顺序图 第八章 协作图 第九章 状态…...

Linux高级IO

高级IO 1.五种IO模型1.1 阻塞IO1.2 非阻塞IO1.3 信号驱动IO1.4 多路复用/多路转接IO1.5 异步IO1.6 小结 2.高级IO重要概念3.非阻塞IO3.1 实现函数NoBlock3.2 轮询方式读取标准输入 4.I/O多路转接之select4.1 理解select执行过程4.2 select的特点4.3 select缺点4.4 实现 5.I/O多…...

go-admin-ui开源后台管理系统华为云部署

1.华为云开通8000与9527端口 2.编译 编译成功 3.发布到远程服务器 4.登陆华为云终端 5.安装Nginx 6.查看服务启动状态 7.添加网站 添加与修改配置www-data 改为 www 自定义日志输出格式 添加网站配置文件go_admin_ui.conf 添加如下内容: location 下的root指向网站文件夹 修…...

点云入门知识

点云的处理任务 场景语义分割 物体的三维表达方法&#xff08;3D representations&#xff09;&#xff1a; 点云&#xff1a;是由物体表面上许多点数据来表征这个物体。最接近原始传感器数据&#xff0c;且具有丰富的几何信息。 Mesh&#xff1a;用三角形面片和正方形面片拼…...

HTML静态网页成品作业(HTML+CSS+JS)——家乡莆田介绍网页(5个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;使用Javacsript代码实现图片轮播&#xff0c;共有5个页面。 二、作品…...

#### grpc比http性能高的原因 ####

grpc比http性能高的原因 二进制消息格式&#xff1a;gRPC使用Protobuf&#xff08;一种有效的二进制消息格式&#xff09;进行序列化&#xff0c;这种格式在服务器和客户端上的序列化速度非常快&#xff0c;且序列化后的消息体积小&#xff0c;适合带宽有限的场景。 HTTP/2协…...

微软Edge浏览器搜索引擎切换全攻略

微软Edge浏览器作为Windows 10的默认浏览器&#xff0c;提供了丰富的功能和良好的用户体验。其中&#xff0c;搜索引擎的切换功能允许用户根据个人喜好和需求&#xff0c;快速更换搜索引擎&#xff0c;从而获得更加个性化的搜索服务。本文将详细介绍如何在Edge浏览器中进行搜索…...

<Linux> 实现命名管道多进程任务派发

实现命名管道多进程任务派发 common文件 #ifndef _COMMON_H_ #define _COMMON_H_#pragma once #include <iostream> #include <unistd.h> #include <string> #include <sys/types.h> #include <sys/stat.h> #include <wait.h> #include &…...

BigInteger 和 BigDecimal(java)

文章目录 BigInteger(大整数&#xff09;常用构造方法常用方法 BigDecimal(大浮点数&#xff09;常用构造方法常用方法 DecimalFormat(数字格式化) BigInteger(大整数&#xff09; java.math.BigInteger。 父类&#xff1a;Number 常用构造方法 构造方法&#xff1a;BigIntege…...

Linux 进程间通讯

Linux IPC 方式 在Linux系统中&#xff0c;进程间通信&#xff08;IPC&#xff09;是多个运行中的程序或进程之间交换数据和信息的关键机制。Linux提供了多种IPC机制&#xff0c;每种机制都有其特定的用途和优势。以下是Linux上主要的IPC通信方式&#xff1a; 管道&#xff08…...

数据分析三剑客-Matplotlib

数据分析三剑客 数据分析三剑客通常指的是在Python数据分析领域中&#xff0c;三个非常重要的工具和库&#xff1a;Pandas、NumPy和Matplotlib。Pandas主要负责数据处理和分析&#xff0c;NumPy专注于数值计算和数学运算&#xff0c;而Matplotlib则负责数据可视化。这三个库相…...

FastAPI-Body、Field

参考&#xff1a;模式的额外信息 - 例子 - FastAPI 在FastAPI中&#xff0c;Body和Field是两个常用的注解&#xff0c;它们用于定义请求体中的数据或路径参数、查询参数等的处理方式。这两个注解都来自于Pydantic库&#xff0c;用于数据验证和解析&#xff0c;但它们的应用场景…...

软件设计师笔记-操作系统知识(二)

线程 以下是关于线程的一些关键点&#xff1a; 线程是进程中的一个实体&#xff1a;进程是操作系统分配资源&#xff08;如内存空间、文件句柄等&#xff09;的基本单位&#xff0c;而线程是进程中的一个执行单元。多个线程可以共享同一个进程的地址空间和其他资源。线程是CP…...

鸿蒙UI开发快速入门 —— part12: 渲染控制

如果你对鸿蒙开发感兴趣&#xff0c;加入Harmony自习室吧~&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; 扫描下面的二维码关注公众号。 1、前言 在声明式描述语句中开发者除了使用系统组件外&#xff0c;还可…...

添加用户页面(Flask+前端+MySQL整合)

首先导入Flask库和pymysql库。Flask用于创建Web应用程序&#xff0c;pymysql用于连接和操作MySQL数据库。 from flask import Flask, render_template, request import pymysql创建一个Flask应用实例。__name__参数告诉Flask使用当前模块作为应用的名称。 app Flask(__name_…...

素数筛(算法篇)

算法之素数筛 素数筛 引言&#xff1a; 素数(质数)&#xff1a;除了1和自己本身之外&#xff0c;没有任何因子的数叫做素数(质数) 朴素筛法(优化版) 概念&#xff1a; 朴素筛法&#xff1a;是直接暴力枚举2到当前判断的数x(不包括)&#xff0c;然后看在这范围内是否存在因…...