当前位置: 首页 > news >正文

Pipeline流水线组件

文章目录

    • 1、新建pipeline流水线
    • 2、定义处理器
    • 3、定义处理器上下文
    • 4、pipeline流水线实现
    • 5、处理器抽象类实现
    • 6、pipeline流水线构建者
    • 7、具体处理器实现
    • 8、流水线测试
    • 9、运行结果

1、新建pipeline流水线

package com.summer.toolkit.model.chain;import java.util.List;
import java.util.concurrent.Executor;public interface Pipeline<T> {/*** 向pipeline中添加一个执行器** @param handler 执行器* @return 返回pipeline对象*/Pipeline<T> addLast(Handler<T> handler);/*** 向pipeline中添加一个执行器** @param name    执行器名称* @param handler 执行器* @return 返回pipeline对象*/Pipeline<T> addLast(String name, Handler<T> handler);/*** pipeline执行** @param list 数据集合* @return 返回值,执行完成返回true*/boolean execute(List<T> list);/*** pipeline并行执行** @param list     数据集合* @param executor 线程池* @return 返回值,执行完成返回true*/boolean parallelExecute(List<T> list, Executor executor);/*** pipeline执行** @param object 单个数据* @return 返回值,执行完成返回true*/boolean execute(T object);}

2、定义处理器

package com.summer.toolkit.model.chain;public interface Handler<T> {/*** 处理器处理方法** @param handlerContext 上下文* @param t              要处理的数据*/void doHandler(HandlerContext<T> handlerContext, T t);}

3、定义处理器上下文

package com.summer.toolkit.model.chain;import lombok.Data;@Data
public class HandlerContext<T> {/*** 执行器名称 */private String name;/*** 执行器 */private Handler<T> handler;/*** 链表的下一个节点,用来保存下一个执行器 */public HandlerContext<T> next;public HandlerContext(Handler<T> handler) {this.name = handler.getClass().getName();this.handler = handler;}public HandlerContext(String name, Handler<T> handler) {this.name = name;this.handler = handler;}/*** 调用该方法即调用上下文中处理器的执行方法** @param t 需要处理的数据*/public void handle(T t) {this.handler.doHandler(this, t);}/*** 执行下一个节点的处理器** @param t 待执行的数据*/public void runNext(T t) {if (this.next != null) {this.next.handle(t);}}
}

4、pipeline流水线实现

package com.summer.toolkit.model.chain;import com.summer.toolkit.util.CollectionUtils;
import com.summer.toolkit.util.StringUtils;
import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;@Slf4j
public class DefaultPipeline<T> implements Pipeline<T> {/*** 默认pipeline中有一个处理器上下文的头结点* 头结点无处理逻辑,直接执行下一个节点的处理器*/HandlerContext<T> head = new HandlerContext<>(HandlerContext::runNext);@Overridepublic Pipeline<T> addLast(Handler<T> handler) {this.addLast(null, handler);return this;}@Overridepublic Pipeline<T> addLast(String name, Handler<T> handler) {if (handler == null) {log.warn("处理器为空,不进行添加!");return this;}if (StringUtils.isEmpty(name)) {name = handler.getClass().getName();}// 将处理器添加到处理器上下文的尾节点HandlerContext<T> context = head;while (context.next != null) {context = context.next;}context.next = new HandlerContext<T>(name, handler);return this;}@Overridepublic boolean execute(List<T> list) {List<Object> result = list.stream().peek(this::execute).collect(Collectors.toList());return true;}@Overridepublic boolean parallelExecute(List<T> list, Executor executor) {Map<String, List<T>> parts = this.split(list);List<CompletableFuture<Boolean>> results = new ArrayList<>();for (Map.Entry<String, List<T>> entry : parts.entrySet()) {CompletableFuture<Boolean> completableFuture = CompletableFuture// 提交任务.supplyAsync(() -> this.execute(entry.getValue()), executor)// 打印异常信息.exceptionally(e -> {log.error("并行处理数据时发生异常!{}", e.getMessage(), e);return Boolean.FALSE;});results.add(completableFuture);}CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).join();return true;}@Overridepublic boolean execute(T t) {this.head.handle(t);return true;}/*** 对集合进行分组拆分** @param list 集合* @return 返回值*/private Map<String, List<T>> split(List<T> list) {Map<String, List<T>> parts = new HashMap<>(8);if (CollectionUtils.isEmpty(list)) {return parts;}// 如果集合数量过少,则不进行分组int limit = 10;if (list.size() < limit) {String key = String.valueOf(0);parts.put(key, list);return parts;}// 固定分五个分组int group = 5;for (int i = 0, length = list.size(); i < length; i++) {int key = i % group;List<T> part = parts.computeIfAbsent(String.valueOf(key), k -> new ArrayList<>());T t = list.get(i);part.add(t);}return parts;}}

5、处理器抽象类实现

package com.summer.toolkit.model.chain;import lombok.extern.slf4j.Slf4j;@Slf4j
public abstract class AbstractHandler<T> implements Handler<T> {/*** 开始处理数据,通用方法** @param handlerContext 上下文* @param t              要处理的数据*/@Overridepublic void doHandler(HandlerContext<T> handlerContext, T t) {long start = System.currentTimeMillis();String threadName = Thread.currentThread().getName();String handlerName = handlerContext.getName();log.info("====={} 开始处理:{}=====", threadName, handlerName);try {// 此处处理异常,如果执行过程失败,则继续执行下一个handlerthis.handle(t);} catch (Throwable throwable) {log.error("====={} 处理异常:{},异常原因:{}=====", threadName, handlerName, throwable.getMessage(), throwable);this.handleException(t, throwable);}long end = System.currentTimeMillis();log.info("====={} 处理完成:{},耗时:{} 毫秒=====", threadName, handlerName, (end - start));// 处理完该上下文中的处理器逻辑后,调用上下文中的下一个执行器的执行方法handlerContext.runNext(t);}/*** 处理数据抽象方法,由子类实现具体细节** @param t 对象*/public abstract void handle(T t);/*** 处理数据抽象方法,由子类实现具体细节** @param t         对象* @param throwable 异常对象*/public void handleException(T t, Throwable throwable) {log.error("=====处理数据发生异常:{}", throwable.getMessage(), throwable);}}

6、pipeline流水线构建者

package com.summer.toolkit.model.chain;public class DefaultPipelineBuilder<T> {private final Pipeline<T> pipeline;public DefaultPipelineBuilder() {this.pipeline = new DefaultPipeline<>();}/*** 向pipeline中添加一个执行器** @param handler 执行器* @return 返回pipeline对象*/public DefaultPipelineBuilder<T> addLast(Handler<T> handler) {pipeline.addLast(handler);return this;}/*** 向pipeline中添加一个执行器** @param name 执行器名称* @return 返回pipeline对象*/public DefaultPipelineBuilder<T> addLast(String name, Handler<T> handler) {pipeline.addLast(name, handler);return this;}/*** 返回pipeline对象** @return 返回值*/public Pipeline<T> build() {return this.pipeline;}}

7、具体处理器实现

package com.summer.toolkit.model.chain;import lombok.extern.slf4j.Slf4j;import java.util.Objects;@Slf4j
public class StringHandler extends AbstractHandler<String> {@Overridepublic void handle(String s) {log.info("入参:{}", s);}@Overridepublic void handleException(String s, Throwable throwable) {if (Objects.nonNull(throwable)) {log.error("异常:{}", throwable.getMessage());}}}

8、流水线测试

package com.summer.toolkit.model;import com.summer.toolkit.model.chain.DefaultPipelineBuilder;
import com.summer.toolkit.model.chain.Pipeline;
import com.summer.toolkit.model.chain.StringHandler;public class Processor {public static void main(String[] args) {DefaultPipelineBuilder<String> builder = new DefaultPipelineBuilder<>();Pipeline<String> pipeline = builder.addLast("字符串信息", new StringHandler()).addLast("寄件人信息", new StringHandler()).addLast("收件人信息", new StringHandler()).build();pipeline.execute("1");}}

9、运行结果

20:03:00.285 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:字符串信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:字符串信息,耗时:5 毫秒=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:寄件人信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:寄件人信息,耗时:0 毫秒=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:收件人信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:收件人信息,耗时:0 毫秒=====

相关文章:

Pipeline流水线组件

文章目录 1、新建pipeline流水线2、定义处理器3、定义处理器上下文4、pipeline流水线实现5、处理器抽象类实现6、pipeline流水线构建者7、具体处理器实现8、流水线测试9、运行结果 1、新建pipeline流水线 package com.summer.toolkit.model.chain;import java.util.List; impo…...

闪灵CMS电子商城系统源码v5.0(自带微信小程序)

源码介绍 闪灵CMS电子商城系统源码&#xff0c;双语带手机版&#xff0c;PHPMYSQL进行开发&#xff0c;网站安装简单、快捷。 闪灵CMS系统更新日志 1.修复&#xff1a;修复了开启强制https后&#xff0c;说明文档重定向过多的问题 2.修复&#xff1a;修复了商品名称过长时无…...

基于SSM的旅游民宿预定系统【源码】【运行教程】

基于SSM的旅游民宿预定系统 一、项目介绍1. 游客功能2. 管理员功能3. 高级功能 二、项目技术栈三、项目运行四、项目演示总结 大家好&#xff0c;这里是程序猿代码之路&#xff01;随着旅游业的快速发展&#xff0c;民宿作为一种独特的住宿方式越来越受到游客的喜爱。为了提升用…...

PgSQL技术内幕 - psql与服务端连接与交互机制

PgSQL技术内幕 - 客户端psql与服务端连接与交互机制 简单来说&#xff0c;PgSQL的psql客户端向服务端发起连接请求&#xff0c;服务端接收到请求后&#xff0c;fork出一个子进程&#xff0c;之后由该子进程和客户端进行交互&#xff0c;处理客户端的SQL等&#xff0c;并将结果返…...

实现开发板三盏灯点亮熄灭

实现开发板三盏灯点亮熄灭 typedef struct {volatile unsigned int MODER; // 0x00volatile unsigned int OTYPER; // 0x04volatile unsigned int OSPEEDR; // 0x08volatile unsigned int PUPDR; // 0x0Cvolatile unsigned int IDR; // 0x10volatile unsigned int OD…...

外汇天眼:盈透证券为客户提供更丰富的欧洲衍生品交易渠道

电子交易巨头盈透证券&#xff08;纳斯达克代码&#xff1a;IBKR&#xff09;今日宣布&#xff0c;通过Cboe欧洲期权交易所&#xff08;CEDX&#xff09;新增欧洲股票期权和欧洲指数期货及期权。这一新增功能使得盈透证券的客户可以在单一统一平台上&#xff0c;除了股票、期权…...

论文阅读Rolling-Unet,卷积结合MLP的图像分割模型

这篇论文提出了一种新的医学图像分割网络Rolling-Unet&#xff0c;目的是在不用Transformer的前提下&#xff0c;能同时有效提取局部特征和长距离依赖性,从而在性能和计算成本之间找到良好的平衡点。 论文地址&#xff1a;https://ojs.aaai.org/index.php/AAAI/article/view/2…...

Linux Shell命令vim使用

一、引例 以判断引出&#xff08;学过C其他语言容易接受&#xff09;。 简单命令说明&#xff1a; -e 测试文件是否存在 -f 测试文件是否为普通文件 -d 测试文件是否为目录 -r 测试当前用户对某文件是否具有“可读”权限 -w 测试当前用户对某文件是否具有“可写”权限…...

如何将 API 管理从 Postman 转移到 Apifox

上一篇推文讲到用 Swagger 管理的 API 怎么迁移到 Apifox&#xff0c;有许多同学反馈说能不能介绍一下 Postman 的迁移以及迁移过程中需要注意的事项。那么今天&#xff0c;它来了&#xff01; 从 Postman 迁移到 Apifox 的方法有两种&#xff1a; 导出 Postman 集合 &#x…...

用链表实现的C语言队列

一、队列概述 在数据结构中&#xff0c;队列是一种先进先出&#xff08;FIFO&#xff09;的线性表。它在许多应用场景中非常有用&#xff0c;例如任务调度、进程管理、资源管理等。队列是一种重要的数据结构&#xff0c;其主要特点是先进先出&#xff08;FIFO, First In First …...

国产SDI视频均衡驱动器,功能与 LMH0387/LMH0344 一致

视频均衡驱动器&#xff0c;功能与 LMH0387 一致、LMH0344。本期间支持 DVB-ASI&#xff0c;作为驱动器能够选择输出速率&#xff0c;作为均衡接收器能支持100m以上传输距离&#xff08;线缆类型Belden 1694A&#xff09;。最大支持3Gbps 速率的信号 2 产品特征 a&#xff09…...

如何用Xinstall CPS结算系统打破传统营销桎梏,实现用户增长?

在互联网流量红利逐渐衰退的今天&#xff0c;App推广和运营面临着前所未有的挑战。如何快速搭建起满足用户需求的运营体系&#xff0c;成为了众多企业急待解决的问题。而在这个关键时刻&#xff0c;Xinstall CPS结算系统应运而生&#xff0c;以其独特的优势帮助企业解决了一系列…...

(代数:解一元二次方程)可以使用下面的公式求一元二次方程 ax2+bx+c0 的两个根:

(代数:解一元二次方程)可以使用下面的公式求一元二次方程 ax2bxc0 的两个根: b2-4ac 称作一元二次方程的判别式。如果它是正值,那么一元二次方程就有两个实数根。 如果它为 0&#xff0c;方程式就只有一个根。如果它是负值&#xff0c;方程式无实根。 编写程序&#xff0c;提示…...

如何提高网站收录?

GSI服务就是专门干这个的&#xff0c;这个服务用的是光算科技自己研发的GPC爬虫池系统。这个系统通过建立一个庞大的站群和复杂的链接结构&#xff0c;来吸引谷歌的爬虫。这样一来&#xff0c;你的网站就能更频繁地被谷歌的爬虫访问&#xff0c;从而提高被收录的机会。 说到效…...

Docker 学习总结(83)—— 配置文件daemon.json介绍及优化建议

一、daemon.json 文件概述 daemon.json是Docker守护进程的配置文件,它允许系统管理员自定义Docker守护程序的行为。此文件通常位于/etc/docker/目录下。通过修改daemon.json,可以调整Docker守护进程的多种设置,包括网络配置、日志记录、存储驱动等。 二、daemon.json 文件结…...

Javaweb04-Servlet技术2(HttpServletResponse, HttpServletRequest)

Servlet技术基础 HttpServletResponse对象 HttpServletResponce对象是继承ServletResponse接口&#xff0c;专门用于封装Http请求 HttpServletResponce有关响应行的方法 方法说明功能描述void setStatus(int stauts)用于设置HTTP响应消息的状态码&#xff0c;并生成响应状态…...

chat gpt基本原理解读

chat gpt基本原理解读 ChatGPT是一种基于生成式预训练变换器&#xff08;Generative Pre-trained Transformer, GPT&#xff09;的对话模型&#xff0c;主要通过大量的文本数据训练生成自然语言回复。以下是ChatGPT的基本原理解读&#xff1a; 1. 基本架构 ChatGPT 是基于 GPT…...

单目标应用:基于蛇鹫优化算法SBOA的微电网优化(MATLAB代码)

一、微电网模型介绍 微电网多目标优化调度模型简介_vmgpqv-CSDN博客 参考文献&#xff1a; [1]李兴莘,张靖,何宇,等.基于改进粒子群算法的微电网多目标优化调度[J].电力科学与工程, 2021, 37(3):7 二、蛇鹫优化算法求解微电网 2.1算法简介 蛇鹫优化算法&#xff08;Secre…...

MySQL系列-安装配置使用说明(MAC版本)

1、前言 本文将介绍MySQL的安装配置以及基本语法操作说明 环境&#xff1a;mac 版本&#xff1a;MySQL 8.0.28 之前电脑安装卸载过&#xff0c;后面在装的时候遇到一些问题&#xff0c;用了四五天才解决&#xff0c;主要是参考 https://blog.csdn.net/zz00008888/article/deta…...

vue elementui el-input 正则验证,限制只能输入数字和小数

vue elementui el-input 正则验证 限制只能输入数字和小数&#xff0c;以下两种方法都可以&#xff1a; 1、οninput“value value.replace(/[^0-9.]/g,‘’)” 2、οninput“value value.replace(/[^\d.]/g, ‘’)” 限制只能输入数字&#xff1a; 1、oninput “valuevalu…...

强化学习入门

简介 强化学习&#xff08;Reinforcement Learning, RL&#xff09;&#xff0c;又称再励学习、评价学习或增强学习&#xff0c;是机器学习的范式和方法论之一&#xff0c;用于描述和解决智能体&#xff08;agent&#xff09;在与环境的交互过程中通过学习策略以达成回报最大化…...

简约不简单,建筑装饰演绎现代美学

走在城市的大街小巷&#xff0c;你是否曾被那些独特而精美的建筑装饰所吸引&#xff1f;每一栋建筑都像是艺术家的杰作&#xff0c;通过精美的装饰诉说着它的故事。 我们的建筑装饰&#xff0c;不仅注重外在的美观&#xff0c;更追求内在的品质。从古典的雕花到现代的简约线条&…...

SpringBoot调用WebService的实践

作者所在公司的系统间的信息交互是通过webservice完成。如&#xff1a;MES与SAP的交互&#xff0c;MES与WMS的交换&#xff0c;MES与SRM的交互&#xff0c;MES与IOT的交互等。 MES是用.NET VS2008 C#写的&#xff0c;调用webservice很简单&#xff0c;这里不再赘述。如有想了解…...

源码编译构建LAMP

Apache 起源 源于A Patchy Server&#xff0c;著名的开源Web服务软件1995年时&#xff0c;发布Apache服务程序的1.0版本由Apache软件基金会&#xff08;ASF)负责维护最新的名称为“Apache HTTP Server”官方站点&#xff1a;http://httpd.apache.org/ 主要特点 开发源代码/…...

搜索是门艺术,大神都是这样找资源

以下所有资源均可在星云导航找到&#xff0c;网站地址&#xff1a;https://www.xygalaxy.com/ 浏览器搜索高级用法 1、排除干扰&#xff0c;指定关键词 1.1、排除指定关键字 格式&#xff1a;关键字1 -关键字2比如搜索&#xff1a;星云导航&#xff0c;不想要CSDN的内容 星…...

【设计模式深度剖析】【5】【行为型】【迭代器模式】

&#x1f448;️上一篇:策略模式 | 下一篇:中介者模式&#x1f449;️ 设计模式-专栏&#x1f448;️ 文章目录 迭代器模式定义英文原话直译如何理解呢&#xff1f; 迭代器模式的角色1. Iterator&#xff08;迭代器&#xff09;2. ConcreteIterator&#xff08;具体迭代器…...

怎么更快捷的修改图片大小?压缩图片jpg、png、gif的快捷方法

jpg作为最常用的一种图片格式&#xff0c;在遇到图片太大问题时&#xff0c;该如何操作能够快速在压缩图片jpg的大小呢&#xff1f;图片太大无法上传时目前常见的一个使用问题&#xff0c;只有将图片处理到合适的大小才可以正常在平台上传使用&#xff0c;一般情况下想要快速解…...

Shell脚本 if语句

条件测试&#xff1a; $? 返回码 判断命令或者脚本是否执行成功&#xff08;最近的一条&#xff09; 0 true 为真就是成功 成立 非0 false 失败或者异常 test命令 可以进行条件测试 然后根据的是返回值来判断条件是否成立。 -e 测试目录或者文件是否存在 exist -d 测试…...

集合查询-并(UNION)集运算、交(INTERSECT)集运算、差(EXCEPT)集运算

一、概述 集合查询是对两个SELECT语句的查询结果进行再进行处理的查询 二、条件 1、两个SELECT语句的查询结果必须是属性列数目相同 2、两个SELECT语句的查询结果必须是对应位置上的属性列必须是相同的数据类型 三、并(UNION)运算 1、语法格式&#xff1a; SELECT 语句1…...

常用的bit位操作

//判断某1位是1还是0 #ifndef GET_BIT #define BIT_IS_1(value,bitpos) (((value)&(1<<(bitpos)))>>(bitpos)) #endif //读取指定位置bit位的值 #ifndef GET_BIT #define GET_BIT(value,bitpos) ((value)&(1<<(bitpos))) #endif //取反指定位置bit位…...