当前位置: 首页 > news >正文

java线程池动态调节功能实现

java线程池动态调节功能实现

  • 功能背景
  • ThreadPoolExecutor配置
  • 自定义可变容LinkedBlockingQueue
  • controller接口
  • 测试结果

功能背景

由于线程池的参数配置是一个比较难准确配置好, 如果需要进行配置修改, 就会对配置进行修改,再进行部署,影响效率, 或者应用场景的变化,导致固定的配置很难满足当前需求, 需要对线程池的参数进行动态配置, 查找资料发现很多博客在说美团发布的一篇文字《Java线程池实现原理及其在美团业务中的实践》, 于是手进行实现

ThreadPoolExecutor配置

mport io.micrometer.core.instrument.util.NamedThreadFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** desc: 线程池配置** @author qts* @date 2023/11/15 0015*/
@Configuration
public class ThreadPoolConfig {@Beanpublic ThreadPoolExecutor threadPoolExecutor() {return new ThreadPoolExecutor(2,4,60, TimeUnit.SECONDS,new ResizableCapacityLinkedBlockingQueue<>(10),new NamedThreadFactory("my-executor"),new ThreadPoolExecutor.CallerRunsPolicy());//拒绝策略:当前调用线程进行执行}}

自定义可变容LinkedBlockingQueue

由于LinkedBlockingQueue中的capacity被 final修饰,无法进行修改, 故将LinkedBlockingQueue代码复制,后将capacity的final删除,并提getter和setter方法, 代码如下

/** ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.*//** Written by Doug Lea with assistance from members of JCP JSR-166* Expert Group and released to the public domain, as explained at* http://creativecommons.org/publicdomain/zero/1.0/*/import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;/*** An optionally-bounded {@linkplain BlockingQueue blocking queue} based on* linked nodes.* This queue orders elements FIFO (first-in-first-out).* The <em>head</em> of the queue is that element that has been on the* queue the longest time.* The <em>tail</em> of the queue is that element that has been on the* queue the shortest time. New elements* are inserted at the tail of the queue, and the queue retrieval* operations obtain elements at the head of the queue.* Linked queues typically have higher throughput than array-based queues but* less predictable performance in most concurrent applications.** <p>The optional capacity bound constructor argument serves as a* way to prevent excessive queue expansion. The capacity, if unspecified,* is equal to {@link Integer#MAX_VALUE}.  Linked nodes are* dynamically created upon each insertion unless this would bring the* queue above capacity.** <p>This class and its iterator implement all of the* <em>optional</em> methods of the {@link Collection} and {@link* Iterator} interfaces.** <p>This class is a member of the* <a href="{@docRoot}/../technotes/guides/collections/index.html">* Java Collections Framework</a>.** @since 1.5* @author Doug Lea* @param <E> the type of elements held in this collection*/
public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {private static final long serialVersionUID = -6903933977591709194L;/** A variant of the "two lock queue" algorithm.  The putLock gates* entry to put (and offer), and has an associated condition for* waiting puts.  Similarly for the takeLock.  The "count" field* that they both rely on is maintained as an atomic to avoid* needing to get both locks in most cases. Also, to minimize need* for puts to get takeLock and vice-versa, cascading notifies are* used. When a put notices that it has enabled at least one take,* it signals taker. That taker in turn signals others if more* items have been entered since the signal. And symmetrically for* takes signalling puts. Operations such as remove(Object) and* iterators acquire both locks.** Visibility between writers and readers is provided as follows:** Whenever an element is enqueued, the putLock is acquired and* count updated.  A subsequent reader guarantees visibility to the* enqueued Node by either acquiring the putLock (via fullyLock)* or by acquiring the takeLock, and then reading n = count.get();* this gives visibility to the first n items.** To implement weakly consistent iterators, it appears we need to* keep all Nodes GC-reachable from a predecessor dequeued Node.* That would cause two problems:* - allow a rogue Iterator to cause unbounded memory retention* - cause cross-generational linking of old Nodes to new Nodes if*   a Node was tenured while live, which generational GCs have a*   hard time dealing with, causing repeated major collections.* However, only non-deleted Nodes need to be reachable from* dequeued Nodes, and reachability does not necessarily have to* be of the kind understood by the GC.  We use the trick of* linking a Node that has just been dequeued to itself.  Such a* self-link implicitly means to advance to head.next.*//*** Linked list node class*/static class Node<E> {E item;/*** One of:* - the real successor Node* - this Node, meaning the successor is head.next* - null, meaning there is no successor (this is the last node)*/ResizableCapacityLinkedBlockingQueue.Node<E> next;Node(E x) { item = x; }}/** The capacity bound, or Integer.MAX_VALUE if none */private int capacity;/** Current number of elements */private final AtomicInteger count = new AtomicInteger();/*** Head of linked list.* Invariant: head.item == null*/transient ResizableCapacityLinkedBlockingQueue.Node<E> head;/*** Tail of linked list.* Invariant: last.next == null*/private transient ResizableCapacityLinkedBlockingQueue.Node<E> last;/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();/*** Signals a waiting take. Called only from put/offer (which do not* otherwise ordinarily lock takeLock.)*/private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}/*** Signals a waiting put. Called only from take/poll.*/private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}/*** Links node at end of queue.** @param node the node*/private void enqueue(ResizableCapacityLinkedBlockingQueue.Node<E> node) {// assert putLock.isHeldByCurrentThread();// assert last.next == null;last = last.next = node;}/*** Removes a node from head of queue.** @return the node*/private E dequeue() {// assert takeLock.isHeldByCurrentThread();// assert head.item == null;ResizableCapacityLinkedBlockingQueue.Node<E> h = head;ResizableCapacityLinkedBlockingQueue.Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}/*** Locks to prevent both puts and takes.*/void fullyLock() {putLock.lock();takeLock.lock();}/*** Unlocks to allow both puts and takes.*/void fullyUnlock() {takeLock.unlock();putLock.unlock();}//     /**
//      * Tells whether both locks are held by current thread.
//      */
//     boolean isFullyLocked() {
//         return (putLock.isHeldByCurrentThread() &&
//                 takeLock.isHeldByCurrentThread());
//     }/*** Creates a {@code ResizableCapacityLinkedBlockingQueue} with a capacity of* {@link Integer#MAX_VALUE}.*/public ResizableCapacityLinkedBlockingQueue() {this(Integer.MAX_VALUE);}/*** Creates a {@code ResizableCapacityLinkedBlockingQueue} with the given (fixed) capacity.** @param capacity the capacity of this queue* @throws IllegalArgumentException if {@code capacity} is not greater*         than zero*/public ResizableCapacityLinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new ResizableCapacityLinkedBlockingQueue.Node<E>(null);}/*** Creates a {@code ResizableCapacityLinkedBlockingQueue} with a capacity of* {@link Integer#MAX_VALUE}, initially containing the elements of the* given collection,* added in traversal order of the collection's iterator.** @param c the collection of elements to initially contain* @throws NullPointerException if the specified collection or any*         of its elements are null*/public ResizableCapacityLinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;putLock.lock(); // Never contended, but necessary for visibilitytry {int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new ResizableCapacityLinkedBlockingQueue.Node<E>(e));++n;}count.set(n);} finally {putLock.unlock();}}// this doc comment is overridden to remove the reference to collections// greater in size than Integer.MAX_VALUE/*** Returns the number of elements in this queue.** @return the number of elements in this queue*/public int size() {return count.get();}// this doc comment is a modified copy of the inherited doc comment,// without the reference to unlimited queues./*** Returns the number of additional elements that this queue can ideally* (in the absence of memory or resource constraints) accept without* blocking. This is always equal to the initial capacity of this queue* less the current {@code size} of this queue.** <p>Note that you <em>cannot</em> always tell if an attempt to insert* an element will succeed by inspecting {@code remainingCapacity}* because it may be the case that another thread is about to* insert or remove an element.*/public int remainingCapacity() {return capacity - count.get();}/*** Inserts the specified element at the tail of this queue, waiting if* necessary for space to become available.** @throws InterruptedException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset local var// holding count negative to indicate failure unless set.int c = -1;ResizableCapacityLinkedBlockingQueue.Node<E> node = new ResizableCapacityLinkedBlockingQueue.Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from capacity. Similarly* for all other uses of count in other wait guards.*/while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();}/*** Inserts the specified element at the tail of this queue, waiting if* necessary up to the specified wait time for space to become available.** @return {@code true} if successful, or {@code false} if*         the specified waiting time elapses before space is available* @throws InterruptedException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(new ResizableCapacityLinkedBlockingQueue.Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;}/*** Inserts the specified element at the tail of this queue if it is* possible to do so immediately without exceeding the queue's capacity,* returning {@code true} upon success and {@code false} if this queue* is full.* When using a capacity-restricted queue, this method is generally* preferable to method {@link BlockingQueue#add add}, which can fail to* insert an element only by throwing an exception.** @throws NullPointerException if the specified element is null*/public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;if (count.get() == capacity)return false;int c = -1;ResizableCapacityLinkedBlockingQueue.Node<E> node = new ResizableCapacityLinkedBlockingQueue.Node<E>(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {if (count.get() < capacity) {enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0;}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x = null;int c = -1;long nanos = unit.toNanos(timeout);final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}public E poll() {final AtomicInteger count = this.count;if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {if (count.get() > 0) {x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}public E peek() {if (count.get() == 0)return null;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {ResizableCapacityLinkedBlockingQueue.Node<E> first = head.next;if (first == null)return null;elsereturn first.item;} finally {takeLock.unlock();}}/*** Unlinks interior Node p with predecessor trail.*/void unlink(ResizableCapacityLinkedBlockingQueue.Node<E> p, ResizableCapacityLinkedBlockingQueue.Node<E> trail) {// assert isFullyLocked();// p.next is not changed, to allow iterators that are// traversing p to maintain their weak-consistency guarantee.p.item = null;trail.next = p.next;if (last == p)last = trail;if (count.getAndDecrement() == capacity)notFull.signal();}/*** Removes a single instance of the specified element from this queue,* if it is present.  More formally, removes an element {@code e} such* that {@code o.equals(e)}, if this queue contains one or more such* elements.* Returns {@code true} if this queue contained the specified element* (or equivalently, if this queue changed as a result of the call).** @param o element to be removed from this queue, if present* @return {@code true} if this queue changed as a result of the call*/public boolean remove(Object o) {if (o == null) return false;fullyLock();try {for (ResizableCapacityLinkedBlockingQueue.Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {if (o.equals(p.item)) {unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}}/*** Returns {@code true} if this queue contains the specified element.* More formally, returns {@code true} if and only if this queue contains* at least one element {@code e} such that {@code o.equals(e)}.** @param o object to be checked for containment in this queue* @return {@code true} if this queue contains the specified element*/public boolean contains(Object o) {if (o == null) return false;fullyLock();try {for (ResizableCapacityLinkedBlockingQueue.Node<E> p = head.next; p != null; p = p.next)if (o.equals(p.item))return true;return false;} finally {fullyUnlock();}}/*** Returns an array containing all of the elements in this queue, in* proper sequence.** <p>The returned array will be "safe" in that no references to it are* maintained by this queue.  (In other words, this method must allocate* a new array).  The caller is thus free to modify the returned array.** <p>This method acts as bridge between array-based and collection-based* APIs.** @return an array containing all of the elements in this queue*/public Object[] toArray() {fullyLock();try {int size = count.get();Object[] a = new Object[size];int k = 0;for (ResizableCapacityLinkedBlockingQueue.Node<E> p = head.next; p != null; p = p.next)a[k++] = p.item;return a;} finally {fullyUnlock();}}/*** Returns an array containing all of the elements in this queue, in* proper sequence; the runtime type of the returned array is that of* the specified array.  If the queue fits in the specified array, it* is returned therein.  Otherwise, a new array is allocated with the* runtime type of the specified array and the size of this queue.** <p>If this queue fits in the specified array with room to spare* (i.e., the array has more elements than this queue), the element in* the array immediately following the end of the queue is set to* {@code null}.** <p>Like the {@link #toArray()} method, this method acts as bridge between* array-based and collection-based APIs.  Further, this method allows* precise control over the runtime type of the output array, and may,* under certain circumstances, be used to save allocation costs.** <p>Suppose {@code x} is a queue known to contain only strings.* The following code can be used to dump the queue into a newly* allocated array of {@code String}:**  <pre> {@code String[] y = x.toArray(new String[0]);}</pre>** Note that {@code toArray(new Object[0])} is identical in function to* {@code toArray()}.** @param a the array into which the elements of the queue are to*          be stored, if it is big enough; otherwise, a new array of the*          same runtime type is allocated for this purpose* @return an array containing all of the elements in this queue* @throws ArrayStoreException if the runtime type of the specified array*         is not a supertype of the runtime type of every element in*         this queue* @throws NullPointerException if the specified array is null*/@SuppressWarnings("unchecked")public <T> T[] toArray(T[] a) {fullyLock();try {int size = count.get();if (a.length < size)a = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), size);int k = 0;for (ResizableCapacityLinkedBlockingQueue.Node<E> p = head.next; p != null; p = p.next)a[k++] = (T)p.item;if (a.length > k)a[k] = null;return a;} finally {fullyUnlock();}}public String toString() {fullyLock();try {ResizableCapacityLinkedBlockingQueue.Node<E> p = head.next;if (p == null)return "[]";StringBuilder sb = new StringBuilder();sb.append('[');for (;;) {E e = p.item;sb.append(e == this ? "(this Collection)" : e);p = p.next;if (p == null)return sb.append(']').toString();sb.append(',').append(' ');}} finally {fullyUnlock();}}/*** Atomically removes all of the elements from this queue.* The queue will be empty after this call returns.*/public void clear() {fullyLock();try {for (ResizableCapacityLinkedBlockingQueue.Node<E> p, h = head; (p = h.next) != null; h = p) {h.next = h;p.item = null;}head = last;// assert head.item == null && head.next == null;if (count.getAndSet(0) == capacity)notFull.signal();} finally {fullyUnlock();}}/*** @throws UnsupportedOperationException {@inheritDoc}* @throws ClassCastException            {@inheritDoc}* @throws NullPointerException          {@inheritDoc}* @throws IllegalArgumentException      {@inheritDoc}*/public int drainTo(Collection<? super E> c) {return drainTo(c, Integer.MAX_VALUE);}/*** @throws UnsupportedOperationException {@inheritDoc}* @throws ClassCastException            {@inheritDoc}* @throws NullPointerException          {@inheritDoc}* @throws IllegalArgumentException      {@inheritDoc}*/public int drainTo(Collection<? super E> c, int maxElements) {if (c == null)throw new NullPointerException();if (c == this)throw new IllegalArgumentException();if (maxElements <= 0)return 0;boolean signalNotFull = false;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {int n = Math.min(maxElements, count.get());// count.get provides visibility to first n NodesResizableCapacityLinkedBlockingQueue.Node<E> h = head;int i = 0;try {while (i < n) {ResizableCapacityLinkedBlockingQueue.Node<E> p = h.next;c.add(p.item);p.item = null;h.next = h;h = p;++i;}return n;} finally {// Restore invariants even if c.add() threwif (i > 0) {// assert h.item == null;head = h;signalNotFull = (count.getAndAdd(-i) == capacity);}}} finally {takeLock.unlock();if (signalNotFull)signalNotFull();}}/*** Returns an iterator over the elements in this queue in proper sequence.* The elements will be returned in order from first (head) to last (tail).** <p>The returned iterator is* <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.** @return an iterator over the elements in this queue in proper sequence*/public Iterator<E> iterator() {return new ResizableCapacityLinkedBlockingQueue.Itr();}private class Itr implements Iterator<E> {/** Basic weakly-consistent iterator.  At all times hold the next* item to hand out so that if hasNext() reports true, we will* still have it to return even if lost race with a take etc.*/private ResizableCapacityLinkedBlockingQueue.Node<E> current;private ResizableCapacityLinkedBlockingQueue.Node<E> lastRet;private E currentElement;Itr() {fullyLock();try {current = head.next;if (current != null)currentElement = current.item;} finally {fullyUnlock();}}public boolean hasNext() {return current != null;}/*** Returns the next live successor of p, or null if no such.** Unlike other traversal methods, iterators need to handle both:* - dequeued nodes (p.next == p)* - (possibly multiple) interior removed nodes (p.item == null)*/private ResizableCapacityLinkedBlockingQueue.Node<E> nextNode(ResizableCapacityLinkedBlockingQueue.Node<E> p) {for (;;) {ResizableCapacityLinkedBlockingQueue.Node<E> s = p.next;if (s == p)return head.next;if (s == null || s.item != null)return s;p = s;}}public E next() {fullyLock();try {if (current == null)throw new NoSuchElementException();E x = currentElement;lastRet = current;current = nextNode(current);currentElement = (current == null) ? null : current.item;return x;} finally {fullyUnlock();}}public void remove() {if (lastRet == null)throw new IllegalStateException();fullyLock();try {ResizableCapacityLinkedBlockingQueue.Node<E> node = lastRet;lastRet = null;for (ResizableCapacityLinkedBlockingQueue.Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {if (p == node) {unlink(p, trail);break;}}} finally {fullyUnlock();}}}/** A customized variant of Spliterators.IteratorSpliterator */static final class LBQSpliterator<E> implements Spliterator<E> {static final int MAX_BATCH = 1 << 25;  // max batch array size;final ResizableCapacityLinkedBlockingQueue<E> queue;ResizableCapacityLinkedBlockingQueue.Node<E> current;    // current node; null until initializedint batch;          // batch size for splitsboolean exhausted;  // true when no more nodeslong est;           // size estimateLBQSpliterator(ResizableCapacityLinkedBlockingQueue<E> queue) {this.queue = queue;this.est = queue.size();}public long estimateSize() { return est; }public Spliterator<E> trySplit() {ResizableCapacityLinkedBlockingQueue.Node<E> h;final ResizableCapacityLinkedBlockingQueue<E> q = this.queue;int b = batch;int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;if (!exhausted &&((h = current) != null || (h = q.head.next) != null) &&h.next != null) {Object[] a = new Object[n];int i = 0;ResizableCapacityLinkedBlockingQueue.Node<E> p = current;q.fullyLock();try {if (p != null || (p = q.head.next) != null) {do {if ((a[i] = p.item) != null)++i;} while ((p = p.next) != null && i < n);}} finally {q.fullyUnlock();}if ((current = p) == null) {est = 0L;exhausted = true;}else if ((est -= i) < 0L)est = 0L;if (i > 0) {batch = i;return Spliterators.spliterator(a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |Spliterator.CONCURRENT);}}return null;}public void forEachRemaining(Consumer<? super E> action) {if (action == null) throw new NullPointerException();final ResizableCapacityLinkedBlockingQueue<E> q = this.queue;if (!exhausted) {exhausted = true;ResizableCapacityLinkedBlockingQueue.Node<E> p = current;do {E e = null;q.fullyLock();try {if (p == null)p = q.head.next;while (p != null) {e = p.item;p = p.next;if (e != null)break;}} finally {q.fullyUnlock();}if (e != null)action.accept(e);} while (p != null);}}public boolean tryAdvance(Consumer<? super E> action) {if (action == null) throw new NullPointerException();final ResizableCapacityLinkedBlockingQueue<E> q = this.queue;if (!exhausted) {E e = null;q.fullyLock();try {if (current == null)current = q.head.next;while (current != null) {e = current.item;current = current.next;if (e != null)break;}} finally {q.fullyUnlock();}if (current == null)exhausted = true;if (e != null) {action.accept(e);return true;}}return false;}public int characteristics() {return Spliterator.ORDERED | Spliterator.NONNULL |Spliterator.CONCURRENT;}}/*** Returns a {@link Spliterator} over the elements in this queue.** <p>The returned spliterator is* <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.** <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},* {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.** @implNote* The {@code Spliterator} implements {@code trySplit} to permit limited* parallelism.** @return a {@code Spliterator} over the elements in this queue* @since 1.8*/public Spliterator<E> spliterator() {return new ResizableCapacityLinkedBlockingQueue.LBQSpliterator<E>(this);}/*** Saves this queue to a stream (that is, serializes it).** @param s the stream* @throws java.io.IOException if an I/O error occurs* @serialData The capacity is emitted (int), followed by all of* its elements (each an {@code Object}) in the proper order,* followed by a null*/private void writeObject(java.io.ObjectOutputStream s)throws java.io.IOException {fullyLock();try {// Write out any hidden stuff, plus capacitys.defaultWriteObject();// Write out all elements in the proper order.for (ResizableCapacityLinkedBlockingQueue.Node<E> p = head.next; p != null; p = p.next)s.writeObject(p.item);// Use trailing null as sentinels.writeObject(null);} finally {fullyUnlock();}}/*** Reconstitutes this queue from a stream (that is, deserializes it).* @param s the stream* @throws ClassNotFoundException if the class of a serialized object*         could not be found* @throws java.io.IOException if an I/O error occurs*/private void readObject(java.io.ObjectInputStream s)throws java.io.IOException, ClassNotFoundException {// Read in capacity, and any hidden stuffs.defaultReadObject();count.set(0);last = head = new ResizableCapacityLinkedBlockingQueue.Node<E>(null);// Read in all elements and place in queuefor (;;) {@SuppressWarnings("unchecked")E item = (E)s.readObject();if (item == null)break;add(item);}}public int getCapacity() {return capacity;}public void setCapacity(int capacity) {this.capacity = capacity;}
}

controller接口

提供查询线程池状态,修改线程池参数方法, 以及测试线程池方法


import com.example.redissiontest.config.ResizableCapacityLinkedBlockingQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;/*** desc: 测试动态配置线程池** @author qts* @date 2023/11/15 0015*/
@RestController
@RequestMapping("/threadpool")
@Slf4j
public class ThreadPoolTestController {@Autowiredprivate ThreadPoolExecutor threadPoolExecutor;// 业务操作@GetMapping("/biz")public String test(@RequestParam Integer concurrencyCount) {for (int i = 0; i < concurrencyCount; i++) {final int index = i + 1;try {threadPoolExecutor.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}printThreadPoolStatus(index);});} catch (RejectedExecutionException e) {log.error("biz拒绝-index={}", index);}}return "success";}/**** 打印当前线程池的状态*/public void printThreadPoolStatus(int index) {log.info("biz执行-index={};core_size:{};thread_current_size:{};" +"thread_max_size:{};queue_capacity{};queue_current_size:{}",index, threadPoolExecutor.getCorePoolSize(),threadPoolExecutor.getActiveCount(), threadPoolExecutor.getMaximumPoolSize(),((ResizableCapacityLinkedBlockingQueue)threadPoolExecutor.getQueue()).getCapacity(),threadPoolExecutor.getQueue().size());}// 查询线程参数@GetMapping("/getParams")public Map<String, Integer> getParams() {HashMap<String, Integer> params = new LinkedHashMap<>();params.put("core_size", threadPoolExecutor.getCorePoolSize());params.put("thread_max_size", threadPoolExecutor.getMaximumPoolSize());params.put("queue_capacity",((ResizableCapacityLinkedBlockingQueue)threadPoolExecutor.getQueue()).getCapacity());params.put("thread_current_size",threadPoolExecutor.getActiveCount());params.put("queue_current_size", threadPoolExecutor.getQueue().size());return params;}// 修改线程参数@PutMapping("/changeParams")public String changeParams(@RequestBody Map<String, Object> params) {ResizableCapacityLinkedBlockingQueue queue= (ResizableCapacityLinkedBlockingQueue)threadPoolExecutor.getQueue();log.info("corePoolSize before => {}", threadPoolExecutor.getCorePoolSize());log.info("maxPoolSize before => {}", threadPoolExecutor.getMaximumPoolSize());log.info("queueCapacity before => {}", queue.getCapacity());threadPoolExecutor.setCorePoolSize((Integer) params.get("corePoolSize"));threadPoolExecutor.setMaximumPoolSize((Integer) params.get("maxPoolSize"));queue.setCapacity((Integer) params.get("queueCapacity"));log.info("corePoolSize after => {}", threadPoolExecutor.getCorePoolSize());log.info("maxPoolSize after => {}", threadPoolExecutor.getMaximumPoolSize());log.info("queueCapacity before => {}", queue.getCapacity());return "success";}
}

测试结果

  1. 初始参数: corePoolSize=2,maxPoolSize=4,queueCapacity=10
    在这里插入图片描述
  2. 执行测试方法,并发量30
    在这里插入图片描述
  3. 测试接口控制台, 当前线程数最多4个,当前队列数量最多10个任务等待
    在这里插入图片描述
  4. 修改测试 corePoolSize=3, maxPoolSize=6,queueCapacity=20
    在这里插入图片描述
  5. 查询线程池状态,发生变化
    在这里插入图片描述
  6. 再执行测试接口,并发量30 , 观察当前线程数是否是6, 及当前队列的任务数量是否最大是20 , 是则代表动态设置成功
    在这里插入图片描述
    参考文章 文章1
    参考文章 文章2

相关文章:

java线程池动态调节功能实现

java线程池动态调节功能实现 功能背景ThreadPoolExecutor配置自定义可变容LinkedBlockingQueuecontroller接口测试结果 功能背景 由于线程池的参数配置是一个比较难准确配置好, 如果需要进行配置修改, 就会对配置进行修改,再进行部署,影响效率, 或者应用场景的变化,导致固定的…...

KT148A语音芯片使用串口uart本控制的完整说明_包含硬件和指令举例

一、功能简介 KT148A肯定是支持串口的&#xff0c;有客户反馈使用一线还是不方便&#xff0c;比如一些大型的系统不适合有延时的操作&#xff0c;所以更加倾向于使用uart控制&#xff0c;这里我们也给出解决方案 延伸出来另外一个版本&#xff0c;KT158A 注意次版本芯片还是…...

kubectl 本地远程链接k8s多个集群,远程管控多集群,查看日志 部署服务(windows版)

文章目录 一、前言二、windows上安装kubectl和mobaxterm2.1 准备安装包2.2 安装kubectl2.3 链接k8s集群2.4 查看某一个pod的容器日志2.5 切换context 上下文配置&#xff0c;实现在多个k8s集群间动态切换 一、前言 现如今是一个万物皆上云 的时代&#xff0c;各种云层出不穷&am…...

wireshark打开tcpdump抓的包 vwr: Invalid data length runs past the end of the record

tcpdump -i any -n -s0 > t.pcap 使用此命令在Debian系统上抓包&#xff0c;下载到PC&#xff0c;用wireshark打开时报错&#xff1a; 后来发现写入文件时使用 -w 是没问题的&#xff0c;原因还不清楚。 tcpdump -i any -n -s0 -w t.pcap...

Python爬虫教程:从入门到实战

更多Python学习内容&#xff1a;ipengtao.com 大家好&#xff0c;我是涛哥&#xff0c;今天为大家分享 Python爬虫教程&#xff1a;从入门到实战&#xff0c;文章3800字&#xff0c;阅读大约15分钟&#xff0c;大家enjoy~~ 网络上的信息浩如烟海&#xff0c;而爬虫&#xff08;…...

C++实现高频设计模式

面试能说出这几种常用的设计模式即可 1.策略模式 1.1 业务场景 大数据系统把文件推送过来&#xff0c;根据不同类型采取不同的解析方式。多数的小伙伴就会写出以下的代码&#xff1a; if(type"A"){//按照A格式解析 }else if(type"B"){//按照B格式解析 …...

opencv(2): 视频采集和录制

视频采集 相关API VideoCapture()cap.read()&#xff1a; 返回两个值&#xff0c;第一个参数&#xff0c;如果读到frame&#xff0c;返回 True. 第二个参数为相应的图像帧。cap.release() VideoCapture cv2.VideoCapture(0) 0 表示自动检测&#xff0c;如果在笔记本上运行&…...

SpringBoot+EasyExcel设置excel样式

方式一&#xff1a;使用注解方式设置样式 模板可通过HeadFontStyle、HeadStyle、ContentFontStyle、ContentStyle、HeadRowHeight ContentRowHeight等注解设置excel单元格样式&#xff1b; //字体样式及字体大小 HeadFontStyle(fontName "宋体",fontHeightInPoints…...

自定义View之Measure(二)

measure 用来测量 View 的宽和高&#xff0c;它的流程分为 View 的 measure 流程和 ViewGroup 的measure流程&#xff0c;只不过ViewGroup的measure流程除了要完成自己的测量&#xff0c;还要遍历地调用子元素的measure&#xff08;&#xff09;方法。 上一回说到performMeasur…...

SQL注入学习--GTFHub(布尔盲注+时间盲注+MySQL结构)

目录 布尔盲注 手工注入 笔记 Boolean注入 # 使用脚本注入 sqlmap注入 使用Burpsuite进行半自动注入 时间盲注 手工注入 使用脚本注入 sqlmap注入 使用Burpsuite进行半自动注入 MySQL结构 手工注入 sqlmap注入 笔记 union 联合注入&#xff0c;手工注入的一般步骤 …...

Kubernetes学习-概念2

参考&#xff1a;关于 cgroup v2 | Kubernetes 关于 cgroup v2 在 Linux 上&#xff0c;控制组约束分配给进程的资源。 kubelet 和底层容器运行时都需要对接 cgroup 来强制执行为 Pod 和容器管理资源&#xff0c; 这包括为容器化工作负载配置 CPU/内存请求和限制。 Linux 中…...

StyleGAN:彻底改变生成对抗网络的艺术

一、介绍 多年来&#xff0c;人工智能领域取得了显着的进步&#xff0c;其中最令人兴奋的领域之一是生成模型的发展。这些模型旨在生成与人类创作没有区别的内容&#xff0c;例如图像和文本。其中&#xff0c;StyleGAN&#xff08;即风格生成对抗网络&#xff09;因其创建高度逼…...

黑马程序员微服务第四天课程 分布式搜索引擎1

分布式搜索引擎01 – elasticsearch基础 0.学习目标 1.初识elasticsearch 1.1.了解ES 1.1.1.elasticsearch的作用 elasticsearch是一款非常强大的开源搜索引擎&#xff0c;具备非常多强大功能&#xff0c;可以帮助我们从海量数据中快速找到需要的内容 例如&#xff1a; …...

向量以及矩阵

0.前言 好了那我们新的征程也即将开始&#xff0c;那么在此呢我也先啰嗦两句&#xff0c;本篇文章介绍数学基础的部分&#xff0c;因为个人精力有限我不可能没一字一句都讲得非常清楚明白&#xff0c;像矩阵乘法之类的一些基础知识我都是默认你会了&#xff08;还不会的同学推…...

9.程序的机器级代码表示,CISC和RISC

目录 一. x86汇遍语言基础&#xff08;Intel格式&#xff09; 二. AT&T格式汇编语言 三. 程序的机器级代码表示 &#xff08;1&#xff09;选择语句 &#xff08;2&#xff09;循环语句 &#xff08;3&#xff09;函数调用 1.函数调用命令 2.栈帧及其访问 3.栈帧的…...

《硅基物语.AI写作高手:从零开始用ChatGPT学会写作》《从零开始读懂相对论》

文章目录 《硅基物语.AI写作高手&#xff1a;从零开始用ChatGPT学会写作》内容简介核心精华使用ChatGPT可以高效搞定写作的好处如下 《从零开始读懂相对论》内容简介关键点书摘最后 《硅基物语.AI写作高手&#xff1a;从零开始用ChatGPT学会写作》 内容简介 本书从写作与ChatG…...

【2016年数据结构真题】

已知由n&#xff08;M>2&#xff09;个正整数构成的集合A{a<k<n},将其划分为两个不相交的子集A1 和A2&#xff0c;元素个数分别是n1和n2&#xff0c;A1和A2中的元素之和分别为S1和S2。设计一个尽可能高效的划分算法&#xff0c;满足|n1-n2|最小且|s1-s2|最大。要求…...

创作者等级终于升到4级了

写了两个月的文章&#xff0c;终于等到4级了。发文纪念一下&#xff1a;...

Games104现代游戏引擎笔记 面向数据编程与任务系统

Basics of Parallel Programming 并行编程的基础 核达到了上限&#xff0c;无法越做越快&#xff0c;只能通过更多的核来解决问题 Process 进程 有独立的存储单元&#xff0c;系统去管理&#xff0c;需要通过特殊机制去交换信息 Thread 线程 在进程之内&#xff0c;共享了内存…...

系列三、GC垃圾回收【总体概览】

一、GC垃圾回收【总体概览】 JVM进行GC时&#xff0c;并非每次都对上面的三个内存区域&#xff08;新生区、养老区、元空间/永久代&#xff09;一起回收&#xff0c;大部分回收的是新生区里边的垃圾&#xff0c;因此GC按照回收的区域又分为了两种类型&#xff0c;一种是发生在新…...

无线WiFi安全渗透与攻防(N.3)WPA破解-创建Hash-table加速并用Cowpatty破解

WPA破解-创建Hash-table加速并用Cowpatty破解 WPA破解-创建Hash-table加速并用Cowpatty破解1.Cowpatty 软件介绍2.渗透流程1.安装CoWPAtty2.抓握手包1.查看网卡2.开启监听模式3.扫描wifi4.抓握手包5.进行冲突模式攻击3.STA重新连接wifi4.渗透WPA wifi5.使用大字典破解3.hash-ta…...

golang 动态库

目录 1. golang 动态库2. golang 语言使用动态库、调用动态链接库2.1. Go 插件系统2.2. 动态加载的优劣2.3. Go 的插件系统&#xff1a;Plugin2.4. 插件开发原则2.4.1. 插件独立2.4.2. 使用接口类型作为边界2.4.3. Unix 模块化原则2.4.4. 版本控制 2.5. 插件开发示例2.5.1. 编写…...

Python的2042小游戏及其详解

源码&#xff1a; import random import os# 游戏界面尺寸 SIZE 4# 游戏结束标志 GAME_OVER False# 初始化游戏界面 board [[0] * SIZE for _ in range(SIZE)]# 随机生成一个初始方块 def add_random_tile():empty_tiles [(i, j) for i in range(SIZE) for j in range(SIZ…...

怎么去掉邮件内容中的回车符

上图是Outlook 截图&#xff0c;可见1指向的总有回车符&#xff1b; 故障原因&#xff1a; 不小心误按了箭头4这个选项&#xff1b; 解决方法&#xff1a; 点击2箭头确保tab展开&#xff1b; 点击3以找到箭头4. 取消勾选或者多次点击&#xff0c;即可解决。...

Git-概念与架构

GIT-概念与架构 一、背景和起源二、版本控制系统1.版本控制分类1.1 集中式版本控制1.2 分布式版本控制 2.Git和SVN对比2.1 SVN2.2 GIT 三、GIT框架1.工作区&#xff08;working directory&#xff09;2.暂存区&#xff08;staging area&#xff09;3.本地仓库&#xff08;local…...

android 数独小游戏 经典数独·休闲益智

一款经典数独训练app 标题资源下载 &#xff08;0积分&#xff09;https://download.csdn.net/download/qq_38355313/88544810 首页页面&#xff1a; 1.包含有简单、普通、困难、大师四种难度的数独挑战供选择&#xff1b; 记录页面&#xff1a; 1.记录用户训练过的数独信息&…...

GAT里面的sofamax函数的实现:

1.sofamx 公式&#xff1a; 2. GAT里的sofamax函数的实现&#xff1a; 1. 因为指数在x轴正轴爆炸式地快速增长&#xff0c;如果zi比较大&#xff0c;exp⁡(zi)也会非常大&#xff0c;得到的数值可能会溢出。溢出又分为下溢出&#xff08;Underflow&#xff09;和上溢出&#x…...

Idea 编译SpringBoot项目Kotlin报错/Idea重新编译

原因应该是一次性修改了大量的文件, SpringBoot项目启动Kotlin报错, Build Project也是同样的结果, 报错如下 Error:Kotlin: Module was compiled with an incompatible version of Kotlin. The binary version of its metadata is 1.9.0, expected version is 1.1.13. Build-&…...

【Qt之QWizard问题】setPixmap()设置logo、background、watermark无效不显示解决方案

问题原因&#xff1a; 使用QWizard或者QWizardPage设置像素图&#xff0c;结果设置完不显示效果。 设置示例&#xff1a; setPixmap(QWizard::WatermarkPixmap, QPixmap("xxx/xxx/xxx.png"));setPixmap(QWizard::BackgroundPixmap, QPixmap("xxx/xxx/xxx.png&…...

mysql 设置远程登录

为了允许远程连接到MySQL服务器&#xff0c;你需要采取以下步骤&#xff1a; 编辑MySQL配置文件&#xff1a; 打开MySQL的配置文件 my.cnf 或 my.ini&#xff0c;这取决于你的操作系统和MySQL版本。该文件通常位于MySQL安装目录下的 etc 或 etc/mysql 目录中。 添加或确保以下行…...