您好,登录后才能下订单哦!
[TOC]
LinkedBlockingQueue 是一个用链表实现的有界阻塞队列;此队列的默认和最大长度为Integer.MAX_VALUE;此队列按照先进先出的原则对元素就行排序;队列有两个锁,生成和消费各一把锁,都是默认的非公平锁。
static class Node<E> {
// 我们插入的值
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
// 下一个node
Node<E> next;
Node(E x) { item = x; }
}
/** 队列容量 */
private final int capacity;
/** 两个锁,需要使用AtomicInteger保证原子性 */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
// 头结点
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
// 尾节点
private transient Node<E> last;
/** Lock held by take, poll, etc */
/** take, poll, etc 的锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
/** 等待在队列空 */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
/** put, offer, etc的锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
/** 等待在队列满 */
private final Condition notFull = putLock.newCondition();
// 无参构造
public LinkedBlockingQueue() {
// 默认Integer.MAX_VALUE
this(Integer.MAX_VALUE);
}
// 有参构造
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 创建一个item为null的节点
last = head = new Node<E>(null);
}
public boolean offer(E e) {
// e不能为null
if (e == null) throw new NullPointerException();
// 总数
final AtomicInteger count = this.count;
// 总数等于了容量 返回false
if (count.get() == capacity)
return false;
int c = -1;
// 创建一个node
Node<E> node = new Node<E>(e);
// 获取锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
// 插入链表
enqueue(node);
// 加1返回旧值
c = count.getAndIncrement();
// c是增加之前的值,然后加1,再判断有没有可以存储的容量
if (c + 1 < capacity)
// 有唤醒下一个线程
notFull.signal();
}
} finally {
putLock.unlock();
}
// 队列有一个元素了,证明之前队列为空,可能已经有元素来消费了,所以就需要唤醒一个等待消费的线程
if (c == 0)
signalNotEmpty();
return c >= 0;
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
注意:offer 还有一个重载方法,支持中断,带有超时时间的限制offer(E e, long timeout, TimeUnit unit)。
public void put(E e) throws InterruptedException {
// 不可以为null
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
// 构建一个节点
Node<E> node = new Node<E>(e);
// 获取put锁
final ReentrantLock putLock = this.putLock;
// 获取count
final AtomicInteger count = this.count;
// 调用获取锁的方法,支持中断
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
// 等于了队列的容量
while (count.get() == capacity) {
// 进入阻塞队列
notFull.await();
}
// 入队
enqueue(node);
// 返回的是自增前的值
c = count.getAndIncrement();
// 如果这个元素入队以后,还有多于的空间,唤醒等待队列的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// c==0,证明之前队列是空的,唤醒一个获取线程
if (c == 0)
signalNotEmpty();
}
这次我们看个带超时时间的poll方法。
// 带超时时间的消费一个元素
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 支持中断的获取锁
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
// count-- 返回旧值
c = count.getAndDecrement();
// 还有元素,唤醒一个等待获取的线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 队列还有一个位置,唤醒一个入队线程
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC // 自引用
head = first;
E x = first.item;
first.item = null;
return x;
}
// 获取元素
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 队列为null 就阻塞
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 队列消费一个元素,可以唤醒一个生产线程了
if (c == capacity)
signalNotFull();
return x;
}
// 获取第一个元素
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
public int size() {
return count.get();
}
LinkedBlockingQueue 可以看做是一个×××队列,因为最大容量是Integer.MAX_VALUE,这已经很大了,所以使用时一定注意容量问题,避免内存溢出,但是好处就是可以不用我们去初始容量;队列在入队和出队使用了两把锁,提高了并发性,相对于一把锁来说;我们可以发现队列的底层数据结构采用的是链表,对比ArrayBlockingQueue的数组数据结构,在处理数据的同时,节点本身也需要处理垃圾回收,所以相对于数组来的数据来说增加了垃圾回收,可能影响性能;LinkedBlockingQueue 和ArrayBlockingQueue 两个可以对比学习,追求系统稳定性,性能就使用ArrayBlockingQueue ,追求并发性,可能发生大量请求时(系统不是很稳定)要注意内存溢出就使用LinkedBlockingQueue ,使用场景属于个人理解,欢迎指正。
《参考 Java 并发编程的艺术》
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。