Pipeline流水线组件
文章目录
- 1、新建pipeline流水线
- 2、定义处理器
- 3、定义处理器上下文
- 4、pipeline流水线实现
- 5、处理器抽象类实现
- 6、pipeline流水线构建者
- 7、具体处理器实现
- 8、流水线测试
- 9、运行结果
1、新建pipeline流水线
package com.summer.toolkit.model.chain;import java.util.List;
import java.util.concurrent.Executor;public interface Pipeline<T> {/*** 向pipeline中添加一个执行器** @param handler 执行器* @return 返回pipeline对象*/Pipeline<T> addLast(Handler<T> handler);/*** 向pipeline中添加一个执行器** @param name 执行器名称* @param handler 执行器* @return 返回pipeline对象*/Pipeline<T> addLast(String name, Handler<T> handler);/*** pipeline执行** @param list 数据集合* @return 返回值,执行完成返回true*/boolean execute(List<T> list);/*** pipeline并行执行** @param list 数据集合* @param executor 线程池* @return 返回值,执行完成返回true*/boolean parallelExecute(List<T> list, Executor executor);/*** pipeline执行** @param object 单个数据* @return 返回值,执行完成返回true*/boolean execute(T object);}
2、定义处理器
package com.summer.toolkit.model.chain;public interface Handler<T> {/*** 处理器处理方法** @param handlerContext 上下文* @param t 要处理的数据*/void doHandler(HandlerContext<T> handlerContext, T t);}
3、定义处理器上下文
package com.summer.toolkit.model.chain;import lombok.Data;@Data
public class HandlerContext<T> {/*** 执行器名称 */private String name;/*** 执行器 */private Handler<T> handler;/*** 链表的下一个节点,用来保存下一个执行器 */public HandlerContext<T> next;public HandlerContext(Handler<T> handler) {this.name = handler.getClass().getName();this.handler = handler;}public HandlerContext(String name, Handler<T> handler) {this.name = name;this.handler = handler;}/*** 调用该方法即调用上下文中处理器的执行方法** @param t 需要处理的数据*/public void handle(T t) {this.handler.doHandler(this, t);}/*** 执行下一个节点的处理器** @param t 待执行的数据*/public void runNext(T t) {if (this.next != null) {this.next.handle(t);}}
}
4、pipeline流水线实现
package com.summer.toolkit.model.chain;import com.summer.toolkit.util.CollectionUtils;
import com.summer.toolkit.util.StringUtils;
import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;@Slf4j
public class DefaultPipeline<T> implements Pipeline<T> {/*** 默认pipeline中有一个处理器上下文的头结点* 头结点无处理逻辑,直接执行下一个节点的处理器*/HandlerContext<T> head = new HandlerContext<>(HandlerContext::runNext);@Overridepublic Pipeline<T> addLast(Handler<T> handler) {this.addLast(null, handler);return this;}@Overridepublic Pipeline<T> addLast(String name, Handler<T> handler) {if (handler == null) {log.warn("处理器为空,不进行添加!");return this;}if (StringUtils.isEmpty(name)) {name = handler.getClass().getName();}// 将处理器添加到处理器上下文的尾节点HandlerContext<T> context = head;while (context.next != null) {context = context.next;}context.next = new HandlerContext<T>(name, handler);return this;}@Overridepublic boolean execute(List<T> list) {List<Object> result = list.stream().peek(this::execute).collect(Collectors.toList());return true;}@Overridepublic boolean parallelExecute(List<T> list, Executor executor) {Map<String, List<T>> parts = this.split(list);List<CompletableFuture<Boolean>> results = new ArrayList<>();for (Map.Entry<String, List<T>> entry : parts.entrySet()) {CompletableFuture<Boolean> completableFuture = CompletableFuture// 提交任务.supplyAsync(() -> this.execute(entry.getValue()), executor)// 打印异常信息.exceptionally(e -> {log.error("并行处理数据时发生异常!{}", e.getMessage(), e);return Boolean.FALSE;});results.add(completableFuture);}CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).join();return true;}@Overridepublic boolean execute(T t) {this.head.handle(t);return true;}/*** 对集合进行分组拆分** @param list 集合* @return 返回值*/private Map<String, List<T>> split(List<T> list) {Map<String, List<T>> parts = new HashMap<>(8);if (CollectionUtils.isEmpty(list)) {return parts;}// 如果集合数量过少,则不进行分组int limit = 10;if (list.size() < limit) {String key = String.valueOf(0);parts.put(key, list);return parts;}// 固定分五个分组int group = 5;for (int i = 0, length = list.size(); i < length; i++) {int key = i % group;List<T> part = parts.computeIfAbsent(String.valueOf(key), k -> new ArrayList<>());T t = list.get(i);part.add(t);}return parts;}}
5、处理器抽象类实现
package com.summer.toolkit.model.chain;import lombok.extern.slf4j.Slf4j;@Slf4j
public abstract class AbstractHandler<T> implements Handler<T> {/*** 开始处理数据,通用方法** @param handlerContext 上下文* @param t 要处理的数据*/@Overridepublic void doHandler(HandlerContext<T> handlerContext, T t) {long start = System.currentTimeMillis();String threadName = Thread.currentThread().getName();String handlerName = handlerContext.getName();log.info("====={} 开始处理:{}=====", threadName, handlerName);try {// 此处处理异常,如果执行过程失败,则继续执行下一个handlerthis.handle(t);} catch (Throwable throwable) {log.error("====={} 处理异常:{},异常原因:{}=====", threadName, handlerName, throwable.getMessage(), throwable);this.handleException(t, throwable);}long end = System.currentTimeMillis();log.info("====={} 处理完成:{},耗时:{} 毫秒=====", threadName, handlerName, (end - start));// 处理完该上下文中的处理器逻辑后,调用上下文中的下一个执行器的执行方法handlerContext.runNext(t);}/*** 处理数据抽象方法,由子类实现具体细节** @param t 对象*/public abstract void handle(T t);/*** 处理数据抽象方法,由子类实现具体细节** @param t 对象* @param throwable 异常对象*/public void handleException(T t, Throwable throwable) {log.error("=====处理数据发生异常:{}", throwable.getMessage(), throwable);}}
6、pipeline流水线构建者
package com.summer.toolkit.model.chain;public class DefaultPipelineBuilder<T> {private final Pipeline<T> pipeline;public DefaultPipelineBuilder() {this.pipeline = new DefaultPipeline<>();}/*** 向pipeline中添加一个执行器** @param handler 执行器* @return 返回pipeline对象*/public DefaultPipelineBuilder<T> addLast(Handler<T> handler) {pipeline.addLast(handler);return this;}/*** 向pipeline中添加一个执行器** @param name 执行器名称* @return 返回pipeline对象*/public DefaultPipelineBuilder<T> addLast(String name, Handler<T> handler) {pipeline.addLast(name, handler);return this;}/*** 返回pipeline对象** @return 返回值*/public Pipeline<T> build() {return this.pipeline;}}
7、具体处理器实现
package com.summer.toolkit.model.chain;import lombok.extern.slf4j.Slf4j;import java.util.Objects;@Slf4j
public class StringHandler extends AbstractHandler<String> {@Overridepublic void handle(String s) {log.info("入参:{}", s);}@Overridepublic void handleException(String s, Throwable throwable) {if (Objects.nonNull(throwable)) {log.error("异常:{}", throwable.getMessage());}}}
8、流水线测试
package com.summer.toolkit.model;import com.summer.toolkit.model.chain.DefaultPipelineBuilder;
import com.summer.toolkit.model.chain.Pipeline;
import com.summer.toolkit.model.chain.StringHandler;public class Processor {public static void main(String[] args) {DefaultPipelineBuilder<String> builder = new DefaultPipelineBuilder<>();Pipeline<String> pipeline = builder.addLast("字符串信息", new StringHandler()).addLast("寄件人信息", new StringHandler()).addLast("收件人信息", new StringHandler()).build();pipeline.execute("1");}}
9、运行结果
20:03:00.285 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:字符串信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:字符串信息,耗时:5 毫秒=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:寄件人信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:寄件人信息,耗时:0 毫秒=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:收件人信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:收件人信息,耗时:0 毫秒=====
相关文章:

Pipeline流水线组件
文章目录 1、新建pipeline流水线2、定义处理器3、定义处理器上下文4、pipeline流水线实现5、处理器抽象类实现6、pipeline流水线构建者7、具体处理器实现8、流水线测试9、运行结果 1、新建pipeline流水线 package com.summer.toolkit.model.chain;import java.util.List; impo…...

闪灵CMS电子商城系统源码v5.0(自带微信小程序)
源码介绍 闪灵CMS电子商城系统源码,双语带手机版,PHPMYSQL进行开发,网站安装简单、快捷。 闪灵CMS系统更新日志 1.修复:修复了开启强制https后,说明文档重定向过多的问题 2.修复:修复了商品名称过长时无…...

基于SSM的旅游民宿预定系统【源码】【运行教程】
基于SSM的旅游民宿预定系统 一、项目介绍1. 游客功能2. 管理员功能3. 高级功能 二、项目技术栈三、项目运行四、项目演示总结 大家好,这里是程序猿代码之路!随着旅游业的快速发展,民宿作为一种独特的住宿方式越来越受到游客的喜爱。为了提升用…...

PgSQL技术内幕 - psql与服务端连接与交互机制
PgSQL技术内幕 - 客户端psql与服务端连接与交互机制 简单来说,PgSQL的psql客户端向服务端发起连接请求,服务端接收到请求后,fork出一个子进程,之后由该子进程和客户端进行交互,处理客户端的SQL等,并将结果返…...

实现开发板三盏灯点亮熄灭
实现开发板三盏灯点亮熄灭 typedef struct {volatile unsigned int MODER; // 0x00volatile unsigned int OTYPER; // 0x04volatile unsigned int OSPEEDR; // 0x08volatile unsigned int PUPDR; // 0x0Cvolatile unsigned int IDR; // 0x10volatile unsigned int OD…...

外汇天眼:盈透证券为客户提供更丰富的欧洲衍生品交易渠道
电子交易巨头盈透证券(纳斯达克代码:IBKR)今日宣布,通过Cboe欧洲期权交易所(CEDX)新增欧洲股票期权和欧洲指数期货及期权。这一新增功能使得盈透证券的客户可以在单一统一平台上,除了股票、期权…...

论文阅读Rolling-Unet,卷积结合MLP的图像分割模型
这篇论文提出了一种新的医学图像分割网络Rolling-Unet,目的是在不用Transformer的前提下,能同时有效提取局部特征和长距离依赖性,从而在性能和计算成本之间找到良好的平衡点。 论文地址:https://ojs.aaai.org/index.php/AAAI/article/view/2…...

Linux Shell命令vim使用
一、引例 以判断引出(学过C其他语言容易接受)。 简单命令说明: -e 测试文件是否存在 -f 测试文件是否为普通文件 -d 测试文件是否为目录 -r 测试当前用户对某文件是否具有“可读”权限 -w 测试当前用户对某文件是否具有“可写”权限…...

如何将 API 管理从 Postman 转移到 Apifox
上一篇推文讲到用 Swagger 管理的 API 怎么迁移到 Apifox,有许多同学反馈说能不能介绍一下 Postman 的迁移以及迁移过程中需要注意的事项。那么今天,它来了! 从 Postman 迁移到 Apifox 的方法有两种: 导出 Postman 集合 &#x…...

用链表实现的C语言队列
一、队列概述 在数据结构中,队列是一种先进先出(FIFO)的线性表。它在许多应用场景中非常有用,例如任务调度、进程管理、资源管理等。队列是一种重要的数据结构,其主要特点是先进先出(FIFO, First In First …...

国产SDI视频均衡驱动器,功能与 LMH0387/LMH0344 一致
视频均衡驱动器,功能与 LMH0387 一致、LMH0344。本期间支持 DVB-ASI,作为驱动器能够选择输出速率,作为均衡接收器能支持100m以上传输距离(线缆类型Belden 1694A)。最大支持3Gbps 速率的信号 2 产品特征 a)…...

如何用Xinstall CPS结算系统打破传统营销桎梏,实现用户增长?
在互联网流量红利逐渐衰退的今天,App推广和运营面临着前所未有的挑战。如何快速搭建起满足用户需求的运营体系,成为了众多企业急待解决的问题。而在这个关键时刻,Xinstall CPS结算系统应运而生,以其独特的优势帮助企业解决了一系列…...

(代数:解一元二次方程)可以使用下面的公式求一元二次方程 ax2+bx+c0 的两个根:
(代数:解一元二次方程)可以使用下面的公式求一元二次方程 ax2bxc0 的两个根: b2-4ac 称作一元二次方程的判别式。如果它是正值,那么一元二次方程就有两个实数根。 如果它为 0,方程式就只有一个根。如果它是负值,方程式无实根。 编写程序,提示…...

如何提高网站收录?
GSI服务就是专门干这个的,这个服务用的是光算科技自己研发的GPC爬虫池系统。这个系统通过建立一个庞大的站群和复杂的链接结构,来吸引谷歌的爬虫。这样一来,你的网站就能更频繁地被谷歌的爬虫访问,从而提高被收录的机会。 说到效…...

Docker 学习总结(83)—— 配置文件daemon.json介绍及优化建议
一、daemon.json 文件概述 daemon.json是Docker守护进程的配置文件,它允许系统管理员自定义Docker守护程序的行为。此文件通常位于/etc/docker/目录下。通过修改daemon.json,可以调整Docker守护进程的多种设置,包括网络配置、日志记录、存储驱动等。 二、daemon.json 文件结…...

Javaweb04-Servlet技术2(HttpServletResponse, HttpServletRequest)
Servlet技术基础 HttpServletResponse对象 HttpServletResponce对象是继承ServletResponse接口,专门用于封装Http请求 HttpServletResponce有关响应行的方法 方法说明功能描述void setStatus(int stauts)用于设置HTTP响应消息的状态码,并生成响应状态…...

chat gpt基本原理解读
chat gpt基本原理解读 ChatGPT是一种基于生成式预训练变换器(Generative Pre-trained Transformer, GPT)的对话模型,主要通过大量的文本数据训练生成自然语言回复。以下是ChatGPT的基本原理解读: 1. 基本架构 ChatGPT 是基于 GPT…...

单目标应用:基于蛇鹫优化算法SBOA的微电网优化(MATLAB代码)
一、微电网模型介绍 微电网多目标优化调度模型简介_vmgpqv-CSDN博客 参考文献: [1]李兴莘,张靖,何宇,等.基于改进粒子群算法的微电网多目标优化调度[J].电力科学与工程, 2021, 37(3):7 二、蛇鹫优化算法求解微电网 2.1算法简介 蛇鹫优化算法(Secre…...

MySQL系列-安装配置使用说明(MAC版本)
1、前言 本文将介绍MySQL的安装配置以及基本语法操作说明 环境:mac 版本:MySQL 8.0.28 之前电脑安装卸载过,后面在装的时候遇到一些问题,用了四五天才解决,主要是参考 https://blog.csdn.net/zz00008888/article/deta…...

vue elementui el-input 正则验证,限制只能输入数字和小数
vue elementui el-input 正则验证 限制只能输入数字和小数,以下两种方法都可以: 1、οninput“value value.replace(/[^0-9.]/g,‘’)” 2、οninput“value value.replace(/[^\d.]/g, ‘’)” 限制只能输入数字: 1、oninput “valuevalu…...

强化学习入门
简介 强化学习(Reinforcement Learning, RL),又称再励学习、评价学习或增强学习,是机器学习的范式和方法论之一,用于描述和解决智能体(agent)在与环境的交互过程中通过学习策略以达成回报最大化…...

简约不简单,建筑装饰演绎现代美学
走在城市的大街小巷,你是否曾被那些独特而精美的建筑装饰所吸引?每一栋建筑都像是艺术家的杰作,通过精美的装饰诉说着它的故事。 我们的建筑装饰,不仅注重外在的美观,更追求内在的品质。从古典的雕花到现代的简约线条&…...

SpringBoot调用WebService的实践
作者所在公司的系统间的信息交互是通过webservice完成。如:MES与SAP的交互,MES与WMS的交换,MES与SRM的交互,MES与IOT的交互等。 MES是用.NET VS2008 C#写的,调用webservice很简单,这里不再赘述。如有想了解…...

源码编译构建LAMP
Apache 起源 源于A Patchy Server,著名的开源Web服务软件1995年时,发布Apache服务程序的1.0版本由Apache软件基金会(ASF)负责维护最新的名称为“Apache HTTP Server”官方站点:http://httpd.apache.org/ 主要特点 开发源代码/…...

搜索是门艺术,大神都是这样找资源
以下所有资源均可在星云导航找到,网站地址:https://www.xygalaxy.com/ 浏览器搜索高级用法 1、排除干扰,指定关键词 1.1、排除指定关键字 格式:关键字1 -关键字2比如搜索:星云导航,不想要CSDN的内容 星…...

【设计模式深度剖析】【5】【行为型】【迭代器模式】
👈️上一篇:策略模式 | 下一篇:中介者模式👉️ 设计模式-专栏👈️ 文章目录 迭代器模式定义英文原话直译如何理解呢? 迭代器模式的角色1. Iterator(迭代器)2. ConcreteIterator(具体迭代器…...

怎么更快捷的修改图片大小?压缩图片jpg、png、gif的快捷方法
jpg作为最常用的一种图片格式,在遇到图片太大问题时,该如何操作能够快速在压缩图片jpg的大小呢?图片太大无法上传时目前常见的一个使用问题,只有将图片处理到合适的大小才可以正常在平台上传使用,一般情况下想要快速解…...

Shell脚本 if语句
条件测试: $? 返回码 判断命令或者脚本是否执行成功(最近的一条) 0 true 为真就是成功 成立 非0 false 失败或者异常 test命令 可以进行条件测试 然后根据的是返回值来判断条件是否成立。 -e 测试目录或者文件是否存在 exist -d 测试…...

集合查询-并(UNION)集运算、交(INTERSECT)集运算、差(EXCEPT)集运算
一、概述 集合查询是对两个SELECT语句的查询结果进行再进行处理的查询 二、条件 1、两个SELECT语句的查询结果必须是属性列数目相同 2、两个SELECT语句的查询结果必须是对应位置上的属性列必须是相同的数据类型 三、并(UNION)运算 1、语法格式: SELECT 语句1…...

常用的bit位操作
//判断某1位是1还是0 #ifndef GET_BIT #define BIT_IS_1(value,bitpos) (((value)&(1<<(bitpos)))>>(bitpos)) #endif //读取指定位置bit位的值 #ifndef GET_BIT #define GET_BIT(value,bitpos) ((value)&(1<<(bitpos))) #endif //取反指定位置bit位…...