JUC之ArrayBlockingQueue
1.初识
ArrayBlockingQueue的原理就是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm)。
ArrayBlockingQueue是一个带有长度的阻塞队列,初始化的时候必须要指定队列长度,且指定长度之后不允许进行修改。
2.属性
/** The queued items 存储队列元素的数组*/ final Object[] items;
/** items index for next take, poll, peek or remove 拿数据的索引,用于take,poll,peek,remove方法*/ int takeIndex;
/** items index for next put, offer, or add 放数据的索引*/ int putIndex;
/** Number of elements in the queue 元素个数*/ int count;
/* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */
/** Main lock guarding all access 可重入锁*/ final ReentrantLock lock;
/** Condition for waiting takes 条件对象*/ private final Condition notEmpty;
/** Condition for waiting puts 条件对象*/ private final Condition notFull;
3.常用API
添加数据add、offer、put
add()
public boolean add(E e) {
if (offer(e)) return true;
else throw new IllegalStateException("Queue full"); }
实际调用的是offer()方法:
public boolean offer(E e) { checkNotNull(e); // 不允许元素为空
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁,保证调用offer方法的时候只有1个线程进入
try { if (count == items.length) // 如果队列已满 return false; // 返回false,添加失败
else { insert(e); // 没满的话调用insert方法 return true; // 返回true,添加成功 } }
finally { lock.unlock(); // 释放锁,让其他线程可以调用offer方法 } }
insert()
private void insert(E x) { items[putIndex] = x; // 元素添加到数组里
putIndex = inc(putIndex); // 存放数据索引+1,当索引满了变成0
++count; // 元素个数+1 notEmpty.signal(); // 使用条件对象notEmpty通知,比如使用take方法的时候队列里没有数据,被阻塞。这个时候队列insert了一条数据,需要调用signal进行通知 }
put()
public void put(E e) throws InterruptedException {
checkNotNull(e); // 不允许元素为空
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加锁,保证调用put方法的时候只有1个线程
try { while (count == items.length) // 如果队列满了,阻塞当前线程,并加入到条件对象notFull的等待队列里 notFull.await(); // 线程阻塞并被挂起,同时释放锁
insert(e); // 调用insert方法 }
finally { lock.unlock(); // 释放锁,让其他线程可以调用put方法 } }
删除数据poll、take、remove
poll()
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); // 加锁,保证调用poll方法时只有1个线程进入
try { //可以灵活使用三目操作符
return (count == 0) ? null : extract(); // 如果队列里没元素了,返回null,否则调用extract方法 }
finally { lock.unlock(); // 释放锁,让其他线程可以调用poll方法 } }
上述中的extract():
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素
items[takeIndex] = null; // 对应取索引上的数据清空
takeIndex = inc(takeIndex); // 取数据索引+1,当索引满置0
--count; // 元素个数-1
notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知
return x; // 返回元素 }
take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加锁,保证调用take方法的时候只有1个线程 try {
while (count == 0) // 如果队列空,阻塞当前线程,并加入到条件对象notEmpty的等待里
notEmpty.await(); // 线程阻塞并被挂起,同时释放锁
return extract(); // 调用extract方法 }
finally { lock.unlock(); // 释放锁,让其他线程可以调用take方法 } }
4.小结
添加数据方法有add,put,offer这3个方法,总结如下:
1)add方法内部调用offer方法,如果队列满了,抛出IllegalStateException异常,否则返回true
2)offer方法如果队列满了,返回false,否则返回true
3)add方法和offer方法不会阻塞线程,put方法如果队列满了会阻塞线程,直到有线程消费了队列里的数据才有可能被唤醒。
这3个方法内部都会使用可重入锁保证原子性。
删除数据方法有poll,take,remove,这3个方法,这里只分析了两个,后续补上:
1)poll方法对于队列为空的情况,返回null,否则返回队列头部元素
2)remove方法取的元素是基于对象的下标值,删除成功返回true,否则返回false
3)poll方法和remove方法不会阻塞线程
4)take方法对于队列为空的情况,会阻塞并挂起当前线程,直到有数据加入到队列中
这3个方法内部都会调用notFull.signal方法通知正在等待队列满情况下的阻塞线程。
学无止境 天道酬勤
站在巨人的肩膀上能看的更远
本篇知识笔记参考:
1.jdk 8 源码
2.Demo见:
package top.dbwxd.juc.queue;
import java.util.concurrent.ArrayBlockingQueue;
/**
* @author xiaodongw Date: 2019-03-16 Time: 19:09
* @version $Id$
*/
public class ArrayBlockingQueueTest {
public static void main(String[] args) throws Exception {
System.out.println("begin");
ArrayBlockingQueue<String> abq = new ArrayBlockingQueue<>(10);
System.out.println("begin insert");
// insertBlocking();
// fetchBlocking();
testProducerConsumer(abq);
}
public static void insertBlocking() throws Exception {
ArrayBlockingQueue<String> names = new ArrayBlockingQueue<>(2);
names.put("a");
System.out.println("put a 执行完毕");
names.put("b");
System.out.println("put b 执行完毕");//执行不到 阻塞
System.out.println("insertBlocking ended====");
}
public static void fetchBlocking() throws Exception {
ArrayBlockingQueue<String> names = new ArrayBlockingQueue<>(2);
names.put("a");
System.out.println("put a end");
names.remove();
System.out.println("first time remove over");
names.remove(); //报错 java.util.NoSuchElementException
System.out.println("second time remove over");
names.put("b");
System.out.println("fetchBlocking end ===");
}
public static void testProducerConsumer(ArrayBlockingQueue<String> abq) {
Thread tConsumer = new Consumer(abq);
Thread tProducer = new Producer(abq);
tConsumer.start();
tProducer.start();
}
}