GVKun编程网logo

Java高并发BlockingQueue重要的实现类详解(java高并发解决方案)

12

在本文中,您将会了解到关于Java高并发BlockingQueue重要的实现类详解的新资讯,同时我们还将为您解释java高并发解决方案的相关在本文中,我们将带你探索Java高并发BlockingQue

在本文中,您将会了解到关于Java高并发BlockingQueue重要的实现类详解的新资讯,同时我们还将为您解释java高并发解决方案的相关在本文中,我们将带你探索Java高并发BlockingQueue重要的实现类详解的奥秘,分析java高并发解决方案的特点,并给出一些关于(java多线程与并发)java并发库中的阻塞队列--BlockingQueue、20. 并发容器之 ArrayBlockingQueue 和 LinkedBlockingQueue、20. 并发容器之 ArrayBlockingQueue 和 LinkedBlockingQueue 实现原理详解、ArrayBlockingQueue 与 LinkedBlockingQueue 比较分析的实用技巧。

本文目录一览:

Java高并发BlockingQueue重要的实现类详解(java高并发解决方案)

Java高并发BlockingQueue重要的实现类详解(java高并发解决方案)

这篇文章主要给大家介绍了关于Java高并发BlockingQueue重要的实现类的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

ArrayBlockingQueue

有界的阻塞队列,内部是一个数组,有边界的意思是:容量是有限的,必须进行初始化,指定它的容量大小,以先进先出的方式存储数据,最新插入的在对尾,最先移除的对象在头部。public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { /** 队列元素 */ final Object[] items; /** 下一次读取操作的位置, poll, peek or remove */ int takeIndex; /** 下一次写入操作的位置, offer, or add */ int putIndex; /** 元素数量 */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. * 它采用一个 reentrantlock 和相应的两个 Condition 来实现。 */ /** Main lock guarding all access */ final reentrantlock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; /** 指定大小 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** * 指定容量大小与指定访问策略 * @param fair 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁; */ public ArrayBlockingQueue(int capacity, boolean fair) {} /** * 指定容量大小、指定访问策略与最初包含给定集合中的元素 * @param c 将此集合中的元素在构造方法期间就先添加到队列中 */ public ArrayBlockingQueue(int capacity, boolean fair, Collection extends E> c) {} }ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用一个锁对象,由此也意味着两者无法真正并行运行。按照实现原理来分析, ArrayBlockingQueue 完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。然而事实上并没有如此,因为 ArrayBlockingQueue 的数据写入已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。通过构造函数得知,参数 fair 控制对象内部是否采用公平锁,默认采用非公平锁。items、takeIndex、putIndex、count 等属性并没有使用 volatile 修饰,这是因为访问这些变量(通过方法获取)使用都在锁内,并不存在可见性问题,如 size() 。另外有个独占锁 lock 用来对出入对操作加锁,这导致同时只有一个线程可以访问入队出队。

Put 源码分析

/** 进行入队操作 */ public void put(E e) throws InterruptedException { //e为null,则抛出NullPointerException异常 checkNotNull(e); //获取独占锁 final reentrantlock lock = this.lock; /** * lockInterruptibly() * 获取锁定,除非当前线程为interrupted * 如果锁没有被另一个线程占用并且立即返回,则将锁定计数设置为1。 * 如果当前线程已经保存此锁,则保持计数将递增1,该方法立即返回。 * 如果锁被另一个线程保持,则当前线程将被禁用以进行线程调度,并且处于休眠状态 * */ lock.lockInterruptibly(); try { //空队列 while (count == items.length) //进行条件等待处理 notFull.await(); //入队操作 enqueue(e); } finally { //释放锁 lock.unlock(); } } /** 真正的入队 */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; //获取当前元素 final Object[] items = this.items; //按下一个插入索引进行元素添加 items[putIndex] = x; // 计算下一个元素应该存放的下标,可以理解为循环队列 if (++putIndex == items.length) putIndex = 0; count++; //唤起消费者 notEmpty.signal(); }

这里由于在操作共享变量前加了锁,所以不存在内存不可见问题,加锁后获取的共享变量都是从主内存中获取的,而不是在cpu缓存或者寄存器里面的值,释放锁后修改的共享变量值会刷新到主内存。

另外这个队列使用循环数组实现,所以在计算下一个元素存放下标时候有些特殊。另外 insert 后调用 notEmpty.signal() ;是为了激活调用 notEmpty.await(); 阻塞后放入 notEmpty 条件队列的线程。

Take 源码分析

public E take() throws InterruptedException { final reentrantlock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; //这里有些特殊 if (itrs != null) //保持队列中的元素和迭代器的元素一致 itrs.elementDequeued(); notFull.signal(); return x; }

Take 操作和 Put 操作很类似

//该类的迭代器,所有的迭代器共享数据,队列改变会影响所有的迭代器 transient Itrs itrs = null; //其存放了目前所创建的所有迭代器。 /** * 迭代器和它们的队列之间的共享数据,允许队列元素被删除时更新迭代器的修改。 */ class Itrs { void elementDequeued() { // assert lock.getHoldCount() == 1; if (count == 0) //队列中数量为0的时候,队列就是空的,会将所有迭代器进行清理并移除 queueIsEmpty(); //takeIndex的下标是0,意味着队列从尾中取完了,又回到头部获取 else if (takeIndex == 0) takeIndexWrapped(); } /** * 当队列为空的时候做的事情 * 1. 通知所有迭代器队列已经为空 * 2. 清空所有的弱引用,并且将迭代器置空 */ void queueIsEmpty() {} /** * 将takeIndex包装成0 * 并且通知所有的迭代器,并且删除已经过期的任何对象(个人理解是置空对象) * 也直接的说就是在Blocking队列进行出队的时候,进行迭代器中的数据同步,保持队列中的元素和迭代器的元素是一致的。 */ void takeIndexWrapped() {} }

Itrs迭代器创建的时机

//从这里知道,在ArrayBlockingQueue对象中调用此方法,才会生成这个对象 //那么就可以理解为,只要并未调用此方法,则ArrayBlockingQueue对象中的Itrs对象则为空 public Iterator iterator() { return new Itr(); } private class Itr implements Iterator { Itr() { //这里就是生产它的地方 //count等于0的时候,创建的这个迭代器是个无用的迭代器,可以直接移除,进入detach模式。 //否则就把当前队列的读取位置给迭代器当做下一个元素,cursor存储下个元素的位置。 if (count == 0) { // assert itrs == null; cursor = NONE; nextIndex = NONE; prevTakeIndex = DETACHED; } else { final int takeIndex = ArrayBlockingQueue.this.takeIndex; prevTakeIndex = takeIndex; nextItem = itemAt(nextIndex = takeIndex); cursor = incCursor(takeIndex); if (itrs == null) { itrs = new Itrs(this); } else { itrs.register(this); // in this order itrs.doSomeSweeping(false); } prevCycles = itrs.cycles; // assert takeIndex >= 0; // assert prevTakeIndex == takeIndex; // assert nextIndex >= 0; // assert nextItem != null; } } }

代码演示

package com.rumenz.task; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @className: BlockingQuqueExample * @description: Todo 类描述 * @author: mac * @date: 2021/1/20 **/ public class BlockingQueueExample { private static volatile Boolean flag=false; public static void main(String[] args) { BlockingQueue blockingQueue=new ArrayBlockingQueue(1024); ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.execute(()->{ try{ blockingQueue.put(1); Thread.sleep(2000); blockingQueue.put(3); flag=true; }catch (Exception e){ e.printstacktrace(); } }); executorService.execute(()->{ try { while (!flag){ Integer i = (Integer) blockingQueue.take(); System.out.println(i); } }catch (Exception e){ e.printstacktrace(); } }); executorService.shutdown(); } }

LinkedBlockingQueue

基于链表的阻塞队列,通 ArrayBlockingQueue 类似,其内部也维护这一个数据缓冲队列(该队列由一个链表构成),当生产者往队列放入一个数据时,队列会从生产者手上获取数据,并缓存在队列的内部,而生产者立即返回,只有当队列缓冲区到达最大值容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞队列,直到消费者从队列中消费掉一份数据,生产者会被唤醒,反之对于消费者这端的处理也基于同样的原理。

LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行的操作队列中的数据,以调高整个队列的并发能力。

如果构造一个 LinkedBlockingQueue 对象,而没有指定容量大小, LinkedBlockingQueue 会默认一个类似无限大小的容量 Integer.MAX_VALUE ,这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已经被消耗殆尽了。

LinkedBlockingQueue 是一个使用链表完成队列操作的阻塞队列。链表是单向链表,而不是双向链表。

public class LinkedBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { //队列的容量,指定大小或为默认值Integer.MAX_VALUE private final int capacity; //元素的数量 private final AtomicInteger count = new AtomicInteger(); //队列头节点,始终满足head.item==null transient Node head; //队列的尾节点,始终满足last.next==null private transient Node last; /** Lock held by take, poll, etc */ //出队的锁:take, poll, peek 等读操作的方法需要获取到这个锁 private final reentrantlock takeLock = new reentrantlock(); /** Wait queue for waiting takes */ //当队列为空时,保存执行出队的线程:如果读操作的时候队列是空的,那么等待 notEmpty 条件 private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ //入队的锁:put, offer 等写操作的方法需要获取到这个锁 private final reentrantlock putLock = new reentrantlock(); /** Wait queue for waiting puts */ //当队列满时,保存执行入队的线程:如果写操作的时候队列是满的,那么等待 notFull 条件 private final Condition notFull = putLock.newCondition(); //传说中的无界队列 public LinkedBlockingQueue() {} //传说中的有界队列 public LinkedBlockingQueue(int capacity) { if (capacity (null); } //传说中的无界队列 public LinkedBlockingQueue(Collection extends E> c){} /** * 链表节点类 */ static class Node { E item; /** * One of: * - 真正的继任者节点 * - 这个节点,意味着继任者是head.next * - 空,意味着没有后继者(这是最后一个节点) */ Node next; Node(E x) { item = x; } } }

通过其构造函数,得知其可以当做无界队列也可以当做有界队列来使用。

这里用了两把锁分别是 takeLock 和 putLock ,而 Condition 分别是 notEmpty 和 notFull ,它们是这样搭配的。

takeLock

putLock

从上面的构造函数中可以看到,这里会初始化一个空的头结点,那么第一个元素入队的时候,队列中就会有两个元素。读取元素时,也是获取头结点后面的一个元素。count的计数值不包含这个头结点。

Put源码分析

public class LinkedBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { /** * 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。 */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // 如果你纠结这里为什么是 -1,可以看看 offer 方法。这就是个标识成功、失败的标志而已。 int c = -1; //包装成node节点 Node node = new Node(e); final reentrantlock putLock = this.putLock; final AtomicInteger count = this.count; //获取锁定 putLock.lockInterruptibly(); try { /** 如果队列满,等待 notFull 的条件满足。 */ while (count.get() == capacity) { notFull.await(); } //入队 enqueue(node); //原子性自增 c = count.getAndIncrement(); // 如果这个元素入队后,还有至少一个槽可以使用,调用 notFull.signal() 唤醒等待线程。 // 哪些线程会等待在 notFull 这个 Condition 上呢? if (c + 1 node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; // 入队的代码非常简单,就是将 last 属性指向这个新元素,并且让原队尾的 next 指向这个元素 //last.next = node; //last = node; // 这里入队没有并发问题,因为只有获取到 putLock 独占锁以后,才可以进行此操作 last = last.next = node; } /** * 等待PUT信号 * 仅在 take/poll 中调用 * 也就是说:元素入队后,如果需要,则会调用这个方法唤醒读线程来读 */ private void signalNotFull() { final reentrantlock putLock = this.putLock; putLock.lock(); try { notFull.signal();//唤醒 } finally { putLock.unlock(); } } }

Take源码分析

public class LinkedBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final reentrantlock takeLock = this.takeLock; //首先,需要获取到 takeLock 才能进行出队操作 takeLock.lockInterruptibly(); try { // 如果队列为空,等待 notEmpty 这个条件满足再继续执行 while (count.get() == 0) { notEmpty.await(); } //// 出队 x = dequeue(); //count 进行原子减 1 c = count.getAndDecrement(); // 如果这次出队后,队列中至少还有一个元素,那么调用 notEmpty.signal() 唤醒其他的读线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } /** * 出队 */ private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node h = head; Node first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } /** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final reentrantlock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } }

与 ArrayBlockingQueue 对比

ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。

LinkedBlockingQueue 实现一个线程添加文件对象,四个线程读取文件对象

package concurrent; import java.io.File; import java.io.FileFilter; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; public class TestBlockingQueue { static long randomTime() { return (long) (Math.random() * 1000); } public static void main(String[] args) { // 能容纳100个文件 final BlockingQueue queue = new LinkedBlockingQueue(100); // 线程池 final ExecutorService exec = Executors.newFixedThreadPool(5); final File root = new File("F:\JavaLib"); // 完成标志 final File exitFile = new File(""); // 读个数 final AtomicInteger rc = new AtomicInteger(); // 写个数 final AtomicInteger wc = new AtomicInteger(); // 读线程 Runnable read = new Runnable() { public void run() { scanFile(root); scanFile(exitFile); } public void scanFile(File file) { if (file.isDirectory()) { File[] files = file.listFiles(new FileFilter() { public boolean accept(File pathname) { return pathname.isDirectory() || pathname.getPath().endsWith(".java"); } }); for (File one : files) scanFile(one); } else { try { int index = rc.incrementAndGet(); System.out.println("Read0: " + index + " " + file.getPath()); queue.put(file); } catch (InterruptedException e) { } } } }; exec.submit(read); // 四个写线程 for (int index = 0; index

总结

到此这篇关于Java高并发BlockingQueue重要实现类的文章就介绍到这了,更多相关Java高并发BlockingQueue实现类内容请搜索小编以前的文章或继续浏览下面的相关文章希望大家以后多多支持小编!

(java多线程与并发)java并发库中的阻塞队列--BlockingQueue

(java多线程与并发)java并发库中的阻塞队列--BlockingQueue

1.简介
   信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确、合理的使用公共资源。

2.概念
    Semaphore分为单值和多值两种,前者只能被一个线程获得,后者可以被若干个线程获得。

以一个停车场运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。

在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用。

     更进一步,信号量的特性如下:信号量是一个非负整数(车位数),所有通过它的线程(车辆)都会将该整数减一(通过它当然是为了使用资源),当该整数值为零时,所有试图通过它的线程都将处于等待状态。在信号量上我们定义两种操作: Wait(等待) 和 Release(释放)。 当一个线程调用Wait(等待)操作时,它要么通过然后将信号量减一,要么一直等下去,直到信号量大于一或超时。Release(释放)实际上是在信号量上执行加操作,对应于车辆离开停车场,该操作之所以叫做“释放”是因为加操作实际上是释放了由信号量守护的资源。

      在Java中,还可以设置该信号量是否采用公平模式,如果以公平方式执行,则线程将会按到达的顺序(FIFO)执行,如果是非公平,则可以后请求的有可能排在队列的头部。
JDK中定义如下:
Semaphore(int permits, boolean fair)
  创建具有给定的许可数和给定的公平设置的Semaphore。

     Semaphore当前在多线程环境下被扩放使用,操作系统的信号量是个很重要的概念,在进程控制方面都有应用。Java并发库Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。比如在Windows下可以设置共享文件的最大客户端访问个数。

     Semaphore实现的功能就类似厕所有5个坑,假如有10个人要上厕所,那么同时只能有多少个人去上厕所呢?同时只能有5个人能够占用,当5个人中 的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了。另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数选项。单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。

3.案例一:

package SemaPhore;  
  
import java.util.Random;  
import java.util.concurrent.*;  
public class Test {  
  
  
  
    public static void main(String[] args) {  
        //线程池  
        ExecutorService executor = Executors.newCachedThreadPool();  
        //定义信号量,只能5个线程同时访问  
        final Semaphore semaphore = new Semaphore(5);  
        //模拟20个线程同时访问  
        for (int i = 0; i < 20; i++) {  
             final int NO = i;  
             Runnable runnable = new Runnable() {  
                public void run() {  
                    try {  
                        //获取许可  
                        semaphore.acquire();  
                        //availablePermits()指的是当前信号灯库中有多少个可以被使用  
                        System.out.println("线程" + Thread.currentThread().getName() +"进入,当前已有" + (5-semaphore.availablePermits()) + "个并发");  
                        System.out.println("index:"+NO);  
                        Thread.sleep(new Random().nextInt(1000)*10);  
                          
                        System.out.println("线程" + Thread.currentThread().getName() + "即将离开");     
                        //访问完后,释放  
                        semaphore.release();  
  
                      
                    } catch (Exception e) {  
                        e.printStackTrace();  
                    }  
                }  
            };  
              
            executor.execute(runnable);  
        }  
        // 退出线程池  
        executor.shutdown();  
    }  
  
}  

4.案例二:

下面是模拟一个连接池,控制同一时间最多只能有50个线程访问。

import java.util.UUID;  
import java.util.concurrent.Semaphore;  
import java.util.concurrent.TimeUnit;  
  
public class TestSemaphore extends Thread {  
    public static void main(String[] args) {  
        int i = 0;  
        while (i < 500) {  
            i++;  
            new TestSemaphore().start();  
            try {  
                Thread.sleep(1);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
  
    /** 
     * 控制某资源同时被访问的个数的类 控制同一时间最后只能有50个访问 
     */  
    static Semaphore semaphore = new Semaphore(50);  
    static int timeout = 500;  
  
    public void run() {  
        try {  
            Object connec = getConnection();  
            System.out.println("获得一个连接" + connec);  
            Thread.sleep(300);  
            releaseConnection(connec);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
  
    public void releaseConnection(Object connec) {  
        /* 释放许可 */  
        semaphore.release();  
        System.out.println("释放一个连接" + connec);  
    }  
  
    public Object getConnection() {  
        try {/* 获取许可 */  
            boolean getAccquire = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);  
            if (getAccquire) {  
                return UUID.randomUUID().toString();  
            }  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        throw new IllegalArgumentException("timeout");  
    }  
}  

 

 

 

 

 

 

20. 并发容器之 ArrayBlockingQueue 和 LinkedBlockingQueue

20. 并发容器之 ArrayBlockingQueue 和 LinkedBlockingQueue

OSC 请你来轰趴啦!1028 苏州源创会,一起寻宝 AI 时代

1. ArrayBlockingQueue 简介

在多线程编程过程中,为了业务解耦和架构设计,经常会使用并发容器用于存储多线程间的共享数据,这样不仅可以保证线程安全,还可以简化各个线程操作。例如在 “生产者 - 消费者” 问题中,会使用阻塞队列(BlockingQueue)作为数据容器,关于 BlockingQueue 可以看这篇文章。为了加深对阻塞队列的理解,唯一的方式是对其实验原理进行理解,这篇文章就主要来看看 ArrayBlockingQueue 和 LinkedBlockingQueue 的实现原理。

2. ArrayBlockingQueue 实现原理

阻塞队列最核心的功能是,能够可阻塞式的插入和删除队列元素。当前队列为空时,会阻塞消费数据的线程,直至队列非空时,通知被阻塞的线程;当队列满时,会阻塞插入数据的线程,直至队列未满时,通知插入数据的线程(生产者线程)。那么,多线程中消息通知机制最常用的是 lock 的 condition 机制,关于 condition 可以看这篇文章的详细介绍。那么 ArrayBlockingQueue 的实现是不是也会采用 Condition 的通知机制呢?下面来看看。

2.1 ArrayBlockingQueue 的主要属性

ArrayBlockingQueue 的主要属性如下:

/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;

/*
 * Concurrency control uses the classic two-condition algorithm
 * found in any textbook.
 */

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

从源码中可以看出 ArrayBlockingQueue 内部是采用数组进行数据存储的(属性items),为了保证线程安全,采用的是 ReentrantLock lock,为了保证可阻塞式的插入删除数据利用的是 Condition,当获取数据的消费者线程被阻塞时会将该线程放置到 notEmpty 等待队列中,当插入数据的生产者线程被阻塞时,会将该线程放置到 notFull 等待队列中。而 notEmpty 和 notFull 等中要属性在构造方法中进行创建:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

接下来,主要看看可阻塞式的 put 和 take 方法是怎样实现的。

2.2 put 方法详解

put(E e) 方法源码如下:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
		//如果当前队列已满,将线程移入到notFull等待队列中
        while (count == items.length)
            notFull.await();
		//满足插入数据的要求,直接进行入队操作
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

该方法的逻辑很简单,当队列已满时(count == items.length)将线程移入到 notFull 等待队列中,如果当前满足插入数据的条件,就可以直接调用 enqueue(e) 插入数据元素。enqueue 方法源码为:

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
	//插入数据
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
	//通知消费者线程,当前队列中有数据可供消费
    notEmpty.signal();
}

enqueue 方法的逻辑同样也很简单,先完成插入数据,即往数组中添加数据(items[putIndex] = x),然后通知被阻塞的消费者线程,当前队列中有数据可供消费(notEmpty.signal())。

2.3 take 方法详解

take 方法源码如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
		//如果队列为空,没有数据,将消费者线程移入等待队列中
        while (count == 0)
            notEmpty.await();
		//获取数据
        return dequeue();
    } finally {
        lock.unlock();
    }
}

take 方法也主要做了两步:1. 如果当前队列为空的话,则将获取数据的消费者线程移入到等待队列中;2. 若队列不为空则获取数据,即完成出队操作 dequeue。dequeue 方法源码为:

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
	//获取数据
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //通知被阻塞的生产者线程
	notFull.signal();
    return x;
}

dequeue 方法也主要做了两件事情:1. 获取队列中的数据,即获取数组中的数据元素((E) items[takeIndex]);2. 通知 notFull 等待队列中的线程,使其由等待队列移入到同步队列中,使其能够有机会获得 lock,并执行完成功退出。

从以上分析,可以看出 put 和 take 方法主要是通过 condition 的通知机制来完成可阻塞式的插入数据和获取数据。在理解 ArrayBlockingQueue 后再去理解 LinkedBlockingQueue 就很容易了。

3. LinkedBlockingQueue 实现原理

LinkedBlockingQueue 是用链表实现的有界阻塞队列,当构造对象时为指定队列大小时,队列默认大小为 Integer.MAX_VALUE。从它的构造方法可以看出:

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

3.1 LinkedBlockingQueue 的主要属性

LinkedBlockingQueue 的主要属性有:

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
 * Head of linked list.
 * Invariant: head.item == null
 */
transient Node<E> head;

/**
 * Tail of linked list.
 * Invariant: last.next == null
 */
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

可以看出与 ArrayBlockingQueue 主要的区别是,LinkedBlockingQueue 在插入数据和删除数据时分别是由两个不同的 lock(takeLockputLock)来控制线程安全的,因此,也由这两个 lock 生成了两个对应的 condition(notEmptynotFull)来实现可阻塞的插入和删除数据。并且,采用了链表的数据结构来实现队列,Node 结点的定义为:

static class Node<E> {
    E item;

    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    Node<E> next;

    Node(E x) { item = x; }
}

接下来,我们也同样来看看 put 方法和 take 方法的实现。

3.2 put 方法详解

put 方法源码为:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
		//如果队列已满,则阻塞当前线程,将其移入等待队列
        while (count.get() == capacity) {
            notFull.await();
        }
		//入队操作,插入数据
        enqueue(node);
        c = count.getAndIncrement();
		//若队列满足插入数据的条件,则通知被阻塞的生产者线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

put 方法的逻辑也同样很容易理解,可见注释。基本上和 ArrayBlockingQueue 的 put 方法一样。take 方法的源码如下:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
		//当前队列为空,则阻塞当前线程,将其移入到等待队列中,直至满足条件
        while (count.get() == 0) {
            notEmpty.await();
        }
		//移除队头元素,获取数据
        x = dequeue();
        c = count.getAndDecrement();
        //如果当前满足移除元素的条件,则通知被阻塞的消费者线程
		if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

take 方法的主要逻辑请见于注释,也很容易理解。

4. ArrayBlockingQueue 与 LinkedBlockingQueue 的比较

相同点:ArrayBlockingQueue 和 LinkedBlockingQueue 都是通过 condition 通知机制来实现可阻塞式插入和删除元素,并满足线程安全的特性;

不同点:1. ArrayBlockingQueue 底层是采用的数组进行实现,而 LinkedBlockingQueue 则是采用链表数据结构; 2. ArrayBlockingQueue 插入和删除数据,只采用了一个 lock,而 LinkedBlockingQueue 则是在插入和删除分别采用了 putLocktakeLock,这样可以降低线程由于线程无法获取到 lock 而进入 WAITING 状态的可能性,从而提高了线程并发执行的效率。

20. 并发容器之 ArrayBlockingQueue 和 LinkedBlockingQueue 实现原理详解

20. 并发容器之 ArrayBlockingQueue 和 LinkedBlockingQueue 实现原理详解

1. ArrayBlockingQueue 简介

在多线程编程过程中,为了业务解耦和架构设计,经常会使用并发容器用于存储多线程间的共享数据,这样不仅可以保证线程安全,还可以简化各个线程操作。例如在 “生产者 - 消费者” 问题中,会使用阻塞队列(BlockingQueue)作为数据容器,关于 BlockingQueue 可以看这篇文章。为了加深对阻塞队列的理解,唯一的方式是对其实验原理进行理解,这篇文章就主要来看看 ArrayBlockingQueue 和 LinkedBlockingQueue 的实现原理。

2. ArrayBlockingQueue 实现原理

阻塞队列最核心的功能是,能够可阻塞式的插入和删除队列元素。当前队列为空时,会阻塞消费数据的线程,直至队列非空时,通知被阻塞的线程;当队列满时,会阻塞插入数据的线程,直至队列未满时,通知插入数据的线程(生产者线程)。那么,多线程中消息通知机制最常用的是 lock 的 condition 机制,关于 condition 可以看这篇文章的详细介绍。那么 ArrayBlockingQueue 的实现是不是也会采用 Condition 的通知机制呢?下面来看看。

2.1 ArrayBlockingQueue 的主要属性

ArrayBlockingQueue 的主要属性如下:

/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;

/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty; //消费者等待线程

/** Condition for waiting puts */
private final Condition notFull; //生产者等待线程

从源码中可以看出 ArrayBlockingQueue 内部是采用数组进行数据存储的(属性items),为了保证线程安全,采用的是 ReentrantLock lock,为了保证可阻塞式的插入删除数据利用的是 Condition,当获取数据的消费者线程被阻塞时会将该线程放置到 notEmpty 等待队列中,当插入数据的生产者线程被阻塞时,会将该线程放置到 notFull 等待队列中。而 notEmpty 和 notFull 等中要属性在构造方法中进行创建:

public ArrayBlockingQueue(int capacity, boolean fair) {
   if (capacity <= 0)
       throw new IllegalArgumentException();
   this.items = new Object[capacity];
   lock = new ReentrantLock(fair);
   notEmpty = lock.newCondition();
   notFull =  lock.newCondition();
}

接下来,主要看看可阻塞式的 put 和 take 方法是怎样实现的。

2.2 put 方法详解

put(E e)方法源码如下:

public void put(E e) throws InterruptedException {
   checkNotNull(e);
   final ReentrantLock lock = this.lock;
   lock.lockInterruptibly();
   try {
//如果当前队列已满,将线程移入到notFull等待队列中
       while (count == items.length)
           notFull.await(); //移入生产者等待线程
//满足插入数据的要求,直接进行入队操作
       enqueue(e);
  } finally {
       lock.unlock();
  }
}

该方法的逻辑很简单,当队列已满时(count == items.length)将线程移入到 notFull 等待队列中,如果当前满足插入数据的条件,就可以直接调用enqueue(e)插入数据元素。enqueue 方法源码为:

private void enqueue(E x) {
   // assert lock.getHoldCount() == 1;
   // assert items[putIndex] == null;
   final Object[] items = this.items;
//插入数据
   items[putIndex] = x;
   if (++putIndex == items.length)
       putIndex = 0;
   count++;
//通知消费者线程,当前队列中有数据可供消费
   notEmpty.signal();
}

enqueue 方法的逻辑同样也很简单,先完成插入数据,即往数组中添加数据(items[putIndex] = x),然后通知被阻

塞的消费者线程,当前队列中有数据可供消费(notEmpty.signal())。

2.3 take 方法详解

take 方法源码如下:

public E take() throws InterruptedException {
   final ReentrantLock lock = this.lock;
   lock.lockInterruptibly();
   try {
//如果队列为空,没有数据,将消费者线程移入等待队列中
       while (count == 0)
           notEmpty.await();//移入消费者等待线程
//获取数据
       return dequeue();
  } finally {
       lock.unlock();
  }
}

take 方法也主要做了两步:1. 如果当前队列为空的话,则将获取数据的消费者线程移入到等待队列中;2. 若队列不为空则获取数据,即完成出队操作dequeue。dequeue 方法源码为:

private E dequeue() {
   // assert lock.getHoldCount() == 1;
   // assert items[takeIndex] != null;
   final Object[] items = this.items;
   @SuppressWarnings("unchecked")
//获取数据
   E x = (E) items[takeIndex];
   items[takeIndex] = null;
   if (++takeIndex == items.length)
       takeIndex = 0;
   count--;
   if (itrs != null)
       itrs.elementDequeued();
   //通知被阻塞的生产者线程
notFull.signal();
   return x;
}

dequeue 方法也主要做了两件事情:1. 获取队列中的数据,即获取数组中的数据元素((E) items[takeIndex]);2. 通知 notFull 等待队列中的线程,使其由等待队列移入到同步队列中,使其能够有机会获得 lock,并执行完成功退出。

从以上分析,可以看出 put 和 take 方法主要是通过 condition 的通知机制来完成可阻塞式的插入数据和获取数据。在理解 ArrayBlockingQueue 后再去理解 LinkedBlockingQueue 就很容易了。

3. LinkedBlockingQueue 实现原理

LinkedBlockingQueue 是用链表实现的有界阻塞队列,当构造对象时为指定队列大小时,队列默认大小为Integer.MAX_VALUE。从它的构造方法可以看出:

public LinkedBlockingQueue() {
   this(Integer.MAX_VALUE);
}

3.1 LinkedBlockingQueue 的主要属性

LinkedBlockingQueue 的主要属性有:

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;//头

/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last; //尾

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();  //消费者可重入锁

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();//消费者线程

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock(); //生产者可重入锁

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();//生产者线程

可以看出与 ArrayBlockingQueue 主要的区别是,LinkedBlockingQueue 在插入数据和删除数据时分别是由两个不同的 lock(takeLockputLock)来控制线程安全的,因此,也由这两个 lock 生成了两个对应的 condition(notEmptynotFull)来实现可阻塞的插入和删除数据。并且,采用了链表的数据结构来实现队列,这样设计的目的是因为插入数据

是往队尾进行的,删除数据是往队头进行的,从而可以降低获取不到 Lock 锁而进入 Waiting 状态,提高并发

Node 结点的定义为:

static class Node<E> {
    E item;

    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    Node<E> next;

    Node(E x) { item = x; }
}

接下来,我们也同样来看看 put 方法和 take 方法的实现。

3.2 put 方法详解

put 方法源码为:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();//与lock方法类似,优先考虑响应中断
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
		//如果队列已满,则阻塞当前线程,将其移入等待队列
        while (count.get() == capacity) {
            notFull.await(); //生产者线程阻塞
        }
		//入队操作,向队尾插入数据
        enqueue(node);
        c = count.getAndIncrement();//索引后移
		//若队列满足插入数据的条件,则通知被阻塞的生产者线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

put 方法的逻辑也同样很容易理解,可见注释。基本上和 ArrayBlockingQueue 的 put 方法一样。take 方法的源码如下:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
		//当前队列为空,则阻塞当前线程,将其移入到等待队列中,直至满足条件
        while (count.get() == 0) {
            notEmpty.await();
        }
		//移除队头元素,获取数据
        x = dequeue();
        c = count.getAndDecrement();
        //如果当前满足移除元素的条件,则通知被阻塞的消费者线程
		if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

take 方法的主要逻辑请见于注释,也很容易理解。

4. ArrayBlockingQueue 与 LinkedBlockingQueue 的比较

相同点:ArrayBlockingQueue 和 LinkedBlockingQueue 都是通过 condition 通知机制来实现可阻塞式插入和删除元素,并满足线程安全的特性;

不同点:1. ArrayBlockingQueue 底层是采用的数组进行实现,而 LinkedBlockingQueue 则是采用链表数据结构;

  1. ArrayBlockingQueue 插入和删除数据,只采用了一个 lock,而 LinkedBlockingQueue 则是在插入和删除分别采用了putLock和 `takeLock`,这样可以降低线程由于线程无法获取到 lock 而进入 WAITING 状态的可能性,从而提高了线程并发执行的效率

ArrayBlockingQueue 与 LinkedBlockingQueue 比较分析

ArrayBlockingQueue 与 LinkedBlockingQueue 比较分析

ArrayBlockingQueue:基于数组实现,使用一把锁,2 个信号量,一个数组,一个放数据 index,一个取数据 index,存放数据时,每次放完数据就会通知取阻塞的取线程,每次取完数据就会通知阻塞的放线程,请注意:取和放是互斥的。

LinkedBlockingQueue:基于单链表实现,使用 2 把锁,每一把锁一个信号量,一个 node 链表,一个指向放数据的 node 节点 last,一个指向取数据的 node 节点 head,存放数据时,每次放完数据,若容器未满,会继续通知放线程,放数据,直到容器放满,每次取数据时,取完数据,若容器还有数据,都会通知取线程继续取数据,直到容器内数据为空,请注意:取和取是互斥的,放和放是互斥的,取和放可以同时进行。

代码测试,可以在 debug 模式下,查看数据存取的过程

package test;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
// 对比阻塞队列的不同点
public class Container {

public static void main(String\[\] args) {  
    ThreadPoolExecutor tp = new ThreadPoolExecutor(5, 10, 50l,  
            TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10));  
    Container container = new Container();  

    final ArrayBlockingQueue<Integer> a = new ArrayBlockingQueue<Integer>(  
            10);  
    final LinkedBlockingQueue<Integer> l = new LinkedBlockingQueue<Integer>(  
            10);  
    producer p = container.new producer(l);  
    consumer c = container.new consumer(l);  
    tp.submit(p);  
    tp.submit(c);

}

/*  
 \* static class source { static int\[\] a = new int\[10\]; static int take;  
 \* static int put; static int count; final static ReentrantLock lock = new  
 \* ReentrantLock(); final static Condition notFull = lock.newCondition();  
 \* final static Condition notEmpty = lock.newCondition(); static void  
 \* put(int i) { lock.lock(); try{ if (count == 10) { try { notEmpty.await();  
 *   
 \* } catch (InterruptedException e) { // TODO Auto-generated catch block  
 \* e.printStackTrace(); }  
 *   
 \* } a\[put\] = i; System.out.println(a); put++; count++; if (put >= 10) { put  
 \* = 0; } notFull.signal(); }finally{ lock.unlock(); }  
 *   
 \* }  
 *   
 \* static synchronized int get() { lock.lock(); try { int tar; if (count ==  
 \* 0) { try { notFull.await(); } catch (InterruptedException e) { // TODO  
 \* Auto-generated catch block e.printStackTrace(); } } tar = a\[take\];  
 \* take++; count--; if (take >= 10) { take = 0; } notEmpty.signal();  
 \* System.out.println(a.toString()); return tar; }finally{ lock.unlock(); }  
 \* }  
 *   
 \* public static void set(int i) { put(i);  
 *   
 \* }  
 *   
 \* }  
 */  
//生产者线程  
class producer implements Runnable {  
    BlockingQueue<Integer> a;

    public producer(BlockingQueue a) {  
        this.a = a;  
    }

    @Override  
    public void run() {  
        try {  
            doRun();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }

    private void doRun() throws InterruptedException {  
        int i = 0;  
        for (;;) {

            // Thread.currentThread().sleep(1000);  
            a.put(i++);

        }

    }

}  
//消费者线程  
class consumer implements Runnable {  
    BlockingQueue<Integer> a;

    public consumer(BlockingQueue a) {  
        this.a = a;  
    }

    @Override  
    public void run() {  
        try {  
            doRun();  
        } catch (InterruptedException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }  
    }

    private void doRun() throws InterruptedException {  
        for (;;) {  
            try {  
                /*  
                 \* System.out.println(a.toString());  
                 \* System.out.println(a.take());  
                 */  
                a.take();  
            } catch (InterruptedException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
            }

        }

    }

}

}

我们今天的关于Java高并发BlockingQueue重要的实现类详解java高并发解决方案的分享就到这里,谢谢您的阅读,如果想了解更多关于(java多线程与并发)java并发库中的阻塞队列--BlockingQueue、20. 并发容器之 ArrayBlockingQueue 和 LinkedBlockingQueue、20. 并发容器之 ArrayBlockingQueue 和 LinkedBlockingQueue 实现原理详解、ArrayBlockingQueue 与 LinkedBlockingQueue 比较分析的相关信息,可以在本站进行搜索。

本文标签: