聊聊httpclient的CPool
序
本文主要研究一下httpclient的CPool
ConnPool
org/apache/http/pool/ConnPool.java
public interface ConnPool<T, E> {/*** Attempts to lease a connection for the given route and with the given* state from the pool.** @param route route of the connection.* @param state arbitrary object that represents a particular state* (usually a security principal or a unique token identifying* the user whose credentials have been used while establishing the connection).* May be {@code null}.* @param callback operation completion callback.** @return future for a leased pool entry.*/Future<E> lease(final T route, final Object state, final FutureCallback<E> callback);/*** Releases the pool entry back to the pool.** @param entry pool entry leased from the pool* @param reusable flag indicating whether or not the released connection* is in a consistent state and is safe for further use.*/void release(E entry, boolean reusable);}
ConnPool定义了lease及release方法,其中定义了两个泛型,T表示route,E表示poolEntry
ConnPoolControl
public interface ConnPoolControl<T> {void setMaxTotal(int max);int getMaxTotal();void setDefaultMaxPerRoute(int max);int getDefaultMaxPerRoute();void setMaxPerRoute(final T route, int max);int getMaxPerRoute(final T route);PoolStats getTotalStats();PoolStats getStats(final T route);}
ConnPoolControl接口定义了设置和访问maxTotal、defaultMaxPerRoute及PoolStats的方法
AbstractConnPool
org/apache/http/pool/AbstractConnPool.java
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>implements ConnPool<T, E>, ConnPoolControl<T> {private final Lock lock;private final Condition condition;private final ConnFactory<T, C> connFactory;private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;private final Set<E> leased;private final LinkedList<E> available;private final LinkedList<Future<E>> pending;private final Map<T, Integer> maxPerRoute;private volatile boolean isShutDown;private volatile int defaultMaxPerRoute;private volatile int maxTotal;private volatile int validateAfterInactivity;public AbstractConnPool(final ConnFactory<T, C> connFactory,final int defaultMaxPerRoute,final int maxTotal) {super();this.connFactory = Args.notNull(connFactory, "Connection factory");this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");this.maxTotal = Args.positive(maxTotal, "Max total value");this.lock = new ReentrantLock();this.condition = this.lock.newCondition();this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();this.leased = new HashSet<E>();this.available = new LinkedList<E>();this.pending = new LinkedList<Future<E>>();this.maxPerRoute = new HashMap<T, Integer>();}/*** Creates a new entry for the given connection with the given route.*/protected abstract E createEntry(T route, C conn);//......}
AbstractConnPool声明实现ConnPool、ConnPoolControl接口,它定义E必须继承PoolEntry,同时定义了泛型C,表示connectionType
shutdown
public void shutdown() throws IOException {if (this.isShutDown) {return ;}this.isShutDown = true;this.lock.lock();try {for (final E entry: this.available) {entry.close();}for (final E entry: this.leased) {entry.close();}for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {pool.shutdown();}this.routeToPool.clear();this.leased.clear();this.available.clear();} finally {this.lock.unlock();}}
shutdown方法会遍历available、leased挨个执行close,然后遍历routeToPool挨个执行shutdown
lease方法
public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {Args.notNull(route, "Route");Asserts.check(!this.isShutDown, "Connection pool shut down");return new Future<E>() {private final AtomicBoolean cancelled = new AtomicBoolean(false);private final AtomicBoolean done = new AtomicBoolean(false);private final AtomicReference<E> entryRef = new AtomicReference<E>(null);@Overridepublic boolean cancel(final boolean mayInterruptIfRunning) {if (done.compareAndSet(false, true)) {cancelled.set(true);lock.lock();try {condition.signalAll();} finally {lock.unlock();}if (callback != null) {callback.cancelled();}return true;}return false;}@Overridepublic boolean isCancelled() {return cancelled.get();}@Overridepublic boolean isDone() {return done.get();}@Overridepublic E get() throws InterruptedException, ExecutionException {try {return get(0L, TimeUnit.MILLISECONDS);} catch (final TimeoutException ex) {throw new ExecutionException(ex);}}@Overridepublic E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {for (;;) {synchronized (this) {try {final E entry = entryRef.get();if (entry != null) {return entry;}if (done.get()) {throw new ExecutionException(operationAborted());}final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);if (validateAfterInactivity > 0) {if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {if (!validate(leasedEntry)) {leasedEntry.close();release(leasedEntry, false);continue;}}}if (done.compareAndSet(false, true)) {entryRef.set(leasedEntry);done.set(true);onLease(leasedEntry);if (callback != null) {callback.completed(leasedEntry);}return leasedEntry;} else {release(leasedEntry, true);throw new ExecutionException(operationAborted());}} catch (final IOException ex) {if (done.compareAndSet(false, true)) {if (callback != null) {callback.failed(ex);}}throw new ExecutionException(ex);}}}}};}
lease方法主要是get及cancel,其中get方法主要是执行getPoolEntryBlocking,对于validateAfterInactivity大于0的则根据判断是否需要validate,若需要且validate失败则执行leasedEntry.close()及release方法
getPoolEntryBlocking
org/apache/http/pool/AbstractConnPool.java
private E getPoolEntryBlocking(final T route, final Object state,final long timeout, final TimeUnit timeUnit,final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {Date deadline = null;if (timeout > 0) {deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));}this.lock.lock();try {final RouteSpecificPool<T, C, E> pool = getPool(route);E entry;for (;;) {Asserts.check(!this.isShutDown, "Connection pool shut down");if (future.isCancelled()) {throw new ExecutionException(operationAborted());}for (;;) {entry = pool.getFree(state);if (entry == null) {break;}if (entry.isExpired(System.currentTimeMillis())) {entry.close();}if (entry.isClosed()) {this.available.remove(entry);pool.free(entry, false);} else {break;}}if (entry != null) {this.available.remove(entry);this.leased.add(entry);onReuse(entry);return entry;}// New connection is neededfinal int maxPerRoute = getMax(route);// Shrink the pool prior to allocating a new connectionfinal int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);if (excess > 0) {for (int i = 0; i < excess; i++) {final E lastUsed = pool.getLastUsed();if (lastUsed == null) {break;}lastUsed.close();this.available.remove(lastUsed);pool.remove(lastUsed);}}if (pool.getAllocatedCount() < maxPerRoute) {final int totalUsed = this.leased.size();final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);if (freeCapacity > 0) {final int totalAvailable = this.available.size();if (totalAvailable > freeCapacity - 1) {if (!this.available.isEmpty()) {final E lastUsed = this.available.removeLast();lastUsed.close();final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());otherpool.remove(lastUsed);}}final C conn = this.connFactory.create(route);entry = pool.add(conn);this.leased.add(entry);return entry;}}boolean success = false;try {pool.queue(future);this.pending.add(future);if (deadline != null) {success = this.condition.awaitUntil(deadline);} else {this.condition.await();success = true;}if (future.isCancelled()) {throw new ExecutionException(operationAborted());}} finally {// In case of 'success', we were woken up by the// connection pool and should now have a connection// waiting for us, or else we're shutting down.// Just continue in the loop, both cases are checked.pool.unqueue(future);this.pending.remove(future);}// check for spurious wakeup vs. timeoutif (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {break;}}throw new TimeoutException("Timeout waiting for connection");} finally {this.lock.unlock();}}
getPoolEntryBlocking先根据route从routeToPool取出对应的RouteSpecificPool,然后pool.getFree(state),之后判断是否过期,是否关闭,没问题则从available移除,添加到leased中,然后执行onReuse回调,如果entry为null则通过connFactory.create(route)来创建
release
@Overridepublic void release(final E entry, final boolean reusable) {this.lock.lock();try {if (this.leased.remove(entry)) {final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());pool.free(entry, reusable);if (reusable && !this.isShutDown) {this.available.addFirst(entry);} else {entry.close();}onRelease(entry);Future<E> future = pool.nextPending();if (future != null) {this.pending.remove(future);} else {future = this.pending.poll();}if (future != null) {this.condition.signalAll();}}} finally {this.lock.unlock();}}
release方法先获取RouteSpecificPool,然后执行pool.free(entry, reusable)
CPool
org/apache/http/impl/conn/CPool.java
@Contract(threading = ThreadingBehavior.SAFE)
class CPool extends AbstractConnPool<HttpRoute, ManagedHttpClientConnection, CPoolEntry> {private static final AtomicLong COUNTER = new AtomicLong();private final Log log = LogFactory.getLog(CPool.class);private final long timeToLive;private final TimeUnit timeUnit;public CPool(final ConnFactory<HttpRoute, ManagedHttpClientConnection> connFactory,final int defaultMaxPerRoute, final int maxTotal,final long timeToLive, final TimeUnit timeUnit) {super(connFactory, defaultMaxPerRoute, maxTotal);this.timeToLive = timeToLive;this.timeUnit = timeUnit;}@Overrideprotected CPoolEntry createEntry(final HttpRoute route, final ManagedHttpClientConnection conn) {final String id = Long.toString(COUNTER.getAndIncrement());return new CPoolEntry(this.log, id, route, conn, this.timeToLive, this.timeUnit);}@Overrideprotected boolean validate(final CPoolEntry entry) {return !entry.getConnection().isStale();}@Overrideprotected void enumAvailable(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {super.enumAvailable(callback);}@Overrideprotected void enumLeased(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {super.enumLeased(callback);}}
CPool继承了AbstractConnPool,其T为HttpRoute,C为ManagedHttpClientConnection,E为CPoolEntry;其createEntry方法创建CPoolEntry,validate则判断connect是不是stale
小结
ConnPool定义了lease及release方法,其中定义了两个泛型,T表示route,E表示poolEntry;AbstractConnPool声明实现ConnPool、ConnPoolControl接口,它定义E必须继承PoolEntry,同时定义了泛型C,表示connectionType;CPool继承了AbstractConnPool,其T为HttpRoute,C为ManagedHttpClientConnection,E为CPoolEntry。
AbstractConnPool的lease方法主要是get及cancel,其中get方法主要是执行getPoolEntryBlocking,对于validateAfterInactivity大于0的则根据判断是否需要validate,若需要且validate失败则执行leasedEntry.close()及release方法;release方法先获取RouteSpecificPool,然后执行pool.free(entry, reusable)
相关文章:
聊聊httpclient的CPool
序 本文主要研究一下httpclient的CPool ConnPool org/apache/http/pool/ConnPool.java public interface ConnPool<T, E> {/*** Attempts to lease a connection for the given route and with the given* state from the pool.** param route route of the connecti…...
B2主题优化:WordPress文章每次访问随机增加访问量
老站长都知道,一个新站刚开始创建,内容也不多的时候,用户进来看到文章浏览量要么是0,要么是 个位数,非常影响体验,就会有一种“这个网站没人气,看来不行”的感觉。 即使你的内容做的很好&#x…...
大模型部署手记(1)ChatGLM2+Windows GPU
1.简介: 组织机构:智谱/清华 代码仓:https://github.com/THUDM/ChatGLM2-6B 模型:THUDM/chatglm2-6b 下载:https://huggingface.co/THUDM/chatglm2-6b 镜像下载:https://aliendao.cn/models/THUDM/chat…...
Rust Rocket: 构建Restful服务项目实战
前言 这几天我的笔记系统开发工作进入了搬砖期,前端基于Yew,后端基于Rocket。关于Rocket搭建Restful服务,官方也有介绍,感觉很多细节不到位。因此我打算花2到3天的时间来整理一下,也算是对自己的一个交代。 对于有一…...
苹果签名有多少种类之TF签名(TestFlight签名)是什么?优势是什么?什么场合需要应用到?
(一)TestFlight 能够让您:邀请内部和外部的测试人员为应用程序提供反馈。 跟踪应用程序在测试过程中发现的 bug 和用户体验问题。 收集 Crash 报告,了解应用程序在真实设备上的运行状况。 要使用 TestFlight,您可以按照…...
如何将图片存到数据库(以mysql为例), 使用ORM Bee更加简单
如何将图片存到数据库 1. 创建数据库: 2. 生成Javabean public class ImageExam implements Serializable {private static final long serialVersionUID 1596686274309L;private Integer id;private String name; // private Blob image;private InputStream image; //将In…...
【“栈、队列”的应用】408数据结构代码
王道数据结构强化课——【“栈、队列”的应用】代码,持续更新 链式存储栈(单链表实现),并基于上述定义,栈顶在链头,实现“出栈、入栈、判空、判满”四个基本操作 #include <stdio.h> #include <…...
es的nested查询
一、一层嵌套 mapping: PUT /nested_example {"mappings": {"properties": {"name": {"type": "text"},"books": {"type": "nested","properties": {"title": {"t…...
<一>Qt斗地主游戏开发:开发环境搭建--VS2019+Qt5.15.2
1. 开发环境概述 对于Qt的开发环境来说,主流编码IDE界面一般有两种:Qt Creator或VSQt。为了简单起见,这里的操作系统限定为windows,编译器也通用VS了。Qt版本的话自己选择就可以了,当然VS的版本也是依据Qt版本来选定的…...
python:进度条的使用(tqdm)
摘要:为python程序进度条,可以知道程序运行进度。 python中,常用的进度条模块是tqdm,将介绍tqdm的安装和使用 1、安装tqdm: pip install tqdm2、tqdm的使用: (1)在for循环中的使用࿱…...
Java类型转换和类型提升
目录 一、类型转换 1.1 自动类型转换(隐式) 1.1.1 int 与 long 之间 1.1.2 float 与 double 之间 1.1.3 int 与 byte 之间 1.2 强制类型转换(显示) 1.2.1 int 与 long 之间 1.2.2 float 与 double 之间 1.2.3 int 与 d…...
C# 读取 Excel xlsx 文件,显示在 DataGridView 中
编写 read_excel.cs 如下 using System; using System.Collections.Generic; using System.ComponentModel; using System.IO; using System.Data; using System.Linq; using System.Text; using System.Data.OleDb;namespace ReadExcel {public partial class Program{static…...
Docker02基本管理
目录 1、Docker 网络 1.1 Docker 网络实现原理 1.2 Docker 的网络模式 1.3 网络模式详解 1.4 资源控制 1.5 进行CPU压力测试 1.6 清理docker占用的磁盘空间 1.7 生产扩展 1、Docker 网络 1.1 Docker 网络实现原理 Docker使用Linux桥接,在宿主机虚拟一个Docke…...
Scala第十章
Scala第十章 章节目标 1.数组 2.元组 3.列表 4.集 5.映射 6.迭代器 7.函数式编程 8.案例:学生成绩单 scala总目录 文档资料下载...
10.4 校招 实习 内推 面经
绿泡*泡: neituijunsir 交流裙 ,内推/实习/校招汇总表格 1、校招 | 集度2024届秋招正式启动(内推) 校招 | 集度2024届秋招正式启动(内推) 2、校招 | 道通科技2024秋季校园招聘正式启动啦! …...
从0开始深入理解并发、线程与等待通知机制(中)
一,深入学习 Java 的线程 线程的状态/生命周期 Java 中线程的状态分为 6 种: 1. 初始(NEW):新创建了一个线程对象,但还没有调用 start()方法。 2. 运行(RUNNABLE):Java 线程中将就绪(ready)和…...
UE5报错及解决办法
1、编译报错,内容如下: Unable to build while Live Coding is active. Exit the editor and game, or press CtrlAltF11 if iterating on code in the editor or game 解决办法 取消Enable Live Coding勾选...
怎么通过docker/portainer部署vue项目
这篇文章分享一下如何通过docker将vue项目打包成镜像文件,并使用打包的镜像在docker/portainer上部署运行,写这篇文章参考了vue-cli和docker的官方文档。 首先,阅读vue-cli关于docker部署的说明,上面提供了关键的几个步骤。 从上面…...
【面试经典150 | 矩阵】旋转图像
文章目录 写在前面Tag题目来源题目解读解题思路方法一:原地旋转方法二:翻转代替旋转 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法,两到三天更新一篇文章,欢迎催更…… 专栏内容以分析题目为主,并附带…...
机器人制作开源方案 | 家庭清扫拾物机器人
作者:罗诚、李旭洋、胡旭、符粒楷 单位:南昌交通学院 人工智能学院 指导老师:揭吁菡 在家庭中我们有时无法到一些低矮阴暗的地方进行探索,比如茶几下或者床底下,特别是在部分家庭中,如果没有及时对这些阴…...
ubuntu搭建nfs服务centos挂载访问
在Ubuntu上设置NFS服务器 在Ubuntu上,你可以使用apt包管理器来安装NFS服务器。打开终端并运行: sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享,例如/shared: sudo mkdir /shared sud…...
(十)学生端搭建
本次旨在将之前的已完成的部分功能进行拼装到学生端,同时完善学生端的构建。本次工作主要包括: 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...
Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动
一、前言说明 在2011版本的gb28181协议中,拉取视频流只要求udp方式,从2016开始要求新增支持tcp被动和tcp主动两种方式,udp理论上会丢包的,所以实际使用过程可能会出现画面花屏的情况,而tcp肯定不丢包,起码…...
盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来
一、破局:PCB行业的时代之问 在数字经济蓬勃发展的浪潮中,PCB(印制电路板)作为 “电子产品之母”,其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透,PCB行业面临着前所未有的挑战与机遇。产品迭代…...
Spring Boot 实现流式响应(兼容 2.7.x)
在实际开发中,我们可能会遇到一些流式数据处理的场景,比如接收来自上游接口的 Server-Sent Events(SSE) 或 流式 JSON 内容,并将其原样中转给前端页面或客户端。这种情况下,传统的 RestTemplate 缓存机制会…...
聊聊 Pulsar:Producer 源码解析
一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台,以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中,Producer(生产者) 是连接客户端应用与消息队列的第一步。生产者…...
汽车生产虚拟实训中的技能提升与生产优化
在制造业蓬勃发展的大背景下,虚拟教学实训宛如一颗璀璨的新星,正发挥着不可或缺且日益凸显的关键作用,源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例,汽车生产线上各类…...
条件运算符
C中的三目运算符(也称条件运算符,英文:ternary operator)是一种简洁的条件选择语句,语法如下: 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true,则整个表达式的结果为“表达式1”…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)
宇树机器人多姿态起立控制强化学习框架论文解析 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一) 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...
【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)
🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...
