当前位置: 首页 > news >正文

聊聊httpclient的CPool

本文主要研究一下httpclient的CPool

ConnPool

org/apache/http/pool/ConnPool.java

public interface ConnPool<T, E> {/*** Attempts to lease a connection for the given route and with the given* state from the pool.** @param route route of the connection.* @param state arbitrary object that represents a particular state*  (usually a security principal or a unique token identifying*  the user whose credentials have been used while establishing the connection).*  May be {@code null}.* @param callback operation completion callback.** @return future for a leased pool entry.*/Future<E> lease(final T route, final Object state, final FutureCallback<E> callback);/*** Releases the pool entry back to the pool.** @param entry pool entry leased from the pool* @param reusable flag indicating whether or not the released connection*   is in a consistent state and is safe for further use.*/void release(E entry, boolean reusable);}

ConnPool定义了lease及release方法,其中定义了两个泛型,T表示route,E表示poolEntry

ConnPoolControl

public interface ConnPoolControl<T> {void setMaxTotal(int max);int getMaxTotal();void setDefaultMaxPerRoute(int max);int getDefaultMaxPerRoute();void setMaxPerRoute(final T route, int max);int getMaxPerRoute(final T route);PoolStats getTotalStats();PoolStats getStats(final T route);}

ConnPoolControl接口定义了设置和访问maxTotal、defaultMaxPerRoute及PoolStats的方法

AbstractConnPool

org/apache/http/pool/AbstractConnPool.java

@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>implements ConnPool<T, E>, ConnPoolControl<T> {private final Lock lock;private final Condition condition;private final ConnFactory<T, C> connFactory;private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;private final Set<E> leased;private final LinkedList<E> available;private final LinkedList<Future<E>> pending;private final Map<T, Integer> maxPerRoute;private volatile boolean isShutDown;private volatile int defaultMaxPerRoute;private volatile int maxTotal;private volatile int validateAfterInactivity;public AbstractConnPool(final ConnFactory<T, C> connFactory,final int defaultMaxPerRoute,final int maxTotal) {super();this.connFactory = Args.notNull(connFactory, "Connection factory");this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");this.maxTotal = Args.positive(maxTotal, "Max total value");this.lock = new ReentrantLock();this.condition = this.lock.newCondition();this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();this.leased = new HashSet<E>();this.available = new LinkedList<E>();this.pending = new LinkedList<Future<E>>();this.maxPerRoute = new HashMap<T, Integer>();}/*** Creates a new entry for the given connection with the given route.*/protected abstract E createEntry(T route, C conn);//......}   

AbstractConnPool声明实现ConnPool、ConnPoolControl接口,它定义E必须继承PoolEntry,同时定义了泛型C,表示connectionType

shutdown

    public void shutdown() throws IOException {if (this.isShutDown) {return ;}this.isShutDown = true;this.lock.lock();try {for (final E entry: this.available) {entry.close();}for (final E entry: this.leased) {entry.close();}for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {pool.shutdown();}this.routeToPool.clear();this.leased.clear();this.available.clear();} finally {this.lock.unlock();}}

shutdown方法会遍历available、leased挨个执行close,然后遍历routeToPool挨个执行shutdown

lease方法

    public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {Args.notNull(route, "Route");Asserts.check(!this.isShutDown, "Connection pool shut down");return new Future<E>() {private final AtomicBoolean cancelled = new AtomicBoolean(false);private final AtomicBoolean done = new AtomicBoolean(false);private final AtomicReference<E> entryRef = new AtomicReference<E>(null);@Overridepublic boolean cancel(final boolean mayInterruptIfRunning) {if (done.compareAndSet(false, true)) {cancelled.set(true);lock.lock();try {condition.signalAll();} finally {lock.unlock();}if (callback != null) {callback.cancelled();}return true;}return false;}@Overridepublic boolean isCancelled() {return cancelled.get();}@Overridepublic boolean isDone() {return done.get();}@Overridepublic E get() throws InterruptedException, ExecutionException {try {return get(0L, TimeUnit.MILLISECONDS);} catch (final TimeoutException ex) {throw new ExecutionException(ex);}}@Overridepublic E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {for (;;) {synchronized (this) {try {final E entry = entryRef.get();if (entry != null) {return entry;}if (done.get()) {throw new ExecutionException(operationAborted());}final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);if (validateAfterInactivity > 0)  {if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {if (!validate(leasedEntry)) {leasedEntry.close();release(leasedEntry, false);continue;}}}if (done.compareAndSet(false, true)) {entryRef.set(leasedEntry);done.set(true);onLease(leasedEntry);if (callback != null) {callback.completed(leasedEntry);}return leasedEntry;} else {release(leasedEntry, true);throw new ExecutionException(operationAborted());}} catch (final IOException ex) {if (done.compareAndSet(false, true)) {if (callback != null) {callback.failed(ex);}}throw new ExecutionException(ex);}}}}};}

lease方法主要是get及cancel,其中get方法主要是执行getPoolEntryBlocking,对于validateAfterInactivity大于0的则根据判断是否需要validate,若需要且validate失败则执行leasedEntry.close()及release方法

getPoolEntryBlocking

org/apache/http/pool/AbstractConnPool.java

    private E getPoolEntryBlocking(final T route, final Object state,final long timeout, final TimeUnit timeUnit,final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {Date deadline = null;if (timeout > 0) {deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));}this.lock.lock();try {final RouteSpecificPool<T, C, E> pool = getPool(route);E entry;for (;;) {Asserts.check(!this.isShutDown, "Connection pool shut down");if (future.isCancelled()) {throw new ExecutionException(operationAborted());}for (;;) {entry = pool.getFree(state);if (entry == null) {break;}if (entry.isExpired(System.currentTimeMillis())) {entry.close();}if (entry.isClosed()) {this.available.remove(entry);pool.free(entry, false);} else {break;}}if (entry != null) {this.available.remove(entry);this.leased.add(entry);onReuse(entry);return entry;}// New connection is neededfinal int maxPerRoute = getMax(route);// Shrink the pool prior to allocating a new connectionfinal int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);if (excess > 0) {for (int i = 0; i < excess; i++) {final E lastUsed = pool.getLastUsed();if (lastUsed == null) {break;}lastUsed.close();this.available.remove(lastUsed);pool.remove(lastUsed);}}if (pool.getAllocatedCount() < maxPerRoute) {final int totalUsed = this.leased.size();final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);if (freeCapacity > 0) {final int totalAvailable = this.available.size();if (totalAvailable > freeCapacity - 1) {if (!this.available.isEmpty()) {final E lastUsed = this.available.removeLast();lastUsed.close();final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());otherpool.remove(lastUsed);}}final C conn = this.connFactory.create(route);entry = pool.add(conn);this.leased.add(entry);return entry;}}boolean success = false;try {pool.queue(future);this.pending.add(future);if (deadline != null) {success = this.condition.awaitUntil(deadline);} else {this.condition.await();success = true;}if (future.isCancelled()) {throw new ExecutionException(operationAborted());}} finally {// In case of 'success', we were woken up by the// connection pool and should now have a connection// waiting for us, or else we're shutting down.// Just continue in the loop, both cases are checked.pool.unqueue(future);this.pending.remove(future);}// check for spurious wakeup vs. timeoutif (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {break;}}throw new TimeoutException("Timeout waiting for connection");} finally {this.lock.unlock();}}

getPoolEntryBlocking先根据route从routeToPool取出对应的RouteSpecificPool,然后pool.getFree(state),之后判断是否过期,是否关闭,没问题则从available移除,添加到leased中,然后执行onReuse回调,如果entry为null则通过connFactory.create(route)来创建

release

    @Overridepublic void release(final E entry, final boolean reusable) {this.lock.lock();try {if (this.leased.remove(entry)) {final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());pool.free(entry, reusable);if (reusable && !this.isShutDown) {this.available.addFirst(entry);} else {entry.close();}onRelease(entry);Future<E> future = pool.nextPending();if (future != null) {this.pending.remove(future);} else {future = this.pending.poll();}if (future != null) {this.condition.signalAll();}}} finally {this.lock.unlock();}}

release方法先获取RouteSpecificPool,然后执行pool.free(entry, reusable)

CPool

org/apache/http/impl/conn/CPool.java

@Contract(threading = ThreadingBehavior.SAFE)
class CPool extends AbstractConnPool<HttpRoute, ManagedHttpClientConnection, CPoolEntry> {private static final AtomicLong COUNTER = new AtomicLong();private final Log log = LogFactory.getLog(CPool.class);private final long timeToLive;private final TimeUnit timeUnit;public CPool(final ConnFactory<HttpRoute, ManagedHttpClientConnection> connFactory,final int defaultMaxPerRoute, final int maxTotal,final long timeToLive, final TimeUnit timeUnit) {super(connFactory, defaultMaxPerRoute, maxTotal);this.timeToLive = timeToLive;this.timeUnit = timeUnit;}@Overrideprotected CPoolEntry createEntry(final HttpRoute route, final ManagedHttpClientConnection conn) {final String id = Long.toString(COUNTER.getAndIncrement());return new CPoolEntry(this.log, id, route, conn, this.timeToLive, this.timeUnit);}@Overrideprotected boolean validate(final CPoolEntry entry) {return !entry.getConnection().isStale();}@Overrideprotected void enumAvailable(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {super.enumAvailable(callback);}@Overrideprotected void enumLeased(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {super.enumLeased(callback);}}

CPool继承了AbstractConnPool,其T为HttpRoute,C为ManagedHttpClientConnection,E为CPoolEntry;其createEntry方法创建CPoolEntry,validate则判断connect是不是stale

小结

ConnPool定义了lease及release方法,其中定义了两个泛型,T表示route,E表示poolEntry;AbstractConnPool声明实现ConnPool、ConnPoolControl接口,它定义E必须继承PoolEntry,同时定义了泛型C,表示connectionType;CPool继承了AbstractConnPool,其T为HttpRoute,C为ManagedHttpClientConnection,E为CPoolEntry。

AbstractConnPool的lease方法主要是get及cancel,其中get方法主要是执行getPoolEntryBlocking,对于validateAfterInactivity大于0的则根据判断是否需要validate,若需要且validate失败则执行leasedEntry.close()及release方法;release方法先获取RouteSpecificPool,然后执行pool.free(entry, reusable)

相关文章:

聊聊httpclient的CPool

序 本文主要研究一下httpclient的CPool ConnPool org/apache/http/pool/ConnPool.java public interface ConnPool<T, E> {/*** Attempts to lease a connection for the given route and with the given* state from the pool.** param route route of the connecti…...

B2主题优化:WordPress文章每次访问随机增加访问量

老站长都知道&#xff0c;一个新站刚开始创建&#xff0c;内容也不多的时候&#xff0c;用户进来看到文章浏览量要么是0&#xff0c;要么是 个位数&#xff0c;非常影响体验&#xff0c;就会有一种“这个网站没人气&#xff0c;看来不行”的感觉。 即使你的内容做的很好&#x…...

大模型部署手记(1)ChatGLM2+Windows GPU

1.简介&#xff1a; 组织机构&#xff1a;智谱/清华 代码仓&#xff1a;https://github.com/THUDM/ChatGLM2-6B 模型&#xff1a;THUDM/chatglm2-6b 下载&#xff1a;https://huggingface.co/THUDM/chatglm2-6b 镜像下载&#xff1a;https://aliendao.cn/models/THUDM/chat…...

Rust Rocket: 构建Restful服务项目实战

前言 这几天我的笔记系统开发工作进入了搬砖期&#xff0c;前端基于Yew&#xff0c;后端基于Rocket。关于Rocket搭建Restful服务&#xff0c;官方也有介绍&#xff0c;感觉很多细节不到位。因此我打算花2到3天的时间来整理一下&#xff0c;也算是对自己的一个交代。 对于有一…...

苹果签名有多少种类之TF签名(TestFlight签名)是什么?优势是什么?什么场合需要应用到?

&#xff08;一&#xff09;TestFlight 能够让您&#xff1a;邀请内部和外部的测试人员为应用程序提供反馈。 跟踪应用程序在测试过程中发现的 bug 和用户体验问题。 收集 Crash 报告&#xff0c;了解应用程序在真实设备上的运行状况。 要使用 TestFlight&#xff0c;您可以按照…...

如何将图片存到数据库(以mysql为例), 使用ORM Bee更加简单

如何将图片存到数据库 1. 创建数据库: 2. 生成Javabean public class ImageExam implements Serializable {private static final long serialVersionUID 1596686274309L;private Integer id;private String name; // private Blob image;private InputStream image; //将In…...

【“栈、队列”的应用】408数据结构代码

王道数据结构强化课——【“栈、队列”的应用】代码&#xff0c;持续更新 链式存储栈&#xff08;单链表实现&#xff09;&#xff0c;并基于上述定义&#xff0c;栈顶在链头&#xff0c;实现“出栈、入栈、判空、判满”四个基本操作 #include <stdio.h> #include <…...

es的nested查询

一、一层嵌套 mapping: PUT /nested_example {"mappings": {"properties": {"name": {"type": "text"},"books": {"type": "nested","properties": {"title": {"t…...

<一>Qt斗地主游戏开发:开发环境搭建--VS2019+Qt5.15.2

1. 开发环境概述 对于Qt的开发环境来说&#xff0c;主流编码IDE界面一般有两种&#xff1a;Qt Creator或VSQt。为了简单起见&#xff0c;这里的操作系统限定为windows&#xff0c;编译器也通用VS了。Qt版本的话自己选择就可以了&#xff0c;当然VS的版本也是依据Qt版本来选定的…...

python:进度条的使用(tqdm)

摘要&#xff1a;为python程序进度条&#xff0c;可以知道程序运行进度。 python中&#xff0c;常用的进度条模块是tqdm&#xff0c;将介绍tqdm的安装和使用 1、安装tqdm: pip install tqdm2、tqdm的使用&#xff1a; &#xff08;1&#xff09;在for循环中的使用&#xff1…...

Java类型转换和类型提升

目录 一、类型转换 1.1 自动类型转换&#xff08;隐式&#xff09; 1.1.1 int 与 long 之间 1.1.2 float 与 double 之间 1.1.3 int 与 byte 之间 1.2 强制类型转换&#xff08;显示&#xff09; 1.2.1 int 与 long 之间 1.2.2 float 与 double 之间 1.2.3 int 与 d…...

C# 读取 Excel xlsx 文件,显示在 DataGridView 中

编写 read_excel.cs 如下 using System; using System.Collections.Generic; using System.ComponentModel; using System.IO; using System.Data; using System.Linq; using System.Text; using System.Data.OleDb;namespace ReadExcel {public partial class Program{static…...

Docker02基本管理

目录 1、Docker 网络 1.1 Docker 网络实现原理 1.2 Docker 的网络模式 1.3 网络模式详解 1.4 资源控制 1.5 进行CPU压力测试 1.6 清理docker占用的磁盘空间 1.7 生产扩展 1、Docker 网络 1.1 Docker 网络实现原理 Docker使用Linux桥接&#xff0c;在宿主机虚拟一个Docke…...

Scala第十章

Scala第十章 章节目标 1.数组 2.元组 3.列表 4.集 5.映射 6.迭代器 7.函数式编程 8.案例&#xff1a;学生成绩单 scala总目录 文档资料下载...

10.4 校招 实习 内推 面经

绿泡*泡&#xff1a; neituijunsir 交流裙 &#xff0c;内推/实习/校招汇总表格 1、校招 | 集度2024届秋招正式启动&#xff08;内推&#xff09; 校招 | 集度2024届秋招正式启动&#xff08;内推&#xff09; 2、校招 | 道通科技2024秋季校园招聘正式启动啦&#xff01; …...

从0开始深入理解并发、线程与等待通知机制(中)

一&#xff0c;深入学习 Java 的线程 线程的状态/生命周期 Java 中线程的状态分为 6 种&#xff1a; 1. 初始(NEW)&#xff1a;新创建了一个线程对象&#xff0c;但还没有调用 start()方法。 2. 运行(RUNNABLE)&#xff1a;Java 线程中将就绪&#xff08;ready&#xff09;和…...

UE5报错及解决办法

1、编译报错&#xff0c;内容如下&#xff1a; Unable to build while Live Coding is active. Exit the editor and game, or press CtrlAltF11 if iterating on code in the editor or game 解决办法 取消Enable Live Coding勾选...

怎么通过docker/portainer部署vue项目

这篇文章分享一下如何通过docker将vue项目打包成镜像文件&#xff0c;并使用打包的镜像在docker/portainer上部署运行&#xff0c;写这篇文章参考了vue-cli和docker的官方文档。 首先&#xff0c;阅读vue-cli关于docker部署的说明&#xff0c;上面提供了关键的几个步骤。 从上面…...

【面试经典150 | 矩阵】旋转图像

文章目录 写在前面Tag题目来源题目解读解题思路方法一&#xff1a;原地旋转方法二&#xff1a;翻转代替旋转 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法&#xff0c;两到三天更新一篇文章&#xff0c;欢迎催更…… 专栏内容以分析题目为主&#xff0c;并附带…...

机器人制作开源方案 | 家庭清扫拾物机器人

作者&#xff1a;罗诚、李旭洋、胡旭、符粒楷 单位&#xff1a;南昌交通学院 人工智能学院 指导老师&#xff1a;揭吁菡 在家庭中我们有时无法到一些低矮阴暗的地方进行探索&#xff0c;比如茶几下或者床底下&#xff0c;特别是在部分家庭中&#xff0c;如果没有及时对这些阴…...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

Vue记事本应用实现教程

文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展&#xff1a;显示创建时间8. 功能扩展&#xff1a;记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...

RocketMQ延迟消息机制

两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…...

大型活动交通拥堵治理的视觉算法应用

大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动&#xff08;如演唱会、马拉松赛事、高考中考等&#xff09;期间&#xff0c;城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例&#xff0c;暖城商圈曾因观众集中离场导致周边…...

基于当前项目通过npm包形式暴露公共组件

1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹&#xff0c;并新增内容 3.创建package文件夹...

(二)原型模式

原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...

Python爬虫(一):爬虫伪装

一、网站防爬机制概述 在当今互联网环境中&#xff0c;具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类&#xff1a; 身份验证机制&#xff1a;直接将未经授权的爬虫阻挡在外反爬技术体系&#xff1a;通过各种技术手段增加爬虫获取数据的难度…...

是否存在路径(FIFOBB算法)

题目描述 一个具有 n 个顶点e条边的无向图&#xff0c;该图顶点的编号依次为0到n-1且不存在顶点与自身相连的边。请使用FIFOBB算法编写程序&#xff0c;确定是否存在从顶点 source到顶点 destination的路径。 输入 第一行两个整数&#xff0c;分别表示n 和 e 的值&#xff08;1…...

C#中的CLR属性、依赖属性与附加属性

CLR属性的主要特征 封装性&#xff1a; 隐藏字段的实现细节 提供对字段的受控访问 访问控制&#xff1a; 可单独设置get/set访问器的可见性 可创建只读或只写属性 计算属性&#xff1a; 可以在getter中执行计算逻辑 不需要直接对应一个字段 验证逻辑&#xff1a; 可以…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化

缓存架构 代码结构 代码详情 功能点&#xff1a; 多级缓存&#xff0c;先查本地缓存&#xff0c;再查Redis&#xff0c;最后才查数据库热点数据重建逻辑使用分布式锁&#xff0c;二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...