ArrayBlockingQueue和LinkedBlockingQueue的区别
您目前处于:编程  2014-10-11

BlockingQueue的核心方法

boolean add(E e) // 把e添加到BlockingQueue里。如果BlockingQueue可以容纳,则返回true,否则抛出异常。
boolean offer(E e) // 表示如果可能的话,将e加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
void put(E e) // 把e添加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续。
E poll(long timeout, TimeUnit unit)  // 取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。
E take() // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,则调用此方法的线程被阻塞直到BlockingQueue有新的数据被加入。
int drainTo(Collection c)
int drainTo(Collection c, int maxElements)   // 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率,不需要多次分批加锁或释放锁。

ArrayBlockingQueue源码分析

ArrayBlockingQueue创建的时候需要指定容量capacity(可以存储的最大的元素个数,因为它不会自动扩容),其中一个构造方法为:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = (E[]) new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

ArrayBlockingQueue类中定义的变量有:

    /** The queued items  */
    private final E[] items;
    /** items index for next take, poll or remove */
    private int takeIndex;
    /** items index for next put, offer, or add. */
    private int putIndex;
    /** Number of items in the queue */
    private int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    private final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;

使用数组items来存储元素,由于是循环队列,使用takeIndex和putIndex来标记put和take的位置。可以看到,该类中只定义了一个锁ReentrantLock,定义两个Condition对象:notEmputy和notFull,分别用来对take和put操作进行所控制。

put(E e)方法的源码如下。进行put操作之前,必须获得锁并进行加锁操作,以保证线程安全性。加锁后,若发现队列已满,则调用notFull.await()方法,如当前线程陷入等待。直到其他线程take走某个元素后,会调用notFull.signal()方法来激活该线程。激活之后,继续下面的插入操作。

/**
  * Inserts the specified element at the tail of this queue, waiting
  * for space to become available if the queue is full.
  *
  */
public void put(E e) throws InterruptedException {
    //不能存放 null  元素
    if (e == null) throw new NullPointerException();
    final E[] items = this.items;	//数组队列
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lockInterruptibly();
    try {
        try {
            //当队列满时,调用notFull.await()方法,使该线程阻塞。
            //直到take掉某个元素后,调用notFull.signal()方法激活该线程。
            while (count == items.length)
                notFull.await();
        } catch (InterruptedException ie) {
            notFull.signal(); // propagate to non-interrupted thread
            throw ie;
        }
        //把元素 e 插入到队尾
        insert(e);
    } finally {
        //解锁
        lock.unlock();
    }
}

insert(E e) 方法如下:

/**
  * Inserts element at current put position, advances, and signals.
  * Call only when holding lock.
  */
private void insert(E x) {
    items[putIndex] = x;  
    //下标加1或者等于0
    putIndex = inc(putIndex);
    ++count;  //计数加1
    //若有take()线程陷入阻塞,则该操作激活take()线程,继续进行取元素操作。
    //若没有take()线程陷入阻塞,则该操作无意义。
    notEmpty.signal();
}
	
/**
  * Circularly increment i.
  */
final int inc(int i) {
    //此处可以看到使用了循环队列
    return (++i == items.length)? 0 : i;
}

take()方法代码如下。take操作和put操作相反:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();  //加锁
    try {
        try {
            //当队列空时,调用notEmpty.await()方法,使该线程阻塞。
            //直到take掉某个元素后,调用notEmpty.signal()方法激活该线程。
            while (count == 0)
                notEmpty.await();
        } catch (InterruptedException ie) {
            notEmpty.signal(); // propagate to non-interrupted thread
            throw ie;
        }
        //取出队头元素
        E x = extract();
        return x;
    } finally {
        lock.unlock();	//解锁
    }
}

extract() 方法如下:

/**
  * Extracts element at current take position, advances, and signals.
  * Call only when holding lock.
  */
private E extract() {
    final E[] items = this.items;
    E x = items[takeIndex];
    items[takeIndex] = null;
    takeIndex = inc(takeIndex);
    --count;
    notFull.signal();
    return x;
}

LinkedBlockingQueue 源码分析

LinkedBlockingQueue 类中定义的变量有:

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger(0);

    /** Head of linked list */
    private transient Node<E> head;

    /** Tail of linked list */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

该类中定义了两个ReentrantLock锁:putLock和takeLock,分别用于put端和take端。也就是说,生成端和消费端各自独立拥有一把锁,避免了读(take)写(put)时互相竞争锁的情况。

/**
  * Inserts the specified element at the tail of this queue, waiting if
  * necessary for space to become available.
  */
public void put(E e) throws InterruptedException {
    if (e == null)
        throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly(); //加 putLock 锁
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from
         * capacity. Similarly for all other uses of count in
         * other wait guards.
         */
        //当队列满时,调用notFull.await()方法释放锁,陷入等待状态。
        //有两种情况会激活该线程
        //第一、 某个put线程添加元素后,发现队列有空余,就调用notFull.signal()方法激活阻塞线程
        //第二、 take线程取元素时,发现队列已满。则其取出元素后,也会调用notFull.signal()方法激活阻塞线程
        while (count.get() == capacity) { 
            notFull.await();
        }
        // 把元素 e 添加到队列中(队尾)
        enqueue(e);
        c = count.getAndIncrement();
        //发现队列未满,调用notFull.signal()激活阻塞的put线程(可能存在)
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        //队列空,说明已经有take线程陷入阻塞,故调用signalNotEmpty激活阻塞的take线程
        signalNotEmpty();
}

enqueue(E e)方法如下:

/**
 * Creates a node and links it at end of queue.
 * @param x the item
 */
private void enqueue(E x) {
    // assert putLock.isHeldByCurrentThread();
    last = last.next = new Node<E>(x);
}

take()方法代码如下。take操作和put操作相反:

 public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

dequeue()方法如下:

/**
  * Removes a node from head of queue.
  * @return the node
  */
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

ArrayBlockingQueue和LinkedBlockingQueue的区别

1. 队列中锁的实现不同

ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁;
LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock。

2. 在生产或消费时操作不同

ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的;
LinkedBlockingQueue实现的队列中在生产和消费的时候,需要把枚举对象转换为Node<E>进行插入或移除,会影响性能。

3. 队列大小初始化方式不同

ArrayBlockingQueue实现的队列中必须指定队列的大小;
LinkedBlockingQueue实现的队列中可以不指定队列的大小,但是默认是Integer.MAX_VALUE。

注意:

1. 在使用LinkedBlockingQueue时,若用默认大小且当生产速度大于消费速度时候,有可能会内存溢出;
2. 在使用ArrayBlockingQueue和LinkedBlockingQueue分别对1000000个简单字符做入队操作时,LinkedBlockingQueue的消耗是ArrayBlockingQueue消耗的10倍左右,即LinkedBlockingQueue消耗在1500毫秒左右,而ArrayBlockingQueue只需150毫秒左右。


转载请并标注: “本文转载自 linkedkeeper.com ”