网站建设哪家公司好网站建设 公司/长沙企业网站建设报价
目录
- BlockingQueue详解
- 0、BlockingQueue简介
- BlockingQueue接口中方法注释
- BlockingQueue的实现,总结计划
- 1、ArrayBlockingQueue简介
- 2、ArrayBlockingQueue的继承体系
- 3、ArrayBlockingQueue的构造方法
- ①、 `ArrayBlockingQueue(int capacity)`
- ②、`ArrayBlockingQueue(int capacity, boolean fair)`
- ③、`ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)`
- 4、ArrayBlockingQueue的适用场景
- ①、资源池管理
- ②、多线程任务调度
- ③、实现生产者-消费者模式
- ④、使用有界队列
- ArrayBlockingQueue 的简单使用代码示例:
- 5、ArrayBlockingQueue的数据结构
- 看下ArrayBlockingQueue类的部分属性
- 6、ArrayBlockingQueue的`put`方法
- 7、ArrayBlockingQueue的`take`方法
- 8、ArrayBlockingQueue,take和put方法动画演示
- 9、ArrayBlockingQueue的其他方法
- `add(E e)`方法
- `offer(E e)` 方法
- `offer(E e, long timeout, TimeUnit unit) ` 方法
- `poll()`方法
- `poll(long timeout, TimeUnit unit)` 方法
- `peek()`方法
- `remove()`方法
- `contains(Object o)`方法
- `drainTo(Collection<? super E> c)`方法
- 10、LinkedBlockingQueue简介和数据结构
- LinkedBlockingQueue属性和构造函数
- LinkedBlockingQueue的`take`和`put`方法
- ArrayBlockingQueue和LinkedBlockingQueue的一些区别?
- 为什么ArrayBlockingQueue不设计成读写锁分离的模式?
- 11、其他的BlockingQueue实现
- PriorityBlockingQueue 简单介绍
- SynchronousQueue简单介绍
- LinkedTransferQueue简单介绍
- LinkedBlockingDeque简单介绍
BlockingQueue详解
总结到这儿,总感觉这集合的多线程味越来越重了~
为什么这样呢,因为 BlockingQueue和前面说过的CopyOnWriteArrayList、
ConcurrentHashMap都是 java.util.concurrent
包下的。
这个包就是为了解决多线程的各种问题而设计的,所以java.util.concurrent
包下的集合大都和多线程有关系。
0、BlockingQueue简介
阻塞队列的基本概念:
阻塞队列(Blocking Queue)是一种特殊的队列,支持两个特殊的操作:
- ①、在队列为空时,获取元素的操作将会被阻塞,直到队列变为非空。
- ②、在队列已满时,插入元素的操作将会被阻塞,直到队列不再是满的。
阻塞队列是线程安全的,且通常用于生产者-消费者模型中。
BlockingQueue 提供了四种不同类型的操作方式:抛出异常、特殊值、阻塞和超时。
BlockingQueue接口中方法注释
public interface BlockingQueue<E> extends Queue<E> {// 尝试添加元素到队列中,如果队列已满,则抛出 IllegalStateExceptionboolean add(E e);// 尝试添加元素到队列中,如果队列已满,则返回 falseboolean offer(E e);// 将元素添加到队列中,如果队列已满,则等待空间可用void put(E e) throws InterruptedException;// 尝试将元素添加到队列中,在指定的等待时间内,如果队列已满,则等待空间可用boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;// 获取并移除队列头部的元素,如果队列为空,则等待直到有元素可用E take() throws InterruptedException;// 尝试获取并移除队列头部的元素,在指定的等待时间内,如果队列为空,则返回 nullE poll(long timeout, TimeUnit unit) throws InterruptedException;// 返回队列剩余的容量int remainingCapacity();// 从队列中移除指定的元素boolean remove(Object o);// 检查队列中是否包含指定的元素boolean contains(Object o);// 从队列中移除所有可用的元素,并将它们添加到指定的集合中int drainTo(Collection<? super E> c);// 从队列中移除最多 maxElements 个可用的元素,并将它们添加到指定的集合中int drainTo(Collection<? super E> c, int maxElements);
}
知道了抽象层接口的规范后。再去看实现就有整体的把握了。
BlockingQueue的实现,总结计划
ArrayBlockingQueue 下面会详细讲解。
LinkedBlockingQueue 讲解一下底层数据结构和 take、put方法。
PriorityBlockingQueue 简单介绍。
SynchronousQueue简单介绍。
LinkedTransferQueue简单介绍。
LinkedBlockingDeque简单介绍。
1、ArrayBlockingQueue简介
ArrayBlockingQueue是JDK的JUC包下对阻塞队列的一种实现。
ArrayBlockingQueue的主要特性如下:
- ①、有界:队列有固定的容量,容量在创建时指定,且不能改变。如果队列已满,插入操作将被阻塞,直到有空间可用。
- ②、阻塞:队列支持阻塞的插入和移除操作。这意味着当队列满时,插入操作会等待,直到有空间可用;当队列空时,移除操作会等待,直到有元素可用。
- ③、线程安全:内部使用可重入锁(ReentrantLock)和两个条件变量(notEmpty和notFull)来管理并发访问。
2、ArrayBlockingQueue的继承体系
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable
可以看到ArrayBlockingQueue实现了Collection和Queue接口,是单值集合,具有队列的特性。
又实现了BlockingQueue接口,具有阻塞队列的特性。
3、ArrayBlockingQueue的构造方法
①、 ArrayBlockingQueue(int capacity)
这个构造方法创建一个具有指定容量的 ArrayBlockingQueue,其内部锁的公平性设置为 false。
/*** 创建一个具有指定容量的ArrayBlockingQueue,锁的公平性设置为false。** @param capacity 队列的容量* @throws IllegalArgumentException 如果容量小于等于0*/
public ArrayBlockingQueue(int capacity) {this(capacity, false);
}
②、ArrayBlockingQueue(int capacity, boolean fair)
这个构造方法创建一个具有指定容量的 ArrayBlockingQueue,并允许指定锁的公平性。如果 fair 为 true,锁将采用公平策略,即按顺序分配锁;如果 false,锁将采用非公平策略。
/*** 创建一个具有指定容量的ArrayBlockingQueue,并指定锁的公平性。** @param capacity 队列的容量* @param fair 指定锁的公平性,如果为true则锁是公平的* @throws IllegalArgumentException 如果容量小于等于0*/
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException(); // 如果容量小于等于0,抛出异常this.items = new Object[capacity]; // 初始化存储元素的数组lock = new ReentrantLock(fair); // 创建一个可重入锁,并指定其公平性notEmpty = lock.newCondition(); // 创建一个条件变量,用于在队列不为空时通知notFull = lock.newCondition(); // 创建一个条件变量,用于在队列未满时通知
}
③、ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
这个构造方法创建一个具有指定容量和锁公平性的 ArrayBlockingQueue,并将指定集合中的元素添加到队列中。
/*** 创建一个具有指定容量和锁公平性的ArrayBlockingQueue,并将集合中的元素添加到队列中。** @param capacity 队列的容量* @param fair 指定锁的公平性,如果为true则锁是公平的* @param c 要添加到队列中的集合* @throws IllegalArgumentException 如果容量小于等于0或者集合中的元素数量超过队列容量* @throws NullPointerException 如果集合或其中任何一个元素为null*/
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {this(capacity, fair); // 调用前一个构造方法,初始化容量和锁的公平性final ReentrantLock lock = this.lock;lock.lock(); // 加锁以确保可见性,而不是互斥try {int i = 0;try {for (E e : c) {checkNotNull(e); // 检查元素是否为null,如果是则抛出NullPointerExceptionitems[i++] = e; // 将元素添加到队列中}} catch (ArrayIndexOutOfBoundsException ex) {throw new IllegalArgumentException(); // 如果集合元素超过容量,抛出异常}count = i; // 设置队列中的元素数量putIndex = (i == capacity) ? 0 : i; // 设置放置下一个元素的索引} finally {lock.unlock(); // 释放锁}
}
4、ArrayBlockingQueue的适用场景
我们先看下这个集合的用途,然后简单的使用体验一下,之后再去看数据结构和一些方法的实现。
①、资源池管理
适用于实现资源池管理,例如数据库连接池。资源池中的资源可以放入 ArrayBlockingQueue,线程可以安全地获取和释放资源。
②、多线程任务调度
在多线程任务调度中,可以使用 ArrayBlockingQueue 来存放任务。工作线程从队列中取任务并执行,新的任务可以被安全地添加到队列中。
③、实现生产者-消费者模式
这个本质上也是多线程任务调度。
④、使用有界队列
当需要限制队列的最大容量时,ArrayBlockingQueue 是一个很好的选择。它在队列满时阻塞生产者,在队列空时阻塞消费者,从而有效地控制系统资源的使用。
还有哪些应用呢? 随便找个Spring项目,然后 Ctrl+鼠标左键点击JDK源码中ArrayBlockingQueue 的类名
就能看到了,可以挑几个感兴趣的研究研究,比如说 兔子消息队列的模板方法类 RabbitTemplate里面有用到ArrayBlockingQueue来实现同步的请求-回复机制。
ArrayBlockingQueue 的简单使用代码示例:
import java.util.concurrent.ArrayBlockingQueue;public class TestA {private static final ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(5);public static void main(String[] args) {Thread producer = new Thread(() -> {try {for (int i = 1; i <= 5; i++) {blockingQueue.put(i);System.out.println("生产者生产了:" + i);}} catch (InterruptedException e) {e.printStackTrace();}}, "生产者");Thread consumer = new Thread(() -> {try {for (int i = 1; i <= 5; i++) {blockingQueue.take();System.out.println("消费者消费了:" + i);}} catch (InterruptedException e) {e.printStackTrace();}}, "消费者");producer.start();consumer.start();}}
执行结果:
生产者生产了:1
消费者消费了:1
生产者生产了:2
消费者消费了:2
生产者生产了:3
消费者消费了:3
生产者生产了:4
消费者消费了:4
生产者生产了:5
消费者消费了:5
是不是感觉变味了,明明在讲集合,咋又扯上多线程了。
因为JUC包里的集合基本上都是为了解决多线程问题而设计的,没办法,不扯多线程不行呀。
不过使用ArrayBlockingQueue 来实现生产者和消费者模型是真的简单呀,只需要利用put和take方法即可,什么线程安全问题,线程通信问题都被ArrayBlockingQueue 给解决了,这些内部的线程安全实现对使用者来说算是透明的,后面会分析这些方法是如何实现的。
5、ArrayBlockingQueue的数据结构
毫无疑问 又是Object[]
数组 。
看下ArrayBlockingQueue类的部分属性
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 存储队列元素的数组final Object[] items;// 下一个要取出、轮询、查看或移除的元素的索引int takeIndex;// 下一个要放置、提供或添加的元素的索引int putIndex;// 队列中的元素数量int count;// 主锁,用于保护所有访问final ReentrantLock lock;// 条件变量,用于等待取出操作private final Condition notEmpty;// 条件变量,用于等待放置操作private final Condition notFull;}
6、ArrayBlockingQueue的put
方法
public void put(E e) throws InterruptedException {// 检查要添加的元素是否为空checkNotNull(e);// 获取队列的锁final ReentrantLock lock = this.lock;// 以可中断的方式获取锁// lock.lockInterruptibly();try {// 如果队列已满,则等待while (count == items.length)notFull.await();// 将元素加入队列enqueue(e);} finally {// 释放锁lock.unlock();}
}
注意: 当线程尝试通过lockInterruptibly()方法获取锁时,如果在此期间线程被其他线程中断(通过Thread.interrupt()方法),那么这个方法会立即抛出InterruptedException,从而允许线程优雅地处理中断,比如停止执行某些操作或清理资源。
enqueue
方法
private void enqueue(E x) { // 获取队列数组final Object[] items = this.items;// 将元素放入当前插入位置items[putIndex] = x;// 更新插入位置索引,如果到达数组末尾则回绕if (++putIndex == items.length)putIndex = 0;// 增加元素计数count++;// 唤醒等待“非空”条件的线程notEmpty.signal();
}
7、ArrayBlockingQueue的take
方法
public E take() throws InterruptedException {// 获取ReentrantLock实例,lock是ArrayBlockingQueue类中的一个成员变量final ReentrantLock lock = this.lock;// 以中断方式获取锁,如果当前线程被中断则抛出InterruptedExceptionlock.lockInterruptibly();try {// 如果队列为空(count == 0),则等待notEmpty条件while (count == 0)notEmpty.await();// 从队列中移除并返回队头元素return dequeue();} finally {// 确保在任何情况下都释放锁,以避免死锁lock.unlock();}
}
private E dequeue() {// 获取存储队列元素的数组final Object[] items = this.items;// 获取takeIndex位置的元素,并将其强制转换为E类型@SuppressWarnings("unchecked")E x = (E) items[takeIndex];// 将取出位置的元素设为null,以便GC回收items[takeIndex] = null;// 增加takeIndex,如果到达数组末尾则回绕到开头if (++takeIndex == items.length)takeIndex = 0;// 递减count,表示队列中的元素数减少count--;// 如果有迭代器在迭代队列,则通知它们元素已被移除if (itrs != null)itrs.elementDequeued();// 唤醒等待队列非满的线程notFull.signal();// 返回取出的元素return x;
}
8、ArrayBlockingQueue,take和put方法动画演示
先看下面的代码示例:
import java.util.concurrent.ArrayBlockingQueue;public class TestA {public static void main(String[] args) {try {ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(3);blockingQueue.put(1);blockingQueue.put(2);blockingQueue.put(3);System.out.println("主线程添加元素:1 2 3");// 使用新线程 t1 再添加 一个元素 由于阻塞队列满了 所以t1线程阻塞Thread t1 = new Thread(() -> {try {blockingQueue.put(4);System.out.println("t1线程添加元素:4");} catch (InterruptedException e) {e.printStackTrace();}});t1.start();Integer take1 = blockingQueue.take();// 主线程拿走一个元素System.out.println("主线程拿走元素:" + take1);t1.join(); // 等待t1 添加完成System.out.println("当前队列中元素:" + blockingQueue);blockingQueue.clear(); // 清空 阻塞队列System.out.println("==========队列已清空===========");// 使用新线程 t2 获取元素 由于阻塞队列为空 所以t2线程阻塞Thread t2 = new Thread(() -> {try {Integer take = blockingQueue.take();System.out.println("t2线程获取元素: " + take + "成功");} catch (InterruptedException e) {e.printStackTrace();}});t2.start();blockingQueue.put(5);System.out.println("主线程添加元素:5");t2.join();} catch (InterruptedException e) {e.printStackTrace();}}}
执行结果:
一定要结合代码的执行顺序看结果
主线程添加元素:1 2 3
主线程拿走元素:1
t1线程添加元素:4
当前队列中元素:[2, 3, 4]
==========队列已清空===========
主线程添加元素:5
t2线程获取元素: 5成功
下面用动画演示下
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put(1);
blockingQueue.put(2);
blockingQueue.put(3);
添加元素,把队列添加满的情况
下面再来看当队列满了之后再去添加元素的情况:
// 使用新线程 t1 再添加 一个元素 由于阻塞队列满了 所以t1线程阻塞
Thread t1 = new Thread(() -> {try {blockingQueue.put(4);System.out.println("t1线程添加元素:4");} catch (InterruptedException e) {e.printStackTrace();}
});
t1.start();
Integer take1 = blockingQueue.take();// 主线程拿走一个元素
最后再看下队列清空后 执行take的情况:
blockingQueue.clear(); // 清空 阻塞队列System.out.println("==========队列已清空===========");// 使用新线程 t2 获取元素 由于阻塞队列为空 所以t2线程阻塞Thread t2 = new Thread(() -> {try {Integer take = blockingQueue.take();System.out.println("t2线程获取元素: " + take + "成功");} catch (InterruptedException e) {e.printStackTrace();}});t2.start();blockingQueue.put(5);
9、ArrayBlockingQueue的其他方法
add(E e)
方法
将元素添加到队列中,当队列已满时,抛出 IllegalStateException。
public boolean add(E e) {// 调用父类的add方法(实际就是调用下面重写的add方法)return super.add(e);
}public boolean add(E e) {// 调用offer方法 尝试将元素插入队列 if (offer(e))return true;else// 如果插入失败(队列已满),抛出IllegalStateExceptionthrow new IllegalStateException("Queue full");
}
offer(E e)
方法
将元素添加到队列中,如果队列已满,立即返回 false。
public boolean offer(E e) {// 检查传入的元素是否为null,如果为null,抛出NullPointerExceptioncheckNotNull(e);// 获取队列的重入锁final ReentrantLock lock = this.lock;// 获取锁lock.lock();try {// 如果队列已满,返回falseif (count == items.length)return false;else {// 将元素添加到队列enqueue(e);return true;}} finally {// 释放锁lock.unlock();}
}
offer(E e, long timeout, TimeUnit unit)
方法
offer方法的重载,带超时时间。
尝试将元素添加到队列中,如果队列已满,则等待指定的时间。
如果在等待时间内队列有空闲空间,则将元素添加到队列中并返回 true;如果等待超时,则返回 false。
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {// 检查传入的元素是否为null,如果为null,抛出NullPointerExceptioncheckNotNull(e);// 将等待时间转换为纳秒long nanos = unit.toNanos(timeout);// 获取队列的重入锁final ReentrantLock lock = this.lock;// 以可中断的方式获取锁,如果当前线程在等待获取锁时被中断,则抛出InterruptedExceptionlock.lockInterruptibly();try {// 如果队列已满,进入等待状态,直到队列有空闲空间或等待超时while (count == items.length) {// 如果等待时间已到,返回falseif (nanos <= 0)return false;// 等待notFull条件变量,返回剩余的等待时间 = 传入的nanos - notFull条件实际等待的时间nanos = notFull.awaitNanos(nanos);}// 将元素添加到队列enqueue(e);return true;} finally {// 释放锁lock.unlock();}
}
poll()
方法
从队列中移除并返回队头元素,如果队列为空,返回 null。
public E poll() {// 获取队列的重入锁final ReentrantLock lock = this.lock;// 获取锁lock.lock();try {// 如果队列为空,返回null,否则移除并返回队头元素return (count == 0) ? null : dequeue();} finally {// 释放锁lock.unlock();}
}
poll(long timeout, TimeUnit unit)
方法
从队列中移除并返回队头元素。如果队列为空,则等待指定的时间。
如果在等待时间内队列有元素入队,则返回队头元素;如果等待超时,则返回 null。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 将等待时间转换为纳秒long nanos = unit.toNanos(timeout);// 获取队列的重入锁final ReentrantLock lock = this.lock;// 以可中断的方式获取锁,如果当前线程在等待获取锁时被中断,则抛出InterruptedExceptionlock.lockInterruptibly();try {// 如果队列为空,进入等待状态,直到队列有元素或等待超时while (count == 0) {// 如果等待时间已到,返回nullif (nanos <= 0)return null;// 等待notEmpty条件变量,返回剩余的等待时间 = 传入的nanos - notFull条件实际等待的时间nanos = notEmpty.awaitNanos(nanos);}// 从队列中移除并返回队头元素return dequeue();} finally {// 释放锁lock.unlock();}
}
peek()
方法
返回队头元素但不移除它,如果队列为空,返回 null。
public E peek() {// 获取队列的重入锁final ReentrantLock lock = this.lock;// 获取锁lock.lock();try {// 返回队头元素但不移除它,如果队列为空,返回nullreturn itemAt(takeIndex);} finally {// 释放锁lock.unlock();}
}final E itemAt(int i) {// 获取底层数组中相应索引位置的元素return (E) items[i];}
remove()
方法
从队列中移除并返回队头元素,如果队列为空,抛出 NoSuchElementException。
public E remove() {// 尝试移除并返回队头元素E x = poll();// 如果队头元素不为空,返回该元素if (x != null)return x;else// 如果队列为空,抛出NoSuchElementExceptionthrow new NoSuchElementException();
}
contains(Object o)
方法
检查队列中是否包含指定元素。
public boolean contains(Object o) {// 如果传入的对象为null,返回falseif (o == null) return false;// 获取队列的数组final Object[] items = this.items;// 获取队列的重入锁final ReentrantLock lock = this.lock;// 获取锁lock.lock();try {// 如果队列中有元素,检查每个元素是否与传入的对象相等if (count > 0) {// 获取当前插入位置索引final int putIndex = this.putIndex;// 从队列的读取位置开始检查int i = takeIndex;do {// 如果找到相等的元素,返回trueif (o.equals(items[i]))return true;// 更新检查位置索引if (++i == items.length)i = 0;} while (i != putIndex);}// 如果未找到相等的元素,返回falsereturn false;} finally {// 释放锁lock.unlock();}
}
drainTo(Collection<? super E> c)
方法
将队列中的所有元素转移到指定的集合中。
此操作是一个批量操作,试图一次性地清空队列并把队列中所有元素转移到指定的集合中。
public int drainTo(Collection<? super E> c) {// 调用重载方法return drainTo(c, Integer.MAX_VALUE);}public int drainTo(Collection<? super E> c, int maxElements) {// 检查目标集合是否为null,如果为null,抛出NullPointerExceptioncheckNotNull(c);// 检查目标集合是否为队列本身,如果是,抛出IllegalArgumentExceptionif (c == this)throw new IllegalArgumentException();// 如果maxElements小于等于0,直接返回0if (maxElements <= 0)return 0;// 获取队列的重入锁final ReentrantLock lock = this.lock;// 获取锁lock.lock();try {// 计算实际要转移的元素数量int n = Math.min(maxElements, count);// 将元素从队列中转移到目标集合for (int i = 0; i < n; i++) {c.add(this.dequeue());}// 返回实际转移的元素数量return n;} finally {// 释放锁lock.unlock();}
}
10、LinkedBlockingQueue简介和数据结构
LinkedBlockingQueue属性和构造函数
看下面两个类属性就知道了
transient Node<E> head;private transient Node<E> last;
LinkedBlockingQueue 是一个基于链表实现的阻塞队列。
再看下构造函数:
// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue 实例。
public LinkedBlockingQueue() {// 调用带有容量参数的构造函数,默认容量为Integer.MAX_VALUEthis(Integer.MAX_VALUE);
}// 创建一个指定容量的 LinkedBlockingQueue 实例。
public LinkedBlockingQueue(int capacity) {// 检查容量是否大于0,否则抛出IllegalArgumentExceptionif (capacity <= 0) throw new IllegalArgumentException();// 初始化队列容量this.capacity = capacity;// 初始化头节点和尾节点为哨兵节点,不存储实际数据last = head = new Node<E>(null);
}// 使用给定的集合创建一个 LinkedBlockingQueue 实例,并将集合中的元素添加到队列中。
public LinkedBlockingQueue(Collection<? extends E> c) {// 调用带有容量参数的构造函数,默认容量为Integer.MAX_VALUEthis(Integer.MAX_VALUE);// 获取插入锁final ReentrantLock putLock = this.putLock;// 获取锁以确保可见性putLock.lock();try {int n = 0;// 遍历集合中的每个元素for (E e : c) {// 如果元素为null,抛出NullPointerExceptionif (e == null)throw new NullPointerException();// 如果元素数量达到容量,抛出IllegalStateExceptionif (n == capacity)throw new IllegalStateException("Queue full");// 将元素包装成节点并添加到队列中enqueue(new Node<E>(e));++n;}// 更新队列中元素的数量count.set(n);} finally {// 释放锁putLock.unlock();}
}
LinkedBlockingQueue的take
和put
方法
首先先明确一点LinkedBlockingQueue的读锁和写锁是分开的,这点和ArrayBlockingQueue不同。ArrayBlockingQueue读写用的都是同一个锁。
LinkedBlockingQueue 使用了读写分离锁,即 takeLock 和 putLock,来分别控制读取和插入操作。这种设计可以减少锁竞争,提高并发性能。
private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();
take
方法
public E take() throws InterruptedException {E x; // 存储从队列中取出的元素int c = -1; // 用于记录当前队列元素数量final AtomicInteger count = this.count; // 队列中元素的数量 缓存成员变量到局部变量final ReentrantLock takeLock = this.takeLock; // 读取操作的锁takeLock.lockInterruptibly(); // 获取读取锁,可被中断try {// 当队列为空时,等待notEmpty条件while (count.get() == 0) {notEmpty.await(); // 等待队列非空}x = dequeue(); // 从队列中取出一个元素c = count.getAndDecrement(); // 获取并递减当前队列中的元素数量// 如果队列中还有剩余元素,唤醒其他等待线程if (c > 1)notEmpty.signal(); // 唤醒其他等待的读取操作} finally {takeLock.unlock(); // 释放读取锁}// 如果取出元素后,队列变得不满,唤醒等待的插入操作if (c == capacity)signalNotFull(); // 通知等待的插入操作return x; // 返回取出的元素
}
take方法步骤说明:
- 获取 takeLock 锁,确保只有一个线程可以执行读取操作。
- 如果队列为空,等待 notEmpty 条件。
- 从队列中取出一个元素,并递减元素数量。
- 如果队列中仍有元素,唤醒其他等待的读取操作。
- 释放 takeLock 锁。
- 如果队列之前已满,现在有空闲,唤醒等待的插入操作。
- 返回取出的元素。
put
方法
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException(); // 检查插入元素是否为nullint c = -1; // 用于记录当前队列元素数量Node<E> node = new Node<E>(e); // 创建一个新的节点包装插入的元素final ReentrantLock putLock = this.putLock; // 插入操作的锁final AtomicInteger count = this.count; // 队列中元素的数量 缓存成员变量到局部变量putLock.lockInterruptibly(); // 获取插入锁,可被中断try {// 当队列已满时,等待notFull条件while (count.get() == capacity) {notFull.await(); // 等待队列非满}enqueue(node); // 将新节点插入到队列中c = count.getAndIncrement(); // 获取并递增当前队列中的元素数量// 如果插入后队列仍未满,唤醒其他等待线程if (c + 1 < capacity)notFull.signal(); // 唤醒其他等待的插入操作} finally {putLock.unlock(); // 释放插入锁}// 如果插入元素前队列为空,唤醒等待的读取操作if (c == 0)signalNotEmpty(); // 通知等待的读取操作
}
put方法步骤说明:
- 检查插入的元素是否为 null,如果是,则抛出 NullPointerException。
- 创建一个新节点包装插入的元素。
- 获取 putLock 锁,确保只有一个线程可以执行插入操作。
- 如果队列已满,等待 notFull 条件。
- 将新节点插入到队列中,并递增元素数量。
- 如果插入后队列仍未满,唤醒其他等待的插入操作。
- 释放 putLock 锁。
- 如果插入元素前队列为空,现在有元素,唤醒等待的读取操作。
注意: 队列中元素数量使用AtomicInteger保证原子性。同时缓存成员变量到局部变量有一些好处。
例如可以提高代码的可读性和可维护性,同时也有助于性能优化和确保线程安全的引用。
final AtomicInteger count = this.count; // 队列中元素的数量
并且源码中对于上面这段代码不在lock锁范围内使用有一段注释:
关于count的非保护使用: 注释指出,尽管count变量没有直接由putLock保护(即在读取count时没有加锁),这种做法在这里是安全的。原因是当线程持有putLock时,其他试图插入或者移除元素的线程都被阻塞了,因此count的值只可能因为当前线程或其他线程从等待中被唤醒并完成了插入或移除操作而改变。这意味着,即使在等待期间检查count的值,也不会因为并发修改而导致不一致。
ArrayBlockingQueue和LinkedBlockingQueue的一些区别?
特性 | ArrayBlockingQueue | LinkedBlockingQueue |
---|---|---|
底层数据结构 | 数组 | 链表 |
有界/无界 | 有界(在构造时指定固定容量) | 可以有界(指定容量)也可以无界(默认) |
锁机制 | 单一锁(用于 put 和 take 操作) | 读写锁分离(分别用于 put 和 take 操作) |
性能 | 在单线程或低并发场景中性能较好,因为锁开销较少 | 在高并发场景中性能较好,因为读写操作分离减少了锁竞争 |
内存开销 | 固定内存开销(数组大小) | 潜在的更高内存开销(链表节点) |
容量扩展 | 不支持动态扩展,容量固定 | 支持动态扩展(如果无界) |
适用场景 | 适用于固定大小的队列和低并发场景 | 适用于需要动态扩展的队列和高并发场景 |
为什么ArrayBlockingQueue不设计成读写锁分离的模式?
我觉得有如下原因:
- 数据结构导致的设计差异
ArrayBlockingQueue:
使用数组作为底层数据结构。
需要在固定大小的数组中进行元素的插入和删除操作,这些操作涉及到对数组索引的维护和管理。
队列满或空时,需要阻塞插入或删除操作,涉及到队列头尾指针的调整。
如果使用读写锁分离,需要考虑并处理队列头尾指针调整的线程安全问题,设计起来比较复杂。
LinkedBlockingQueue:
使用链表作为底层数据结构。
插入和删除操作只需调整链表的头尾指针,不涉及到数组索引的管理。
链表的结构使得读写操作更容易分离,适合使用不同的锁进行管理。
- 性能和设计权衡
ArrayBlockingQueue是针对固定大小的队列,设计目标就是简单、高效。
ArrayBlockingQueue也并非不能设计成读写锁模式,只是投入和回报比不成正比。读写锁的引入会增加复杂度,性能提升可能有限,不一定值得。
11、其他的BlockingQueue实现
PriorityBlockingQueue 简单介绍
PriorityBlockingQueue 是一个基于优先级的无界阻塞队列。
继承体系
public class PriorityBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable
数据结构:
底层数据结构:PriorityBlockingQueue 基于一个可调整大小的数组实现的二叉小顶堆(最小堆),它使用数组来表示二叉堆。后面会再写一篇PriorityQueue详解的文章详细介绍其内部数据结构。
优先级排序:
队列中的元素按照自然顺序(通过实现 Comparable 接口)或通过提供的比较器(Comparator)进行排序。元素的优先级决定了它们在队列中的顺序。
适用场景:
**任务调度:**适用于需要按照优先级处理任务的场景。
路径查找: 在图算法中,优先队列通常用于实现最短路径查找算法,如 Dijkstra 算法。
**多线程环境:**需要在多线程环境中进行优先级调度的场景。
SynchronousQueue简单介绍
SynchronousQueue 是 Java 并发包中的一种特殊类型的队列。
继承体系
public class SynchronousQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable
特点:
无存储元素: SynchronousQueue 内部并不存储任何元素,即使是空间大小为 0。
直接传输: 它主要用于线程之间直接传输元素,生产者线程将元素直接交给消费者线程,而不会将元素存储在队列中。
阻塞操作: SynchronousQueue 的插入和移除操作是阻塞的。
公平性: 它是一个公平的队列,采用公平的顺序来处理元素的访问。
容量: 容量为 0,即只能容纳一个正在进行交换的元素。
适用场景
直接传输: 适用于需要在生产者和消费者之间进行直接传输的场景,例如线程池任务调度等。
流水线处理: 适用于将生产者生成的数据直接传输给消费者处理的情况,避免数据存储和额外的线程开销。
同步控制: 在并发编程中,用于线程同步和控制流量的传输。
LinkedTransferQueue简单介绍
LinkedTransferQueue 是 Java 并发包中的一个特殊类型的队列,它结合了队列(Queue)和传输队列(TransferQueue)的特性。
继承体系
public class LinkedTransferQueue<E> extends AbstractQueue<E>implements TransferQueue<E>, java.io.Serializable
数据结构:
链表
特点
无界队列: LinkedTransferQueue 是一个无界队列,可以动态增长以容纳更多的元素。
插入和移除操作: 支持常规的插入(offer、add)和移除(poll、take)操作。
传输操作: 支持直接的元素传输操作,即使队列为空也可以进行元素传输。
阻塞和非阻塞操作: 提供了阻塞和非阻塞的插入和移除方法,以及等待传输的操作。
公平性: 在处理元素时采用公平的顺序,即按照等待时间长短处理。
适用场景
生产者-消费者模式: 适用于多线程环境下的生产者和消费者模式,支持高并发的元素传输和处理。
任务调度: 在任务调度器中,可以使用 LinkedTransferQueue 来管理和调度任务,实现任务之间的依赖和传递。
优先级处理: 可以基于元素的属性或优先级进行传输和处理,支持优先级队列的特性。
LinkedBlockingDeque简单介绍
LinkedBlockingDeque 是 Java 并发包中的双向阻塞队列,结合了双向队列(Deque)和阻塞队列(BlockingDeque)的特性。
继承体系
public class LinkedBlockingDeque<E>extends AbstractQueue<E>implements BlockingDeque<E>, java.io.Serializable
数据结构:
双向链表
特点
双向队列: LinkedBlockingDeque 是一个双向队列,支持在队列两端(头部和尾部)进行插入和移除操作。
阻塞操作: 队列的插入和移除操作是阻塞的,即当队列为空或已满时,插入和移除操作会阻塞调用线程,直到有可用的空间或元素。
无界队列: 与 LinkedBlockingQueue 类似,LinkedBlockingDeque 也是一个无界队列,可以动态增长以容纳更多的元素。
线程安全: LinkedBlockingDeque 是线程安全的,支持多个线程同时进行插入、移除和访问操作。
公平性: 使用公平的顺序来处理等待队列的线程。
相关文章:

BlockingQueue详解(含动画演示)
目录 BlockingQueue详解0、BlockingQueue简介BlockingQueue接口中方法注释BlockingQueue的实现,总结计划 1、ArrayBlockingQueue简介2、ArrayBlockingQueue的继承体系3、ArrayBlockingQueue的构造方法①、 ArrayBlockingQueue(int capacity)②、ArrayBlockingQueue(…...

wordpress商用付费主题与免费主题的区别
WordPress免费主题与WordPress付费主题,都可以用,但存在非常大的差别。从直观的感受,简单地说就是,WordPress免费主题能用,WordPress付费主题好用。如果涉及到其它的方面,WordPress商用付费主题与免费主题之…...

【ARM Trace32(劳特巴赫) 使用介绍 2.7 -- bat 脚本传参数给 trace32 cmm 脚本】
请阅读【Trace32 ARM 专栏导读】 文章目录 bat 脚本传参数给 trace32脚本可变参数传入CMM 脚本接收参数运行BAT脚本bat 脚本传参数给 trace32脚本 在使用 Trace32 的过程中,如果每次都是通过GUI 界面来操作,是习惯使用命令行工作的人所不能忍受的!!!,那么能不同通过脚本…...

NavicatforMySQL11.0软件下载-NavicatMySQL11最新版下载附件详细安装步骤
我们必须承认Navicat for MySQL 支援 Unicode,以及本地或远程 MySQL 服务器多连线,使用者可浏览数据库、建立和删除数据库、编辑数据、建立或执行 SQL queries、管理使用者权限(安全设定)、将数据库备份/复原、汇入/汇出数据&…...

弱监督学习
弱监督学习(Weak Supervision)是一种利用不完全、不精确或噪声数据进行模型训练的方法。以下是一些常用的弱监督方法及其原理: 1. 数据增强(Data Augmentation) 原理: 数据增强是一种通过增加训练数据的多…...

代码随想录算法训练营第五十天|LeetCode1143 最长公共子序列、LeetCode1035 不相交的线、LeetCode53 最大子数组和
题1: 指路:1143. 最长公共子序列 - 力扣(LeetCode) 思路与代码: 类似于最长重复子数组,我们依旧定义一个二维数组dp[i][j],其含义为从0到以i-1结尾的nums1数组和从0到j-1结尾的nums2数组的最…...

百日筑基第三天-SOA初步了解
百日筑基第三天-SOA初步了解 SOA(Service-Oriented Architecture,面向服务的架构)是一种软件设计原则,它倡导将应用程序分解为独立的服务单元,这些服务通过定义良好的接口相互通信,以实现业务功能。而RPC&…...

「2024中国数据要素产业图谱1.0版」重磅发布,景联文科技凭借高质量数据采集服务入选!
近日,景联文科技入选数据猿和上海大数据联盟发布的《2024中国数据要素产业图谱1.0版》数据采集服务板块。 景联文科技是专业数据服务公司,提供从数据采集、清洗、标注的全流程数据解决方案,协助人工智能企业解决整个AI链条中数据采集和数据标…...

条码二维码读取设备在医疗设备自助服务的重要性
医疗数字信息化建设的深入推进,医疗设备自助服务系统已成为医疗服务领域的一大趋势,条码二维码读取设备作为自助设备的重要组成部分,通过快速、准确地读取条形码二维码信息,不公提升了医疗服务效率,还为患者提供了更加…...

centos 7.8 安装sql server 2019
1.系统环境 centos 7.8 2.数据库安装文件准备 下载 SQL Server 2019 (15.x) Red Hat 存储库配置文件 sudo curl -o /etc/yum.repos.d/mssql-server.repo https://packages.microsoft.com/config/rhel/7/mssql-server-2019.repo 采用yum源进行不安装下载,这时yum 会自动检测…...

Android焦点机制结合WMS
文章前提: 了解WMS基本作用了解window的概念,phoneWindow,rootViewImpl了解view的事件分发 开始: 讲三件事情: window的创建,更新焦点的更新事件的分发 Window的创建,更新: wi…...

Hive分区和分桶
分区: 根据某一列进行进行划分存储,常用的有时间分区; 查询数据时只需要扫描特定的分区数据,不需要全盘扫描,节省时间, 方便数据归档和清理 创建分区表 create table table_name( col1 int, col2 string ) partition …...

GPT-5的到来~
IT之家6月22日消息,在美国达特茅斯工程学院周四公布的采访中,OpenAI首席技术官米拉穆拉蒂被问及GPT-5是否会在明年发布,给出了肯定答案并表示将在一年半后发布。此外,穆拉蒂在采访中还把GPT-4到GPT-5的飞跃描述为高中生到博士生的成长。“像 GPT-4 这样的系统则更像是聪明的…...

责任链模式(设计模式)
责任链模式(Chain of Responsibility Pattern)是一种行为设计模式,它允许多个对象有机会处理请求,从而避免请求的发送者和接收者之间的耦合。将这些对象连成一条链,并沿着这条链传递请求,直到有一个对象处理…...

计算机图形学入门20:加速光线追踪
1.前言 前文说了Whitted-style光线追踪技术的原理以及光线与平面的交点计算方式,对于现在应用最广的Polygon Mesh显式曲面来说,一个复杂场景中的多边形面总数可能达到千万甚至亿万以上,如果每个像素发射光线都和场景中每个平面进行求交点计算…...

sys.stdin对象——实现标准输入
自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 语法参考 sys.stdin是一个标准化输入对象,可以连续输入或读入文件所有内容,不结束,不能直接使用。输入完成后&am…...

嵌入式项目分享| 终极智能手表,全过程+全开源分享
这是一个非常完整的智能手表开源项目,功能齐全,且资料开源,如果你是:自己平时喜欢diy的工程师,想要提升开发技能的学生,马上要做毕设的大四学生,这个手表很值得一做,别错过了~~ 所有开源的资料以及原文链接见文末。 先来看下这个手表的功能: 首先,是一个可以佩戴的手…...

【Linux详解】进程的状态 | 运行 阻塞 挂起 | 僵尸和孤儿状态
目录 操作系统中 运行状态 阻塞状态 进程状态转换 Linux系统中 查看进程状态 深度睡眠状态 T 暂停状态 Z 僵尸状态 孤儿状态 文章手稿 xmind: 引言 介绍系统中的进程状态及其管理方式。将通过结合操作系统原理和实际代码示例,详细说明进程的各种状态、转换…...

MySQL添加外键约束经典案例
1DDL建表语句 需要一个emp员工表和一个dept部门表 CREATE TABLE emp (id int NOT NULL AUTO_INCREMENT,name varchar(50) COLLATE utf8mb4_0900_as_ci NOT NULL COMMENT 姓名,age int DEFAULT NULL COMMENT 年龄,job varchar(20) COLLATE utf8mb4_0900_as_ci DEFAULT NULL CO…...

vue3监听器watch以及watchEffect的使用
一,watch()简介: 侦听一个或多个响应式数据源,并在数据源变化时调用所给的回调函数 watch()默认是懒侦听的,即仅在侦听源发生变化时才执行回调函数。 watch()一共有三个参数 第一个参数:侦听器的源,可以为以…...

modelsim做后仿真的一点思路
这是以TD_5.6.3_Release_88061生成的网表文件(其他工具生成的网表文件类似),与modelsim联合进行门级仿真的样例,时序仿真与门级仿真的方法类似,只是增加了标准延时文件。 1、建立门级仿真工程 将门级网表和testbench添…...

如何获取特定 HIVE 库的元数据信息如其所有分区表和所有分区
如何获取特定 HIVE 库的元数据信息如其所有分区表和所有分区 1. 问题背景 有时我们需要获取特定 HIVE 库下所有分区表,或者所有分区表的所有分区,以便执行进一步的操作,比如通过 使用 HIVE 命令 MSCK REPAIR TABLE table_name sync partiti…...

如何在 qmake(QtCreator)中指定 Mac 平台
在 Qt 项目文件(.pro 文件)中设置针对 Mac OS 的配置项。通常情况下,我们可以使用如下方式为 Windows 和 Unix 系统分别添加源文件: win32 {SOURCES += hellowin.cpp } unix {SOURCES += hellounix.cpp }虽然 Mac OS 是类 Unix 系统,但有时我们仍然需要区分它和 Linux 系…...

day39动态规划part02| 62.不同路径 63. 不同路径 II 343. 整数拆分 (可跳过)96..不同的二叉搜索树 (可跳过)
**62.不同路径 ** 本题大家掌握动态规划的方法就可以。 数论方法 有点非主流,很难想到。 题目讲解 | 视频讲解 class Solution { public:int uniquePaths(int m, int n) {// 确定数组及其下标的含义int dp[101][101] {0}; //到达i,j的点有多少条路径// 确定递推…...

声场合成新方法:基于声波传播的框架
声场合成是指在房间内的麦克风阵列上,根据来自房间内其他位置的声源信号,合成每个麦克风的音频信号。它是评估语音/音频通信设备性能指标的关键任务,因为它是一种成本效益高的方法,用于数据生成以替代真实的数据收集,后…...

鸿蒙文件操作事前准备
13900001,沙箱13900002 首选授权 module授权配置 "requestPermissions": [{ "name": "ohos.permission.CAMERA",}, { "name": "ohos.permission.READ_MEDIA",}, { "name": "ohos.permission.WR…...

AI智能时代:ChatGPT如何在金融市场发挥策略分析与预测能力?
文章目录 一、ChatGPT在金融策略制定中的深度应用客户需求分析与定制化策略市场动态跟踪与策略调整策略分析与优化 二、ChatGPT在算法交易中的深度应用自动交易策略制定交易执行与监控风险管理 三、未来展望《智能量化:ChatGPT在金融策略与算法交易中的实践》亮点内…...

C#面:C#属性能在接口中声明吗?
在C#中,接口是一种定义了一组方法、属性和事件的类型。在接口中,只能声明方法、属性和事件的签名,而不能包含字段、构造函数或实现代码。因此,C#属性不能直接在接口中声明。 然而,你可以在接口中定义属性的签名&#…...

区块链的历史和发展:从比特币到以太坊
想象一下,你住在一个小镇上,每个人都有一个大账本,记录着所有的交易。这个账本很神奇,每当有人买卖东西,大家都会在自己的账本上记一笔,确保每个人的账本都是一致的。这就是区块链的基本思想。而区块链的故…...

input()函数——输入
自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 语法参考 input()函数可以提示并接收用户的输入,将所有的输入按照字符串进行处理,并返回一个字符串,input()函数的…...