GVKun编程网logo

ReentrantReadWriteLock:ReadLock和WriteLock有什么区别?(readwritelockslim)

22

在本文中,您将会了解到关于ReentrantReadWriteLock:ReadLock和WriteLock有什么区别?的新资讯,同时我们还将为您解释readwritelockslim的相关在本文中,

在本文中,您将会了解到关于ReentrantReadWriteLock:ReadLock和WriteLock有什么区别?的新资讯,同时我们还将为您解释readwritelockslim的相关在本文中,我们将带你探索ReentrantReadWriteLock:ReadLock和WriteLock有什么区别?的奥秘,分析readwritelockslim的特点,并给出一些关于7. ReadWriteLock 接口及其实现 ReentrantReadWriteLock、J.U.C|读-写锁ReentrantReadWriteLock、Java Lock接口分析之ReentantReadWriteLock、Java 多线程:Lock 接口(接口方法分析,ReentrantLock,ReadWriteLock的实用技巧。

本文目录一览:

ReentrantReadWriteLock:ReadLock和WriteLock有什么区别?(readwritelockslim)

ReentrantReadWriteLock:ReadLock和WriteLock有什么区别?(readwritelockslim)

我所知道的是:

  • ReadLockWriteLock相互影响莫名其妙
  • WriteLock就像 同步
  • ReadLock 似乎无法独自工作

答案1

小编典典

readLock.lock();

  • 这意味着,如果有任何其他线程在 (即持有写锁),则在此处停止直到没有其他线程在写。
  • 一旦授予 了该锁,在释放该锁之前,将不允许其他线程进行 (即获取写锁)。

writeLock.lock();

  • 这意味着,如果有任何其他线程正在 读取 写入,请在此处停止并等待,直到没有其他线程在读取或写入。
  • 授予锁后,在释放该锁之前,将不允许其他线程 读取 写入(即获取读取或写入锁)。

结合使用这些功能,您一次只能安排一个线程进行写访问,但是,除了一个线程正在写时,您可以同时阅读任意数量的阅读器。

换一种方式。你想每次 读取 从结构,采取了 锁。每次您 要写时 ,都要
一个锁。这样一来,只要发生写操作,就不会有人在读(您可以想象您具有独占访问权),但是只要没有人在写,就会有许多读者同时阅读。

7. ReadWriteLock 接口及其实现 ReentrantReadWriteLock

7. ReadWriteLock 接口及其实现 ReentrantReadWriteLock

Java 并发包的 locks 包里的锁基本上已经介绍的差不多了,ReentrantLock 重入锁是个关键,在清楚的了解了同步器 AQS 的运行机制后,实际上再分析这些锁就会显得容易的多,这章节主讲另外一个重要的锁 ---ReentrantReadWriteLock 读写锁。

ReentrantLock 是一个独占锁,也就是说只能由一个线程获取锁,但如果场景是线程只做读的操作呢?这样 ReentrantLock 就不是很合适,读的线程并不需要保证其线程的安全性,任何一个线程都能去获取锁,只有这样才能尽可能地保证性能和效率。ReentrantReadWriteLock 就是这样的一个锁,在其内部分为读锁和写锁,可以有 N 个读操作线程获取到读锁,但是只能有 1 个写操作线程获取到写锁,那么可以预见的是读锁是共享锁(AQS 中的共享模式),写锁是独占所(AQS 中的独占模式)。首先来看读写锁的接口类:

public interface ReadWriteLock {    
    Lock readLock();        //获取读锁
    Lock writeLock();        //获取写锁
 }

可以看到 ReadWriteLock 接口只定义了两个方法,获取读锁和获取写锁的方法。下面是 ReadWriteLock 的实现类 ---ReentrantReadWriteLock。

和 ReentrantLock 类似,ReentrantReadWriteLock 在其内部也是通过一个内部类 Sync 实现同步器 AQS,同样也是通过实现 Sync 实现公平锁和非公平锁,这一点的思路和 ReentrantLock 类似。在 ReadWriteLock 接口中获取的读锁和写锁是怎么实现的呢?

//ReentrantReadWriteLock
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
public ReentrantReadWriteLock(){
    this(false);    //默认非公平锁
}
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();    //锁类型(公平/非公平)
    readerLock = new ReadLock(this);    //构造读锁
    writerLock = new WriteLock(this);    //构造写锁
}
……
public ReentrantReadWriteLock.WriteLock writeLock0{return writerLock;}
public ReentrantReadWriteLock.ReadLock readLock0{return ReaderLock;}
//ReentrantReadWriteLock$ReadLock
public static class ReadLock implements Lock {
    protected ReadLock(ReentrantReadwritLock lock) {
        sync = lock.sync;        //最后还是通过Sync内部类实现锁
  }
    ……    //它实现的是Lock接口,其余的实现可以和ReentrantLock作对比,获取锁、释放锁等等
}
//ReentrantReadWriteLock$WriteLock
public static class WriteLock implemnts Lock {
    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
  }
……    //它实现的是Lock接口,其余的实现可以和ReentrantLock作对比,获取锁、释放锁等等
}

上面是对 ReentrantReadWriteLock 做了一个大致的介绍,可以看到在其内部有好几个内部类,实际上读写锁有两个锁 ---ReadLock、WriteLock,这两个锁都是实现自 Lock 接口,可以和 ReentrantLock 对比,而这两个锁的内部实现则通过 Sync,也就是同步器 AQS 实现的,这也可以和 ReentrantLock 中的 Sync 对比。

回顾一下 AQS,其内部有两个重要的数据结构 --- 一个是同步队列、一个则是同步状态,这个同步状态应用到读写锁中也就是读写状态,但 AQS 中只有一个 state 整形来标识同步状态,读写锁中则有读、写两个同步状态需要记录。所以,读写锁将 AQS 中的 state 整型做了一个处理,它是一个 int 型变量,一共 4 个字节 32 位,那么可以读写状态就各占 16 位 ---- 高 16 位标识度,低 16 位标识写。 image

先在有一个疑问如果 state 的值为 5,二进制为(00000000000000000000000000000101),如何快速确定读和写各自的状态呢?这就要用到位移运算了。计算方式为:写状态 = state & 0x0000FFFF,读状态 = stae >>> 16。写状态增加 1:state + 1,读状态增加 1=state + (1 << 16)。有关位移运算可以参考《<<、>>、>>> 位移操作》

1. 写锁的获取与释放

根据我们之前的经验可以得知:AQS 已经将获取锁的算法骨架搭好了,只需子类实现 tryAcquire(独占锁),故我们只需要看 tryAcquire。

//ReentrantReadWriteLock$Sync
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread;
    int c = getState();    //获取state状态
    int w = exclusiveCount(c);    //获取写状态,即 state & 0x00001111
    if (c != 0) {    //存在同步状态(读或写),作下一步判断
        if (w == 0 || current != getExclusiveOwnerThread())     //写状态为0,但同步状态不为0表示有读状态,此时获取锁失败,或者当前已经有其他写线程获取了锁此时也获取锁失败
            return false;
        if (w + exclusiveCount(acquire) > MAX_COUNT)    //锁重入是否超过限制
            throw new Error(“Maxium lock count exceeded”);
        setState(c + acquire);    //记录锁状态
        return true;
  }
  if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
      return false;        //writerShouldBlock对于非公平锁总是返回false,对于公平锁则判断同步队列中是否有前驱节点
  setExclusiveOwnerThread(current);
  return true;
}

上面是写锁的状态获取,不好理解的是 writeShouleBlock 方法,此方法上面有描述,非公平锁直接返回 false,而对于公平锁则是调用 hasQueuedPredecessors,方法如下:

//ReentrantReadWriteLock$FairSync
final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}

原因是为什么呢?这就要回到非公平锁和公平锁的区别上来了。对于非公平锁,每次线程获取锁时,首先会强行进行锁获取操作而不管同步队列中是否有线程,当获取不到锁才会将线程构造至队尾;对于公平锁来讲,只要同步队列中存在线程,就不会去获取锁,而是将线程构造至队尾。所以重新回到写状态的获取上,tryAcquire 方法里,前面发现没有线程持有锁,但是此时会根据锁的不同做响应操作,对于非公平锁 --- 抢锁,对公平锁 --- 同步队列中有线程,不抢锁,添加至队尾排队。

写锁的释放和 ReentrantLock 的释放过程基本类似,毕竟都是独占锁,每次释放减少写的状态,直道减小到 0 就表示写锁已经完全释放。

读锁的获取与释放

同理,根据我们之前的经验可以得知:AQS 已经将获取锁的算法骨架搭好了,只需子类实现 tryAcquireShared(共享锁),故我们只需要查看 tryAcquireShared。我们知道对于共享模式下的锁,他能狗被多个线程同时获取,现在问题来了,T1 线程获取了锁,同步状态 state=1,此时 T2 也获取了锁,state=2,接着 T1 线程重入 state=3,也就是说读状态是所有线程读锁次数的总和,而每个线程各自获取读锁的次数只能选择保存在 ThreadLocal 中,由线程自身维护,所以这个地方要做一些复杂处理,源码有点长,但复杂就在于每个线程保存自身获取读锁的次数,具体参照源码的 tryAcquireShared,仔细阅读并结合上面对写锁获取锁的分析不难读懂。

读锁的释放值得注意的地方在于自身维护的获取锁的次数,以及通过移位操作减少状态 state-(1 << 16)。

J.U.C|读-写锁ReentrantReadWriteLock

J.U.C|读-写锁ReentrantReadWriteLock

一、写在前面


在上篇我们聊到了可重入锁(排它锁)ReentrantLcok ,具体参见《J.U.C|可重入锁ReentrantLock》

ReentrantLcok 属于排它锁,本章我们再来一起聊聊另一个我们工作中出镜率很高的读-写锁。

二、简介


重入锁ReentrantLock是排他锁(互斥锁),排他锁在同一时刻仅有一个线程可访问,但是在大多数场景下,大部分时间都是提供读服务的,而写服务占用极少的时间,然而读服务不存在数据竞争的问题,如果一个线程在读时禁止其他线程读势必会降低性能。所以就有了读写锁。

读写锁内部维护着一对锁,一个读锁和一个写锁。通过分离读锁和写锁,使得并发性比一般排他锁有着显著的提升。

读写锁在同一时间可以允许多个读线程同时访问,但是写线程访问时,所有的读线程和写线程都会阻塞。

主要有以下特征:

  • 公平性选择:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
  • 重进入:该锁支持重进入,以读写线程为列,读线程在获取到读锁之后,能再次获取读锁。而写线程在获取写锁后能够再次获取写锁,同时也可以获取读锁。
  • 锁降级:遵循获取写锁、读锁再释放写锁的次序,写锁能够降级成为读锁。
读写锁最多支持65535个递归写入锁和65535个递归读取锁。 锁降级:遵循获取写锁、获取读锁在释放写锁的次序,写锁能够降级成为读锁 读写锁ReentrantReadWriteLock实现接口ReadWriteLock,该接口维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 writer,读取锁可以由多个 reader 线程同时保持。

图片描述

三、主要方法介绍


读写锁ReentrantReadWriteLock 实现了ReadWriteLock 接口,该接口维护一对相关的锁即读锁和写锁。

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing
     */
    Lock writeLock();
}

ReadWriteLock定义了两个方法。readLock()返回用于读操作的锁,writeLock()返回用于写操作的锁。ReentrantReadWriteLock定义如下:

/** 内部类 读锁*/
private final ReentrantReadWriteLock.ReadLock readerLock;
 /** 内部类 写锁*/
private final ReentrantReadWriteLock.WriteLock writerLock;
/** 执行所有同步机制 */
final Sync sync;

// 默认实现非公平锁
public ReentrantReadWriteLock() {
    this(false);
}
// 利用给定的公平策略初始化ReentrantReadWriteLock
public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
 }
 
 // 返回写锁
 public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
 //返回读锁
 public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
 
 // 实现同步器,也是实现锁的核心
 abstract static class Sync extends AbstractQueuedSynchronizer {
    // 省略实现代码
 }
 
 // 公平锁的实现
 static final class FairSync extends Sync {
    // 省略实现代码
 }
 // 非公平锁的实现
 static final class NonfairSync extends Sync {
    // 省略实现代码
 }
 
 // 读锁实现
 public static class ReadLock implements Lock, java.io.Serializable {
    // 省略实现代码
 }
 // 写锁实现
 public static class WriteLock implements Lock, java.io.Serializable {
    // 省略实现代码
 }

ReentrantReadWriteLock 和 ReentrantLock 其实都一样,锁核心都是Sync, 读锁和写锁都是基于Sync来实现的。从这分析其实ReentrantReadWriteLock就是一个锁,只不过内部根据不同的场景设计了两个不同的实现方式。其读写锁为两个内部类: ReadLock、WriteLock 都实现了Lock 接口。

读写锁同样依赖自定义同步器来实现同步状态的, 而读写状态就是其自定义同步器的状态。回想ReentantLock 中自定义同步器的实现,同步状态表示锁被一个线程重复获取的次数,而读写锁中的自定义同步器需要在一个同步状态(一个整型变量)上维护多个读线程和写线程的状况,而该状态的设计成为关键。

如何在一个整型上维护多种状态,那就需要‘按位切割使用’这个变量,读写锁将变量切割成两部分,高16位表示读,低16位表示写。

分割之后,读写锁是如何迅速确定读锁和写锁的状态呢?通过位运算,假如当前同步状态为S,那么写状态等于 S & 0x0000FFFF(将高16位全部抹去),读状态等于S >>> 16(无符号补0右移16位)。代码如下:

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count  */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

四、写锁的获取与释放


写锁是一个支持重入的排他锁,如果当前线程已经获取了写锁,则增加写状态。如果当前线程获取写锁时读锁已经被获取或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。

  • 写锁的获取

写锁的获取入口通过WriteLock的lock方法

public void lock() {
   sync.acquire(1);
}

Sync的acquire(1)方法 定义在AQS中

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

tryAcquire(arg) 方法除了重入方法外,还增加了是否存在读锁的判断,如果读锁存在、则不能获取写锁。原因在于写操作要对所有的读操作的可见性。

protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            // 获取同步状态
            int c = getState();
            // 获取写锁的获取次数
            int w = exclusiveCount(c);
             // 已有线程获取锁
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                /**
                 * w == 0 表示存在读锁(同步状态不等于0说明已有线程获取到锁(读/写)
                 * 而写锁状态为0则说明不存在写锁,所以只能是读锁了)
                 * current != getExclusiveOwnerThread()) 不是自己获取的写锁
                 * 
                 * 如果存在读锁或者持有写锁的线程不是自己,直接返回false
                 */
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                    
                 // 如果获取写锁的数量超过最大值65535 ,直接异常
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                // 设置共享状态
                setState(c + acquires);
                return true;
            }
             
             /**
             * writerShouldBlock() 是否需要阻塞写锁,这里直接返回的是false
             * compareAndSetState(c, c + acquires) 设置写锁的状态
             */
            
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;   
    }
小结

写锁的获取基本和RenntrantLock 类似
判断当前是否有线程持有写锁(写锁的状态是否为0)
写锁的状态不为0,如果存在读锁或者写锁不是自己持有则直接返回fasle。
判断申请写锁数量是否超标(> 65535),超标则直接异常,反之则设置共享状态。
写锁状态为0,如果写锁需要阻塞或者CAS设置共享状态失败,则直接返回false,否则获取锁成功,设置持锁线程为自己。

来张图加深下理解

图片描述

  • 写锁的释放

写锁的释放和ReentrantLock 极为相似, 每次释放就是状态减1 ,当状态为0表示释放成功。

写锁释放的入口WriteLock中的unlock方法

public void unlock() {
      sync.release(1)
 }

Sync 中release方法由AQS中实现的

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease(arg) 方法释放共享状态,非常简单就是共享状态减1,为0表示释放成功

protected final boolean tryRelease(int releases) {
            // 判断锁持有者是否是自己
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
             // 共享状态值 - release
            int nextc = getState() - releases;
            // 判断写锁数量是否为0
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }
小结

写锁的释放很简单

  • 首先判断锁持有者不是自己则直接异常
  • 是自己则将共享状态 -1
  • 判断写锁数量是否为0,如果为0将持有锁线程变量设为null
  • 设置共享状态

来张图加深下理解

图片描述

五、读锁的获取与释放


读锁为一个可重入的共享锁,它能够被多个线程同时持有,在没有其他写线程访问时,读锁总是获取成功,所需要的也就是(线程安全的)增加读状态。

  • 读锁的获取

读锁的获取可以通过ReadLock.lock()方法。

public void lock() {
    //读锁是一个可重入共享锁,委托给内部类Sync实现
     sync.acquireShared(1);
}

Sync的acquireShared(1)方法定义在AQS中

public final void acquireShared(int arg) {
        // AQS 中 尝试获取共享状态,如果共享状态大于等于0则说明获取锁成功,否则加入同步队列。
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

tryAcquireShared(int unused)方法中,如果其他线程获取了写锁,则读锁获取失败线程将进入等待状态,如果当前线程获取写锁或者写锁未被获取则利用CAS(线程安全的)增加同步状态,成功则获取锁。

protected final int tryAcquireShared(int unused) {
            
            Thread current = Thread.currentThread();
            // 获取共享状态
            int c = getState();
            // 判断是否有写锁 && 持有写锁的线程是否是自己,为true直接返回-1 
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
             //获取共享资源的数量
            int r = sharedCount(c);
           
           /**
              * readerShouldBlock():判断锁是否需要等待(公平锁原则)
              * r < MAX_COUNT:判断锁的数量是否超过最大值65535
              * compareAndSetState(c, c + SHARED_UNIT): 设置共享状态(读锁状态)
              */
              
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                // r==0 :当前没有任何线程获取读锁
                if (r == 0) {
                    // 设置当前线程为第一个获取读锁的线程
                    firstReader = current;
                    // 计数设置为1
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    // 表示重入锁,在计数其上+1
                    firstReaderHoldCount++;
                } else {
                    
                    /**
                     *  HoldCounter 主要是一个类来记录线程获取锁的数量
                     *  cachedHoldCounter 缓存的是最后一个获取锁线程的HoldCounter对象
                     */
                     
                    HoldCounter rh = cachedHoldCounter;
                    // 如果缓存不存在,或者线程不是自己
                    if (rh == null || rh.tid != getThreadId(current))
                        // 从当前线程本地变量ThreadLocalHoldCounter 中获取HoldCounter 并赋值给 cachedHoldCounter, rh
                        cachedHoldCounter = rh = readHolds.get();
                     // 如果缓存的HoldCounter 是当前的线程的,且计数为0 
                    else if (rh.count == 0)
                        // 将rh 存到ThreadLocalHoldCounter 中,将计数+1 
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            /**
             * 进入fullTryAcquireShared(current) 条件
             * 1: readerShouldBlock()  = true  
             * 2: r < MAX_COUNT = false  读锁达到最大
             * 3: 设置共享状态失败
            return fullTryAcquireShared(current);
        }

NonfairSync 中的 readerShouldBlock() 方法判断当前申请读锁的线程是否需要阻塞

final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }

apparentlyFirstQueuedIsExclusive() 判断同步队列中老二节点是否是独占式(获取写锁请求)是返回ture 否则返回false

final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        // 主要条件判断下一个节点是否是获取写锁线程在排队
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }

自旋来获取读锁,个人感觉对tryAcquireShared(int unused) 方法获取读锁失败的一种补救,其实现逻辑基本相同。

final int fullTryAcquireShared(Thread current) {
            // 线程内部计数器
            HoldCounter rh = null;
            // 自旋
            for (;;) {
                // 获取共享状态
                int c = getState();
               
               /**
                 * exclusiveCount(c) !=0:存在独占锁(写锁)
                 * getExclusiveOwnerThread() != current 判断是否是自己持有写锁
                 * 再次是写锁是否是自己
                 */
                 
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                   
                } else if (readerShouldBlock()) {//判断读锁是否需要阻塞
                    // 如果需要阻塞,表示除了当前线程持有写锁外,还有其他线程在等待获取写锁,故,即使申请读锁的线程已经持有写锁(写锁内部再次申请读锁,俗称锁降级)还是会失败,因为有其他线程也在申请写锁,此时,只能结束本次申请读锁的请求,转而去排队,否则,将造成死锁
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        // 到这里其实就写锁的一个让步, 清楚HoldCounter 缓存
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                // 下面逻辑和tryAcquireShared(int unused) 基本相同不再解释了
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }
小结

读锁的获取稍微有点复杂,整个过程如下

  • 如果其他线程获取了写锁、则获取读锁失败。
  • 如果当前线程获取到了写锁或者写锁未被获取则利用CAS(线程安全的)增加读锁状态
  • 否则 fullTryAcquireShared(Thread current) 自旋方式再次来尝试获取。

读锁获取流程图如下

图片描述

  • 读锁的释放

读锁的释放通过ReadLock的unlock()方式释放的。

public void unlock() {
            sync.releaseShared(1);
        }

Sync的releaseShared(1)同样定义在AQS中

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

调用tryReleaseShared(int unused) 方法来释放共享状态。

protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            //判断当前线程释放是第一个获取读锁的线程
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                // 判断获取锁的次数释放为1,如果为1说明没有重入情况,直接释放firstReader = null;否则将该线程持有锁的数量 -1
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                // 如果当前线程不是第一个获取读锁的线程。
                
                // 获取缓存中的HoldCounter
                HoldCounter rh = cachedHoldCounter;
                // 如果缓存中的HoldCounter 不属于当前线程则获取当前线程的HoldCounter。
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    // 如果线程持有锁的数量小于等1 直接删除HoldCounter
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                // 持有锁数量大于1 则执行 - 1操作
                --rh.count;
            }
            // 自旋释放同步状态
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }
小结

锁的释放比较简单,

首先看当前线程是否是第一个获取读锁的线程,如果是并且没有发生重入,则将首次获取读锁变量设为null, 如果发生重入,则将首次获取读锁计数器 -1

其次 查看缓存中计数器是否为空或者是否是当前线程,如果为空或者不是则获取当前线程的计数器,如果计数器个数小于等1, 从ThreadLocl 中删除计数器,并计数器值-1,如果小于等于0异常 。

最后自旋修改同步状态。

读锁释放流程图如下

图片描述

六、总结


通过上面的源码分析,我们来总结下:

在线程持有读锁的情况下,该线程不能取得写锁(为了保证写操作对后续所有的读操作保持可见性)。

在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。

仔细想想,这个设计是合理的:因为当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读

一个线程要想同时持有写锁和读锁,必须先获取写锁再获取读锁;写锁可以“降级”为读锁;读锁不能“升级”为写锁。

因技术水平有限,如有不对的地方,欢迎拍砖

Java Lock接口分析之ReentantReadWriteLock

Java Lock接口分析之ReentantReadWriteLock

ReentantReadWriteLock读写锁,在读线程多余写线程的并发环境中能体现出优异的性能,相比于synchronized与ReentrantLock这种独占式锁的模型,ReentantReadWriteLock采用独占式写锁与共享式读锁的方式,大大提高了针对于读多写少的多线程环境的系统性能。

在分析ReentantReadWriteLock源码前,我们需要先熟悉下独占锁与共享锁的基本模型。

独占锁与共享锁的基本模型

共享式取与独占式取最主要的区在于同一刻能否有多个线程同时获取到同步状。以文件的例,如果一个程序在文件操作,那么文件的写操作均被阻塞,而操作能时进行。写操作要求对资源的独占式访问,而操作可以是共享式访问,两种不同的访问模式在同一文件或源的访问情况

中,左半部分,共享式访问资,其他共享式的访问均被允,而独占式访问被阻塞,右半部分是独占式访问资,同一刻其他访问均被阻塞。

ReentrantReadWriteLock的结构

下面我们来消息分析下ReentantReadWriteLock的实现过程:

ReentrantReadWriteLock并没有继承ReentrantLock,也并没有实现Lock接口,而是实现了ReadWriteLock接口,该接口提供readLock()方法获取读锁,writeLock()获取写锁。

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {

    private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;

    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
}

public interface ReadWriteLock {
    
    Lock readLock();

    Lock writeLock();
}

默认构造方法为非公平模式 ,开发者也可以通过指定fair为true设置为 公平模式。

    public ReentrantReadWriteLock() {
        this(false);
    }

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    public static class ReadLock implements Lock, java.io.Serializable {}
    public static class WriteLock implements Lock, java.io.Serializable {}

而公平模式和非公平模式分别由内部类FairSync和NonfairSync实现,这两个类继承自另一个内部类Sync,该Sync继承自AbstractQueuedSynchronizer(以后简称 AQS ),这里基本同ReentrantLock的内部实现一致。

abstract static class Sync extends AbstractQueuedSynchronizer {
}

static final class FairSync extends Sync {
}

static final class NonfairSync extends Sync {
}

而ReentrantReadWriteLock针对FairSync于NonfairSync也有着自己的内部类实现,与ReentrantLock一样。

ReentrantReadWriteLock的锁标记实现state

在ReentrantLock的分析中得知,其独占性和重入性都是通过CAS操作维护AQS内部的state变量实现的。而对于ReentantReadWriteLock因为要维护两个锁(读/写),但是同步状态state只有一个,所以ReentantReadWriteLock采用“按位切割”的方式,所谓“按位切割”就是将这个32位的int型state变量分为高16位和低16位来使用,高16位代表读状态,低16位代表写状态。

高16位 低16位
读状态 写状态
0000 0000 0000 0011 0000 0000 0000 0000

上面的一个32位的int表示有3个线程获取了读锁,0个线程获取了写锁

读/写锁如何确定和改变状态的呢?答案是位运算 !
假设当前同步状态为state

//读状态:无符号右移16位
state >>> 16
//写状态:高16位都和0按位与运算,抹去高16位
state & Ox0000FFFF
//读状态加1
state + (1 << 16)
//写状态加1
state + 1
//判断写状态大于0,也就是写锁是已经获取
state & Ox0000FFFF > 0
//判断读状态大于0,也就是读锁是已经获取
state != 0 && (state & Ox0000FFFF == 0)

在ReentrantReadWriteLock的源码中我们能看到关于锁标记位state的变量

//定义一个偏移量
static final int SHARED_SHIFT   = 16;
//1个读锁读锁单位
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);  //Ox00010000
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1; //Ox0000FFFF读锁上限
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //Ox0000FFFF用于抹去高16位

//返回读状态
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
//返回写状态
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

首先,我们先来分析写锁的获取与释放

写锁的获取

首先,写锁WriteLock的lock方法如下

public void lock() {
    sync.acquire(1);
}

 acquire方法实现如下,通过远吗我们可以看出,acquire方法首先会调用tryAcquire方法尝试获取写锁,如果获取成功则返回,失败则会调用addWaiter方法将线程添加到CLH队列末尾,并调用acquireQueued方法阻塞当前线程,我们可以看到addWaiter依旧是一独占的模式添加的节点,此处与ReentrantLock实现一样具体addWaiter方法与acquireQueued方法独占式的实现可详见我的一篇博客Synchronized与Lock的底层实现解析

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这里主要讲解tryAcquire方法,代码如下:

protected final boolean tryAcquire(int acquires) {
    //获取当前线程
    Thread current = Thread.currentThread();
    //获取当前同步状态
    int c = getState();
    //获取写锁状态(w>0表示已经有线程获取写锁)
    int w = exclusiveCount(c);
    //如果同步状态不为0,说明有线程已经获取到了同步状态,可能是读锁,可能是写锁
    if (c != 0) {
        //写锁状态0(表示有线程已经获取读锁(共享锁获取时阻塞独占锁))或者当前线程不是已经获取写锁的线程(独占锁只允许自己持有锁)
        //返回false
	    //此处的处理逻辑也间接验证了获取了读锁的线程不能同时获取写锁
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        //大于最大线程数则抛出错误
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //如果写锁状态>0并且当前线程为尺有所线程,则表示写锁重入,以重入锁的方式设置同步状态(写状态直接加)
        //返回true
        setState(c + acquires);
        return true;
    }
    //如果同步状态等于0
    //在尝试获取同步状态之前先调用writerShouldBlock()写等待策略
    //ReentrantReadWriteLock中通过FairSync(公平锁)和NonfairSync(非公品锁)重写writerShouldBlock()方法来达到公平与非公平的实现
    //NonfairSync(非公品锁)中直接返回false表示不进行阻塞直接获取
    //FairSync(公平锁)中需调用hasQueuedPredecessors()方法判断当前线程节点是否为等待队列的head结点的后置节点,是才可以获取锁
    //获取成功则将当前线程设置为持有锁线程,并返回true
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

其实写锁的实现与ReentrantLock差别不大,主要区别在于state的判断,在获取写锁的时候需要判断读锁的状态,并且

if (w == 0 || current != getExclusiveOwnerThread())
            return false;

这段代码的逻辑也验证了读锁不能升级写锁的原因,当写锁个数不为0,并且持有写锁的线程不是当前线程,直接返回false,阻塞了其他线程抢占写锁(当然包括持有读锁的线程)

写锁的释放

public final boolean release(int arg) {
	if (tryRelease(arg)) {
	    Node h = head;
	    if (h != null && h.waitStatus != 0)
		unparkSuccessor(h);
	    return true;
	}
	return false;
}

 这里写锁的释放逻辑与ReentrantLock一样,先tryRelease释放锁,直到写锁state等于0(所有重入锁都释放),唤醒后续节点线程

protected final boolean tryRelease(int releases) {
    //当前线程不是获取了同步状态的线程则抛出异常
    if (!isHeldExclusively())
	    throw new IllegalMonitorStateException();
    //释放一次锁,则将状态-1
    int nextc = getState() - releases;
    //判断写状态是否为0,当写锁的所有重入都释放完锁后,状态归为0
    boolean free = exclusiveCount(nextc) == 0;
    //如果释放完成,将持有锁的线程设置为null
    if (free)
	    setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

写锁的读取和释放还是很好理解的,尤其针对比较熟悉ReentrantLock逻辑的人更好理解,但是读锁的实现相对来说就比较复杂了

读锁的获取

读锁ReadLock的lock方法

public void lock() {
    sync.acquireShared(1);
}

 acquireShared方法实现如下,可以看到读锁的lock方法是基于AQS共享锁实现的

public final void acquireShared(int arg) {
	if (tryAcquireShared(arg) < 0)
	    doAcquireShared(arg);
	}
}

acquireShared方法的逻辑为首先调用tryAcquireShared尝试获取写锁,如果获取失败(返回结果<0)则调用doAcquireShared阻塞该线程,tryAcquireShared方法与doAcquireShared方法我们逐个分析

首先我们需要先了解ReentrantReadWriteLock中的一些变量,方便我们后续理解代码

读锁的获取相对比较复杂,因为读锁还有类似于获取当前线程获得的锁的个数等方法。

//用于记录每个线程获取到的锁的数量
//使用id和count记录
static final class HoldCounter {
    int count = 0;
    final long tid = getThreadId(Thread.currentThread());
}
//这里使用了ThreadLocal为每个线程都单独维护了一个HoldCounter来记录获取的锁的数量
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

//获取到的读锁的数量
private transient ThreadLocalHoldCounter readHolds;
//最后一次成功获取到读锁的线程的HoldCounter对象
private transient HoldCounter cachedHoldCounter;
//第一个获取到读锁的线程
private transient Thread firstReader = null;
//第一个获取到读锁的线程拥有的读锁数量
private transient int firstReaderHoldCount;

上面的4个变量,其实就是完成一件事情,将获取读锁的线程放入线程本地变量(ThreadLocal),方便从整个上 下文,根据当前线程获取持有锁的次数信息。其实 firstReader,firstReaderHoldCount ,cachedHoldCounter 这三个变量就是为readHolds变量服务的,是一个优化手段,尽量减少直接使用readHolds.get方法的次数,firstReader与firstReadHoldCount保存第一个获取读锁的线程,也就是readHolds中并不会保存第一个获取读锁的线程;cachedHoldCounter 缓存的是最后一个获取线程的HolderCount信息,该变量主要是在如果当前线程多次获取读锁时,减少从readHolds中获取HoldCounter的次数。

tryAcquireShared方法

protected final int tryAcquireShared(int unused) {
    //获取当前线程
    Thread current = Thread.currentThread();
    //获取同步状态
    int c = getState();
    //如果已经有写锁被获取并且获取写锁的线程不是当前线程则获取失败
    //此处判断逻辑隐含了一个条件,就是当有写锁获取并且是获取写锁的是当前线程,那么不返回-1,允许此写锁获取读锁
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    //获取读状态(读锁被获取的数量)
    int r = sharedCount(c);
    //根据是否是公平锁来判断是否需要进入阻塞,同时判断当前读锁数量是否小于读锁允许最大数量(0xFFFF个),并进行一次CAS获取
    //一个线程获取也好,多个线程争抢也好,if中的一次CAS获取能够保证只有一个线程获取到读锁
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //如果是第一个获取读状态的线程
        if (r == 0) {
            //设置firstReader和firstReaderHoldCount
            firstReader = current;
            firstReaderHoldCount = 1;
        //如果当前线程和第一个获取读锁的线程是同一个线程那么它的获取的读锁数量加1(可见读锁也是一个重入锁)
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        //是别的线程
        } else {
            //获取最后一次获取到读状态的线程
            HoldCounter rh = cachedHoldCounter;
            //rh == null(当前线程是第二个获取的),或者当前线程和rh不是同一个
	    //那么获取到当前线程的HoldCounter并设置到cachedHoldCounter
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            //如果rh就是当前线程的HoldCounter并且当前线程获取到的读状态位0那么给当前线程的HoldCounter设置为rh
            else if (rh.count == 0)
                readHolds.set(rh);
            //获取到的读锁数加1
            rh.count++;
        }
        return 1;
    }
    //获取读锁失败调用该方法进行CAS循环获取
    return fullTryAcquireShared(current);
}

fullTryAcquireShared方法的解读

//第一次获取读锁失败,有两种情况:
//1)没有写锁被占用时,尝试通过一次CAS去获取锁时,更新失败(说明有其他读锁在申请)
//2)当前线程占有读锁,并且有其他写锁在当前线程的下一个节点等待获取写锁,最后在fullTryAcquireShared中获取到读锁
//3)当前线程占有写锁,并且有其他写锁在当前线程的下一个节点等待获取写锁,除非当前线程的下一个节点被取消,否则fullTryAcquireShared也获取不到读锁fullTryAcquireShared也获取不到读锁
final int fullTryAcquireShared(Thread current) {

    HoldCounter rh = null;
    //自旋
    for (;;) {
        int c = getState();
        //如果已经有写锁被获取
        if (exclusiveCount(c) != 0) {
            //如果获取写锁的线程不是当前线程则获取失败
            if (getExclusiveOwnerThread() != current)
                return -1;
            //如果获取写锁的线程是当前线程则继续保持这个写锁,并且这个写锁可以在获取读锁
        //如果此时应该进入阻塞
        } else if (readerShouldBlock()) {
            // Make sure we''re not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                //第一次循环
                if (rh == null) {
		    //获取最后一次获取到读状态的线程
                    rh = cachedHoldCounter;
		    //rh == null(当前线程是第二个获取的),或者当前线程和rh不是同一个
		    //那么获取到当前线程的HoldCounter并设置到cachedHoldCounter
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        //如果当前线程的读锁为0就remove,因为后面会set
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                //不是第一次循环
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //尝试CAS设置同步状态
        //后续操作和tryAquireShared基本一致
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

readerShouldBlock阻塞判断方法的解读

/**
*  非公平锁的读锁获取策略
*/ 
final boolean readerShouldBlock() {
    //如果当前线程的后续节点为独占式写线程,则返回true(表示当前线程在tryAcquireShared方法中不能立刻获取读锁,需要后续通过fullTryAcquireShared方法取判断是否需要阻塞线程)
    //在fullTryAcquireShared方法中会通过判断当前获取读锁线程的读锁数量来判断当前尝试获取读锁的线程是否持有写锁,如果持有写锁辨明所降级,需要将当前锁降级的线程添加到阻塞队列中重新获取读锁
    //这么做是为了让后续的写线程有抢占写锁的机会,不会因为一直有读线程或者锁降级情况的存在而造成后续写线程的饥饿等待
    return apparentlyFirstQueuedIsExclusive();
}

final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

/**
*  公平锁的读锁获取策略
*/  
final boolean readerShouldBlock() {
    //如果当前线程不是同步队列头结点的next节点(head.next) (判断是否有前驱节点,如果有则返回false,否则返回true。遵循FIFO)
    //则阻塞当前线程 
    return hasQueuedPredecessors();
}

public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

doAcquireShared方法

/**
* 跟独占锁很像,只不过共享锁初始化时有传入一个count,count为1
*/
private void doAcquireShared(int arg) {
	//把当前线程封装到一个SHARE类型Node中,添加到SyncQueue尾巴上
	final Node node = addWaiter(Node.SHARED);
	try {
	    boolean interrupted = false;
	    for (;;) {
		final Node p = node.predecessor();
		if (p == head) {//前继节点是head节点,下一个就到自己了
		    int r = tryAcquireShared(arg);//非公平锁实现,再尝试获取锁
		    //state==0时tryAcquireShared会返回>=0(CountDownLatch中返回的是1)。state为0说明共享次数已经到了,可以获取锁了
		    //注意上面说的, 等于0表示不用唤醒后继节点,大于0需要
		    if (r >= 0) {//r>0表示state==0,前继节点已经释放锁,锁的状态为可被获取
			setHeadAndPropagate(node, r);//这一步设置node为head节点设置node.waitStatus->Node.PROPAGATE,然后唤醒node.thread
			//唤醒head节点线程后,从这里开始继续往下走
			p.next = null; //head已经指向node节点,oldHead.next索引置空,方便p节点对象回收
			if (interrupted)
			    selfInterrupt();
			return;
		    }
		}
		//前继节点非head节点,将前继节点状态设置为SIGNAL,通过park挂起node节点的线程
		if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
		    interrupted = true;
	    }
	} catch (Throwable t) {
	    cancelAcquire(node);
	    throw t;
	}
}

setHeadAndPropagate方法

/**
 * 把node节点设置成head节点,且node.waitStatus->Node.PROPAGATE
 */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;//h用来保存旧的head节点
    setHead(node);//head引用指向node节点
    /* 这里意思有两种情况是需要执行唤醒操作
     * 1.propagate > 0 表示调用方指明了后继节点需要被唤醒
     * 2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点*/
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;	
        if (s == null || s.isShared())//node是最后一个节点或者 node的后继节点是共享节点
	    /* 如果head节点状态为SIGNAL,唤醒head节点线程,重置head.waitStatus->0
	     * head节点状态为0(第一次添加时是0),设置head.waitStatus->Node.PROPAGATE表示状态需要向后继节点传播
	     */
	    doReleaseShared();//对于这个方法,其实就是把node节点设置成Node.PROPAGATE状态
    }
}

doReleaseShared方法

/** 
 * 把当前结点设置为SIGNAL或者PROPAGATE
 * 唤醒head.next(B节点),B节点唤醒后可以竞争锁,成功后head->B,然后又会唤醒B.next,一直重复直到共享节点都唤醒
 * head节点状态为SIGNAL,重置head.waitStatus->0,唤醒head节点线程,唤醒后线程去竞争共享锁
 * head节点状态为0,将head.waitStatus->Node.PROPAGATE传播状态,表示需要将状态向后继节点传播
 */
private void doReleaseShared() {
	for (;;) {
		Node h = head;
		if (h != null && h != tail) {
			int ws = h.waitStatus;
			if (ws == Node.SIGNAL) {//head是SIGNAL状态
			   /* head状态是SIGNAL,重置head节点waitStatus为0,这里不直接设为Node.PROPAGATE,
			    * 是因为unparkSuccessor(h)中,如果ws < 0会设置为0,所以ws先设置为0,再设置为PROPAGATE
			    * 这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
			    */
			    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
				    continue;//设置失败,重新循环
				    /* head状态为SIGNAL,且成功设置为0之后,唤醒head.next节点线程
				     * 此时head、head.next的线程都唤醒了,head.next会去竞争锁,成功后head会指向获取锁的节点,
				     * 也就是head发生了变化。看最底下一行代码可知,head发生变化后会重新循环,继续唤醒head的下一个节点
				     */
				    unparkSuccessor(h);
			/*
			 * 如果本身头节点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。
			 * 意味着需要将状态向后一个节点传播
			 */
			} 
			else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
			    continue;
		}
		if (h == head)//如果head变了,重新循环
		break;
	}
}

读锁的释放

releaseShared方法

public final boolean releaseShared(int arg) {
	//只有共享锁完全释放,才能调用下面的doReleaseShared方法唤醒head节点的后继节点
	//加入多个读锁同时在获取到了读锁,即使不按顺序释放锁,也不会影响head后继节点的唤醒
	//因为共享锁可以有多个线程组成,但是释放锁的条件只有一个,就是读锁标记为0,
	//即使最后释放锁的节点不是head,但是也能保证head后继节点正常被唤醒
	if (tryReleaseShared(arg)) {
	    //此处的doReleaseShared方法与setHeadAndPropagate方法中锁唤醒的节点有所差别
	    //setHeadAndPropagate方法只唤醒head后继的共享锁节点
	    //doReleaseShared方法则会唤醒head后继的独占锁或共享锁
	    doReleaseShared();
	    return true;
	}
	return false;
}

 releaseShared方法首先会调用tryReleaseShared方法尝试释放共享锁,成功的话会唤醒head节点的后继节点

tryReleaseShared方法

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //如果当前线程是第一个获取读锁的线程
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        //如果第一个获取读锁的线程只获取了一个锁那么firstReader=null
        //否则firstReaderHoldCount--
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        //如果当前线程不是第一个获取读锁的线程
        HoldCounter rh = cachedHoldCounter;
        //获取当前线程的HoldCounter
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            //当前线程获取的读锁小于等于1那么就将remove当前线程的HoldCounter
            readHolds.remove();
            //当前线程获取的读锁小于等于0抛出异常
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        //当前线程拥有的读锁数量减1
        --rh.count;
    }
    //自旋
    for (;;) {
        int c = getState();
        //释放后的同步状态
        int nextc = c - SHARED_UNIT;
        //CAS设置同步状态,成功则返回是否同步状态为0
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

tryReleaseShared方法的逻辑在于将读锁的标记位-1,并且同时将firstReader和非firstReader的获取锁的次数-1,直到读锁标记位为0时表示读锁释放完毕

锁降级

锁降级指的是先获取到写锁,然后获取到读锁,然后释放了写锁的过程。 
因为在获取读锁的时候的判断条件是:

if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;

所以当前线程是可以在获取了写锁的情况下再去获取读锁的。 
那么在写锁释放了之后应该还能继续持有读锁。

最后:锁是不支持锁升级的(先获取写锁,再获取读锁然后释放读锁), 
因为第一步获取读锁的时候可能有多个线程获取了读锁,这样如果锁升级的话将会导致写操作对其他已经获取了读锁的线程不可见。
 

参考:

https://blog.csdn.net/LightOfMiracle/article/details/73184755

https://www.cnblogs.com/zaizhoumo/p/7782941.html

https://blog.csdn.net/u010577768/article/details/79995811

Java 多线程:Lock 接口(接口方法分析,ReentrantLock,ReadWriteLock

Java 多线程:Lock 接口(接口方法分析,ReentrantLock,ReadWriteLock

前言


当我们了解了多线程生成的原因之后,会有相应的解决办法,最典型的就是 synchronized 和 lock。lock 可以说是 synchronized 的一个替代品,synchronized 能做的事,lock 基本都可以做,而且能做得更好。他们的一些区别是:

  • lock 在获取锁的过程可以被中断。
  • lock 可以尝试获取锁,如果锁被其他线程持有,则返回 false,不会使当前线程休眠。
  • lock 在尝试获取锁的时候,传入一个时间参数,如果在这个时间范围内,没有获得锁,那么就是终止请求。
  • synchronized 会自动释放锁,lock 则不会自动释放锁。

这样可以看到,lock 比起 synchronized 具有更细粒度的控制。但是也不是说 lock 就完全可以取代 synchronized,因为 lock 的学习成本,复杂度等方面要比 synchronized 高,对于初级 java 程序员,使用 synchronized 的风险要比 lock 低。

目录


  • Lock 接口方法分析
  • RentrantLock
  • ReadWriteLock

Java Lock 接口源码分析


Lock 接口方法如下:

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

lock,unlock 方法

lock () 可以用于对一段代码进行加锁,这样别的代码在锁释放之前需要进行等待,需要注意,lock 不会像 synchronized 那样自动释放锁,所以:一定要放在 try-finally 块中,保证锁的释放。 例如:

try {
    lock.lock();
    ......
} finally {
    lock.unlock();  
}

tryLock 方法

  • tryLock ():尝试获得锁,如果成功,返回 true,否则,返回 false。
  • tryLock (long time,TimeUnit unit):在一定的时间内尝试获得锁,并且在这段时间直接可以被打断。如果成功获得,那么将返回 true,否则,返回 false。

lockInterruptibly 方法

这里首先需要了解两个概念才能更好的理解这个方法:

  • 线程的打扰机制
  • Thread 类的 interrupt,interrupted,isInterrupted 方法的区别

对于线程的打扰机制,每个线程都有一个打扰标志。

  • 如果线程在 sleep 或 wait,join,此时如果别的进程调用此进程的 interrupt()方法,此线程会被唤醒并被要求处理 InterruptedException;
  • 如果线程在运行,则不会收到提醒。但是此线程的 “打扰标志” 会被设置。

所以说,对于 interrupt () 方法:不会中断一个正在运行的线程。

对于 interrupt,interrupted,isInterrupted 方法的区别:

interrupt 方法上面有说到了。对于 interrupted 和 isInterrupted 方法,stackoverflow 说得很好了:

interrupted() is static and checks the current thread. isInterrupted() is an instance method which checks the Thread object that it is called on.

A common error is to call a static method on an instance.

Thread myThread = ...; if (myThread.interrupted()) {} // WRONG! This might not be checking myThread. if (myThread.isInterrupted()) {} // Right!

Another difference is that interrupted() also clears the status of the current thread. In other words, if you call it twice in a row and the thread is not interrupted between the two calls, the second call will return false even if the first call returned true.

The Javadocs tell you important things like this; use them often!

下面再介绍下 lockInterruptibly 方法:

当通过这个方法去获取锁时,如果线程正在等待获取锁,则这个线程能够响应中断,即中断线程的等待状 态。例如当两个线程同时通过 lock.lockInterruptibly () 想获取某个锁时,假若此时线程 A 获取到了锁,而线程 B 只有在等待,那么对线程 B 调用 threadB.interrupt () 方法能够中断线程 B 的等待过程。

newCondition()

用于获取一个 Conodition 对象。Condition 对象是比 Lock 更细粒度的控制。要很好的理解 condition,个人觉得必须要知道,生产者消费者问题。

简单来说就是,我们都了解生产者在缓冲区满了的时候需要休眠,此时会再唤起一个线程,那么你此时唤醒的是生成者还是消费者呢,如果是消费者,很好;但是如果是唤醒生产者,那还要再休眠,此时就浪费资源了。condition 就可以用来解决这个问题,能保证每次唤醒的都是消费者。具体参考:Java 多线程:condition 关键字

lock 方法大体就介绍到这里。

ReentrantLock


可重入锁:指同一个线程,外层函数获得锁之后,内层递归函数仍有获得该锁的代码,但是不受影响。

** 可重入锁的最大作用就是 可以避免死锁。** 例如:A 线程有两个方法 a 和 b,其中 a 方法会调用 b 方法,假如 a,b 两个方法都需要获得锁,那么首先 a 方法先执行,会获得锁,此时 b 方法将永远获得不了锁,b 方法将一直阻塞住, a 方法由于 b 方法没有执行完,它本身也 不释放锁,此时就会造成一个死锁。 ReentrantLock 就是一个可重入锁。真正使用锁的时候,一般是 Lock lock = new ReentrantLock ();然后 使用 Lock 接口方法。

ReadWriteLock


接口代码如下:

public interface ReadWriteLock {  
    Lock readLock();  
    Lock writeLock();  
} 

ReadWriteLock 可以算是 Lock 的一个细分,合理使用有利于提高效率。比如说, 对于一个变量 i, A,B 线程同时读,那么不会造成错误的结果,所以此时是允许并发,但是如果是同时写操作,那么则是有可能造成错误。所以真正使用的时候,可以使用细分需要的是读锁还是写锁,再相应地进行加锁。

Ps:从代码也可以看出,ReadWriteLock 和 Lock 没有关系,既不继承,也不是实现。

关于ReentrantReadWriteLock:ReadLock和WriteLock有什么区别?readwritelockslim的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于7. ReadWriteLock 接口及其实现 ReentrantReadWriteLock、J.U.C|读-写锁ReentrantReadWriteLock、Java Lock接口分析之ReentantReadWriteLock、Java 多线程:Lock 接口(接口方法分析,ReentrantLock,ReadWriteLock等相关知识的信息别忘了在本站进行查找喔。

本文标签: