GVKun编程网logo

Java中的StampedLock是什么?(java stampedlock)

15

在本文中,我们将带你了解Java中的StampedLock是什么?在这篇文章中,我们将为您详细介绍Java中的StampedLock是什么?的方方面面,并解答javastampedlock常见的疑惑,

在本文中,我们将带你了解Java中的StampedLock是什么?在这篇文章中,我们将为您详细介绍Java中的StampedLock是什么?的方方面面,并解答java stampedlock常见的疑惑,同时我们还将给您一些技巧,以帮助您实现更有效的brew install example -fs 中 -fs 用法是什么,bottle block 是什么?、Java 8新特性探究(十)StampedLock将是解决同步问题的新宠、Java 并发编程笔记之 StampedLock 锁源码探究、Java 并发(8)- 读写锁中的性能之王:StampedLock

本文目录一览:

Java中的StampedLock是什么?(java stampedlock)

Java中的StampedLock是什么?(java stampedlock)

我正在研究Java代码,我需要在其中实现线程。我正在通过JAVA 8 API,我了解了Stamped
Locks。谁能告诉我为什么在多线程中使用StampedLocks?

提前致谢。

答案1

小编典典

StampedLock是使用ReadWriteLock(由ReentrantReadWriteLock实现)的替代方法。StampedLock和ReentrantReadWriteLock之间的主要区别在于:

  • StampedLocks允许乐观锁定读取操作
  • ReentrantLocks是可重入的(StampedLocks不是)

因此,如果您遇到争用的场景(否则,您不妨使用synchronized或简单的方法Lock)并且读取者多于写入者,那么使用StampedLock可以显着提高性能。

但是,在得出结论之前,您应该基于特定的用例来衡量性能。

亨氏·卡布兹(Heinz
Kabutz)在他的时事通讯中写了有关StampedLocks的文章,并发表了有关性能的演讲。

brew install example -fs 中 -fs 用法是什么,bottle block 是什么?

brew install example -fs 中 -fs 用法是什么,bottle block 是什么?

因为要安装 mcrypt ,所以使用 brew install php56-mcrypt,但是安装完后,查看phpinfo()中没有加载mcrypt,坚持下来发现:brew install example -fs 中 -fs 用法是什么,bottle block 是什么?

安装的php开启了debug模式,而mcrypt没有,在github上找到一个issue:
https://github.com/Homebrew/h...
中 用 brew install php56-mcrypt -fs 就可以解决,实际操作确实可以解决,

issue中的解释是 Just pass -fs to your install arguments. brew install example -fs. It''ll skip the bottle block entirely.
但是没有弄清楚,加上-fs参数就是跳过`bottle block` 吗?`bottle block` 是什么?

回复内容:

因为要安装 mcrypt ,所以使用 brew install php56-mcrypt,但是安装完后,查看phpinfo()中没有加载mcrypt,坚持下来发现:brew install example -fs 中 -fs 用法是什么,bottle block 是什么?

安装的php开启了debug模式,而mcrypt没有,在github上找到一个issue:
https://github.com/Homebrew/h...
中 用 brew install php56-mcrypt -fs 就可以解决,实际操作确实可以解决,

issue中的解释是 Just pass -fs to your install arguments. brew install example -fs. It''ll skip the bottle block entirely.
但是没有弄清楚,加上-fs参数就是跳过`bottle block` 吗?`bottle block` 是什么?

bottle block是指bottle代码块,是brew用来直接下载二进制包进行安装的代码,加上-fs的意思是--build-from-source,也就是从源代码编译安装

Java 8新特性探究(十)StampedLock将是解决同步问题的新宠

Java 8新特性探究(十)StampedLock将是解决同步问题的新宠

Java8就像一个宝藏,一个小的API改进,也足与写一篇文章,比如同步,一直是多线程并发编程的一个老话题,相信没有人喜欢同步的代码,这会降低应用的吞吐量等性能指标,最坏的时候会挂起死机,但是即使这样你也没得选择,因为要保证信息的正确性。所以本文决定将从synchronized、Lock到Java8新增的StampedLock进行对比分析,相信StampedLock不会让大家失望。

synchronized

在java5之前,实现同步主要是使用synchronized。它是Java语言的关键字,当它用来修饰一个方法或者一个代码块的时候,能够保证在同一时刻最多只有一个线程执行该段代码。

有四种不同的同步块:

  1. 实例方法

  2. 静态方法

  3. 实例方法中的同步块

  4. 静态方法中的同步块

大家对此应该不陌生,所以不多讲了,以下是代码示例

synchronized(this)
// do operation
}

小结:在多线程并发编程中Synchronized一直是元老级角色,很多人都会称呼它为重量级锁,但是随着Java SE1.6对Synchronized进行了各种优化之后,性能上也有所提升。

Lock

它是Java 5在java.util.concurrent.locks新增的一个API。

Lock是一个接口,核心方法是lock(),unlock(),tryLock(),实现类有ReentrantLock, ReentrantReadWriteLock.ReadLock, ReentrantReadWriteLock.WriteLock;

ReentrantReadWriteLock, ReentrantLock 和synchronized锁都有相同的内存语义。

与synchronized不同的是,Lock完全用Java写成,在java这个层面是无关JVM实现的。Lock提供更灵活的锁机制,很多synchronized 没有提供的许多特性,比如锁投票,定时锁等候和中断锁等候,但因为lock是通过代码实现的,要保证锁定一定会被释放,就必须将unLock()放到finally{}中

下面是Lock的一个代码示例

rwlock.writeLock().lock();
try {
// do operation
} finally {
rwlock.writeLock().unlock();
}

小结:比synchronized更灵活、更具可伸缩性的锁定机制,但不管怎么说还是synchronized代码要更容易书写些

StampedLock

它是java8在java.util.concurrent.locks新增的一个API。

ReentrantReadWriteLock 在沒有任何读写锁时,才可以取得写入锁,这可用于实现了悲观读取(Pessimistic Reading),即如果执行中进行读取时,经常可能有另一执行要写入的需求,为了保持同步,ReentrantReadWriteLock 的读取锁定就可派上用场。

然而,如果读取执行情况很多,写入很少的情况下,使用 ReentrantReadWriteLock 可能会使写入线程遭遇饥饿(Starvation)问题,也就是写入线程吃吃无法竞争到锁定而一直处于等待状态。

StampedLock控制锁有三种模式(写,读,乐观读),一个StampedLock状态是由版本和模式两个部分组成,锁获取方法返回一个数字作为票据stamp,它用相应的锁状态表示并控制访问,数字0表示没有写锁被授权访问。在读锁上分为悲观锁和乐观锁。

所谓的乐观读模式,也就是若读的操作很多,写的操作很少的情况下,你可以乐观地认为,写入与读取同时发生几率很少,因此不悲观地使用完全的读取锁定,程序可以查看读取资料之后,是否遭到写入执行的变更,再采取后续的措施(重新读取变更信息,或者抛出异常) ,这一个小小改进,可大幅度提高程序的吞吐量!!

下面是java doc提供的StampedLock一个例子

class Point {
   private double x, y;
   private final StampedLock sl = new StampedLock();
   void move(double deltaX, double deltaY) { // an exclusively locked method
     long stamp = sl.writeLock();
     try {
       x += deltaX;
       y += deltaY;
     } finally {
       sl.unlockWrite(stamp);
     }
   }
  //下面看看乐观读锁案例
   double distanceFromOrigin() { // A read-only method
     long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
     double currentX = x, currentY = y; //将两个字段读入本地局部变量
     if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
        stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁
        try {
          currentX = x; // 将两个字段读入本地局部变量
          currentY = y; // 将两个字段读入本地局部变量
        } finally {
           sl.unlockRead(stamp);
        }
     }
     return Math.sqrt(currentX * currentX + currentY * currentY);
   }
//下面是悲观读锁案例
   void moveIfAtOrigin(double newX, double newY) { // upgrade
     // Could instead start with optimistic, not read mode
     long stamp = sl.readLock();
     try {
       while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
         long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
         if (ws != 0L) { //这是确认转为写锁是否成功
           stamp = ws; //如果成功 替换票据
           x = newX; //进行状态改变
           y = newY; //进行状态改变
           break;
         }
         else { //如果不能成功转换为写锁
           sl.unlockRead(stamp); //我们显式释放读锁
           stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试
         }
       }
     } finally {
       sl.unlock(stamp); //释放读锁或写锁
     }
   }
 }

小结:

StampedLock要比ReentrantReadWriteLock更加廉价,也就是消耗比较小。

StampedLock与ReadWriteLock性能对比

下图是和ReadWritLock相比,在一个线程情况下,是读速度其4倍左右,写是1倍。

下图是六个线程情况下,读性能是其几十倍,写性能也是近10倍左右:

下图是吞吐量提高:

总结

1、synchronized是在JVM层面上实现的,不但可以通过一些监控工具监控synchronized的锁定,而且在代码执行时出现异常,JVM会自动释放锁定;

2、ReentrantLock、ReentrantReadWriteLock,、StampedLock都是对象层面的锁定,要保证锁定一定会被释放,就必须将unLock()放到finally{}中;

3、StampedLock 对吞吐量有巨大的改进,特别是在读线程越来越多的场景下;

4、StampedLock有一个复杂的API,对于加锁操作,很容易误用其他方法;

5、当只有少量竞争者的时候,synchronized是一个很好的通用的锁实现;

6、当线程增长能够预估,ReentrantLock是一个很好的通用的锁实现;

StampedLock 可以说是Lock的一个很好的补充,吞吐量以及性能上的提升足以打动很多人了,但并不是说要替代之前Lock的东西,毕竟他还是有些应用场景的,起码API比StampedLock容易入手,下篇博文争取更新快一点,可能会是Nashorn的内容,这里允许我先卖个关子。。。





Java 并发编程笔记之 StampedLock 锁源码探究

Java 并发编程笔记之 StampedLock 锁源码探究

StampedLock 是 JUC 并发包里面 JDK1.8 版本新增的一个锁,该锁提供了三种模式的读写控制,当调用获取锁的系列函数的时候,会返回一个 long 型的变量,该变量被称为戳记(stamp), 这个戳记代表了锁的状态。

try 系列获取锁的函数,当获取锁失败后会返回为 0 的 stamp 值。当调用释放锁和转换锁的方法时候需要传入获取锁时候返回的 stamp 值。

StampedLockd 的内部实现是基于 CLH 锁的,CLH 锁原理:锁维护着一个等待线程队列,所有申请锁且失败的线程都记录在队列。一个节点代表一个线程,保存着一个标记位 locked, 用以判断当前线程是否已经释放锁。当一个线程试图获取锁时,从队列尾节点作为前序节点,循环判断所有的前序节点是否已经成功释放锁。

如下图所示:

 

我们首先看 Stampedlock 有哪些属性先,源码如下:

  private static final long serialVersionUID = -6001602636862214147L;

    /** 获取服务器CPU核数 */
    private static final int NCPU = Runtime.getRuntime().availableProcessors();

    /** 线程入队列前自旋次数 */
    private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;

    /** 队列头结点自旋获取锁最大失败次数后再次进入队列 */
    private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;

    /** 重新阻塞前的最大重试次数 */
    private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;

    /** The period for yielding when waiting for overflow spinlock */
    private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1

    /** 溢出之前用于阅读器计数的位数 */
    private static final int LG_READERS = 7;

    // 锁定状态和stamp操作的值
    private static final long RUNIT = 1L;
    private static final long WBIT  = 1L << LG_READERS;
    private static final long RBITS = WBIT - 1L;
    private static final long RFULL = RBITS - 1L;
    private static final long ABITS = RBITS | WBIT;   //前8位都为1
    private static final long SBITS = ~RBITS; // 1 1000 0000

    //锁state初始值,第9位为1,避免算术时和0冲突
    private static final long ORIGIN = WBIT << 1;

    // 来自取消获取方法的特殊值,因此调用者可以抛出IE
    private static final long INTERRUPTED = 1L;

    // WNode节点的status值
    private static final int WAITING   = -1;
    private static final int CANCELLED =  1;

    // WNode节点的读写模式
    private static final int RMODE = 0;
    private static final int WMODE = 1;

    /** Wait nodes */
    static final class WNode {
        volatile WNode prev;
        volatile WNode next;
        volatile WNode cowait;    // 读模式使用该节点形成栈
        volatile Thread thread;   // non-null while possibly parked
        volatile int status;      // 0, WAITING, or CANCELLED
        final int mode;           // RMODE or WMODE
        WNode(int m, WNode p) { mode = m; prev = p; }
    }

    /** CLH队头节点 */
    private transient volatile WNode whead;
    /** CLH队尾节点 */
    private transient volatile WNode wtail;

    // views
    transient ReadLockView readLockView;
    transient WriteLockView writeLockView;
    transient ReadWriteLockView readWriteLockView;

    /** 锁队列状态, 当处于写模式时第8位为1,读模式时前7为为1-126(附加的readerOverflow用于当读者超过126时) */
    private transient volatile long state;
    /** 将state超过 RFULL=126的值放到readerOverflow字段中 */
    private transient int readerOverflow;

StampedLockd 源码中的 WNote 就是等待链表队列,每一个 WNode 标识一个等待线程,whead 为 CLH 队列头,wtail 为 CLH 队列尾,state 为锁的状态。long 型即 64 位,倒数第八位标识写锁状态,如果为 1,标识写锁占用!下面围绕这个 state 来讲述锁操作。

首先是常量标识:

WBIT=1000 0000(即 - 128)

RBIT =0111 1111(即 127) 

SBIT =1000 0000(后 7 位表示当前正在读取的线程数量,清 0)

 

 

StampedLock 给我们提供了 3 种读写模式的锁,如下:

  1. 写锁 writeLock 是一个独占锁,同时只有一个线程可以获取该锁,当一个线程获取该锁后,其他请求读锁和写锁的线程必须等待,这跟 ReentrantReadWriteLock 的写锁很相似,不过要注意的是 StampedLock 的写锁是不可重入锁,

当目前没有线程持有读锁或者写锁的时候才可以获取到该锁,请求该锁成功后会返回一个 stamp 票据变量来表示该锁的版本,如下源码所示:

  /**
     * 
     *获取写锁,获取失败会一直阻塞,直到获得锁成功
     * @return 可以用来解锁或转换模式的戳记(128的整数)
     */
    public long writeLock() {
        long s, next;  
        return ((((s = state) & ABITS) == 0L &&   // 完全没有任何锁(没有读锁和写锁)的时候可以通过
                 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? //第8位置为1
                next : acquireWrite(false, 0L));
    }

 

 writeLock ():典型的 cas 操作,如果 STATE 等于 s, 设置写锁位为 1(s+WBIT)。acquireWrite 跟 acquireRead 逻辑类似,先自旋尝试、加入等待队列、直至最终 Unsafe.park () 挂起线程。

 

private long acquireWrite(boolean interruptible, long deadline) {
        WNode node = null, p;
        for (int spins = -1;;) { // 入队时自旋
            long m, s, ns;
            //无锁
            if ((m = (s = state) & ABITS) == 0L) {
                if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                    return ns;
            }
            else if (spins < 0)
                //持有写锁,并且队列为空
                spins = (m == WBIT && wtail == whead) ? SPINS : 0;
            else if (spins > 0) {
                //恒成立
                if (LockSupport.nextSecondarySeed() >= 0)
                    --spins;
            }
            else if ((p = wtail) == null) { 
                //初始化队列,写锁入队列
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            }
            else if (node == null)
                //不为空,写锁入队列
                node = new WNode(WMODE, p);
            else if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                break;//入队列成功退出循环
            }
        }

        for (int spins = -1;;) {
            WNode h, np, pp; int ps;
            //前驱节点为头节点
            if ((h = whead) == p) {
                if (spins < 0)
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1;
                for (int k = spins;;) { // spin at head
                    long s, ns;
                    //无锁
                    if (((s = state) & ABITS) == 0L) {
                        if (U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + WBIT)) {
                            //当前节点设置为头结点
                            whead = node;
                            node.prev = null;
                            return ns;
                        }
                    }
                    else if (LockSupport.nextSecondarySeed() >= 0 &&
                             --k <= 0)
                        break;
                }
            }
            else if (h != null) { // help release stale waiters
                WNode c; Thread w;
                //头结点为读锁将栈中所有读锁线程唤醒
                while ((c = h.cowait) != null) {
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            //
            if (whead == h) {
                if ((np = node.prev) != p) {
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                else if ((ps = p.status) == 0)
                    //前驱节点置为等待状态
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) {
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                }
                else {
                    long time; // 0 argument to park means no timeout
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                        whead == h && node.prev == p)
                        U.park(false, time);  // emulate LockSupport.park
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);
                }
            }
        }
    }

 

并且 StampedLock 还提供了非阻塞 tryWriteLock 方法,源码如下:

 /**
     * 没有任何锁时则获取写锁,否则返回0
     *
     * @return 可以用来解锁或转换模式的戳记(128的整数),获取失败返回0
     */
    public long tryWriteLock() {
        long s, next;
        return ((((s = state) & ABITS) == 0L &&
                 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
                next : 0L);
    }
  /**
     * unit时间内获得写锁成功返回状态值,失败返回0,或抛出InterruptedException
     * @return 0:获得锁失败
     * @throws InterruptedException 线程获得锁之前调用interrupt()方法抛出的异常
     */
    public long tryWriteLock(long time, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(time);
        if (!Thread.interrupted()) {
            long next, deadline;
            if ((next = tryWriteLock()) != 0L)
                //获得锁成功
                return next;
            if (nanos <= 0L)
                //超时返回0
                return 0L;
            if ((deadline = System.nanoTime() + nanos) == 0L)
                deadline = 1L;
            if ((next = acquireWrite(true, deadline)) != INTERRUPTED)
                //规定时间内获得锁结果
                return next;
        }
        throw new InterruptedException();
    }

 

当释放该锁的时候需要调用 unlockWrite 方法并传递获取锁的时候的 stamp 参数。源码如下:

  /**
     * state匹配stamp则释放写锁,
     * @throws IllegalMonitorStateException  不匹配则抛出异常
     */
    public void unlockWrite(long stamp) {
        WNode h;
        //state不匹配stamp  或者 没有写锁
        if (state != stamp || (stamp & WBIT) == 0L)
            throw new IllegalMonitorStateException();
        //state += WBIT, 第8位置为0,但state & SBITS 会循环,一共有4个值
        state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
        if ((h = whead) != null && h.status != 0)
            //唤醒继承者节点线程
            release(h);
    }

  unlockWrite (): 释放锁与加锁动作相反。将写标记位清零,如果 state 溢出,则退回到初始值;

 

  2. 悲观锁 readLock,是个共享锁,在没有线程获取独占写锁的情况下,同时多个线程可以获取该锁;如果已经有线程持有写锁,其他线程请求获取该锁会被阻塞,这类似 ReentrantReadWriteLock 的读锁(不同在于这里的读锁是不可重入锁)。

这里说的悲观是指在具体操作数据前,悲观的认为其他线程可能要对自己操作的数据进行修改,所以需要先对数据加锁,这是在读少写多的情况下的一种考虑,请求该锁成功后会返回一个 stamp 票据变量来表示该锁的版本,源码如下:

  /**
     *  悲观读锁,非独占锁,为获得锁一直处于阻塞状态,直到获得锁为止
     */
    public long readLock() {
        long s = state, next;  
        // 队列为空   && 没有写锁同时读锁数小于126  && CAS修改状态成功      则状态加1并返回,否则自旋获取读锁
        return ((whead == wtail && (s & ABITS) < RFULL &&
                 U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
                next : acquireRead(false, 0L));
    }

 乐观锁失败后锁升级为 readLock ():尝试 state+1, 用于统计读线程的数量,如果失败,进入 acquireRead () 进行自旋,通过 CAS 获取锁。

如果自旋失败,入 CLH 队列,然后再自旋,如果成功获得读锁,则激活 cowait 队列中的读线程 Unsafe.unpark (), 如果最终依然失败,则 Unsafe ().park () 挂起当前线程。

  /**
     * @param interruptible 是否允许中断
     * @param 标识超时限时(0代表不限时),然后进入循环。
     * @return next state, or INTERRUPTED
     */
    private long acquireRead(boolean interruptible, long deadline) {
        WNode node = null, p;
        //自旋
        for (int spins = -1;;) {
            WNode h;
            //判断队列为空
            if ((h = whead) == (p = wtail)) {
                //定义 long m,s,ns,并循环
                for (long m, s, ns;;) {
                    //将state超过 RFULL=126的值放到readerOverflow字段中
                    if ((m = (s = state) & ABITS) < RFULL ?
                        U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                        //获取锁成功返回
                        return ns;
                    //state高8位大于0,那么说明当前锁已经被写锁独占,那么我们尝试自旋  + 随机的方式来探测状态
                    else if (m >= WBIT) {
                        if (spins > 0) {
                            if (LockSupport.nextSecondarySeed() >= 0)
                                --spins;
                        }
                        else {
                            if (spins == 0) {
                                WNode nh = whead, np = wtail;
                                //一直获取锁失败,或者有线程入队列了退出内循环自旋,后续进入队列
                                if ((nh == h && np == p) || (h = nh) != (p = np))
                                    break;
                            }
                            //自旋 SPINS 次
                            spins = SPINS;
                        }
                    }
                }
            }
            if (p == null) { 
                //初始队列
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            }
            //当前节点为空则构建当前节点,模式为RMODE,前驱节点为p即尾节点。
            else if (node == null)
                node = new WNode(RMODE, p);
            //当前队列为空即只有一个节点(whead=wtail)或者当前尾节点的模式不是RMODE,那么我们会尝试在尾节点后面添加该节点作为尾节点,然后跳出外层循环
            else if (h == p || p.mode != RMODE) {
                if (node.prev != p)
                    node.prev = p;
                else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                    p.next = node;
                    //入队列成功,退出自旋
                    break;
                }
            }
            //队列不为空并且是RMODE模式, 添加该节点到尾节点的cowait链(实际上构成一个读线程stack)中
            else if (!U.compareAndSwapObject(p, WCOWAIT,
                                             node.cowait = p.cowait, node))
                //失败处理
                node.cowait = null;
            else {
                //通过CAS方法将该节点node添加至尾节点的cowait链中,node成为cowait中的顶元素,cowait构成了一个LIFO队列。
                //循环
                for (;;) {
                    WNode pp, c; Thread w;
                    //尝试unpark头元素(whead)的cowait中的第一个元素,假如是读锁会通过循环释放cowait链
                    if ((h = whead) != null && (c = h.cowait) != null &&
                        U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null) 

                        U.unpark(w);
                    //node所在的根节点p的前驱就是whead或者p已经是whead或者p的前驱为null
                    if (h == (pp = p.prev) || h == p || pp == null) {
                        long m, s, ns;
                        do {
                            //根据state再次积极的尝试获取锁
                            if ((m = (s = state) & ABITS) < RFULL ?
                                U.compareAndSwapLong(this, STATE, s,
                                                     ns = s + RUNIT) :
                                (m < WBIT &&
                                 (ns = tryIncReaderOverflow(s)) != 0L))
                                return ns;
                        } while (m < WBIT);//条件为读模式
                    }
                    if (whead == h && p.prev == pp) {
                        long time;
                        if (pp == null || h == p || p.status > 0) {
                            //这样做的原因是被其他线程闯入夺取了锁,或者p已经被取消
                            node = null; // throw away
                            break;
                        }
                        if (deadline == 0L)
                            time = 0L;
                        else if ((time = deadline - System.nanoTime()) <= 0L)
                            return cancelWaiter(node, p, false);
                        Thread wt = Thread.currentThread();
                        U.putObject(wt, PARKBLOCKER, this);
                        node.thread = wt;
                        if ((h != pp || (state & ABITS) == WBIT) &&
                            whead == h && p.prev == pp)
                            U.park(false, time);
                        node.thread = null;
                        U.putObject(wt, PARKBLOCKER, null);
                        //出现的中断情况下取消当前节点的cancelWaiter操作
                        if (interruptible && Thread.interrupted())
                            return cancelWaiter(node, p, true);
                    }
                }
            }
        }

        for (int spins = -1;;) {
            WNode h, np, pp; int ps;
            if ((h = whead) == p) {
                if (spins < 0)
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1;
                for (int k = spins;;) { // spin at head
                    long m, s, ns;
                    if ((m = (s = state) & ABITS) < RFULL ?
                        U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                        WNode c; Thread w;
                        whead = node;
                        node.prev = null;
                        while ((c = node.cowait) != null) {
                            if (U.compareAndSwapObject(node, WCOWAIT,
                                                       c, c.cowait) &&
                                (w = c.thread) != null)
                                U.unpark(w);
                        }
                        return ns;
                    }
                    else if (m >= WBIT &&
                             LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                        break;
                }
            }
            else if (h != null) {
                WNode c; Thread w;
                while ((c = h.cowait) != null) {
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            if (whead == h) {
                if ((np = node.prev) != p) {
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                else if ((ps = p.status) == 0)
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) {
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                }
                else {
                    long time;
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 &&
                        (p != h || (state & ABITS) == WBIT) &&
                        whead == h && node.prev == p)
                        U.park(false, time);
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);
                }
            }
        }
    }

 

 

并且 StampedLock 还提供了非阻塞 tryReadLock 方法,源码如下:

  /**
     * 可以立即获得锁,则获取读锁,否则返回0
     */
    public long tryReadLock() {
        for (;;) {
            long s, m, next;
            //持有写锁返回0
            if ((m = (s = state) & ABITS) == WBIT)
                return 0L;
            //读线程数 < RFULL,CAS变更状态
            else if (m < RFULL) {
                if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
                    return next;
            }
            //将state超过 RFULL的值放到readerOverflow字段
            else if ((next = tryIncReaderOverflow(s)) != 0L)
                return next;
        }
    }
  /**
     * unit时间内获得读锁成功返回状态值,失败返回0,或抛出InterruptedException
     */
    public long tryReadLock(long time, TimeUnit unit)
        throws InterruptedException {
        long s, m, next, deadline;
        long nanos = unit.toNanos(time);
        if (!Thread.interrupted()) {
            if ((m = (s = state) & ABITS) != WBIT) {
                if (m < RFULL) {
                    if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
                        return next;
                }
                else if ((next = tryIncReaderOverflow(s)) != 0L)
                    return next;
            }
            if (nanos <= 0L)
                return 0L;
            if ((deadline = System.nanoTime() + nanos) == 0L)
                deadline = 1L;
            if ((next = acquireRead(true, deadline)) != INTERRUPTED)
                return next;
        }
        throw new InterruptedException();
    }

 

StampedLock 的悲观读锁 readLock 当释放该锁时候需要 unlockRead 并传递参数 stamp。源码如下:

  /**
     * state匹配stamp则释放读锁,
     */
    public void unlockRead(long stamp) {
        long s, m; WNode h;
        for (;;) {
            //不匹配抛出异常
            if (((s = state) & SBITS) != (stamp & SBITS) ||
                (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
                throw new IllegalMonitorStateException();
            //小于最大记录数值
            if (m < RFULL) {
                if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                    if (m == RUNIT && (h = whead) != null && h.status != 0)
                        release(h);
                    break;
                }
            }
            //否则readerOverflow减一
            else if (tryDecReaderOverflow(s) != 0L)
                break;
        }
    }

 

  3. 乐观读锁 tryOptimisticRead,是相对于悲观锁来说的,在操作数据前并没有通过 CAS 设置锁的状态,仅仅是通过位运算测试;如果当前没有线程持有写锁,则简单的返回一个非 0 的 stamp 版本信息,

获取该 stamp 后在具体操作数据前还需要调用 validate 验证下该 stamp 是否已经不可用,也就是看当调用 tryOptimisticRead 返回 stamp 后,到当前时间是否有其它线程持有了写锁,如果是那么 validate 会返回 0,

否者就可以使用该 stamp 版本的锁对数据进行操作。由于 tryOptimisticRead 并没有使用 CAS 设置锁状态,所以不需要显示的释放该锁。

该锁的一个特点是适用于读多写少的场景,因为获取读锁只是使用位操作进行检验,不涉及 CAS 操作,所以效率会高很多,但是同时由于没有使用真正的锁,在保证数据一致性上需要拷贝一份要操作的变量到方法栈,并且在操作数据时候可能其它写线程已经修改了数据,

而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的。源码如下:

  /**
     * 获取乐观读锁,返回邮票stamp
     */
    public long tryOptimisticRead() {
        long s;  //有写锁返回0.   否则返回256
        return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
    }

 tryOptimisticRead ():如果当前没有写锁占用,返回 state (后 7 位清 0,即清 0 读线程数),如果有写锁,返回 0,即失败。

 

  /**
     * 验证从调用tryOptimisticRead开始到现在这段时间内有无写锁占用过锁资源,有写锁获得过锁资源则返回false. stamp为0返回false.
     * @return 从返回stamp开始,没有写锁获得过锁资源返回true,否则返回false
     */
    public boolean validate(long stamp) {
        //强制读取操作和验证操作在一些情况下的内存排序问题
        U.loadFence();
        //当持有写锁后再释放写锁,该校验也不成立,返回false
        return (stamp & SBITS) == (state & SBITS);
    }

 

StamedLock 还支持这三种锁在一定条件下进行相互转换,例如 long tryConvertToWriteLock (long stamp) 期望把 stamp 标示的锁升级为写锁,这个函数会在下面几种情况下返回一个有效的 stamp(也就是晋升写锁成功): 

  1. 当前锁已经是写锁模式了。

  2. 当前锁处于读锁模式,并且没有其他线程是读锁模式

  3. 当前处于乐观读模式,并且当前写锁可用。

源码如下:

  /**
     * state匹配stamp时, 执行下列操作之一. 
     *   1、stamp 已经持有写锁,直接返回.  
     *   2、读模式,但是没有更多的读取者,并返回一个写锁stamp.
     *   3、有一个乐观读锁,只在即时可用的前提下返回一个写锁stamp
     *   4、其他情况都返回0
     */
    public long tryConvertToWriteLock(long stamp) {
        long a = stamp & ABITS, m, s, next;
        //state匹配stamp
        while (((s = state) & SBITS) == (stamp & SBITS)) {
            //没有锁
            if ((m = s & ABITS) == 0L) {
                if (a != 0L)
                    break;
                //CAS修改状态为持有写锁,并返回
                if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
                    return next;
            }
            //持有写锁
            else if (m == WBIT) {
                if (a != m)
                    //其他线程持有写锁
                    break;
                //当前线程已经持有写锁
                return stamp;
            }
            //有一个读锁
            else if (m == RUNIT && a != 0L) {
                //释放读锁,并尝试持有写锁
                if (U.compareAndSwapLong(this, STATE, s,
                                         next = s - RUNIT + WBIT))
                    return next;
            }
            else
                break;
        }
        return 0L;
    }
/**
    *   state匹配stamp时, 执行下列操作之一.
        1、stamp 表示持有写锁,释放写锁,并持有读锁
        2 stamp 表示持有读锁 ,返回该读锁
        3 有一个乐观读锁,只在即时可用的前提下返回一个读锁stamp
        4、其他情况都返回0,表示失败
     *
     */
    public long tryConvertToReadLock(long stamp) {
        long a = stamp & ABITS, m, s, next; WNode h;
        //state匹配stamp
        while (((s = state) & SBITS) == (stamp & SBITS)) {
            //没有锁
            if ((m = s & ABITS) == 0L) {
                if (a != 0L)
                    break;
                else if (m < RFULL) {
                    if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
                        return next;
                }
                else if ((next = tryIncReaderOverflow(s)) != 0L)
                    return next;
            }
            //写锁
            else if (m == WBIT) {
                //非当前线程持有写锁
                if (a != m)
                    break;
                //释放写锁持有读锁
                state = next = s + (WBIT + RUNIT);
                if ((h = whead) != null && h.status != 0)
                    release(h);
                return next;
            }
            //持有读锁
            else if (a != 0L && a < WBIT)
                return stamp;
            else
                break;
        }
        return 0L;
    }

校验这个戳是否有效 validate ():比较当前 stamp 和发生乐观锁得到的 stamp 比较,不一致则失败。

还有一个转换成乐观锁 tryConvertToOptimisticRead(long stamp) ,这里就不讲了,道理都差不多。

另外 StampedLock 的读写锁都是不可重入锁,所以当获取锁后释放锁前,不应该再调用会获取锁的操作,以避免产生死锁。

当多个线程同时尝试获取读锁和写锁的时候,谁先获取锁没有一定的规则,完全都是尽力而为,是随机的,并且该锁不是直接实现 Lock 或 ReadWriteLock 接口,而是内部自己维护了一个双向阻塞队列。

 

下面通过 JDK8 里面提供的一个管理二维点的例子讲解来加深对上面讲解的理解。代码如下所示:

package com.hjc;

import java.util.concurrent.locks.StampedLock;

/**
 * Created by cong on 2018/6/16.
 */
public class Point {

    // 成员变量
    private double x, y;

    // 锁实例
    private final StampedLock sl = new StampedLock();

    // 排它锁-写锁(writeLock)
    void move(double deltaX, double deltaY) {
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    // 乐观读锁(tryOptimisticRead)
    double distanceFromOrigin() {

        // 尝试获取乐观读锁(1)
        long stamp = sl.tryOptimisticRead();
        // 将全部变量拷贝到方法体栈内(2)
        double currentX = x, currentY = y;
        // 检查在(1)获取到读锁票据后,锁有没被其它写线程排它性抢占(3)
        if (!sl.validate(stamp)) {
            // 如果被抢占则获取一个共享读锁(悲观获取)(4)
            stamp = sl.readLock();
            try {
                // 将全部变量拷贝到方法体栈内(5)
                currentX = x;
                currentY = y;
            } finally {
                // 释放共享读锁(6)
                sl.unlockRead(stamp);
            }
        }
        // 返回计算结果(7)
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    // 使用悲观锁获取读锁,并尝试转换为写锁
    void moveIfAtOrigin(double newX, double newY) {
        // 这里可以使用乐观读锁替换(1)
        long stamp = sl.readLock();
        try {
            // 如果当前点在原点则移动(2)
            while (x == 0.0 && y == 0.0) {
                // 尝试将获取的读锁升级为写锁(3)
                long ws = sl.tryConvertToWriteLock(stamp);
                // 升级成功,则更新票据,并设置坐标值,然后退出循环(4)
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                } else {
                    // 读锁升级写锁失败则释放读锁,显示获取独占写锁,然后循环重试(5)
                    sl.unlockRead(stamp);
                    stamp = sl.writeLock();
                }
            }
        } finally {
            // 释放锁(6)
            sl.unlock(stamp);
        }
    }
    
}

如上代码 Point 类里面有两个成员变量(x,y) 来标示一个点的二维坐标,和三个操作坐标变量的方法,另外实例化了一个 StampedLock 对象用来保证操作的原子性。

首先分析下 move 方法,该函数作用是使用参数的增量值,改变当前 point 坐标的位置;代码先获取到了写锁,然后对 point 坐标进行修改,然后释放锁。该锁是排它锁,这保证了其它线程调用 move 函数时候会被阻塞,也保证了其它线程不能获取读锁,读取坐标的值,直到当前线程显示释放了写锁,

也就是保证了对变量 x,y 操作的原子性和数据一致性。

 

接下来再看 distanceFromOrigin 方法,该方法作用是计算当前位置到原点(坐标为 0,0)的距离,代码(1)首先尝试获取乐观读锁,如果当前没有其它线程获取到了写锁,那么(1)会返回一个非 0 的 stamp 用来表示版本信息,代码(2)拷贝坐标变量到本地方法栈里面。

代码(3)检查在(1)获取到的 stamp 值是否还有效,之所以还要在此校验是因为代码(1)获取读锁时候并没有通过 CAS 操作修改锁的状态,而是简单的通过与或操作返回了一个版本信息,这里校验是看在在获取版本信息到现在的时间段里面是否有其它线程持有了写锁,如果有则之前获取的版本信息就无效了。

这里如果校验成功则执行(7)使用本地方法栈里面的值进行计算然后返回。需要注意的是在代码(3) 校验成功后,代码(7)计算期间,其它线程可能获取到了写锁并且修改了 x,y 的值,而当前线程执行代码(7)进行计算时候采用的还是修改前值的拷贝,也就是操作的值是对之前值的一个拷贝,一个快照,并不是最新的值。

 

也许我们会想,代码(2) 和(3)能否互换?。

  答案是明显不能的,如果位置换了,那么首先执行 validate ,假设验证通过了,要拷贝 x,y 值到本地方法栈,而在拷贝的过程中很有可能其他线程已经修改过了 x,y 中的一个,这就造成了数据的不一致性了。

 

那么你可能还会这样会想,即使不交换代码 (2) 和(3),在拷贝 x,y 值到本地方法栈里面时,也会存在其他线程修改了 x,y 中的一个值,这不也会存在问题吗?

这个确实会存在,但是别忘记了拷贝后还有一道 validate,如果这时候有线程修改了 x,y 中的值,那么肯定是有线程在调用 validate 前,调用 sl.tryOptimisticRead 后获取了写锁,那么进行 validate 时候就会失败。

 

好了知道这么多原理后,我们就会惊叹这也是乐观读设计的精妙之处也是使用时候容易出问题的地方。下面继续分析 validate 失败后会执行代码(4)获取悲观读锁,如果这时候其他线程持有写锁则代码(4)会导致的当前线程阻塞直到其它线程释放了写锁。

如果这时候没有其他线程获取到写锁,那么当前线程就可以获取到读锁,然后执行代码(5)重新拷贝新的坐标值到本地方法栈,然后就是代码(6)释放了锁,拷贝的时候由于加了读锁,所以拷贝期间其它线程获取写锁时候会被阻塞,

这保证了数据的一致性,另外这里 x,y 没有被声明为 volatie,会不会存在内存不可见性问题那?答案是不会,因为加锁的语义保证了内存可见性,

 

最后代码(7)使用方法栈里面数据计算返回,同理这里在计算时候使用的数据也可能不是最新的,其它写线程可能已经修改过原来的 x,y 值了。

 

最后一个方法 moveIfAtOrigin 作用是如果当前坐标为原点则移动到指定的位置。代码(1)获取悲观读锁,保证其它线程不能获取写锁修改 x,y 值,然后代码(2)判断如果当前点在原点则更新坐标,

代码(3) 尝试升级读锁为写锁,这里升级不一定成功,因为多个线程都可以同时获取悲观读锁,当多个线程都执行到(3)时候只有一个可以升级成功,升级成功则返回非 0 的 stamp,否非返回 0。

这里假设当前线程升级成功,然后执行步骤(4)更新 stamp 值和坐标值,然后退出循环。如果升级失败则执行步骤(5)首先释放读锁然后申请写锁,获取到写锁后在循环重新设置坐标值。最后步骤(6) 释放锁。

 

使用乐观读锁还是很容易犯错误的,必须要严谨,必须要保证如下的使用顺序,用伪代码作为讲解,如下:

long stamp = lock.tryOptimisticRead(); //非阻塞获取版本信息
copyVaraibale2ThreadMemory();//拷贝变量到线程本地堆栈
if(!lock.validate(stamp)){ // 校验
    long stamp = lock.readLock();//获取读锁
    try {
        copyVaraibale2ThreadMemory();//拷贝变量到线程本地堆栈
     } finally {
       lock.unlock(stamp);//释放悲观锁
    }

}

useThreadMemoryVarables();//使用线程本地堆栈里面的数据进行操作

 

总结:StampedLock 提供的读写锁与 ReentrantReadWriteLock 类似,只是前者的都是不可重入锁。但是前者通过提供乐观读锁在多线程多读的情况下提供更好的性能,这是因为获取乐观读锁时候不需要进行 CAS 操作设置锁的状态,而只是简单的测试状态。

 

Java 并发(8)- 读写锁中的性能之王:StampedLock

Java 并发(8)- 读写锁中的性能之王:StampedLock

在上一篇《你真的懂 ReentrantReadWriteLock 吗?》中我给大家留了一个引子,一个更高效同时可以避免写饥饿的读写锁 ---StampedLock。StampedLock 实现了不仅多个读不互相阻塞,同时在读操作时不会阻塞写操作。

为什么 StampedLock 这么神奇?能够达到这种效果,它的核心思想在于,在读的时候如果发生了写,应该通过重试的方式来获取新的值,而不应该阻塞写操作。这种模式也就是典型的无锁编程思想,和 CAS 自旋的思想一样。这种操作方式决定了 StampedLock 在读线程非常多而写线程非常少的场景下非常适用,同时还避免了写饥饿情况的发生。这篇文章将通过以下几点来分析 StampedLock。

  • StampedLock 的官方使用示例分析
  • 源码分析:读写锁共享的状态量
  • 源码分析:写锁的释放和获取
  • 源码分析:悲观读锁的释放和获取
  • 性能测试

StampedLock 的官方使用示例分析

先来看一个官方给出的 StampedLock 使用案例:

public class Point {

	private double x, y;
	
	private final StampedLock stampedLock = new StampedLock();
	
	//写锁的使用
	void move(double deltaX, double deltaY){
		
		long stamp = stampedLock.writeLock(); //获取写锁
		try {
			x += deltaX;
			y += deltaY;
		} finally {
			stampedLock.unlockWrite(stamp); //释放写锁
		}
	}
	
	//乐观读锁的使用
	double distanceFromOrigin() {
		
		long stamp = stampedLock.tryOptimisticRead(); //获得一个乐观读锁
		double currentX = x;
		double currentY = y;
		if (!stampedLock.validate(stamp)) { //检查乐观读锁后是否有其他写锁发生,有则返回false
			
			stamp = stampedLock.readLock(); //获取一个悲观读锁
			
			try {
				currentX = x;
			} finally {
				stampedLock.unlockRead(stamp); //释放悲观读锁
			}
		} 
		return Math.sqrt(currentX*currentX + currentY*currentY);
	}
	
	//悲观读锁以及读锁升级写锁的使用
	void moveIfAtOrigin(double newX,double newY) {
		
		long stamp = stampedLock.readLock(); //悲观读锁
		try {
			while (x == 0.0 && y == 0.0) {
				long ws = stampedLock.tryConvertToWriteLock(stamp); //读锁转换为写锁
				if (ws != 0L) { //转换成功
					
					stamp = ws; //票据更新
					x = newX;
					y = newY;
					break;
				} else {
					stampedLock.unlockRead(stamp); //转换失败释放读锁
					stamp = stampedLock.writeLock(); //强制获取写锁
				}
			}
		} finally {
			stampedLock.unlock(stamp); //释放所有锁
		}
	}
}

首先看看第一个方法 move,可以看到它和 ReentrantReadWriteLock 写锁的使用基本一样,都是简单的获取释放,可以猜测这里也是一个独占锁的实现。需要注意的是 在获取写锁是会返回个只 long 类型的 stamp,然后在释放写锁时会将 stamp 传入进去。这个 stamp 是做什么用的呢?如果我们在中间改变了这个值又会发生什么呢?这里先暂时不做解释,后面分析源码时会解答这个问题。

第二个方法 distanceFromOrigin 就比较特别了,它调用了 tryOptimisticRead,根据名字判断这是一个乐观读锁。首先什么是乐观锁?乐观锁的意思就是先假定在乐观锁获取期间,共享变量不会被改变,既然假定不会被改变,那就不需要上锁。在获取乐观读锁之后进行了一些操作,然后又调用了 validate 方法,这个方法就是用来验证 tryOptimisticRead 之后,是否有写操作执行过,如果有,则获取一个读锁,这里的读锁和 ReentrantReadWriteLock 中的读锁类似,猜测也是个共享锁。

第三个方法 moveIfAtOrigin,它做了一个锁升级的操作,通过调用 tryConvertToWriteLock 尝试将读锁转换为写锁,转换成功后相当于获取了写锁,转换失败相当于有写锁被占用,这时通过调用 writeLock 来获取写锁进行操作。

看过了上面的三个方法,估计大家对怎么使用 StampedLock 有了一个初步的印象。下面就通过对 StampedLock 源码的分析来一步步了解它背后是怎么解决锁饥饿问题的。

源码分析:读写锁共享的状态量

从上面的使用示例中我们看到,在 StampedLock 中,除了提供了类似 ReentrantReadWriteLock 读写锁的获取释放方法,还提供了一个乐观读锁的获取方式。那么这三种方式是如何交互的呢?根据 AQS 的经验,StampedLock 中应该也是使用了一个状态量来标志锁的状态。通过下面的源码可以证明这点:

// 用于操作state后获取stamp的值
private static final int LG_READERS = 7;
private static final long RUNIT = 1L;               //0000 0000 0001
private static final long WBIT  = 1L << LG_READERS; //0000 1000 0000
private static final long RBITS = WBIT - 1L;        //0000 0111 1111
private static final long RFULL = RBITS - 1L;       //0000 0111 1110
private static final long ABITS = RBITS | WBIT;     //0000 1111 1111
private static final long SBITS = ~RBITS;           //1111 1000 0000

//初始化时state的值
private static final long ORIGIN = WBIT << 1;       //0001 0000 0000

//锁共享变量state
private transient volatile long state;
//读锁溢出时用来存储多出的毒素哦
private transient int readerOverflow;

上面的源码中除了定义 state 变量外,还提供了一系列变量用来操作 state,用来表示读锁和写锁的各种状态。为了方便理解,我将他们都表示成二进制的值,长度有限,这里用低 12 位来表示 64 的 long,高位自动用 0 补齐。要理解这些状态的作用,就需要具体分析三种锁操作方式是怎么通过 state 这一个变量来表示的,首先来看看获取写锁和释放写锁。

源码分析:写锁的释放和获取

public StampedLock() {
    state = ORIGIN; //初始化state为 0001 0000 0000
}

public long writeLock() {
    long s, next; 
    return ((((s = state) & ABITS) == 0L && //没有读写锁
                U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? //cas操作尝试获取写锁
            next : acquireWrite(false, 0L));    //获取成功后返回next,失败则进行后续处理,排队也在后续处理中
}

public void unlockWrite(long stamp) {
    WNode h;
    if (state != stamp || (stamp & WBIT) == 0L) //stamp值被修改,或者写锁已经被释放,抛出错误
        throw new IllegalMonitorStateException();
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp; //加0000 1000 0000来记录写锁的变化,同时改变写锁状态
    if ((h = whead) != null && h.status != 0)
        release(h);
}

这里先说明两点结论:读锁通过前 7 位来表示,每获取一个读锁,则加 1。写锁通过除前 7 位后剩下的位来表示,每获取一次写锁,则加 1000 0000,这两点在后面的源码中都可以得倒证明。 初始化时将 state 变量设置为 0001 0000 0000。写锁获取通过 ((s = state) & ABITS) 操作等于 0 时默认没有读锁和写锁。写锁获取分三种情况:

  • 没有读锁和写锁时,state 为 0001 0000 0000 0001 0000 0000 & 0000 1111 1111 = 0000 0000 0000 // 等于 0L,可以尝试获取写锁

  • 有一个读锁时,state 为 0001 0000 0001 0001 0000 0001 & 0000 1111 1111 = 0000 0000 0001 // 不等于 0L

  • 有一个写锁,state 为 0001 1000 0000 0001 1000 0000 & 0000 1111 1111 = 0000 1000 0000 // 不等于 0L

获取到写锁,需要将 s + WBIT 设置到 state,也就是说每次获取写锁,都需要加 0000 1000 0000。同时返回 s + WBIT 的值 0001 0000 0000 + 0000 1000 0000 = 0001 1000 0000

释放写锁首先判断 stamp 的值有没有被修改过或者多次释放,之后通过 state = (stamp += WBIT) == 0L ? ORIGIN : stamp 来释放写锁,位操作表示如下: stamp += WBIT 0010 0000 0000 = 0001 1000 0000 + 0000 1000 0000 这一步操作是重点!!!写锁的释放并不是像 ReentrantReadWriteLock 一样 + 1 然后 - 1,而是通过再次加 0000 1000 0000 来使高位每次都产生变化,为什么要这样做?直接减掉 0000 1000 0000 不就可以了吗?这就是为了后面乐观锁做铺垫,让每次写锁都留下痕迹。

大家可以想象这样一个场景,字母 A 变化为 B 能看到变化,如果在一段时间内从 A 变到 B 然后又变到 A,在内存中自会显示 A,而不能记录变化的过程,这也就是 CAS 中的 ABA 问题。在 StampedLock 中就是通过每次对高位加 0000 1000 0000 来达到记录写锁操作的过程,可以通过下面的步骤理解: 第一次获取写锁: 0001 0000 0000 + 0000 1000 0000 = 0001 1000 0000 第一次释放写锁: 0001 1000 0000 + 0000 1000 0000 = 0010 0000 0000 第二次获取写锁: 0010 0000 0000 + 0000 1000 0000 = 0010 1000 0000 第二次释放写锁: 0010 1000 0000 + 0000 1000 0000 = 0011 0000 0000 第 n 次获取写锁: 1110 0000 0000 + 0000 1000 0000 = 1110 1000 0000 第 n 次释放写锁: 1110 1000 0000 + 0000 1000 0000 = 1111 0000 0000 可以看到第 8 位在获取和释放写锁时会产生变化,也就是说第 8 位是用来表示写锁状态的,前 7 位是用来表示读锁状态的,8 位之后是用来表示写锁的获取次数的。这样就有效的解决了 ABA 问题,留下了每次写锁的记录,也为后面乐观锁检查变化提供了基础。

关于 acquireWrite 方法这里不做具体分析,方法非常复杂,感兴趣的同学可以网上搜索相关资料。这里只对该方法做下简单总结,该方法分两步来进行线程排队,首先通过随机探测的方式多次自旋尝试获取锁,然后自旋一定次数失败后再初始化节点进行插入。

源码分析:悲观读锁的释放和获取

public long readLock() {
    long s = state, next;  
    return ((whead == wtail && (s & ABITS) < RFULL && //队列为空,无写锁,同时读锁未溢出,尝试获取读锁
                U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?   //cas尝试获取读锁+1
            next : acquireRead(false, 0L));     //获取读锁成功,返回s + RUNIT,失败进入后续处理,类似acquireWrite
}

public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) {
        if (((s = state) & SBITS) != (stamp & SBITS) ||
            (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            throw new IllegalMonitorStateException();
        if (m < RFULL) {    //小于最大记录值(最大记录值127超过后放在readerOverflow变量中)
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {  //cas尝试释放读锁-1
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                break;
            }
        }
        else if (tryDecReaderOverflow(s) != 0L) //readerOverflow - 1
            break;
    }
}

悲观读锁的获取和 ReentrantReadWriteLock 类似,不同在于StampedLock 的读锁很容易溢出,最大只有 127,超过后通过一个额外的变量 readerOverflow 来存储,这是为了给写锁留下更大的空间,因为写锁是在不停增加的。悲观读锁获取分下面四种情况:

  • 没有读锁和写锁时,state 为 0001 0000 0000 // 小于 0000 0111 1110,可以尝试获取读锁 0001 0000 0000 & 0000 1111 1111 = 0000 0000 0000

  • 有一个读锁时,state 为 0001 0000 0001 // 小于 0000 0111 1110,可以尝试获取读锁 0001 0000 0001 & 0000 1111 1111 = 0000 0000 0001

  • 有一个写锁,state 为 0001 1000 0000 // 大于 0000 0111 1110,不可以获取读锁 0001 1000 0000 & 0000 1111 1111 = 0000 1000 0000

  • 读锁溢出,state 为 0001 0111 1110 // 等于 0000 0111 1110,不可以获取读锁 0001 0111 1110 & 0000 1111 1111 = 0000 0111 1110 读锁的释放过程在没有溢出的情况下是通过 s - RUNIT 操作也就是 - 1 来释放的,当溢出后则将 readerOverflow 变量 - 1。

乐观读锁的获取和验证

乐观读锁因为实际上没有获取过锁,所以也就没有释放锁的过程,只是在操作后通过验证检查和获取前的变化。源码如下:

//尝试获取乐观锁
public long tryOptimisticRead() {
    long s;
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

//验证乐观锁获取之后是否有过写操作
public boolean validate(long stamp) {
    //该方法之前的所有load操作在内存屏障之前完成,对应的还有storeFence()及fullFence()
    U.loadFence();  
    return (stamp & SBITS) == (state & SBITS);  //比较是否有过写操作
}

乐观锁基本原理就时获取锁时记录 state 的写状态,然后在操作完成之后检查写状态是否有变化,因为写状态每次都会在高位留下记录,这样就避免了写锁获取又释放后得不到准确数据。获取写锁记录有三种情况:

  • 没有读锁和写锁时,state 为 0001 0000 0000 //((s = state) & WBIT) == 0L) true 0001 0000 0000 & 0000 1000 0000 = 0000 0000 0000 //(s & SBITS) 0001 0000 0000 & 1111 1000 0000 = 0001 0000 0000

  • 有一个读锁时,state 为 0001 0000 0001 //((s = state) & WBIT) == 0L) true 0001 0000 0001 & 0000 1000 0000 = 0000 0000 0000 //(s & SBITS) 0001 0000 0001 & 1111 1000 0000 = 0001 0000 0000

  • 有一个写锁,state 为 0001 1000 0000 //((s = state) & WBIT) == 0L) false 0001 1000 0000 & 0000 1000 0000 = 0000 1000 0000 //0L 0000 0000 0000

验证过程中是否有过写操作,分四种情况

  • 写过一次 0001 0000 0000 & 1111 1000 0000 = 0001 0000 0000 0010 0000 0000 & 1111 1000 0000 = 0010 0000 0000 //false

  • 未写过,但读过 0001 0000 0000 & 1111 1000 0000 = 0001 0000 0000 0001 0000 1111 & 1111 1000 0000 = 0001 0000 0000 //true

  • 正在写 0001 0000 0000 & 1111 1000 0000 = 0001 0000 0000 0001 1000 0000 & 1111 1000 0000 = 0001 1000 0000 //false

  • 之前正在写,无论是否写完都不会为 0L 0000 0000 0000 & 1111 1000 0000 = 0000 0000 0000 //false

性能测试

分析完了 StampedLock 的实现原理,这里对 StampedLock、ReentrantReadWriteLock 以及 Synchronized 分别在各种场景下进行性能测试,测试的基准代码采用 https://blog.takipi.com/java-8-stampedlocks-vs-readwritelocks-and-synchronized/ 文章中的代码,首先贴出上述博客中的测试结果,文章中的 OPTIMISTIC 模式由于采用了 “脏读” 模式,这里不采用 OPTIMISTIC 的测试结果,只比较 StampedLock、ReentrantReadWriteLock 以及 Synchronized。

5 个读线程和 5 个写线程场景:表现最好的是 StampedLock 的正常模式以及 ReentrantReadWriteLock。 10 个读线程和 10 个写线程场景:表现最好的是 StampedLock 的正常模式以及 Synchronized。 16 个读线程和 4 个写线程场景:表现最好的是 StampedLock 的正常模式以及 Synchronized。 19 个读线程和 1 个写线程场景:表现最好的是 Synchronized。 博客评论中还有一种测试场景 2000 读线程和 1 个写线程,测试结果如下: StampedLock ... 12814.2 ReentrantReadWriteLock ... 18882.8 Synchronized ... 22696.4 表现最好的是 StampedLock。

看完了上面的测试,前面 3 种场景表现最好的都为 StampedLock,但第 4 种情况下 StampedLock 表现很差,于是我自己对代码又进行了一遍测试,同时鉴于读写锁的大量应用在缓存场景下,读写差距极大,我增加了 100 个读和 1 个写的场景。

测试机器:MAC OS (10.12.6),CPU : 2.4 GHz Intel Core i5, 内存:8G 软件版本:JDK1.8 测试结果如下: 19 个读线程和 1 个写线程场景:表现最好的是 StampedLock 以及 Synchronized。 读线程: 19. 写线程: 1. 循环次数: 5. 计算总和: 1000000 100 个读线程和 1 个写线程场景:表现最好的是 StampedLock 以及 Synchronized。 读线程: 100. 写线程: 1. 循环次数: 5. 计算总和: 100000 通过上述测试,可以发现整体性能平均而言 StampedLock 和 Synchronized 相差不大,StampedLock 在读写差距加大时稍微有点优势。而 ReentrantReadWriteLock 性能之差有点出乎意料,基本可以达到抛弃使用的地步了,不知道大家对 ReentrantReadWriteLock 的使用场景有什么建议?

同时鉴于原生的 Synchronized 后期可优化空间比较大,而且在代码复杂性以及安全性上面都具有一定优势,因此在绝大多数场景可以使用 Synchronized 来进行同步,对性能有一定要求的在某些特定场景下可以使用 StampedLock。测试所用代码在我所引用的博客中都可以找到,大家可以自行尝试测试,如果对结果有什么疑问,欢迎在评论中提出。

参考资料: https://blog.takipi.com/java-8-stampedlocks-vs-readwritelocks-and-synchronized/

关于Java中的StampedLock是什么?java stampedlock的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于brew install example -fs 中 -fs 用法是什么,bottle block 是什么?、Java 8新特性探究(十)StampedLock将是解决同步问题的新宠、Java 并发编程笔记之 StampedLock 锁源码探究、Java 并发(8)- 读写锁中的性能之王:StampedLock等相关知识的信息别忘了在本站进行查找喔。

本文标签: