您好,登录后才能下订单哦!
在多线程编程中,队列是一种常用的数据结构,用于在生产者和消费者之间传递数据。Java并发包(JUC)提供了多种线程安全的队列实现,其中LinkedBlockingQueue
是一个基于链表实现的有界或无界阻塞队列。本文将深入分析LinkedBlockingQueue
的源码,探讨其内部实现机制、线程安全性以及性能特点。
LinkedBlockingQueue
是一个基于链表的阻塞队列,它可以在队列满时阻塞生产者线程,在队列空时阻塞消费者线程。LinkedBlockingQueue
既可以是有界的,也可以是无界的。如果构造时指定了容量,则队列是有界的;否则,队列的容量为Integer.MAX_VALUE
,即无界队列。
LinkedBlockingQueue
内部使用链表来存储元素,因此它具有良好的扩展性。LinkedBlockingQueue
通过锁机制保证线程安全。LinkedBlockingQueue
适用于以下场景:
LinkedBlockingQueue
的类结构如下:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 内部节点类
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
// 队列容量
private final int capacity;
// 当前队列中的元素数量
private final AtomicInteger count = new AtomicInteger();
// 队列头节点
transient Node<E> head;
// 队列尾节点
private transient Node<E> last;
// 消费者锁
private final ReentrantLock takeLock = new ReentrantLock();
// 消费者条件队列
private final Condition notEmpty = takeLock.newCondition();
// 生产者锁
private final ReentrantLock putLock = new ReentrantLock();
// 生产者条件队列
private final Condition notFull = putLock.newCondition();
// 构造方法
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
// 其他方法...
}
LinkedBlockingQueue
内部使用Node
类来表示链表中的节点。每个节点包含一个元素item
和一个指向下一个节点的引用next
。
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
LinkedBlockingQueue
通过capacity
字段表示队列的容量,通过count
字段表示当前队列中的元素数量。count
是一个AtomicInteger
类型的变量,用于保证线程安全。
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
LinkedBlockingQueue
使用head
和last
两个字段分别表示队列的头节点和尾节点。头节点是一个哨兵节点,不存储实际元素。
transient Node<E> head;
private transient Node<E> last;
LinkedBlockingQueue
使用两个锁来保证线程安全:takeLock
用于消费者线程,putLock
用于生产者线程。每个锁都有一个对应的条件队列:notEmpty
用于消费者线程等待队列非空,notFull
用于生产者线程等待队列非满。
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue
提供了两个构造方法:一个无参构造方法,创建一个无界队列;另一个构造方法接受一个容量参数,创建一个有界队列。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
LinkedBlockingQueue
提供了多种入队操作,包括put
、offer
和add
。其中,put
方法在队列满时会阻塞,直到队列有空闲空间;offer
方法在队列满时会立即返回false
;add
方法在队列满时会抛出IllegalStateException
。
put
方法用于将元素插入队列尾部。如果队列已满,则当前线程会被阻塞,直到队列有空闲空间。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
offer
方法用于将元素插入队列尾部。如果队列已满,则立即返回false
。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
add
方法用于将元素插入队列尾部。如果队列已满,则抛出IllegalStateException
。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
LinkedBlockingQueue
提供了多种出队操作,包括take
、poll
和remove
。其中,take
方法在队列空时会阻塞,直到队列有元素;poll
方法在队列空时会立即返回null
;remove
方法在队列空时会抛出NoSuchElementException
。
take
方法用于从队列头部取出元素。如果队列为空,则当前线程会被阻塞,直到队列有元素。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
poll
方法用于从队列头部取出元素。如果队列为空,则立即返回null
。
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
remove
方法用于从队列头部取出元素。如果队列为空,则抛出NoSuchElementException
。
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
LinkedBlockingQueue
还提供了其他一些方法,如size
、remainingCapacity
、contains
、clear
等。这些方法的实现相对简单,这里不再赘述。
LinkedBlockingQueue
通过分离锁机制(takeLock
和putLock
)来保证线程安全。生产者线程和消费者线程可以并发操作队列,而不会相互阻塞。只有在队列满或空时,才会阻塞相应的线程。
生产者线程通过putLock
来保证线程安全。当队列满时,生产者线程会被阻塞,直到队列有空闲空间。
消费者线程通过takeLock
来保证线程安全。当队列空时,消费者线程会被阻塞,直到队列有元素。
LinkedBlockingQueue
使用条件队列notFull
和notEmpty
来管理阻塞的线程。当队列满时,生产者线程会被放入notFull
条件队列中;当队列空时,消费者线程会被放入notEmpty
条件队列中。
LinkedBlockingQueue
的性能主要取决于以下几个方面:
LinkedBlockingQueue
使用了分离锁机制,生产者线程和消费者线程之间的锁竞争较少,因此在高并发场景下性能较好。LinkedBlockingQueue
基于链表实现,插入和删除操作的时间复杂度为O(1)。LinkedBlockingQueue
是一个高效、线程安全的阻塞队列实现,适用于生产者-消费者模型、任务调度和消息传递等场景。通过分离锁机制和条件队列,LinkedBlockingQueue
能够有效地减少锁竞争,提高并发性能。在实际应用中,开发者可以根据具体需求选择合适的队列容量,以平衡内存使用和性能。
通过对LinkedBlockingQueue
源码的深入分析,我们可以更好地理解其内部实现机制,从而在实际开发中更加灵活地使用这一强大的并发工具。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。