侧边栏壁纸
博主头像
soulballad博主等级

技术文章记录及总结

  • 累计撰写 169 篇文章
  • 累计创建 26 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

【源码分析-JDK】-3.ReadWriteLock读写锁

soulballad
2021-04-13 / 0 评论 / 0 点赞 / 76 阅读 / 4,382 字
温馨提示:
本文最后更新于 2022-04-19,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

ReentrantReadWriteLock

ReentrantReadWriteLock 类图

1559457988728

从类图上看 ReentrantReadWriteLock 实现了ReadWriteLock接口,这个接口提供了一个readLock 和一个 writeLock方法,分别返回一个 ReadLock 和 WriteLock 对象。

ReadLock 和 WriteLock都是对Lock的实现。

ReentrantReadWriteLock 中还有抽象静态内部类Sync以及它的实现FairSync 和NonFaiySync。

在调用构造函数的时候,会对这些对象进行初始化。

public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

ReadLock

public void lock() {
    sync.acquireShared(1);
}
public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    //获取状态
    int c = getState();
     //如果存在写锁,且锁的持有者不是当前线程,直接返回-1
    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
        return -1;
    //计算有多少线程获取了读锁,读锁是一种共享锁
    int r = sharedCount(c);
     //readerShouldBlock():读锁是否需要等待(公平锁原则)
     //r < MAX_COUNT:持有线程小于最大数(65535)
     // compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态
    if(!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)){
        if (r == 0) {
            // 如果没有线程获取到读锁,则当前线程为 firstReader
            // 重入次数为 1
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 如果当前线程已经获取了读锁,重入次数增加
            firstReaderHoldCount++;
        } else {
            //如果有线程获取锁,且不是当前线程
            //获取最后一个获取读锁的 HoldCounter
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                // threadLocal中获取
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                // 更新threadLocal
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
    /*
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
     */
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        // 已有线程获取写锁
        if (exclusiveCount(c) != 0) {
            //获取写锁的线程不是当前线程
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        //超出最大范围
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //CAS设置状态
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}
private void doAcquireShared(int arg) {
    // 添加一个共享类型的节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        //自旋
        for (;;) {
            //获取前一个节点
            final Node p = node.predecessor();
            if (p == head) {
                //获取锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 设置传播
                    setHeadAndPropagate(node, r);
                    // 移除节点
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

WriteLock

public void lock() {
    sync.acquire(1);
}
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    // 获取写锁的线程数量
    int w = exclusiveCount(c);
    if (c != 0) {
        // 没有线程获取或者不是当前线程获取
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 超过最大值
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //设置状态
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
        return false;
    //当前线程获取锁
    setExclusiveOwnerThread(current);
    return true;
}
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //自旋
        for (;;) {
            final Node p = node.predecessor();
            // 如果p是头结点并且获取了锁
            if (p == head && tryAcquire(arg)) {
                // 把node作为头结点
                setHead(node);
                // 移除p
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
0

评论区