如何理解ArrayBlockingQueue的线程安全

发布时间:2021-10-09 14:15:41 作者:iii
来源:亿速云 阅读:235

本篇内容介绍了“如何理解ArrayBlockingQueue的线程安全”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

ArrayBlockingQueue的线程安全是通过底层的ReentrantLock保证的,因此在元素出入队列操作时,无需额外加锁。写一段简单的代码举个例子,从具体的使用来说明它的线程安全吧

ArrayBlockingQueue<integer> queue=new ArrayBlockingQueue(7,
        true, new ArrayList&lt;&gt;(Arrays.asList(new Integer[]{1,2,3,4,5,6,7})));

@AllArgsConstructor
class Task implements Runnable{
    String threadName;
    @Override
    public void run() {
        while(true) {
            try {
                System.out.println(threadName+" take: "+queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

private void queueTest(){
    new Thread(new Task("Thread 1")).start();
    new Thread(new Task("Thread 2")).start();
}

在代码中创建队列时就往里放入了7个元素,然后创建两个线程各自从队列中取出元素。对队列的操作也非常简单,只用到了操作队列中出队方法take,运行结果如下:

Thread 1 take: 1
Thread 2 take: 2
Thread 1 take: 3
Thread 2 take: 4
Thread 1 take: 5
Thread 2 take: 6
Thread 1 take: 7

可以看到在公平模式下,两个线程交替对队列中的元素执行出队操作,并没有出现重复取出的情况,即保证了多个线程对资源竞争的互斥访问。它的过程如下:

如何理解ArrayBlockingQueue的线程安全

面试官:那它的阻塞性呢?

Hydra:好的,还是写段代码通过例子来说明

private static void queueTest() throws InterruptedException {
    ArrayBlockingQueue<integer> queue=new ArrayBlockingQueue&lt;&gt;(3);
    int size=7;
    Thread putThread=new Thread(()-&gt;{
        for (int i = 0; i <size ; i++) { try queue.put(i); system.out.println("putthread put: "+i+" - size:"+queue.size()); thread.sleep(1000); } catch (interruptedexception e) e.printstacktrace(); }); thread takethread="new" thread(()> {
        for (int i = 0; i &lt; size+1 ; i++) {
            try {
                Thread.sleep(3000);
                System.out.println("TakeThread take: "+queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    putThread.start();
    Thread.sleep(1000);
    takeThread.start();
}

和第一个例子中的代码不同,这次我们创建队列时只指定长度,并不在初始化时就往队列中放入元素。接下来创建两个线程,一个线程充当生产者,生产产品放入到队列中,另一个线程充当消费者,消费队列中的产品。需要注意生产和消费的速度是不同的,生产者每一秒生产一个,而消费者每三秒才消费一个。执行上面的代码,运行结果如下:

PutThread put: 0 - Size:1
PutThread put: 1 - Size:2
PutThread put: 2 - Size:3
TakeThread take: 0
PutThread put: 3 - Size:3
TakeThread take: 1
PutThread put: 4 - Size:3
TakeThread take: 2
PutThread put: 5 - Size:3
TakeThread take: 3
PutThread put: 6 - Size:3
TakeThread take: 4
TakeThread take: 5
TakeThread take: 6

来给你画个比较直观的图吧:

如何理解ArrayBlockingQueue的线程安全

分析运行结果,能够在两个方面体现出队列的阻塞性:

面试官:你只会用puttake方法吗,能不能讲讲其他的方法?

Hydra:方法太多了,简单概括一下插入和移除相关的操作吧

如何理解ArrayBlockingQueue的线程安全

面试官:方法记得还挺清楚,看样子是个合格的 API caller。下面说说原理吧,先讲一下ArrayBlockingQueue 的结构

Hydra:在ArrayBlockingQueue 中有下面四个比较重要的属性

final Object[] items;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity &lt;= 0) throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

在构造函数中对它们进行了初始化:

Condition是一个接口,代码中的notFullnotEmpty实例化的是AQS的内部类ConditionObject,它的内部是由AQS中的Node组成的等待链,ConditionObject中有一个头节点firstWaiter和尾节点lastWaiter,并且每一个Node都有指向相邻节点的指针。简单的来说,它的结构是下面这样的:

如何理解ArrayBlockingQueue的线程安全

至于它的作用先卖个关子,放在后面讲。除此之外,还有两个int类型的属性takeIndexputIndex,表示获取元素的索引位置和插入元素的索引位置。假设一个长度为5的队列中已经有了3个元素,那么它的结构是这样的:

如何理解ArrayBlockingQueue的线程安全

面试官:说一下队列的插入操作吧

Hydra:好的,那我们先说addoffer方法,在执行add方法时,调用了其父类AbstractQueue中的add方法。add方法则调用了offer方法,如果添加成功返回true,添加失败时抛出异常,看一下源码:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

public boolean offer(E e) {
    checkNotNull(e);//检查元素非空
    final ReentrantLock lock = this.lock; //获取锁并加锁
    lock.lock();
    try {
        if (count == items.length)//队列已满
            return false;
        else {
            enqueue(e);//入队
            return true;
        }
    } finally {
        lock.unlock();
    }
}

实际将元素加入队列的核心方法enqueue

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x; 
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

enqueue中,首先将元素放入数组中下标为putIndex的位置,然后对putIndex自增,并判断是否已处于队列中最后一个位置,如果putIndex索引位置等于数组的长度时,那么将putIndex置为0,即下一次在元素入队时,从队列头开始放置。

举个例子,假设有一个长度为5的队列,现在已经有4个元素,我们进行下面一系列的操作,来看一下索引下标的变化:

如何理解ArrayBlockingQueue的线程安全

上面这个例子提前用到了队列中元素被移除时takeIndex会自增的知识点,通过这个例子中索引的变化,可以看出ArrayBlockingQueue就是一个循环队列,takeIndex就相当于队列的头指针,而putIndex相当于队列的尾指针的下一个位置索引。并且这里不需要担心在队列已满时还会继续向队列中添加元素,因为在offer方法中会首先判断队列是否已满,只有在队列不满时才会执行enqueue方法。

面试官:这个过程我明白了,那enqueue方法里最后的notEmpty.signal()是什么意思?

Hydra:这是一个唤醒操作,等后面讲完它的挂起后再说。我还是先把插入操作中的put方讲完吧,看一下它的源码:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

put方法是一个阻塞方法,当队列中元素未满时,会直接调用enqueue方法将元素加入队列中。如果队列已满,就会调用notFull.await()方法将挂起当前线程,直到队列不满时才会被唤醒,继续执行插入操作。

当队列已满,再执行put操作时,就会执行下面的流程:

如何理解ArrayBlockingQueue的线程安全

这里提前剧透一下,当队列中有元素被移除,在调用dequeue方法中的notFull.signal()时,会唤醒等待队列中的线程,并把对应的元素添加到队列中,流程如下:

如何理解ArrayBlockingQueue的线程安全

做一个总结,在插入元素的几个方法中,addoffer以及带有超时的offer方法都是非阻塞的,会立即返回或超时后立即返回,而put方法是阻塞的,只有当队列不满添加成功后才会被返回。

面试官:讲的不错,讲完插入操作了再讲讲移除操作吧

Hydra:还是老规矩,先说非阻塞的方法removepoll,父类的remove方法还是会调用子类的poll方法,不同的是remove方法在队列为空时抛出异常,而poll会直接返回null。这两个方法的核心还是调用的dequeue方法,它的源码如下:

private E dequeue() {
    final Object[] items = this.items;
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        //更新迭代器中的元素
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

dequeue中,在获取到数组下标为takeIndex的元素,并将该位置置为null。将takeIndex自增后判断是否与数组长度相等,如果相等还是按之前循环队列的理论,将它的索引置为0,并将队列的中的计数减1。

有一个队列初始化时有5个元素,我们对齐分别进行5次的出队操作,查看索引下标的变化情况:

如何理解ArrayBlockingQueue的线程安全

然后我们还是结合take方法来说明线程的挂起和唤醒的操作,与put方法相对,take用于阻塞获取元素,来看一下它的源码:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

take是一个可以被中断的阻塞获取元素的方法,首先判断队列是否为空,如果队列不为空那么就调用dequeue方法移除元素,如果队列为空时就调用notEmpty.await()就将当前线程挂起,直到有其他的线程调用了enqueue方法,才会唤醒等待队列中被挂起的线程。可以参考下面的图来理解:

如何理解ArrayBlockingQueue的线程安全

当有其他线程向队列中插入元素后:

如何理解ArrayBlockingQueue的线程安全

入队的enqueue方法会调用notEmpty.signal(),唤醒等待队列中firstWaiter指向的节中的线程,并且该线程会调用dequeue完成元素的出队操作。到这移除的操作就也分析完了,至于开头为什么说ArrayBlockingQueue是线程安全的,看到每个方法前都通过全局单例的lock加锁,相信你也应该明白了

“如何理解ArrayBlockingQueue的线程安全”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

推荐阅读:
  1. 怎么理解C#中Queue的线程安全问题
  2. ArrayBlockingQueue 1.8 源码浅析

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

arrayblockingqueue

上一篇:mysql优化通常使用的方法有哪些

下一篇:数据库调优中如何解决like ’%str’ 时索引不被使用

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》