JDK8 JUC阅读(3)

Posted by Jqy on January 10, 2020

LockSupport

工具类,主要作用是挂起,唤醒线程,为创建锁和其他同步类的基础。基于Unsafe实现

抽象同步队列AQS

实现同步器的基本组件,锁的底层使用AQS实现。 AQS本质是一个FIFO双端队列,队列类型为内部类Node

AQS维持了一个单一的状态信息state,对于ReentarantLock,state表示当前线程获取锁的可重入次数;对于读写锁,高16位表示读状态(获取锁的次数),低16位表示获取到写锁的线程的可重入次数;对于semaphore,state表示可用信号的个数;对于CountDownLatch,state表示计数器当前的值。 内部类ConditionObject,用来结合锁实现线程同步。 线程同步的关键是对state的操作,操作state分为独占和共享两种方式。 独占方式下,获取释放资源过程如下

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&     //尝试获取资源,成功则返回
        //失败将当前线程封装为Node.EXCLUSIVE类型后插入到AQS尾部,
        //并通过LockSupport.pack(this)挂起自己
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

public final boolean release(int arg) {
        if (tryRelease(arg)) {  //尝试释放资源
            Node h = head;
            if (h != null && h.waitStatus != 0)
            //通过LockSupport.unpack(thread)方式激活AQS队列队首线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

其中tryAcquire,tryRelease0只是提供了一个基础框架,需要结合情景进行定制

共享方式下,获取释放资源过程如下:

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)  //尝试获取资源,成功则直接返回
        //失败将当前线程封装为Node.SHARED类型后插入到AQS尾部,
        //并通过LockSupport.pack(this)挂起自己
            doAcquireShared(arg);
    }

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

AQS内部的队列

以入队操作为例,当一个线程获取锁失败后,线程会转化为Node节点,入队至AQS阻塞队列

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

ConditionObject

AQS内部类,可以访问AQS内部的变量,每个条件变量内部维护了一个条件队列,存放调用条件变量await方法时被阻塞的线程(该队列不是AQS队列)

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //创建新Node节点并入队列尾部
            Node node = addConditionWaiter();
            //释放当前线程取得的锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                //通过park挂起线程
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
            //将队头元素移动至AQS
                doSignal(first);
        }

一个锁对应一个AQS阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列。

独占锁ReentrantLock

可重入独占锁,同时只有一个线程可以拥有该锁,其他尝试获取的线程会阻塞在AQS队列内。默认为非公平锁

public ReentrantLock() {
        sync = new NonfairSync();
    }

public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

Sync类直接继承与AQS,子类NonFairSync和FairSync分别实现获取锁的公平和非公平策略。 AQS的state代表了线程获取该锁的可重入次数,为0后释放。

锁的获取

通过lock方法

public void lock() {
        sync.lock();
    }

//非公平策略
final void lock() {
            //CAS设置状态值
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            //通过AQS的acquire
            else
                acquire(1);
        }

//非公平提供的tryAcquire()
protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }//当前线程是该锁的持有者
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            //否则放入AQS阻塞队列
            return false;
        }

非公平策略:先尝试获取锁的线程不一定比后尝试获取锁的线程优先获取锁。 公平策略的实现如下:

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //公平性策略
                if (!hasQueuedPredecessors() &&     //检查当前线程节点有前驱则返回
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

lockInterruptibly方法和lock类似,但它可以对中断进行相应,即当前线程调用该方法时,其他线程调用了当前线程的interrupt,则当前线程会抛出InterruptedException异常后返回。

public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        //检测到当前线程被中断,直接抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

tryLock方法则是先尝试获取锁,如果失败,则返回false,不会引起当前线程的阻塞。

public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

释放锁

若当前线程持有锁,调用该方法使得state值-1,变为0后释放该锁。无持有锁调用该方法会引发IllegalMonitorStateException

public void unlock() {
        sync.release(1);
    }

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            //未持有锁,抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //可重入次数为0,清空持有线程
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            //设置可重入次数为原始值-1
            setState(c);
            return free;
        }

读写锁ReentrantReadWriteLock

采用读写分离策略,允许多个线程同时获得读锁。 内部维护了一个ReadLock和一个WriteLock,依赖于Sync(继承自AQS)实现具体的功能,为了用一个state表示读写两种状态,读写锁用state高16位表示读状态,即获取到读锁的次数;低16位表示获取到写锁的次数的线程的可重入次数。

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
//共享锁线程最大个数65535
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count  */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

写锁(WriteLock)的释放和获取

public void lock() {
            sync.acquire(1);
        }

protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }