|
您目前处于:Development
2014-10-11
|
|
BlockingQueue的核心方法 boolean add(E e) // 把e添加到BlockingQueue里。如果BlockingQueue可以容纳,则返回true,否则抛出异常。 boolean offer(E e) // 表示如果可能的话,将e加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。 void put(E e) // 把e添加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续。 E poll(long timeout, TimeUnit unit) // 取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。 E take() // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,则调用此方法的线程被阻塞直到BlockingQueue有新的数据被加入。 int drainTo(Collection c) int drainTo(Collection c, int maxElements) // 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率,不需要多次分批加锁或释放锁。 ArrayBlockingQueue源码分析 ArrayBlockingQueue创建的时候需要指定容量capacity(可以存储的最大的元素个数,因为它不会自动扩容),其中一个构造方法为: public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}ArrayBlockingQueue类中定义的变量有: /** The queued items */ private final E[] items; /** items index for next take, poll or remove */ private int takeIndex; /** items index for next put, offer, or add. */ private int putIndex; /** Number of items in the queue */ private int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ private final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; 使用数组items来存储元素,由于是循环队列,使用takeIndex和putIndex来标记put和take的位置。可以看到,该类中只定义了一个锁ReentrantLock,定义两个Condition对象:notEmputy和notFull,分别用来对take和put操作进行所控制。 put(E e)方法的源码如下。进行put操作之前,必须获得锁并进行加锁操作,以保证线程安全性。加锁后,若发现队列已满,则调用notFull.await()方法,如当前线程陷入等待。直到其他线程take走某个元素后,会调用notFull.signal()方法来激活该线程。激活之后,继续下面的插入操作。 /**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
*/
public void put(E e) throws InterruptedException {
//不能存放 null 元素
if (e == null) throw new NullPointerException();
final E[] items = this.items; //数组队列
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
try {
//当队列满时,调用notFull.await()方法,使该线程阻塞。
//直到take掉某个元素后,调用notFull.signal()方法激活该线程。
while (count == items.length)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
//把元素 e 插入到队尾
insert(e);
} finally {
//解锁
lock.unlock();
}
}insert(E e) 方法如下: /**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void insert(E x) {
items[putIndex] = x;
//下标加1或者等于0
putIndex = inc(putIndex);
++count; //计数加1
//若有take()线程陷入阻塞,则该操作激活take()线程,继续进行取元素操作。
//若没有take()线程陷入阻塞,则该操作无意义。
notEmpty.signal();
}
/**
* Circularly increment i.
*/
final int inc(int i) {
//此处可以看到使用了循环队列
return (++i == items.length)? 0 : i;
}take()方法代码如下。take操作和put操作相反: public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //加锁
try {
try {
//当队列空时,调用notEmpty.await()方法,使该线程阻塞。
//直到take掉某个元素后,调用notEmpty.signal()方法激活该线程。
while (count == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
//取出队头元素
E x = extract();
return x;
} finally {
lock.unlock(); //解锁
}
}extract() 方法如下: /**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E extract() {
final E[] items = this.items;
E x = items[takeIndex];
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}LinkedBlockingQueue 源码分析 LinkedBlockingQueue 类中定义的变量有: /** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(0); /** Head of linked list */ private transient Node<E> head; /** Tail of linked list */ private transient Node<E> last; /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); 该类中定义了两个ReentrantLock锁:putLock和takeLock,分别用于put端和take端。也就是说,生成端和消费端各自独立拥有一把锁,避免了读(take)写(put)时互相竞争锁的情况。 /**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*/
public void put(E e) throws InterruptedException {
if (e == null)
throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); //加 putLock 锁
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from
* capacity. Similarly for all other uses of count in
* other wait guards.
*/
//当队列满时,调用notFull.await()方法释放锁,陷入等待状态。
//有两种情况会激活该线程
//第一、 某个put线程添加元素后,发现队列有空余,就调用notFull.signal()方法激活阻塞线程
//第二、 take线程取元素时,发现队列已满。则其取出元素后,也会调用notFull.signal()方法激活阻塞线程
while (count.get() == capacity) {
notFull.await();
}
// 把元素 e 添加到队列中(队尾)
enqueue(e);
c = count.getAndIncrement();
//发现队列未满,调用notFull.signal()激活阻塞的put线程(可能存在)
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
//队列空,说明已经有take线程陷入阻塞,故调用signalNotEmpty激活阻塞的take线程
signalNotEmpty();
}enqueue(E e)方法如下: /**
* Creates a node and links it at end of queue.
* @param x the item
*/
private void enqueue(E x) {
// assert putLock.isHeldByCurrentThread();
last = last.next = new Node<E>(x);
}take()方法代码如下。take操作和put操作相反: public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}dequeue()方法如下: /**
* Removes a node from head of queue.
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}ArrayBlockingQueue和LinkedBlockingQueue的区别 1. 队列中锁的实现不同 ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁; 2. 在生产或消费时操作不同 ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的; 3. 队列大小初始化方式不同 ArrayBlockingQueue实现的队列中必须指定队列的大小; 注意: 1. 在使用LinkedBlockingQueue时,若用默认大小且当生产速度大于消费速度时候,有可能会内存溢出; 转载请并标注: “本文转载自 linkedkeeper.com ” ©著作权归作者所有 |