现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。
针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁
类图如下:
说明:如上图所示Sync为ReentrantReadWriteLock内部类,Sync继承自AQS、NonfairSync继承自Sync类、FairSync继承自Sync类(通过构造函数传入的布尔值决定要构造哪一种Sync实例);ReadLock实现了Lock接口、WriteLock也实现了Lock接口;
AQS定义了独占模式的acquire()和release()方法,共享模式的acquireShared()和releaseShared()方法.还定义了抽象方法tryAcquire()、tryAcquiredShared()、tryRelease()和tryReleaseShared()由子类实现,tryAcquire()和tryAcquiredShared()分别对应独占模式和共享模式下的锁的尝试获取,就是通过这两个方法来实现公平性和非公平性,在尝试获取中,如果新来的线程必须先入队才能获取锁就是公平的,否则就是非公平的。这里可以看出AQS定义整体的同步器框架,具体实现放手交由子类实现。
通过类图我们知道一些核心操作由Sync类实现
Sync类内部存在两个内部类,分别为HoldCounter和ThreadLocalHoldCounter,其中HoldCounter主要与读锁配套使用;
abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16; // 读锁单位 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //锁持有的最大数量 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //排它锁持有的最大数量 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** 返回count中表示的共享持有的数量 */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** 返回count中表示的独占持有的数量 */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 计数器 static final class HoldCounter { // 计数 int count = 0; // Use id, not reference, to avoid garbage retention // 获取当前线程的TID属性的值 final long tid = getThreadId(Thread.currentThread()); } // 本地线程计数器 static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { // 重写初始化方法,在没有进行set的情况下,获取的都是该HoldCounter值 public HoldCounter initialValue() { return new HoldCounter(); } } // 本地线程计数器 private transient ThreadLocalHoldCounter readHolds; // 缓存的计数器 private transient HoldCounter cachedHoldCounter; /记录第一个持有共享锁线程的持有共享锁的数量,作者认为大多数情况下不会有并发,更多的是线程交替持有锁 private transient Thread firstReader = null; // 第一个读线程的计数 private transient int firstReaderHoldCount; //构造器 Sync() { // 本地线程计数器 readHolds = new ThreadLocalHoldCounter(); // 设置AQS的状态 setState(getState()); // ensures visibility of readHolds } }
一.写锁过程
获取写锁:
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock(); writeLock.lock(); //ReentrantReadWriteLock 內部类Sync 继承自AQS,这里是调用aqs中的acquire方法 public void lock() { sync.acquire(1); } //AQS中定义了tryAcquire抽象方法,具体的实现由子类去实现 //这里除tryAcquire方法和Reentrantlock 略有不同,后续操作一样一样的, public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
说明:除了aqs中的tryAcquire由具体的实现类来实现,其他部分和ReetrantLock获取锁的过程一样的,这里就不絮叨了,下边主要看下tryAcquire方法的具体实现。(可以参考我写的这篇ReentrantLock详解,或者不清楚的直接留言我)
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); //获取当前锁对象状态 int c = getState(); // 返回count中表示排它锁的数量 int w = exclusiveCount(c); //说明锁被占有(共享锁或者排它锁) if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) //说明现在有共享锁被别的线程占有,尝试获取锁失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; //说明当前线程持有排它锁或者共享锁,这里是判断有没有超出重入次数 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 排它锁重入,直接获取锁 setState(c + acquires); return true; } //如果当前为非公平锁: writerShouldBlock 方法直接返回 false,然后去争抢锁 /**如果当前为公平的写锁 writerShouldBlock 该方法调动 AQS的hasQueuedPredecessors 方法, 判断当前同步队列有没有等待的线程,如果有返回true,没有等待的线程在返回false 然后去争抢锁**/ if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //成功获取锁,把当钱锁设置为当前线程占有 setExclusiveOwnerThread(current); return true; }
这里获取锁失败的情况主要有
失败后的具体操作见ReentrantLock详解
写锁的释放
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock(); writeLock.unlock(); public void unlock() { sync.release(1); } //AQS中定义了tryAcquire抽象方法,具体的实现由子类去实现 public final boolean release(int arg) { //tryRelease尝试释放锁(锁status-arg),如果当前线程没有占有的锁(锁status=0) 返回true if (tryRelease(arg)) { //当前线程释放掉了所有锁 Node h = head; //如果等待队列第一个结点有挂起的线程,将它唤醒去争抢 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; protected final boolean tryRelease(int releases) { //判断当前线程是否为持有锁的线程 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; //判断是否已经全部释放写锁 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
读锁获取过程:
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock(); readLock.lock(); public void lock() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
主要来看下aqs定义的抽象方法tryAcquireShared (sync具体实现的)
获取读锁失败的情况有 :
(1)有其他线程持有排它锁,获取锁失败。
(2)公平锁:同步队列有等待节点;非公平锁:同步队列头节点为排它锁同步队列(防止写锁饥饿)
(3)读锁数量达到最多,抛出异常。
除了以上三种情况,该线程会循环尝试获取读锁直到成功。
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //如果当前排他锁被占有,判断是不是当前线程占有的 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //共享锁占有的数量 int r = sharedCount(c); //readerShouldBlock 方法 判断同步队列中第一个节点是 什么状态 //如果是公平锁:同步队列有节点就返回true,有可能是共享锁也有可能是排它锁的节点, //如果是非公平锁:同步队列第一个节点是等待排它锁 就返回true,防止排它锁出现饥饿状态 //readerShouldBlock 为false就直接获取锁 if (!readerShouldBlock() && //c +的是 1<<16,读锁为高16位表示 r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中 if (r == 0) { // 设置第一个读线程 firstReader = current; // 读线程占用的资源数为1 firstReaderHoldCount = 1; } else if (firstReader == current) {// 当前线程为第一个读线程,表示第一个读锁线程重入 // 占用资源数加1 firstReaderHoldCount++; } else { // 获取计数器 //如果共享锁是被第2+n个线程占有,则使用threadlocal 记录每个线程持有的线程数量 HoldCounter rh = cachedHoldCounter; // 计数器为空或者计数器的tid不为当前正在运行的线程的tid if (rh == null || rh.tid != getThreadId(current)) // 获取当前线程对应的计数器 cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // 计数为0 //加入到readHolds中 readHolds.set(rh); //计数+1 rh.count++; } return 1; } //获取锁失败,放到循环里重试 return fullTryAcquireShared(current); } final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); //有线程持有写锁,且该线程不是当前线程,获取锁失败 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; else{ //有线程持有写锁,且该线程是当前线程,则应该放行让其重入获取锁,否则会造成死锁 } //没有线程持有排它锁,判断获取共享锁是否应该被阻塞 //readerShouldBlock 方法 判断同步队列中第一个节点是 什么状态 //如果是公平锁:同步队列有节点就返回true,有可能是共享锁也有可能是排它锁的节点, //如果是非公平锁:同步队列第一个节点是等待排它锁 就返回true,防止排它锁出现饥饿状态 //readerShouldBlock 为false就直接获取锁 //注:如果为读锁重入的话是允许获取读锁的,该情况会引起写锁饥饿 } else if (readerShouldBlock()) { // 确保获取的不是 读重入锁 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } //如果当前锁不是 读读重入,且应该阻塞,那么获取锁失败 if (rh.count == 0) return -1; } } //判断当前线程有没有超过最大数量限制 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //再次尝试获取锁~~(可能为写读重入) if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
读锁获取失败,调用aqs的doAcquireShared方法尝试将当前线程任务节点加入到同步队列中(加入同步队列的具体细节见ReentrantLock详解)
private void doAcquireShared(int arg) { // 将当前线程任务添加到同步队列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取当前节点的前继节点 final Node p = node.predecessor(); // 判断前继节点是否是head节点 if (p == head) { //如果前置节点为head说明,他当前线程是等待队列中的第一个,那么就尝试获取锁(这里可能是避免线程的上下文切换) int r = tryAcquireShared(arg); if (r >= 0) { // 获取 lock 成功, 设置新的 head, 并唤醒后继获取 readLock 的节点 setHeadAndPropagate(node, r); p.next = null; // help GC // 该线程有可能是被中断唤醒,也有可能是被其他线程唤醒,这里设置下中断状态 if (interrupted) selfInterrupt(); failed = false; return; } } //普通锁的情况下:,然后返回false继续自旋 尝试获取锁 //shouldParkAfterFailedAcquire 只有发现当前节点不是首节点才会返回true ,然后挂起当前线程, if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //如果该线程是被中断唤醒的,用于辅助后续操作判断当前线程是被中断唤醒的 interrupted = true; } } finally { //如果该方法因为某些特殊情况意外的退出(没有获取锁就退出了),那么就取消尝试获取锁 if (failed) cancelAcquire(node); } }
如果读锁获取失败后,尝试将当前线程节点加入到同步队列中。
如果该节点为头节点,那么就自旋争抢锁(避免上下文切换),获取锁成功的话,调用setHeadAndPropagate方法继续唤醒后续节点(如果后续节点为读锁等待节点的话);
如果该节点不为头节点,将当前线程挂起;
我们来看下setHeadAndPropagate方法
// 如果读锁(共享锁)获取成功,或头部节点为空,或头节点取消,或刚获取读锁的线程的下一个节点为空,或在节点的下个节点也在申请读锁, //则在CLH队列中传播下去唤醒线程 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) //下面具体分析 doReleaseShared(); } }
注:后续读锁的释放操作也调用的这个doReleaseShared 方法
private void doReleaseShared() { for (;;) { //唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了 //其实就是唤醒上面新获取到共享锁的节点的后继节点 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //表示后继节点需要被唤醒 if (ws == Node.SIGNAL) { //这里需要控制并发,因为入口有setHeadAndPropagate跟releaseShared两个,避免两次unpark if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //执行唤醒操作 unparkSuccessor(h); } //如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE (这里不是很明白,为什么不需要唤醒的节点要设置这个状态,哪个老铁知道为什么的话指点下) else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } //如果头结点没有发生变化,表示设置完成,退出循环 //如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试 if (h == head) break; }
怎么理解这个传播呢:
就是只要获取成功到读锁,那就要传播到下一个节点(如果一下个节点继续是读锁的申请,只要成功获取,就再下一个节点,直到队列尾部或为写锁的申请,停止传播)。
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock(); readLock.unlock(); public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //上文有讲 doReleaseShared(); return true; } return false; } //该方法的主要作用就是用来维护下当前线程读锁的重入数量; //如果没有线程占有读锁,就返回true 唤醒后续节点 protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //保证一定能释放掉读锁的占有 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
读锁的释放过程比较简单,这里就不做过多的解释了
这里思考一个问题,获取读锁的时候我们讲到读锁传播的概念,为什么在读锁释放的时候,如果还有别的线程占有读锁就不用传播了呢?
因为在现在获取读锁的时候 已经完成读线程唤醒的传播了~~
获取写锁:获取写锁的过程总体和ReentrantLock详解流程一样;
公平锁:先判断同步队列中是否有等待节点在等待获取锁
非公平锁:上来就直接争抢锁
写锁的释放:
注:写锁的释放并没有进行读锁的释放传播,读锁的传播是有读锁成功获取读锁以后进行的
读锁的获取:
读锁的释放:
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算