BlockingQueue和ArrayBlockingQueue

内容纲要

BlockingQueue

BlockingQueue是一个支持在检索元素时等待队列变得非空,并在存储元素时等待队列中有空间可用的队列。

BlockingQueue 方法有四种形式,处理无法立即满足但可能在将来某个时刻可以满足的操作的方式各不相同:
1、抛出异常
2、返回特殊值(根据操作的不同,可能是 null 或 false)
3、无限期地阻塞当前线程,直到操作可以成功
4、在放弃之前仅阻塞给定的最长时间限制。

BlockingQueue 不接受空元素。使用 null 尝试添加、放置或提供空元素时,实现会抛出NullPointerException。null 用作哨兵值,指示 poll 操作失败。

BlockingQueue 可能具有容量限制。在任何给定时间,它可能有一个 remainingCapacity,超出该容量则无法再添加额外元素而不发生阻塞。没有任何内在容量限制的 BlockingQueue 总是报告 Integer.MAX_VALUE 的剩余容量。

BlockingQueue 的实现主要用于生产者-消费者队列,但还支持 Collection 接口。因此,例如,可以使用 remove(x) 从队列中删除任意元素。然而,此类操作通常效率不高,仅用于偶尔使用,例如在取消排队消息时。 BlockingQueue 的实现是线程安全的。所有排队方法都使用内部锁或其他形式的并发控制以原子方式实现其效果。然而,批量 Collection 操作 addAll、containsAll、retainAll 和 removeAll 如果没有在某个实现中另行指定,不一定是原子操作。因此,例如,在仅添加了部分元素的情况下,addAll(c) 可能会失败(抛出异常)。

BlockingQueue 并不固有地支持任何形式的“close”或“shutdown”操作,以指示不会再添加更多项。此类功能的需求和使用往往取决于具体实现。例如,常见做法是由生产者插入特殊的end-of-stream或poison对象,消费者在取出时予以相应解释。

内存一致性影响:与其他并发集合一样,将对象放入 BlockingQueue 之前线程中的操作 happens-before 在另一个线程中访问或移除该元素的操作之后的某些操作。

package java.util.concurrent;

public interface BlockingQueue<E> extends Queue<E> {
  /**
  * 如果可以立即在不违反容量限制的情况下将指定的元素插入此队列,则返回 true 表示成功,并在当前没有可用空间时抛出 IllegalStateException。在使用容量受限队列时,通常更倾向于使用 offer。
  * params: e,插入的元素
  * returns: true
  **/
  boolean add(E e);

  /**
  * 如果可以立即在不违反容量限制的情况下将指定的元素插入此队列,则返回 true 表示成功;如果当前没有可用空间,则返回 false。在使用容量受限队列时,通常更倾向于使用此方法,而不是 add 方法,因为 add 方法只能通过抛出异常来表示无法插入元素。
  * params: 添加的元素
  * returns: 如果添加成功返回true,否则false
  **/
  boolean offer(E e);

  /**
  * 将指定的元素插入到此队列中,必要时等待空间变得可用。
  * params: 添加的元素
  **/
  void put(E e) throws InterruptedException;

  /**
  * 若有必要,将指定的元素插入到此队列中,并等待最多指定的时间,直至空间变得可用。
  * params:
  * e - 添加的元素
  * timeout - 在放弃之前要等待多长时间,使用
  * unit - timeout参数指定的时间单位
  * returns: 如果成功,则返回 true;如果在指定的等待时间内未能获得空间,则返回 false
  **/
  boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

  /**
  * 检索并移除此队列的头部元素,必要时等待直到有元素变得可用。
  * returns: 此队列的头部元素
  **/
  E take() throws InterruptedException;

  /**
  * 如果需要,检索并移除此队列的头部元素,并等待最多指定的等待时间直到有元素变得可用。
  * params:
  * timeout - 在放弃之前要等待多长时间
  * unit - timeout参数指定的时间单位
  * returns: 如果在指定的等待时间内未能获得元素,则返回此队列的头部元素;如果指定的等待时间已过且仍未获得元素,则返回 null。
  **/
  E poll(long timeout, TimeUnit unit) throws InterruptedException;

  /**
  * 返回此队列在没有内存或资源限制的情况下理想情况下可以接受的额外元素数量,如果没有固有限制,则返回 Integer.MAX_VALUE。 请注意,不能仅通过检查 remainingCapacity 来确定插入元素的尝试是否会成功,因为可能有另一个线程正要插入或移除元素。
  * returns:剩余容量
  **/
  int remainingCapacity();

  /**
  * 如果存在的话,从队列中移除指定元素的一个实例。更正式地说,移除满足 o.equals(e) 的一个元素 e(如果此队列包含一个或多个这样的元素)。如果此队列包含指定的元素(或等效地,如果调用导致此队列发生更改),则返回 true。
  * parmas:o - 如果存在的话,要从此队列中移除的元素
  * returns: 如果调用导致此队列发生更改,则返回 true
  **/
  boolean remove(Object o);

  /**
  * 当且仅当此队列包含至少一个满足 o.equals(e) 的元素 e 时,返回 true。
  * params: o - 要在此队列中检查包含情况的对象
  * returns: 如果此队列包含指定的元素,则返回 true
  **/
  public boolean contains(Object o);

  /**
  * 将此队列中所有可用元素移除并添加到给定的集合中。与重复轮询此队列相比,此操作可能更有效率。在尝试向集合 c 添加元素时遇到失败可能导致相关异常被抛出时,元素可能在两个集合中都不在,或者只在其中一个集合中。尝试将队列耗尽到自身会导致 IllegalArgumentException。此外,如果在操作进行中修改了指定的集合,则此操作的行为是未定义的。
  * params: c - 要传输元素的集合
  * returns: 传输的元素数量
  **/
  int drainTo(Collection<? super E> c);

  /**
  *将队列中最多给定数量的可用元素移除,并将它们添加到给定的集合中。在尝试将元素添加到集合 c 时遇到失败可能会导致相关异常抛出时元素既不在集合中,也可能在集合中,甚至两者都有。尝试将队列排空到自身将导致 IllegalArgumentException。此外,在操作进行时修改指定的集合将导致此操作的行为不确定。
  * params: c - 要转移元素的集合 maxElements - 要转移的最大元素数量
  * returns: 转移的元素数量
  **/
  int drainTo(Collection<? super E> c, int maxElements);
}

BlockingQueue方法总结

ArrayBlockingQueue

put

// 在队列的尾部插入指定的元素,如果队列已满,则等待空间变为可用。
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private static void checkNotNull(Object v) {
    if (v == null)
        throw new NullPointerException();
}

这段代码实现了向队列尾部插入指定元素的操作。首先会检查元素是否为null,然后使用ReentrantLock获取锁。在获取锁之后,通过while循环判断队列是否已满,如果队列已满,则调用notFull.await()方法进行等待,直到队列有空闲空间。最后使用enqueue方法将元素插入队列,最终释放锁。

需要注意的是,该方法在获取锁的过程中可以响应中断,而在遇到中断时会释放锁并抛出InterruptedException异常。

take

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

/**
* 提取当前取出位置的元素,使位置前进,并发出信号。仅在持有锁时调用。
*/
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

这是一个Java代码段,它实现了阻塞式队列的take方法。在队列为空时,该方法会阻塞等待直到队列中有元素可取。当队列不为空时,它会从队列中取出一个元素并返回。

这段代码使用了ReentrantLock来实现锁机制,首先获取队列的ReentrantLock对象,然后使用lockInterruptibly方法获取锁,该方法在获取锁的过程中可以响应中断。接着通过while循环判断队列是否为空,如果为空则调用notEmpty.await()方法进行等待,直到队列不为空。最后使用dequeue方法取出元素并返回,在finally块中释放锁。

需要注意的是,该方法在遇到中断时会抛出 InterruptedException 异常。

Leave a Comment

您的电子邮箱地址不会被公开。 必填项已用*标注

close
arrow_upward