当前位置: 首页 > news >正文

一次业务的批量数据任务的处理优化

文章目录

  • 一次业务的批量数据任务的处理优化
    • 业务背景
    • 1.0版本 分批处理模式
    • 2.0版本 平衡任务队列模式
    • 3.0版本 优化调度平衡任务队列模式
    • 总结

一次业务的批量数据任务的处理优化

业务背景

一个重新生成所有客户的财务业务指标数据的批量数据处理任务。

1.0版本 分批处理模式

根据要处理的客户数量,按照最大线程数切分成多个段,尽量保证每个线程处理相同的客户数量。

    private void updateForRegenerateByCustomer(List<Integer> customerIdList,SystemUserCommonDTO user, LocalDateTime now) {List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList)?customerInfoService.listAll():customerInfoService.listByIdList(customerIdList);//先清理客户的数据updateForCleanByCustomerIdList(baseInfoList,user,now);int maxSize = baseInfoList.size();//计算当前任务数量int currentMaxPoolSize = maxPoolSize<maxSize?maxPoolSize:maxSize;CompletableFuture[] tasks = new CompletableFuture[currentMaxPoolSize];//计算每个任务分段的数量int size = maxSize / currentMaxPoolSize;for(int i=0;i<currentMaxPoolSize;i++){final int begin = i * size;final int end = i==currentMaxPoolSize-1?maxSize:(i+1)*size;//创建异步处理的分段任务tasks[i] = CompletableFuture.runAsync(()->updateForGenerateByCustomerIdList(baseInfoList,begin,end,user,now),executorService).whenCompleteAsync((k,v)-> log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成",Thread.currentThread().getName()));}// 向线程池提交任务CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成",tasks.length)).join();}/*** 生成指定客户列表的所有数据**/private void updateForGenerateByCustomerIdList(List<CustomerBaseInfo> baseInfoList,int begin,int end,SystemUserCommonDTO user, LocalDateTime now){//每个线程只处理自己的分段的数据for(int i=begin;i<end;i++){CustomerBaseInfo baseInfo = baseInfoList.get(i);//每个客户独立事务TransactionalUtils.runWithNewTransactional(()->updateForGenerateByCustomerId(baseInfo.getId(),user,now));}}/*** 生成指定客户的所有数据**/private void updateForGenerateByCustomerId(Integer customerId,SystemUserCommonDTO user, LocalDateTime now){//1、重新生成客户的所有业务类型的数据List<FinanceBiMaintainDto> maintainDtoList =financeBiBusinessTypeSupport.getMaintainListByCustomerId(customerId);if(CollectionUtils.isEmpty(maintainDtoList)){return ;}//生成每个指标的数据Map<BusinessIndicatorEnum,List<FinanceBiMaintainDto>> indicatorMaintainDtoMap = maintainDtoList.stream().collect(Collectors.groupingBy(FinanceBiMaintainDto::getIndicator));indicatorMaintainDtoMap.forEach((k,v)->{log.info("重新生成财务业务指标指定客户【{}】的【{}】支持处理开始",customerId,k);financeBiManager.updateForBiMaintain(k, v,user,now);});}

运行耗时:1420.145秒

2.0版本 平衡任务队列模式

1.0 版本 由于不同客户的数据量不同,导致生成数据的耗时不同,因此按照客户数量均分任务的的方式对于每个线程来说,任务量是不一样的,因此可能会导致部分线程太忙,部分线程太空的情况。因此调整为使用队列方式来解决任务分配的问题,每个线程自己取队列中取要处理的客户,直到所有队列中的客户都被处理完,所有的线程结束。这样就避免的线程任务量不平衡问题。

updateForGenerateByCustomerId 方法不需要改造,只需要调整任务分配的相关方法就可以。

private void updateForRegenerateByCustomer(List<Integer> customerIdList, SystemUserCommonDTO user,LocalDateTime now) {List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() :customerInfoService.listByIdList(customerIdList);//先清理客户的数据updateForCleanByCustomerIdList(baseInfoList, user, now);int maxSize = baseInfoList.size();int currentMaxPoolSize = Math.min(maxPoolSize, maxSize);//根据线程数,构建固定的任务数量CompletableFuture<?>[] tasks = new CompletableFuture<?>[currentMaxPoolSize];//构建待处理的客户队列,由于这里没有并发读写的情况,因此用ConcurrentLinkedQueue效率会更高一点。ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(baseInfoList.stream().map(CustomerBaseInfo::getId).collect(Collectors.toList()));//创建多个线程去消耗客户队列for (int i = 0; i < currentMaxPoolSize; i++) {tasks[i] =CompletableFuture.runAsync(() -> updateForGenerateByCustomerIdList(queue, user, now), executorService).whenCompleteAsync((k, v) -> {if (v != null) {log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常",Thread.currentThread().getName()), v);} else {log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成",Thread.currentThread().getName());}});}// 向线程池提交任务CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length)).join();}/*** 生成指定客户列表的所有数据**/private void updateForGenerateByCustomerIdList(ConcurrentLinkedQueue<Integer> queue, SystemUserCommonDTO user,LocalDateTime now) {Integer customerId = queue.poll();//循环从客户队列中取出待处理的客户,直到所有客户都处理完毕。while (customerId != null) {final Integer currentCustomerId = customerId;TransactionalUtils.runWithNewTransactional(() -> updateForGenerateByCustomerId(currentCustomerId, user, now));customerId = queue.poll();}}

优化后的耗时:1037.059秒

3.0版本 优化调度平衡任务队列模式

2.0版本虽然解决的了每个线程任务量不平衡的问题,但可能出现某个数据量很大的客户在队列的尾部,导致当其他线程都处理完所有的客户时,取到最大数据量的客户的线程仍在运行,任务整体的耗时被增加。因此需要优化调度,将耗时高的客户调度到队列头部,保证耗时最长的客户的优先处理,从而避免最后等待耗时长的线程。

updateForGenerateByCustomerIdList 方法不需要改造,只需要队列构造处理就可以。


private void updateForRegenerateByCustomer(List<Integer> customerIdList, SystemUserCommonDTO user,LocalDateTime now) {List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() :customerInfoService.listByIdList(customerIdList);//先清理客户的数据updateForCleanByCustomerIdList(baseInfoList, user, now);//获取客户的统计数据Map<Integer, CustomerStatisticsInfo> customerStatisticsInfoMap =customerStatisticsInfoService.listAll().stream().collect(Collectors.toMap(CustomerStatisticsInfo::getCustomerId, Function.identity()));int maxSize = baseInfoList.size();int currentMaxPoolSize = Math.min(maxPoolSize, maxSize);CompletableFuture<String>[] tasks = new CompletableFuture[currentMaxPoolSize];//根据客户的统计数据,构建待处理的客户队列ConcurrentLinkedQueue<Integer> queue =baseInfoList.stream().map(item -> customerStatisticsInfoMap.get(item.getId())).filter(Objects::nonNull)
//队列按照客户数据量倒序排列               .sorted(Comparator.comparing(CustomerStatisticsInfo::getNumberOfCheckedSatisfactoryActivitys,Comparator.reverseOrder())).map(CustomerStatisticsInfo::getCustomerId).collect(Collectors.toCollection(ConcurrentLinkedQueue::new));for (int i = 0; i < currentMaxPoolSize; i++) {tasks[i] = CompletableFuture.supplyAsync(() -> {updateForGenerateByCustomerIdList(queue, user, now);return Thread.currentThread().getName();}, executorService).whenCompleteAsync((k, ex) -> {if (ex != null) {log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常", k), ex);} else {log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成", k);}});}// 向线程池提交任务CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length)).join();}

耗时:726.725秒

总结

最终的耗时从1400多秒 降低到700多秒。降低了一半左右。

相关文章:

一次业务的批量数据任务的处理优化

文章目录 一次业务的批量数据任务的处理优化业务背景1.0版本 分批处理模式2.0版本 平衡任务队列模式3.0版本 优化调度平衡任务队列模式总结 一次业务的批量数据任务的处理优化 业务背景 一个重新生成所有客户的财务业务指标数据的批量数据处理任务。 1.0版本 分批处理模式 …...

新能源汽车充电站远程监控系统S275钡铼技术无线RTU

新能源汽车充电站的远程监控系统在现代城市基础设施中扮演着至关重要的角色&#xff0c;而钡铼技术的S275无线RTU作为一款先进的物联网数据监测采集控制短信报警终端&#xff0c;为充电站的安全运行和高效管理提供了强大的技术支持。 技术特点和功能 钡铼S275采用了基于UCOSI…...

海外视频媒体发布/发稿:如何在国外媒体以视频的形式宣发

1. 背景介绍 在如今数字化时代&#xff0c;每个国家都拥有着各自的视频媒体平台&#xff0c;而主流媒体也都纷纷加入了视频发布的行列。视频媒体的宣发形式主要包括油管Youtube等视频分享平台&#xff0c;以及图文配合的发布方式。通过在视频中夹带链接&#xff0c;媒体可以以…...

HTML 【实用教程】(2024最新版)

核心思想 —— 语义化 【面试题】如何理解 HTML 语义化 ?仅通过标签便能判断内容的类型&#xff0c;特别是区分标题、段落、图片和表格 增加代码可读性&#xff0c;让人更容易读懂对SEO更加友好&#xff0c;让搜索引擎更容易读懂 html 文件的基本结构 html 文件的文件后缀为 …...

How to Describe Figures in a Research Article

How to Describe Figures in a Research Article DateAuthorVersionNote2024.07.10Dog TaoV1.0Finish the document. 文章目录 How to Describe Figures in a Research ArticleGeneral GuidelinesDetailed DescriptionsCommon Describing Phrases Effective communication of …...

昇思MindSpore学习入门-CELL与参数一

Cell作为神经网络构造的基础单元&#xff0c;与神经网络层(Layer)的概念相对应&#xff0c;对Tensor计算操作的抽象封装&#xff0c;能够更准确清晰地对神经网络结构进行表示。除了基础的Tensor计算流程定义外&#xff0c;神经网络层还包含了参数管理、状态管理等功能。而参数(…...

【k8s中安装rabbitmq】k8s中安装rabbitmq并搭建镜像集群-hostpath版

文章目录 简介一.条件及环境说明二.需求说明三.实现原理及说明四.详细步骤4.1.规划节点标签4.2.创建configmap配置4.3.创建三个statefulset和service headless配置4.4.创建service配置 五.安装完后的配置六.安装说明 简介 k8s集群中搭建rabbitmq集群服务一般都会用到pvc&#x…...

(5) 深入探索Python-Pandas库的核心数据结构:Series详解

目录 前言1. Series 简介2. Series的特点3. Series的创建3.1 使用列表创建Series3.2 使用字典创建Series3.3 使用列表和自定义索引创建Series3.4 指定数据类型和名称 4. Series的索引/切片4.1 下标索引&#xff1a;基于整数位置的索引4.2 基于标签的索引4.3 切片4.4 使用.loc[]…...

JAVA之开发神器——IntelliJ IDEA的下载与安装

一、IDEA是什么&#xff1f; IEAD是JetBrains公司开发的专用于java开发的一款集成开发环境。由于其功能强大且符合人体工程学&#xff08;就是更懂你&#xff09;的优点&#xff0c;深受java开发人员的喜爱。目前在java开发工具中占比3/4。如果你要走java开发方向&#xff0c;那…...

通过Umijs从0到1搭建一个React项目

有一阵时间没写react了&#xff0c;今天通过umi搭建一个demo项目复习一下react&#xff1b;umi是一个可扩展的企业级前端应用框架&#xff0c;在react市场中还是比较火的一个框架。 Umi官方文档&#xff1a;Umi 介绍 (umijs.org) 一、构建项目。 1、安装包管理工具。 官方推…...

Redis 数据过期及淘汰策略

Redis 数据过期及淘汰策略 过期策略 定时过期 在设置key​的过期时间的同时&#xff0c;为该key​创建一个定时器&#xff0c;让定时器在key​的过期时间来临时&#xff0c;对key进行删除。到过期时间就会立即清除。该策略可以立即清除过期的数据&#xff0c;对内存很友好&a…...

vue vite+three在线编辑模型导入导出

文章目录 序一、1.0.0版本1.新增2.编辑3.导出4.导入 总结 序 要实现一个类似于数字孪生的场景 可以在线、新增、删除模型 、以及编辑模型的颜色、长宽高 然后还要实现 编辑完后 保存为json数据 记录模型数据 既可以导入也可以导出 一、1.0.0版本 1.新增 先拿建议的立方体来…...

去水印小程序源码修复版-前端后端内置接口+第三方接口

去水印小程序源码&#xff0c;前端后端&#xff0c;内置接口第三方接口&#xff0c; 修复数据库账号密码错误问题&#xff0c;内置接口支持替换第三方接口&#xff0c; 文件挺全的&#xff0c;可以添加流量主代码&#xff0c;搭建需要准备一台服务器&#xff0c;备案域名和http…...

机器学习:预测评估8类指标

机器学习&#xff1a;8类预测评估指标 R方值、平均值绝对误差值MAE、均方误差MSE、均方误差根EMSE、中位数绝对误差MAD、平均绝对百分误差MAPE、可解释方差分EVS、均方根对数误差MLSE。 一、R方值 1、说明&#xff1a; R方值&#xff0c;也称为确定系数或拟合优度&#xff…...

【深度学习基础】MAC pycharm 专业版安装与激活

文章目录 一、pycharm专业版安装二、激活 一、pycharm专业版安装 PyCharm是一款专为Python开发者设计的集成开发环境&#xff08;IDE&#xff09;&#xff0c;旨在帮助用户在使用Python语言开发时提高效率。以下是对PyCharm软件的详细介绍&#xff0c;包括其作用和主要功能&…...

排序相关算法--1.插入排序+冒泡排序回顾

1.基本分类 2.插入排序 特点&#xff1a;有实践意义&#xff08;例如后期快排的优化&#xff09;&#xff0c;适应性强&#xff0c;一般不会到时间复杂度最坏的情况。 将第一个元素视为已经排好序的序列。取出下一个元素&#xff0c;在已经排好序的序列中从后往前比较&#xf…...

变阻器的故障排除方法有哪些?

变阻器&#xff0c;特别是滑动变阻器&#xff0c;作为电子电路中的常见元件&#xff0c;其故障排除方法主要依据具体的故障现象来确定。以下是一些常见的故障现象及其排除方法&#xff1a; 一、接触不良 现象&#xff1a;电阻器不起作用或电压不稳定。 排除方法&#xff1a; …...

软考《信息系统运行管理员》-3.1信息系统设施运维的管理体系

3.1信息系统设施运维的管理体系 1 信息系统设施运维的对象 基础环境 主要包括信息系统运行环境(机房、设备间、配线室、基站、云计算中心 等)中的空调系统、供配电系统、通信应急设备系统、防护设备系统(如消防系统、安全系统) 等&#xff0c;能维持系统安全正常运转&#xf…...

Nginx重定向

Nginx重定向 location 匹配 location匹配的就是后面的URL /WordPress 192.168.118.10/wordpress location匹配的分类和优先级 1.精确匹配 location/对字符串进行完全匹配,必须完全符合2.正则匹配 ^~ 前缀匹配,以什么为开头~ 区分大小写的匹配~* 不区分大小写!~: 区分大小…...

私有化地图离线部署方案之高程检索服务

私有化地图离线部署整体解决方案&#xff0c;除硬件之外&#xff0c;一般主要由基础地图服务、查询定位服务、路径规划服务和高程检索服务构成。 我们已经分享过基础地图服务、查询定位服务和路径规划服务&#xff0c;现在再为你分享高程检索服务的方法。 私有化高程检索服务…...

微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】

微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来&#xff0c;Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...

云启出海,智联未来|阿里云网络「企业出海」系列客户沙龙上海站圆满落地

借阿里云中企出海大会的东风&#xff0c;以**「云启出海&#xff0c;智联未来&#xff5c;打造安全可靠的出海云网络引擎」为主题的阿里云企业出海客户沙龙云网络&安全专场于5.28日下午在上海顺利举办&#xff0c;现场吸引了来自携程、小红书、米哈游、哔哩哔哩、波克城市、…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

工程地质软件市场:发展现状、趋势与策略建议

一、引言 在工程建设领域&#xff0c;准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具&#xff0c;正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作

一、上下文切换 即使单核CPU也可以进行多线程执行代码&#xff0c;CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短&#xff0c;所以CPU会不断地切换线程执行&#xff0c;从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...

MySQL JOIN 表过多的优化思路

当 MySQL 查询涉及大量表 JOIN 时&#xff0c;性能会显著下降。以下是优化思路和简易实现方法&#xff1a; 一、核心优化思路 减少 JOIN 数量 数据冗余&#xff1a;添加必要的冗余字段&#xff08;如订单表直接存储用户名&#xff09;合并表&#xff1a;将频繁关联的小表合并成…...

Redis:现代应用开发的高效内存数据存储利器

一、Redis的起源与发展 Redis最初由意大利程序员Salvatore Sanfilippo在2009年开发&#xff0c;其初衷是为了满足他自己的一个项目需求&#xff0c;即需要一个高性能的键值存储系统来解决传统数据库在高并发场景下的性能瓶颈。随着项目的开源&#xff0c;Redis凭借其简单易用、…...

android13 app的触摸问题定位分析流程

一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement 1. LAB环境2. L2公告策略2.1 部署Death Star2.2 访问服务2.3 部署L2公告策略2.4 服务宣告 3. 可视化 ARP 流量3.1 部署新服务3.2 准备可视化3.3 再次请求 4. 自动IPAM4.1 IPAM Pool4.2 …...