go并发设计模式runner模式
go并发设计模式runner模式

真正运行的程序不可能是单线程运行的,go语言中最值得骄傲的就是CSP模型了,可以说go语言是CSP模型的实现。
假设现在有一个程序需要实现,这个程序有以下要求:

- 程序可以在分配的时间内完成工作,正常终止;
- 程序没有及时完成工作,“自杀”;
- 接收到操作系统发送的中断事件,程序立刻试图清理状态并停止工作
数据类型设计
程序需要在规定时间内完成工作的最简单方法就是使用goroutine和channel,我们需要一个chan用来接收操作完成的信号,完成任务的函数可能有错误信息返回,因此我们这里定义一个错误类型的通道,用来通知什么时候完成任务以及完成任务的错误信息。
complete chan error
任务执行超时的最简单方法就是使用time包提供的After函数,当指定的时间内没有完成任务那么就出发一下超时通道,因为只需要接收超时的信号,因此只需要定义一个单向接收通道即可
timeout <-chan time.Time
当发生系统中断事件时,程序能立刻清理状态然后清理资源并停止工作,因此我们需要一个信号通道来接收操作系统发送的中断信号,这里我们使用signal包提供的Notify函数来注册信号,当操作系统发送信号时,会通过信号通道发送信号
interrupt chan os.Signal
程序最重要的是能够处理任务,用户需要处理多少任务提前是不能确定的,我们需要一个任务列表,这里我们使用一个切片来保存这些任务。
tasks []func(int)
经过上述设计,我们定义一个Runner结构体,用来保存这些通道和任务列表。
// 并且在操作系统发送中断信号时结束这些任务
type Runner struct {// interrupt channel 用来接收操作系统发送的信号interrupt chan os.Signal// complete channel 用来通知任务已经完成complete chan error// timeout channel 用来通知任务已经超时的接收通道timeout <-chan time.Time// tasks 用来保存任务列表tasks []func(int)
}
错误系统设计
错误系统设计,我们希望在任务执行完成或者超时或者操作系统发送的中断信号时返回错误,因此我们定义两个个错误变量,分别用来保存超时错误,中断错误和正常完成错误。
// ErrTimeout 定义一个超时错误, 会在人物执行超时时被返回
var ErrTimeout = errors.New("received timeout")// ErrInterrupt 定义一个中断错误, 会在收到操作系统事件的时候返回
var ErrInterrupt = errors.New("received interrupt")
数据类型说明
Signal
os.Signal 是一个接口类型,是对不同操作系统上捕获的信号的一个抽象接口,用来从操作系统接收中断事件。
// A Signal represents an operating system signal.
// The usual underlying implementation is operating system-dependent:
// on Unix it is syscall.Signal.
type Signal interface {String() stringSignal() // to distinguish from other Stringers
}
Error
error 是一个接口类型,用来表示错误,所有错误类型都实现了error接口,因此我们可以通过error接口来判断错误类型。
Time
time.Time 是一个结构体类型,用来表示一个时间,包含年月日时分秒纳秒等信息。
type Time struct {// wall and ext encode the wall time seconds, wall time nanoseconds,// and optional monotonic clock reading in nanoseconds.//// From high to low bit position, wall encodes a 1-bit flag (hasMonotonic),// a 33-bit seconds field, and a 30-bit wall time nanoseconds field.// The nanoseconds field is in the range [0, 999999999].// If the hasMonotonic bit is 0, then the 33-bit field must be zero// and the full signed 64-bit wall seconds since Jan 1 year 1 is stored in ext.// If the hasMonotonic bit is 1, then the 33-bit field holds a 33-bit// unsigned wall seconds since Jan 1 year 1885, and ext holds a// signed 64-bit monotonic clock reading, nanoseconds since process start.wall uint64ext int64// loc specifies the Location that should be used to// determine the minute, hour, month, day, and year// that correspond to this Time.// The nil location means UTC.// All UTC times are represented with loc==nil, never loc==&utcLoc.loc *Location
}
方法设计
在go中方法需要示例进行调用,因此我们最后定义一个用来创建Runner实例的New方法,避免用户自行创建实例,导致示例的创建不统一。
名为 New 的工厂函数。这个函数接收一个 time.Duration 类型的值,并返回 Runner 类型的指针。这个函数会创建一个 Runner 类型的值,并初始化每个通道字段。因为 task 字段的零值是 nil,已经满足初始化的要求,所以没有被明确初始化。每个通道字段都有独立的初始化过程
通道 interrupt 被初始化为缓冲区容量为 1 的通道。这可以保证通道至少能接收一个来自语言运行时的 os.Signal 值,确保语言运行时发送这个事件的时候不会被阻塞。如果 goroutine没有准备好接收这个值,这个值就会被丢弃。例如,如果用户反复敲 Ctrl+C 组合键,程序只会在这个通道的缓冲区可用的时候接收事件,其余的所有事件都会被丢弃。
通道 complete 被初始化为无缓冲的通道。当执行任务的 goroutine 完成时,会向这个通道发送一个 error 类型的值或者 nil 值。之后就会等待 main 函数接收这个值。一旦 main 接收了这个 error 值, goroutine 就可以安全地终止了。
最后一个通道 timeout 是用 time 包的 After 函数初始化的。 After 函数返回一个time.Time 类型的通道。语言运行时会在指定的 duration 时间到期之后,向这个通道发送一个 time.Time 的值。
// New 返回一个Runner实例
func New(d time.Duration) *Runner {return &Runner{// 1个缓冲的信号通道interrupt: make(chan os.Signal, 1),// 没有缓冲的信号通道,如果没有接受者那么会阻塞complete: make(chan error),timeout: time.After(d),}
}
Add 方法用来添加任务,因为需要执行的任务前期并不确定有多少,因此Add接收一个名为tasks的可变参数,可变参数可以接受任意数量的值作为传入参数。这个例子里,这些传入的值必须是一个接收一个整数且什么都
不返回的函数。函数执行时的参数 tasks 是一个存储所有这些传入函数值的切片。
// Add 方法用来添加任务,这个任务是一个接收int类型的ID作为参数的函数
func (r *Runner) Add(tasks ...func(int)) {r.tasks = append(r.tasks, tasks...)
}
run 方法会迭代 tasks 切片,并按顺序执行每个函数
func (r *Runner) run() error {for id, task := range r.tasks {if r.gotInterrupt() {return ErrInterrupt}// 执行注册的任务task(id)}return nil
}
gotInterrupt 展示了带 default 分支的 select 语句的经典用法。代码试图从 interrupt 通道去接收信号。一般来说, select 语句在没有任何要接收的数据时会阻塞,不过有了 default 分支就不会阻塞了。 default 分支会将接收 interrupt 通道的阻塞调用转变为非阻塞的。如果 interrupt 通道有中断信号需要接收,就会接收并处理这个中断。如果没有需要接收的信号,就会执行 default 分支。当收到中断信号后,代码会通过调用 Stop 方法来停止接收之后的所有事件。之后函数返回 true。如果没有收到中断信号,在第 99 行该方法会返回 false。本质上,gotInterrupt 方法会让 goroutine 检查中断信号,如果没有发出中断信号,就继续处理工作。
// gotInterrupt 检查是否接收到中断信号
func (r *Runner) gotInterrupt() bool {select {// 如果有中断信号那么返回truecase <-r.interrupt:// 接收到中断信号,停止后续再接收到中断信号signal.Stop(r.interrupt)return true// 没有终端信号返回false,继续执行default:return false}
}
一切步骤都执行完了,现在开始执行任务
// Start 方法用来开始执行任务,并监视通道事件
func (r *Runner) Start() error {// 我们希望接收所有中断信号signal.Notify(r.interrupt, os.Interrupt)// 异步执行任务go func() {r.complete <- r.run()}()select {// 当任务处理完成时该通道会返回case err := <-r.complete:return err// 当任务处理程序运行超时时发出信号case <-r.timeout:return ErrTimeout}
}
将以上代码全部都整合到runner.go文件中
// Package runner 处理任务的运行和声明周期管理
package runnerimport ("errors""os""os/signal""time"
)// Runner 在给定的超时时间内执行一组任务
// 并且在操作系统发送中断信号时结束这些任务
type Runner struct {// interrupt channel 用来接收操作系统发送的信号interrupt chan os.Signal// complete channel 用来通知任务已经完成complete chan error// timeout channel 用来通知任务已经超时timeout <-chan time.Time// tasks 用来保存任务列表tasks []func(int)
}// ErrTimeout 定义一个超时错误, 会在人物执行超时时被返回
var ErrTimeout = errors.New("received timeout")// ErrInterrupt 定义一个中断错误, 会在收到操作系统事件的时候返回
var ErrInterrupt = errors.New("received interrupt")// New 返回一个Runner实例
func New(d time.Duration) *Runner {return &Runner{// 1个缓冲的信号通道interrupt: make(chan os.Signal, 1),// 没有缓冲的信号通道,如果没有接受者那么会阻塞complete: make(chan error),timeout: time.After(d),}
}// Add 方法用来添加任务,这个任务是一个接收int类型的ID作为参数的函数
func (r *Runner) Add(tasks ...func(int)) {r.tasks = append(r.tasks, tasks...)
}// Start 方法用来开始执行任务,并监视通道事件
func (r *Runner) Start() error {// 我们希望接收所有中断信号signal.Notify(r.interrupt, os.Interrupt)// 异步执行任务go func() {r.complete <- r.run()}()select {// 当任务处理完成时该通道会返回case err := <-r.complete:return err// 当任务处理程序运行超时时发出信号case <-r.timeout:return ErrTimeout}}func (r *Runner) run() error {for id, task := range r.tasks {if r.gotInterrupt() {return ErrInterrupt}// 执行注册的任务task(id)}return nil
}// gotInterrupt 检查是否接收到中断信号
func (r *Runner) gotInterrupt() bool {select {// 如果有中断信号那么返回truecase <-r.interrupt:// 接收到中断信号,停止后续再接收到中断信号signal.Stop(r.interrupt)return true// 没有终端信号返回false,继续执行default:return false}
}
在main.go中进行调用
package mainimport ("log""os""time""code/runner"
)// timeout 定义程序执行超时时间,如果超过这个时间还没执行完成会失败退出.
const timeout = 3 * time.Second// 主函数入口
func main() {log.Println("Starting work.")// 调用New创建 runner对象.r := runner.New(timeout)// 向任务队列中添加需要顺序执行的任务r.Add(createTask(), createTask(), createTask())// Run 执行人物,并按照返回错误处理if err := r.Start(); err != nil {switch err {case runner.ErrTimeout:log.Println("Terminating due to timeout.")os.Exit(1)case runner.ErrInterrupt:log.Println("Terminating due to interrupt.")os.Exit(2)}}// 记录执行结果log.Println("Process ended.")
}// createTask 返回一个入参为int的函数
func createTask() func(int) {return func(id int) {log.Printf("Processor - Task #%d.", id)time.Sleep(time.Duration(id) * time.Second)}
}
源码已经放到gitee需要的自行下载:
https://gitee.com/andrewgithub/note_lab/blob/main/example/go/concurrent_mode/runner/runner.go
相关文章:
go并发设计模式runner模式
go并发设计模式runner模式 真正运行的程序不可能是单线程运行的,go语言中最值得骄傲的就是CSP模型了,可以说go语言是CSP模型的实现。 假设现在有一个程序需要实现,这个程序有以下要求: 程序可以在分配的时间内完成工作࿰…...
nn.RNN解析
以下是RNN的计算公式,t时刻的隐藏状态H(t)等于前一时刻隐藏状态H(t-1)乘以参数矩阵,再加t时刻的输入x(t)乘以参数矩阵,最后再通过激活函数,等到t时刻隐藏状态。 下图是输出input和初始化的隐藏状态,当参数batch_first True时候&…...
How to monitor Spring Boot apps with the AppDynamics Java Agent
本文介绍如何使用 AppDynamics Java 代理监视 Azure Spring Apps 中的 Spring Boot 应用程序。 使用 AppDynamics Java 代理可以: 监视应用程序使用环境变量配置 AppDynamics Java 代理 在 AppDynamics 仪表板中检查所有监视数据 How to monitor Spring Boot app…...
Linux学习笔记12 systemd的其他命令
前文已经介绍了systemd在系统初始化中起到的作用和服务的管理和配置。这里补充一下systemd的其他工具和系统进程的管理 前文 Linux学习笔记10 系统启动初始化,服务和进程管理(上)-CSDN博客 Linux学习笔记11 系统启动初始化,服务…...
NGO-CNN-BiGRU-Attention北方苍鹰算法优化卷积双向门控循环单元时间序列预测,含优化前后对比
NGO-CNN-BiGRU-Attention北方苍鹰算法优化卷积双向门控循环单元时间序列预测,含优化前后对比 目录 NGO-CNN-BiGRU-Attention北方苍鹰算法优化卷积双向门控循环单元时间序列预测,含优化前后对比预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介…...
【分布式】分布式缓存
一、什么是分布式缓存 分布式缓存是一种将缓存数据存储在多个节点上的缓存方案。它通过将数据分散存储在多个节点的内存中,以提高系统的读取性能、降低数据库压力和提高系统可扩展性。 二、分布式缓存的优点 优点明细提高性能:分布式缓存可以将数据缓…...
深度学习中的迁移学习:应用与实践
引言 在深度学习领域,迁移学习(Transfer Learning)是一个非常强大且日益流行的概念,它通过将从一个任务中学到的知识应用于另一个任务,能够显著加快模型训练速度并提高其泛化能力。迁移学习在许多实际应用中都得到了广…...
28.UE5实现对话系统
目录 1.对话结构的设计(重点) 2.NPC对话接口的实现 2.1创建类型为pawn的蓝图 2.2创建对话接口 3.对话组件的创建 4.对话的UI设计 4.1UI_对话内容 4.2UI_对话选项 4.3UI_对话选项框 5.对话组件的逻辑实现 通过组件蓝图,也就是下图中的…...
Redis中的分布式锁(步步为营)
分布式锁 概述 分布式锁指的是,所有服务中的所有线程都去获取同一把锁,但只有一个线程可以成功的获得锁,其他没有获得锁的线程必须全部等待,直到持有锁的线程释放锁。 分布式锁是可以跨越多个实例,多个进程的锁 分布…...
CentOS 7安装mysql+JDK+Tomcat完成流程
一.安装mysql 即使是新的linux服务器,也要先验证是否有mysql已经安装,如果有进行卸载原版本,一定要确认是否mysql已不再使用 原安装情况(直接执行命令即可) whereis mysql rpm -qa | grep -i mysql rpm -e perl-DBD-M…...
C++笔记之不同框架中事件循环的核心函数:io_run()、ros_spin()、app_exec()
C笔记之不同框架中事件循环的核心函数:io_run()、ros_spin()、app_exec() code review! 参考笔记 1.qt-C笔记之使用QtConcurrent异步地执行槽函数中的内容,使其不阻塞主界面 2.qt-C笔记之QThread使用 3.qt-C笔记之多线程架构模式:事件信号监…...
C++异常处理
目录 一、异常的概念 二、异常的使用 (1)异常的抛出和捕获 (2)异常的重新抛出 (3)异常安全 (4)异常规范 三、自定义异常体系 四、c标注异常体系 五、异常的优缺点 在之前我们…...
【数据结构】哈希 ---万字详解
unordered系列关联式容器 在C98中,STL提供了底层为红黑树结构的一系列关联式容器,在查询时效率可达到log_2 N,即最差情况下需要比较红黑树的高度次,当树中的节点非常多时,查询效率也不理想。最好 的查询是,…...
4399大数据面试题及参考答案(数据分析和数据开发)
对数据分析的理解 数据分析是一个从数据中提取有价值信息以支持决策的过程。它涵盖了数据收集、清洗、转换、建模和可视化等多个环节。 首先,数据收集是基础。这包括从各种数据源获取数据,例如数据库、文件系统、网络接口等。这些数据源可以是结构化的数据,如关系型数据库中…...
快速理解倒排索引在ElasticSearch中的作用
一.基础概念 定义: 倒排索引是一种数据结构,用来加速文本数据的搜索和检索,和传统的索引方式不同,倒排索引会被每个词汇项与包含该词汇项的文档关联起来,从而去实现快速的全文检索。 举例: 在传统的全文…...
C++趣味编程玩转物联网:基于树莓派Pico控制无源蜂鸣器-实现音符与旋律的结合
无源蜂鸣器是一种多功能的声音输出设备,与有源蜂鸣器相比,它能够通过不同频率的方波生成丰富多样的音调。本项目使用树莓派Pico开发板,通过编程控制无源蜂鸣器播放经典旋律《归来有风》。本文将详细介绍项目实现中的硬件连接、C++代码解析,以及无源蜂鸣器的工作原理。 一、…...
《RuoYi基于SpringBoot+Vue前后端分离的Java快速开发框架学习》系列博客_Part4_三模态融合
系列博客目录 文章目录 系列博客目录目标Step1:之前工作形成子组件Step2:弥补缺失的文本子组件,同时举例如何子组件向父组件传数据Step3:后端代码需要根据上传的文件传给python服务器Step4:python服务器进行分析 目标 实现三模态融合,将文本、图片、音频…...
springboot365高校疫情防控web系统(论文+源码)_kaic
毕 业 设 计(论 文) 题目:高校疫情防控的设计与实现 摘 要 互联网发展至今,无论是其理论还是技术都已经成熟,而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播,搭配信息管理工具可以很好地为…...
STM32 USART串口数据包
单片机学习! 目录 前言 一、数据包 二、HEX数据包 三、文本数据包 四、HEX数据包和文本数据包优缺点 4.1 HEX数据包 4.2 文本数据包 五、HEX数据包接收 六、文本数据包接收 总结 前言 本文介绍了串口数据包收发的思路和流程。 一、数据包 数据包的作用是把一个个单独…...
【LC】3232. 判断是否可以赢得数字游戏
题目描述: 给你一个 正整数 数组 nums。 Alice 和 Bob 正在玩游戏。在游戏中,Alice 可以从 nums 中选择所有个位数 或 所有两位数,剩余的数字归 Bob 所有。如果 Alice 所选数字之和 严格大于 Bob 的数字之和,则 Alice 获胜。如果…...
CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型
CVPR 2025 | MIMO:支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题:MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者:Yanyuan Chen, Dexuan Xu, Yu Hu…...
阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...
CentOS下的分布式内存计算Spark环境部署
一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架,相比 MapReduce 具有以下核心优势: 内存计算:数据可常驻内存,迭代计算性能提升 10-100 倍(文档段落:3-79…...
【android bluetooth 框架分析 04】【bt-framework 层详解 1】【BluetoothProperties介绍】
1. BluetoothProperties介绍 libsysprop/srcs/android/sysprop/BluetoothProperties.sysprop BluetoothProperties.sysprop 是 Android AOSP 中的一种 系统属性定义文件(System Property Definition File),用于声明和管理 Bluetooth 模块相…...
【单片机期末】单片机系统设计
主要内容:系统状态机,系统时基,系统需求分析,系统构建,系统状态流图 一、题目要求 二、绘制系统状态流图 题目:根据上述描述绘制系统状态流图,注明状态转移条件及方向。 三、利用定时器产生时…...
Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)
引言:为什么 Eureka 依然是存量系统的核心? 尽管 Nacos 等新注册中心崛起,但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制,是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...
04-初识css
一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...
css3笔记 (1) 自用
outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size:0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格ÿ…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
