一、锁框架结构

Lock接口:Lock接口支持那些语义不同(重入、公平等)的锁规则。

ReadWriteLock接口:ReadWriteLock接口定义了一些读取者可以共享而写入者独占的锁。

AbstractOwnableSynchronizer/AbstractQueuedSynchronizer/AbstractQueuedLongSynchronizer:AbstractQueuedSynchronizer就是被称之为AQS的类,它是一个非常有用的超类,可用来定义锁以及依赖于排队阻塞线程的其他同步器;ReentrantLock,ReentrantReadWriteLock,CountDownLatch,CyclicBarrier和Semaphore等这些类都是基于AQS类实现的。AbstractQueuedLongSynchronizer 类提供相同的功能但扩展了对同步状态的 64 位的支持。两者都扩展了类 AbstractOwnableSynchronizer(一个帮助记录当前保持独占同步的线程的简单类)。

LockSupport:LockSupport提供“创建锁”和“其他同步类的基本线程阻塞原语”。

Condition:Condition需要和Lock联合使用,它的作用是代替Object监视器方法,可以通过await(),signal()来休眠/唤醒线程。

ReentrantLock:ReentrantLock是独占锁。

ReentrantReadWriteLock:读写锁接口ReadWriteLock的实现类,它包括子类ReadLock和WriteLock。ReentrantLock是共享锁,而WriteLock是独占锁。

CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。

Semaphore是一个计数信号量,它的本质是一个”共享锁”。

二、ReentrantLock(reentrant:可重入的)

ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。

顾名思义,ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。

源码

构造器源码如下,可以看到,ReentrantLock默认实现的是非公平锁。

    public ReentrantLock() {
        sync = new NonfairSync();
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

下面是获取锁和释放锁的方法源码,其内部实现都依赖于sync这个内部对象。

    public void lock() {
        sync.lock();
    }
    public void unlock() {
        sync.release(1);
    }

下面来看一下sync这个对象,可以发现其继承了AbstractQueuedSynchronizer类(AQS)

    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 省略
    }
    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        // 省略
    }

    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        // 省略 
    }

下面以公平锁为例简单看一下获取锁以及释放锁的过程。

    final void lock() {
        acquire(1);
    }
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

获取锁的实现主要包括四个过程

  1. “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。
  2. “当前线程”尝试失败的情况下,先通过addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到”CLH队列(非阻塞的FIFO队列)”末尾。CLH队列就是线程等待队列。
  3. 再执行完addWaiter(Node.EXCLUSIVE)之后,会调用acquireQueued()来获取锁。由于此时ReentrantLock是公平锁,它会根据公平性原则来获取锁。
  4. “当前线程”在执行acquireQueued()时,会进入到CLH队列中休眠等待,直到获取锁了才返回!如果“当前线程”在休眠等待过程中被中断过,acquireQueued会返回true,此时”当前线程”会调用selfInterrupt()来自己给自己产生一个中断。
    protected final boolean tryAcquire(int acquires) {
        // 获取当前线程
        final Thread current = Thread.currentThread();
        // 获取状态
        int c = getState();
        if (c == 0) {
            // 若锁没有被任何线程锁拥有,
            // 则判断“当前线程”是不是CLH队列中的第一个线程线程,
            // 若是的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为“当前线程”。
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            // 如果“独占锁”的拥有者已经为“当前线程”,
            // 则将更新锁的状态。
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {// 自旋
                // node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。
                final Node p = node.predecessor();
                // 再次尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 当前线程是否需要阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 阻塞当前线程
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private Node addWaiter(Node mode) {
        // 新建一个Node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”添加到CLH队列中。
        enq(node);
        return node;
    }
    private static void selfInterrupt() {
        // 产生一个中断
        Thread.currentThread().interrupt();
    }

再来总结下它的流程吧:

  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

接下来再看一下释放锁的过程

    public void unlock() {
        sync.release(1);
    }
    public final boolean release(int arg) {
        if (tryRelease(arg)) {//尝试释放锁
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 唤醒当前线程的后继线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

release()会先调用tryRelease()来尝试释放当前线程锁持有的锁。成功的话,则唤醒后继等待线程,并返回true。否则,直接返回false。

    protected final boolean tryRelease(int releases) {
        // c是本次释放锁之后的状态
        int c = getState() - releases;
        // 如果“当前线程”不是“锁的持有者”,则抛出异常!
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // 如果“锁”已经被当前线程彻底释放,则设置“锁”的持有者为null,即锁是可获取状态。
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        // 设置当前线程的锁的状态
        setState(c);
        return free;
    }
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        // 获取状态
        int ws = node.waitStatus;
        // 如果状态<0,则设置状态=0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        // 获取后继的有效线程(t.waitStatus <= 0)
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 唤醒“后继节点对应的线程”
        if (s != null)
            LockSupport.unpark(s.thread);
    }

总结

  1. 使用内部的类sync(实现AQS接口)来对外服务。
  2. 在AQS类中,通过对状态的判断来进行锁的获取以及释放。
  3. 对状态的设置使用了CAS算法。
  4. 在对线程进行阻塞时使用了LockSupport类。
  5. 公平锁在尝试获取锁时,即使“锁”没有被任何线程锁持有,它也会判断自己是不是CLH等待队列的表头;是的话,才获取锁。而非公平锁在尝试获取锁时,如果“锁”没有被任何线程持有,则不管它在CLH队列的何处,它都直接获取锁。

三、Condition

Condition的作用是对锁进行更精确的控制。

Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。

不同的是,Object中的wait(),notify(),notifyAll()方法是和”同步锁”(synchronized关键字)捆绑使用的;而Condition是需要与”互斥锁”/“共享锁”捆绑使用的。

四、LockSupport

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。

LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程,而且park()和unpark()不会遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题。

因为park() 和 unpark()有许可的存在;调用 park() 的线程和另一个试图将其 unpark() 的线程之间的竞争将保持活性。

五、共享锁ReentrantReadWriteLock

ReadWriteLock,顾名思义,是读写锁。它维护了一对相关的锁 — — “读取锁”和“写入锁”,一个用于读取操作,另一个用于写入操作。
“读取锁”用于只读操作,它是“共享锁”,能同时被多个线程获取。
“写入锁”用于写入操作,它是“独占锁”,写入锁只能被一个线程锁获取。
注意:不能同时存在读取锁和写入锁!

六、CountDownLatch

CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

七、CyclicBarrier

CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

CountDownLatch和CyclicBarrier的不同:

  1. CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
  2. CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。

八、Semaphore

Semaphore是一个计数信号量,它的本质是一个”共享锁”。

信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可。

参考

http://www.cnblogs.com/skywang12345/p/java_threads_category.html