|
您目前处于:Development
2017-06-26
|
系列文章:什么是阻塞队列 BlockingQueue 队列是一种数据结构,它的特点是先进先出(First In First Out),它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素。队列在多线程应用中,常用于生产-消费场景。
BlockingQueue 是 Java util.concurrent 包下重要的数据结构,BlockingQueue 提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于 BlockingQueue 实现的。 BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
BlockingQueue 是个接口,你需要使用它的实现之一来使用 BlockingQueue,Java.util.concurrent 包下具有以下 BlockingQueue 接口的实现类:
下面用 BlockQueue 技术来实现一下: /** 定义一个盘子类,可以放鸡蛋和取鸡蛋 */
public class BigPlate {
/** 装鸡蛋的盘子,大小为5 */
private BlockingQueue<Object> eggs = new ArrayBlockingQueue<Object>(5);
/** 放鸡蛋 */
public void putEgg(Object egg) {
try {
eggs.put(egg);// 向盘子末尾放一个鸡蛋,如果盘子满了,当前线程阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
// 下面输出有时不准确,因为与put操作不是一个原子操作
System.out.println("放入鸡蛋");
}
/** 取鸡蛋 */
public Object getEgg() {
Object egg = null;
try {
egg = eggs.take();// 从盘子开始取一个鸡蛋,如果盘子空了,当前线程阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
// 下面输出有时不准确,因为与take操作不是一个原子操作
System.out.println("拿到鸡蛋");
return egg;
}
/** 放鸡蛋线程 */
static class AddThread extends Thread {
private BigPlate plate;
private Object egg = new Object();
public AddThread(BigPlate plate) {
this.plate = plate;
}
public void run() {
plate.putEgg(egg);
}
}
/** 取鸡蛋线程 */
static class GetThread extends Thread {
private BigPlate plate;
public GetThread(BigPlate plate) {
this.plate = plate;
}
public void run() {
plate.getEgg();
}
}
public static void main(String[] args) {
BigPlate plate = new BigPlate();
// 先启动10个放鸡蛋线程
for(int i = 0; i < 10; i++) {
new Thread(new AddThread(plate)).start();
}
// 再启动10个取鸡蛋线程
for(int i = 0; i < 10; i++) {
new Thread(new GetThread(plate)).start();
}
}
}利用 Condition 来实现阻塞队列 Java 1.5 之后新增了显式锁的接口 java.util.concurrent.locks.Lock 接口,同样提供了显式的条件接口 Condition,并对条件队列进行了增强。 Condition 对象可以提供和 Object 的 wait 和 notify 一样的行为,但是后者必须使用 synchronized 这个内置的monitor锁,而 Condition 使用的是 RenentranceLock 。这两种方式在阻塞等待时都会将相应的锁释放掉,但是 Condition 的等待可以中断,这是二者唯一的区别。 下面就用 Condition 技术来实现一下: class Buffer {
final Lock lock = new ReentrantLock(); //定义一个锁
final Condition notFull = lock.newCondition(); //定义阻塞队列满了的Condition
final Condition notEmpty = lock.newCondition();//定义阻塞队列空了的Condition
final Object[] items = new Object[10]; //为了下面模拟,设置阻塞队列的大小为10,不要设太大
int putptr, takeptr, count; //数组下标,用来标定位置的
//往队列中存数据
public void put(Object x) throws InterruptedException {
lock.lock(); //上锁
try {
while (count == items.length) {
System.out.println(Thread.currentThread().getName() + " 被阻塞了,暂时无法存数据!");
notFull.await(); //如果队列满了,那么阻塞存数据这个线程,等待被唤醒
}
//如果没满,按顺序往数组中存
items[putptr] = x;
if (++putptr == items.length) //这是到达数组末端的判断,如果到了,再回到始端
putptr = 0;
++count; //消息数量
System.out.println(Thread.currentThread().getName() + " 存好了值: " + x);
notEmpty.signal(); //好了,现在队列中有数据了,唤醒队列空的那个线程,可以取数据啦
} finally {
lock.unlock(); //放锁
}
}
//从队列中取数据
public Object take() throws InterruptedException {
lock.lock(); //上锁
try {
while (count == 0) {
System.out.println(Thread.currentThread().getName() + " 被阻塞了,暂时无法取数据!");
notEmpty.await(); //如果队列是空,那么阻塞取数据这个线程,等待被唤醒
}
//如果没空,按顺序从数组中取
Object x = items[takeptr];
if (++takeptr == items.length) //判断是否到达末端,如果到了,再回到始端
takeptr = 0;
--count; //消息数量
System.out.println(Thread.currentThread().getName() + " 取出了值: " + x);
notFull.signal(); //好了,现在队列中有位置了,唤醒队列满的那个线程,可以存数据啦
return x;
} finally {
lock.unlock(); //放锁
}
}
}转载请并标注: “本文转载自 linkedkeeper.com ” ©著作权归作者所有 |