public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列满了, 写进入等待
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列空的, 读进入等待
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
因为BlockingQueue在put take等操作有锁, 因此非高性能容器,
如果需要高并发支持的队列, 则可以使用ConcurrentLinkedQueue. 他内部也是运用了大量无锁操作.
CopyOnWriteArrayListCopyOnWriteArrayList通过在新增元素时, 复制一份新的数组出来, 并在其中写入数据, 之后将原数组引用指向到新数组.
其Add操作是在内部通过ReentrantLock进行锁保护, 防止多线程场景复制多份数组.
而Read操作内部无锁, 直接返回数组引用, 并发下效率高, 因此适用于读多写少的场景.
源码
public boolean add(E e) {
final ReentrantLock lock = this.lock;
// 写数据的锁
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 复制到新的数组
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 加入新元素
newElements[len] = e;
// 修改引用
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
final void setArray(Object[] a) {
array = a;
}
// 读的时候无锁
public E get(int index) {
return get(getArray(), index);
}
示例
使用10个读线程, 100个写线程. 如果使用ArrayList实现, 那么有可能是在运行过程中抛出ConcurrentModificationException.
原因很简单, ArrayList在遍历的时候会check modCount是否发生变化, 如果一边读一边写就会抛异常.
public class CopyOnWriteListDemo {
static List<UUID> list = new CopyOnWriteArrayList<UUID>();
// static List<UUID> list = new ArrayList<UUID>();
// 往list中写数据
public static class AddThread implements Runnable {
@Override
public void run() {
UUID uuid = UUID.randomUUID();
list.add(uuid);
System.out.println("++Add uuid : " + uuid);
}
}
// 从list中读数据
public static class ReadThread implements Runnable {
@Override
public void run() {
System.out.println("start read size: " + list.size() + " thread : " + Thread.currentThread().getName());
for (UUID uuid : list) {
System.out.println("Read uuid : " + uuid + " size : " + list.size() + "thread: " + Thread.currentThread().getName());
}
}
}