Skip to the content.

ReentrantLock 为重入锁,继承自 Lock 接口,内部定义了公平同步器和非公平同步器的实现,两个同步器都继承自同步器抽象类,而同步器抽象类又继承自 AQS 抽象类。

ReentrantLock 内部的方法很少,主要的几个:lock()、 tryLock()、 unlock,这些方法的内部调用的也是同步器的 acquired() 和 release() 方法。

构造方法 如果是通过无参构造函数创建的 ReentrantLock 对象,内部的同步器实现为非公平同步器。如果构造对象的时候你指定了是否公平,而你又选择了公平,那么同步器的实现就为公平同步器。

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

实例方法 类中的实例方法也是相当的简单,实现全丢在 Sync 中了。

注意一下下面这三个方法。lock() 和 unlock() 是跟着 Sync 的实现走的,而 tryLock 调用的这个方法则是定义在 Sync 抽象类中。

    public void lock() {
        sync.lock();
    }

    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

Sync

Sync 类继承自 AQS,它内部实现了获取临界区和释放锁的方法。

其实你可以把这个 nonfairTryAcquire 方法就看做是一个大的 CAS 判断,作用就是讲当前线程设置为头结点。

    abstract static class Sync extends AbstractQueuedSynchronizer {

        abstract void lock();

        final boolean nonfairTryAcquire(int acquires) {
            /* 省略方法实现*/
        }

        protected final boolean tryRelease(int releases) {
            /* 省略方法实现*/
        }
    }

try开头的方法表示尝试获取,它只竞争一次,竞争成功那我就获得锁,返回true;竞争失败,纳表示没有拿到锁,则返回 false。

那么没有使用 try 开头的方法他们的含义是怎么样的呢?那 ReentrantLock 中的 lock 方法来举例,你仔细看一下,这个方法的返回值为 void,你应该感到好奇,返回值应该为 true 或 false 吗?表示是获取到了锁,难道这个方法不关心结果?还是它的结果永远都只有一个?后一种猜测是对的,这个方法的结束只有一种情况,那就是获取到锁,第一次进入的时候它会试图进行一次 CAS判断,如果失败它会进入 AQS 的队列然后被挂起,当前一个等待节点释放了锁,这时候当前线程就会被唤醒,拿到锁进入临界区。

NonFairSync

非公平同步器,后进来的线程可能先获得锁。所以这个方法先进行了一次 CAS 判断,这儿体现了它的非公平。

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

FairSync

公平的同步器,后进来的线程一定更后地获取到锁。因此这个实现类的 lock 方法没有进行 CAS 的尝试。

看这个类的 tryAcquire() 它用的不是父类的 nonfairTryAcquire() 而是自己定义的。你拿这个方法和父类中定义的方法比对一下,再想一下这个类的 FairSync,看看能想通为什么吗?

hasQueuedPredecessors() :如果当前线程的前面还有线程在等待获取锁。

这个类实现两个方法全都体现了公平、排队思想。

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

acquire()

Sync 抽象类中定义了抽象方法 lock(),而 NonFairSync 和 FairSync 这两个抽象类中又实现了这个抽象方法,ReentranLock 的 lock 方法调用也是 Sync 实现类里实现的方法。 这两个实现类中的 lock 方法又有些许区别,NonFairSync 中首先进行了一次 CAS 的判断,CAS 失败再调用 acquire(),而 FairSync 则是直接调用 acquire() 。

acquire 的实现在 AQS,换言之这是 AQS 层面的获取锁。这个层面的逻辑是,首先调用 Sync 实现类的 tryAcquire 方法去直接获取锁,如果失败的话,那就创建一个 Waiter 对象加入等待队列。

既然 arg 总是 1,为什么还要这样一个参数,写死不好吗?如果说是为了扩展性,那么还有别的什么地方用了?

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tail 为 null 什么情况下会发生?初始的等待队列是空的,随着元素一个一个进入,队列的变化是怎样的?

既然已经到了 addWaiter 方法,那么这里的乐观尝试措施就不再是将自己 CAS 为头结点,而是将自己 CAS 为等待队列的最后一个节点。如果这个 CAS 失败的话,那就调用 enq,使用自旋的方式入队。这个方法里面也罢前后的这种联系建立起来了。

这个 enq 方法的自旋里面就两个操作,初始化 tail 变量;以 CAS 的方式入队。这个方法 else 区块的代码很有味道,如果你看不懂可以试着画图去理解,挺有意思的。

这个 addWaiter 方法无论如何都会将这个 Node 入队的

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

回看 acquireQueued() ,入参是一个 Node,这时候的 node 已经入队,arg 为 1。下面这段代码的第一个判断仍然是一个乐观的尝试措施,如果尝试失败那就调用接下来的方法,将线程挂起。

shouldParkAfterFailedAcquire() 根据前驱节点的状态来进行对应处理。如果前驱节点为 SINGAL(-1),表示当前节点需要阻塞;如果前驱节点的 waitStatus 大于 0,其实只能为 1 ,这表示前驱节点为取消状态,那么我们往前找一直找到一个小于等于 0 的,并联系起来;最后一种情况 waitStatus 为 0,这时候会将前驱节点的 waitStatus 置位 SINGAL,然后返回 true。下一次的外圈循环会判断,如果前驱节点为 SINGAL,那么返回 true,当前节点上的线程会被挂起。

处于等待链表中 Node 节点的 waitStatus 大多数情况下都是 0,一个 Node 被初始化 waitStatus 也默认为 0。

节点的 waitStatus 什么情况下会被设置为 0 ?

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

release

relase() 方法的实现仅仅存在于 AQS 类,不过其中的 tryRelease() 方法的实现却在子类中,例如 Sync。

首先调子类的 tryRelease() 方法递减重入次数,只有当重入次数减为 0 时这个方法才会返回 true,其它情况这个方法只会返回 false。

当 tryRelease() 方法返回 true时,表示 AQS 的 state 方法被减到了 0,这时候就可以去唤醒等待队列中下一个可以被唤醒的线程了。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    

当锁被释放之后会调用 unparkSuccessor() 去唤醒后续的线程。

里面这个循环会从 tail 开始从后往前找,找到一个离 node 最近的 waitStatus 为 SINGAL 或 NORMAL 的 node,如果那个 node 不为 null,唤醒它。这是时候那个线程又会接着 acquireQueued() 去重新获取锁。

    private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }