智能BI(后端)-- 系统异步化
文章目录
- 系统问题分析
- 什么是异步化?
- 业务流程分析
- 标准异步化的业务流程
- 系统业务流程
- 线程池
- 为什么需要线程池?
- 线程池两种实现方式
- 线程池的参数
- 线程池的开发
- 项目异步化改造
系统问题分析
问题场景:调用的服务能力有限,或者接口的处理(或返回)时长较长时,就应该考虑异步化了
什么是异步化?
不用等一件事做完,就可以做另外一件事,等第一件事完成时,可以收到一个通知
业务流程分析
标准异步化的业务流程
- 当用户要进行耗时很长的操作时,点击提交后,不需要在界面空等,而是应该把这个任务保存到数据库中记录下来
- 用户要执行新任务时:
a. 任务提交成功:
ⅰ. 若程序存在空闲线程,可以立即执行此任务
ⅱ. 若所有线程均繁忙,任务将入队列等待处理
b. 任务提交失败:比如所有线程都在忙碌且任务队列满了
ⅰ.选择拒绝此任务,不再执行
ⅱ.通过查阅数据库记录,发现提交失败的任务,并在程序空闲时将这些任务取出执行 - 程序(线程)从任务队列中取出任务依次执行,每完成一项任务,就更新任务状态。
- 用户可以查询任务的执行状态,或者在任务执行成功或失败时接收通知(例如:发邮件、系统消息提示或短信),从而优化体验
- 对于复杂且包含多个环节的任务,在每个小任务完成时,要在程序(数据库中))记录任务的执行状态(进度)。
系统业务流程
- 用户点击智能分析页提交按钮时,先把图表立刻保存到数据库中(作为一个任务)
- 用户可以在图表管理查看所有图表(已生成的,生成中的,生成失败的)的信息和状态
- 用户可以修改生成失败的图表信息,点击重新生成,以尝试再次创建图表
问题分析?
- 任务队列的最大容量应该设置为多少
- 程序怎么从任务队列中取出任务去执行?这个任务队列的流程怎么实现?怎么保证程序最多同时执行多少个任务?
线程池实现
线程池
为什么需要线程池?
- 线程的管理比较复杂
- 任务存取比较复杂
- 线程池可以帮你轻松管理线程,协调任务的执行过程
线程池两种实现方式
- Spring中,可以用ThreadPoolTaskExrcutor配合@Async注解来实现(不推荐)
- 在Java中,可以使用JUC并发编程包中的ThreadPoolExecutor来实现非常灵活地自定义线程池
线程池的参数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
现状:AI生成能力的并发是只允许4个任务同时去执行,AI能力允许20个任务排队
corePoolSize(核心线程数):正常情况下,我们的系统应该能同时工作的线程数
maximumPoolSize(最大线程数):极限情况下,我们的线程池所拥有的线程
keepAliveTime(空闲线程存活时间):非核心线程在没有任务的情况下,过多久要删除,从而释放无用的线程资源
unit(空闲线程存活时间的单位):分钟,秒
workQueue(工作队列):用于存放给线程执行的任务,存在一个队列的长度(一定要设置)
threadFactory(线程工厂):控制每个线程的生成,线程的属性
RejectedExecutionHandler(拒绝策略):任务队列满的时候,我们采取什么措施
线程池的开发
自定义线程池配置
package com.yupi.springbootinit.config;import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Configuration
public class ThreadPoolExecutorConfig {@Beanpublic ThreadPoolExecutor threadPoolExecutor(){// 创建一个线程工厂ThreadFactory threadFactory = new ThreadFactory(){// 初始化线程数为 1private int count = 1;// 创建一个新的线程@Override// 每当线程池需要创建新线程时,就会调用newThread方法// @NotNull Runnable r 表示方法参数 r 应该永远不为null,public Thread newThread(@NotNull Runnable r) {Thread thread = new Thread(r);thread.setName("线程" + count ++);return thread;}};// 创建一个新的线程池,线程池核心大小为2,最大线程数为4,// 非核心线程空闲时间为100秒,任务队列为阻塞队列,长度为4,使用自定义的线程工厂创建线ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,4,100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(4),threadFactory);return threadPoolExecutor;}
}
测试controller层(注意线上环境不要暴露出去)
package com.yupi.springbootinit.controller;import cn.hutool.json.JSONUtil;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Profile;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;/*** 队列测试controller*/
@RestController
@RequestMapping("/queue")
@Slf4j
@Profile({"dev","local"}) // 只在开发环境和本地环境生效
public class QueueController {@Resourceprivate ThreadPoolExecutor threadPoolExecutor;@GetMapping("/add")// 接收一个参数name,然后将任务添加到线程池中public void add(String name){// 使用CompletableFuture运行一个异步任务CompletableFuture.runAsync(()->{log.info("任务执行中:" + name + "执行人:" + Thread.currentThread().getName());try {// 让线程休眠10分钟,模拟长时间运行的任务Thread.sleep(600000);} catch (InterruptedException e) {throw new RuntimeException(e);} 异步任务在threadPoolExecutor中执行},threadPoolExecutor);}@GetMapping("/get")public String get(){Map<String, Object> map = new HashMap<>();int size = threadPoolExecutor.getQueue().size();map.put("队列长度",size);long taskCount = threadPoolExecutor.getTaskCount();map.put("任务总数",taskCount);long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();map.put("已完成的任务总数",completedTaskCount);int activeCount = threadPoolExecutor.getActiveCount();map.put("正在工作的线程数",activeCount);return JSONUtil.toJsonStr(map);}
}
项目异步化改造
/*** 智能分析(异步)** @param multipartFile* @param genChartByAiRequest* @param request* @return*/
@PostMapping("/gen/async")
public BaseResponse<BiResponse> genChartByAiAsync(@RequestPart("file") MultipartFile multipartFile,GenChartByAiRequest genChartByAiRequest, HttpServletRequest request) {String name = genChartByAiRequest.getName();String goal = genChartByAiRequest.getGoal();String chartType = genChartByAiRequest.getChartType();// 校验ThrowUtils.throwIf(StringUtils.isBlank(goal), ErrorCode.PARAMS_ERROR, "目标为空");ThrowUtils.throwIf(StringUtils.isNotBlank(name) && name.length() > 100, ErrorCode.PARAMS_ERROR, "名称过长");// 校验文件long size = multipartFile.getSize();String originalFilename = multipartFile.getOriginalFilename();// 校验文件大小final long ONE_MB = 1024 * 1024L;ThrowUtils.throwIf(size > ONE_MB, ErrorCode.PARAMS_ERROR, "文件超过 1M");// 校验文件大小缀 aaa.pngString suffix = FileUtil.getSuffix(originalFilename);final List<String> validFileSuffixList = Arrays.asList("xlsx", "xls");ThrowUtils.throwIf(!validFileSuffixList.contains(suffix), ErrorCode.PARAMS_ERROR, "文件后缀非法");User loginUser = userService.getLoginUser(request);// 限流判断,每个用户一个限流器redisLimiterManager.doRateLimit("genChartByAi_" + loginUser.getId());// 指定一个模型id(把id写死,也可以定义成一个常量)long biModelId = 1659171950288818178L;// 分析需求:// 分析网站用户的增长情况// 原始数据:// 日期,用户数// 1号,10// 2号,20// 3号,30// 构造用户输入StringBuilder userInput = new StringBuilder();userInput.append("分析需求:").append("\n");// 拼接分析目标String userGoal = goal;if (StringUtils.isNotBlank(chartType)) {userGoal += ",请使用" + chartType;}userInput.append(userGoal).append("\n");userInput.append("原始数据:").append("\n");// 压缩后的数据String csvData = ExcelUtils.excelToCsv(multipartFile);userInput.append(csvData).append("\n");// 先把图表保存到数据库中Chart chart = new Chart();chart.setName(name);chart.setGoal(goal);chart.setChartData(csvData);chart.setChartType(chartType);// 插入数据库时,还没生成结束,把生成结果都去掉
// chart.setGenChart(genChart);
// chart.setGenResult(genResult);// 设置任务状态为排队中chart.setStatus("wait");chart.setUserId(loginUser.getId());boolean saveResult = chartService.save(chart);ThrowUtils.throwIf(!saveResult, ErrorCode.SYSTEM_ERROR, "图表保存失败");// 在最终的返回结果前提交一个任务// todo 建议处理任务队列满了后,抛异常的情况(因为提交任务报错了,前端会返回异常)CompletableFuture.runAsync(() -> {// 先修改图表任务状态为 “执行中”。等执行成功后,修改为 “已完成”、保存执行结果;执行失败后,状态修改为 “失败”,记录任务失败信息。(为了防止同一个任务被多次执行)Chart updateChart = new Chart();updateChart.setId(chart.getId());// 把任务状态改为执行中updateChart.setStatus("running");boolean b = chartService.updateById(updateChart);// 如果提交失败(一般情况下,更新失败可能意味着你的数据库出问题了)if (!b) {handleChartUpdateError(chart.getId(), "更新图表执行中状态失败");return;}// 调用 AIString result = aiManager.doChat(biModelId, userInput.toString());String[] splits = result.split("【【【【【");if (splits.length < 3) {handleChartUpdateError(chart.getId(), "AI 生成错误");return;}String genChart = splits[1].trim();String genResult = splits[2].trim();// 调用AI得到结果之后,再更新一次Chart updateChartResult = new Chart();updateChartResult.setId(chart.getId());updateChartResult.setGenChart(genChart);updateChartResult.setGenResult(genResult);updateChartResult.setStatus("succeed");boolean updateResult = chartService.updateById(updateChartResult);if (!updateResult) {handleChartUpdateError(chart.getId(), "更新图表成功状态失败");}},threadPoolExecutor);BiResponse biResponse = new BiResponse();
// biResponse.setGenChart(genChart);
// biResponse.setGenResult(genResult);biResponse.setChartId(chart.getId());return ResultUtils.success(biResponse);
}
// 上面的接口很多用到异常,直接定义一个工具类
private void handleChartUpdateError(long chartId, String execMessage) {Chart updateChartResult = new Chart();updateChartResult.setId(chartId);updateChartResult.setStatus("failed");updateChartResult.setExecMessage(execMessage);boolean updateResult = chartService.updateById(updateChartResult);if (!updateResult) {log.error("更新图表失败状态失败" + chartId + "," + execMessage);
}
相关文章:
智能BI(后端)-- 系统异步化
文章目录 系统问题分析什么是异步化?业务流程分析标准异步化的业务流程系统业务流程 线程池为什么需要线程池?线程池两种实现方式线程池的参数线程池的开发 项目异步化改造 系统问题分析 问题场景:调用的服务能力有限,或者接口的…...
AI绘画Stable Diffusion 插件篇:智能标签提示词插件sd-danbooru-tags-upsampler
大家好,我是向阳。 关于智能标签提示词插件,在很早之前就介绍过很多款了,今天再给大家介绍一款智能标签提示词插件sd-danbooru-tags-upsampler。该智能提示词插件是今年2月23号才发布的第一版V0.1.0,算是比较新的智能提示词插件。…...
Android OpenMAX(六)OMXStore
在前面两节的学习中我们知道了OMX Core是用来管理(查询/创建/销毁)Android平台上的硬件编解码组件的。这一节我们再向上一层,Android平台除了提供有硬件编解码组件支持,还内置了一些软件编解码组件,为了统一管理所有(软/硬)编解码组件,Android在OMX Core之上又抽象了一…...
Ubuntu下halcon软件的下载安装
由于工作需求,点云配准需要使用halcon进行实现,并且将该功能放入QT界面中 1.下载halcon 进入halcon官网进行下载 官网链接:https://www.mvtec.com/products/halcon/ 注意:要注册登陆之后才能进行下载 接着点击Downloads->H…...
『ZJUBCA Collaboration』WTF Academy 赞助支持
非常荣幸宣布,浙江大学区块链协会收到WTF Academy的赞助与支持,未来将共同开展更多深度合作。 WTF Academy是开发者的Web3开源大学,旨在通过开源教育让100,000名开发者进入到Web3。截止目前,WTF开源教程在GitHub收获超15,000 ⭐&a…...
Python开源工具库使用之运动姿势追踪库mediapipe
文章目录 前言一、姿势估计1.1 姿态关键点1.2 旧版 solution API1.3 新版 solution API1.4 俯卧撑计数 二、手部追踪2.1 手部姿态2.2 API 使用2.3 识别手势含义 参考 前言 Mediapipe 是谷歌出品的一种开源框架,旨在为开发者提供一种简单而强大的工具,用…...
【Android Studio】开启真机调试
1 打开手机的开发者模式 各种款式的手机进入开发者模式的情况不同,但大致是在 【关于手机】中多次点击系统版本即可进入。这里以小米8为例,记录下流程。 1.1 进入手机开发者模式 【设置】->【我的设备】->【全部参数】->【MIUI版本】连续点击3…...
CMakeLists.txt语法规则:部分常用命令说明四
一. 简介 前面几篇文章学习了CMakeLists.txt语法中前面几篇文章学习了CMakeLists.txt语法中部分常用命令。文章如下: CMakeLists.txt语法规则:部分常用命令说明一-CSDN博客 CMakeLists.txt语法规则:部分常用命令说明二-CSDN博客 CMakeLi…...
学习前端第三十二天(Rest 参数与 Spread 语法,变量作用域,闭包)
一、Rest 参数与 Spread 语法 1.rest参数 ...变量名:收集剩余的参数并存进指定数组中,需要放到最后; 2.arguments变量 // arguments,以参数在参数列表中的索引作为键,存储所有参数,以类数组对象的形式输出所有函数参数 // 箭头…...
mysql从入门到起飞+面试基础题
mysql基础 MySQL基础 企业面试题1 代码 select m.id,m.num from ( select t.id as id,count(1) num from ( select ra.requester_id as id from RequestAccepted raunion all select ra.accepter_id as id from RequestAccepted ra ) t group by t.id ) m group by id ord…...
设计模式:命令模式
文章目录 一、什么是命令模式二、命令模式结构三、命令模式实现步骤四、命令模式应用场景 一、什么是命令模式 它允许将请求封装为对象,一个请求对应于一个命令,将发出命令的责任和执行命令的责任分割开。每一个命令都是一个操作:请求的一方…...
setinterval和settimeout区别在于
setinterval和settimeout区别在于 执行次数和执行频率 setInterval和setTimeout的主要区别在于执行次数和执行频率。以下是详细介绍:12 setTimeout是一次性的定时器,它在设定的延迟时间之后执行一次函数,然后停止。setInterval是重复性的定…...
shell_结束进程脚本
结束进程的shell脚本如下: #!/bin/bash# kill all process ps aux|grep "local" | grep -v grep | awk {print $2} | while read line; do kill -9 $line; done 解析: ps aux 命令常用于查看当前系统中运行的进程,以及它们所占用…...
GDPU unity游戏开发 碰撞器与触发器
砰砰叫,谁动了她的奶酪让你的小鹿乱撞了。基于此,亦即碰撞与触发的过程。 碰撞器与触发器的区别 通俗点讲,碰撞器检测碰撞,触发器检测触发,讲了跟没讲似的。碰撞器是用来检测碰撞事件的,在unity中ÿ…...
IP地址定位技术在网络安全中的作用
在当今数字化时代,网络安全已经成为企业、政府和个人面临的重要挑战之一。随着互联网的普及和网络攻击的增加,保护个人隐私和防止网络犯罪变得尤为重要。在这一背景下,IP地址定位技术作为网络安全的重要组成部分之一,发挥着关键作…...
R语言中,查看经安装的包,查看已经加载的包,查看特定包是否已经安装,安装包,更新包,卸载包
创建于:2024.5.4 R语言中,查看经安装的包,查看已经加载的包,查看特定包是否已经安装,安装包,更新包,卸载包 文章目录 1. 查看经安装的包2. 查看已经加载的包3. 查看特定包是否已经安装4. 安装包…...
spring boot3单模块项目工程搭建-下(个人开发模板)
⛰️个人主页: 蒾酒 🔥系列专栏:《spring boot实战》 目录 写在前面 上文衔接 常用依赖介绍以及整合 web组件 测试组件 样板代码生成 数据库连接器 常用工具包 面向切面编程 ORM框架 数据连接池 接口测试、文档导出 缓存中间件 参数校…...
精准清理 MongoDB 数据:删除集合的正确姿势
在 MongoDB 数据库管理中,数据清理是维护数据库性能和保持数据一致性的关键步骤之一。而删除集合是实现数据清理的重要手段之一。在这个信息爆炸的时代,了解如何正确地执行集合删除操作至关重要。本文将深入探讨 MongoDB 中删除集合的常用方法、最佳实践…...
java 执行修改语句
你可以使用Java中的JDBC(Java Database Connectivity)来执行修改语句。以下是一个示例: import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement;public class Main {public…...
【Linux系统化学习】网络套接字(编写简单的UDP服务端和客户端)
目录 理解源IP地址和目的IP地址 认识端口号 端口号和进程ID的区别 源端口号和目的端口号 认识TCP和UDP协议 TCP协议 UDP协议 网络字节序 socket编程接口 socket常见API sockaddr结构 简单的UDP网络程序 UDP服务端 创建套接字 填充本地网络信息 绑定 收取消息 …...
MFC 列表控件修改实例(源码下载)
1、本程序基于前期我的博客文章《MFC下拉菜单打钩图标存取实例(源码下载)》 2、程序功能选中列表控件某一项,修改这一项的按钮由禁止变为可用,双击这个按钮弹出对话框可对这一项的记录数据进行修改,点击确定保存修改数…...
QT设计模式:模板模式
基本概念 模板模式(Template Pattern)是一种行为型设计模式,它定义了一个操作中的算法的模板,而将一些步骤延迟到子类中。模板方法使得子类可以不改变一个算法的结构即可重新定义该算法的某些特定步骤。 实现的模块有࿱…...
8.k8s中网络资源service
目录 一、service资源概述 二、service资源类型 1.ClusterIP类型 2.service的nodeport类型 3.service的loadbalancer类型(了解即可) 4.service的externalname类型(了解即可) 三、nodeport的端口范围设置和svc的endpoint列表 1.修…...
51单片机keil编程中遇到的问题(持续更新)
字符无法打印报错 查看特殊功能寄存器名字的时候也会报错,因为无法编译通过,导致头文件的定义内容无法查找 keil编译中 error C127: ‘xx’: invalid storage class 这种一般是在编写头文件或源文件时,在声明函数的结尾没有添加分号&…...
C++类和对象详解(一)
目录 面向过程和面向对象初步认识类的引入类的定义类的两种定义方式声明和定义全部放在类体中 声名定义分离 类的作用域成员变量命名规则建议访问限定符 类的封装类的实例化类对象模型类的对象大小的计算扩展 结构体内存对齐规则 感谢各位大佬对我的支持,如果我的文章对你有用,…...
SCI论文检索报告长什么样?怎么出具?一文了解!
1、SCI检索报告是什么 SCI数据库收录最能反映基础学科研究水平和论文质量,该检索系统收录的科技期刊比较全面,可以说它是集中各个学科高质优秀论文的精粹,评职晋升、项目申报、评奖评优等很多关键时期,都需要开具已经在SCI发表的论…...
UE4_Water插件_Buoyancy组件使用
water插件提供了一个浮力Actor蓝图类。 需要注意的几个问题: 1、StaticMesh需要替换根组件。 2、需要模拟物理设置质量。 3、需要添加浮力组件,设置浮力点,应用水中牵引力。...
OceanBase学习1:分布式数据库与集中式数据库的差异
目录 1. 传统集中式数据库 2. 数据库中间件的分库分表 3. 分布式数据库的基本特点及对比分析 4. OceanBase和传统数据库的对比 5. 小结 1. 传统集中式数据库 优点 成熟稳定:经过近40年的发展,应用到各行各业,产品技术非常成熟稳定行业适配性强:适配…...
计算机网络技术主要学什么内容,有哪些课程
计算机网络技术专业是一个涉及理论与实践紧密结合的学科,主要学习内容有计算机网络基础、网络设备技术、网络编程等内容,以下是上大学网(www.sdaxue.com)整理的计算机网络技术主要学什么内容,供大家参考! 基…...
Mac下安装ffmpeg
1、安装gedit brew install gedit2、配置环境变量,打开~/.zshrc,在末尾添加语句 export PATH$PATH:/usr/local/ffmpeg/bin3、执行语句,使环境变量生效 source ~/.zshrc 4、终端输入 ffmpeg ,看环境变量是否配置成功。 至此&a…...
视频网站如何做盗链/威海网站制作
文章目录1 报错2 解决办法1 报错 尝试在linux上通过Pycharm IDE使用matplotlib包。当我运行此代码时: from matplotlib import pyplot结果报错: ImportError: No module named tkinter2 解决办法 ubuntu的系统: sudo apt-get install py…...
用于制作网页的工具软件/专业关键词优化平台
问题简介 我写爬虫,用到了asyncio相关的事件循环,新建了一个线程去run_forever(),在docker中运行。后来程序有异常,主线程挂了,但是竟然不报错。查了很久,才找出来。 如果你新建一个线程去运行一般的死循环…...
网站大图轮播/南宁网站推广排名
题目 本题要求你写个程序把给定的符号打印成沙漏的形状。例如给定17个“*”,要求按下列格式打印 ************ *****所谓“沙漏形状”,是指每行输出奇数个符号;各行符号中心对齐;相邻两行符号数差2;符号数先从大到小…...
洛阳做网站哪家专业/大型门户网站建设
感谢师兄提供的题图!很久之前,在linux下工作,需要多窗口,一般自带的终端能解决这个问题。后来一个Linux很厉害的H师兄,向我推荐了screen,但是没用几次,就不用了。说明,人在接受新事物…...
建设标准 免费下载网站/灰色词优化培训
在 2003 系统 IIS6 下,经常出现 w3wp.exe 的内存占用不能及时释放和 CPU 占用居高不下的问题,从而导致服务器响应速度很慢。怎样才能找出是哪一个站点出问题呢? 首先给每个站点都建一个应用程序池,这样便于找出问题出在哪一个站点…...
小程序做网站/怎样让自己的网站排名靠前
早上客户反应,其网站无法访问,无限转圈上服务器,查看磁盘空间df -h,内存使用率free -m,网络流量iftop均正常然后使用top查看时,发现mysql的cpu使用率上升到200%。解决过程回放进入mysql查看正在执行的sqlmy…...