同步器AbstractQueuedSynchronizer实现原理及应用

概述

AbstractQueuedSynchronizer提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。

该同步器(以下简称同步器)利用了一个int(volatile变量)来表示状态,期望它能够成为实现大部分同步需求的基础。

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法


protected final int getState()

protected final void setState(int newState)

protected final boolean compareAndSetState(int expect, int update)

子类推荐被定义为自定义同步装置的内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干acquire之类的方法来供使用。

该同步器即可以作为排他模式也可以作为共享模式,当它被定义为一个排他模式时,其他线程对其的获取就被阻止,而共享模式对于多个线程获取都可以成功。

同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。

可以这样理解二者的关系:锁是面向使用者的,它定义了使用者与锁交互的接口(比如允许两个线程并行访问),隐藏了实现细节;同步器是面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理,线程的排队,等待与唤醒等底层操作。锁与同步器很好的隔离了使用者和实现者所需关注的领域。

同步器的接口与示例

同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义的同步组件的实现中,并调用同步器指定的模板方法,而这些模板方法将调用使用者重写的的方法

重写同步器指定方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。

(1)getState() //获取当前同步状态

(2)setState(int newSate)//设置当前同步状态

(3)compareAndSetState(int expect,int update)//使用CAS设置当前状态,该方法能够保证状态设置的原子性。

同步器可重写的方法与描述

1. protected boolean tryAcquire(int arg)

独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态

2. protected boolean tryRelease(int arg)

独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态

3. protected int tryAcquireShared(int arg)

共享式获取同步状态,返回大于等于0的值表示获取成功,反之表示获取失败

4. protected boolean tryReleaseShared(int arg)

共享式释放同步状态

5.protected boolean isHeldExclusively()

当前同步器是否在独占模式下被线程占用,一般改方法表示是否被当前线程占用。

同步器实现的模板方法

调用自定义同步组件时,将会调用同步器提供的模板方法,这些模板方法与描述如下: 

1. public final void acquire(int arg)

独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法,源码如下:

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

2. public final void acquireInterruptibly(int arg)

与acquire(int arg)相同,但是该方法响应中断,当线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则改方法会抛出InterruptedException并返回,源码如下:

    /**
     * Acquires in exclusive mode, aborting if interrupted.
     * Implemented by first checking interrupt status, then invoking
     * at least once {@link #tryAcquire}, returning on
     * success.  Otherwise the thread is queued, possibly repeatedly
     * blocking and unblocking, invoking {@link #tryAcquire}
     * until success or the thread is interrupted.  This method can be
     * used to implement method {@link Lock#lockInterruptibly}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
3. public final boolean tryAcquireNanos(int arg, long nanosTimeout)

在acquireInterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内没有获取同步状态,那么将返回false,如果获取到了同步状元就返回true,源码如下:

    /**
     * Attempts to acquire in exclusive mode, aborting if interrupted,
     * and failing if the given timeout elapses.  Implemented by first
     * checking interrupt status, then invoking at least once {@link
     * #tryAcquire}, returning on success.  Otherwise, the thread is
     * queued, possibly repeatedly blocking and unblocking, invoking
     * {@link #tryAcquire} until success or the thread is interrupted
     * or the timeout elapses.  This method can be used to implement
     * method {@link Lock#tryLock(long, TimeUnit)}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @param nanosTimeout the maximum number of nanoseconds to wait
     * @return {@code true} if acquired; {@code false} if timed out
     * @throws InterruptedException if the current thread is interrupted
     */
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

4.protected int tryAcquireShared(int arg)

共享方法获取同步状态,如果当前线程没有获取到同步状态,将会进入同步队列等待,与独占式获取同步状态的区别是在同一时刻可以有多个线程获取到同步状态。

 /**
     * Attempts to acquire in shared mode. This method should query if
     * the state of the object permits it to be acquired in the shared
     * mode, and if so to acquire it.
     *
     * <p>This method is always invoked by the thread performing
     * acquire.  If this method reports failure, the acquire method
     * may queue the thread, if it is not already queued, until it is
     * signalled by a release from some other thread.
     *
     * <p>The default implementation throws {@link
     * UnsupportedOperationException}.
     *
     * @param arg the acquire argument. This value is always the one
     *        passed to an acquire method, or is the value saved on entry
     *        to a condition wait.  The value is otherwise uninterpreted
     *        and can represent anything you like.
     * @return a negative value on failure; zero if acquisition in shared
     *         mode succeeded but no subsequent shared-mode acquire can
     *         succeed; and a positive value if acquisition in shared
     *         mode succeeded and subsequent shared-mode acquires might
     *         also succeed, in which case a subsequent waiting thread
     *         must check availability. (Support for three different
     *         return values enables this method to be used in contexts
     *         where acquires only sometimes act exclusively.)  Upon
     *         success, this object has been acquired.
     * @throws IllegalMonitorStateException if acquiring would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if shared mode is not supported
     */
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

5. public final void acquireSharedInterruptibly(int arg)

与acquireInterruptibly方法相同,该方法响应中断。源码如下:

/**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

6.public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)

在acquireSharedInterruptibly(int arg)基础上增加了超时限制。源码如下:

  /**
     * Attempts to acquire in shared mode, aborting if interrupted, and
     * failing if the given timeout elapses.  Implemented by first
     * checking interrupt status, then invoking at least once {@link
     * #tryAcquireShared}, returning on success.  Otherwise, the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted or the timeout elapses.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @param nanosTimeout the maximum number of nanoseconds to wait
     * @return {@code true} if acquired; {@code false} if timed out
     * @throws InterruptedException if the current thread is interrupted
     */
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

7. public final boolean release(int arg)

独占式释放同步状态,该方法会在释放同步状态之后,降同步队列中的第一个结点包含的线程唤醒。源码如下:

   /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

8. protected boolean tryReleaseShared(int arg)

共享式释放同步结点,源码如下:

    /**
     * Attempts to set the state to reflect a release in shared mode.
     *
     * <p>This method is always invoked by the thread performing release.
     *
     * <p>The default implementation throws
     * {@link UnsupportedOperationException}.
     *
     * @param arg the release argument. This value is always the one
     *        passed to a release method, or the current state value upon
     *        entry to a condition wait.  The value is otherwise
     *        uninterpreted and can represent anything you like.
     * @return {@code true} if this release of shared mode may permit a
     *         waiting acquire (shared or exclusive) to succeed; and
     *         {@code false} otherwise
     * @throws IllegalMonitorStateException if releasing would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if shared mode is not supported
     */
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

9. public final Collection<Thread> getQueuedThreads()

获取等待在同步队列上的线程集合,源码如下:

  /**
     * Returns a collection containing threads that may be waiting to
     * acquire.  Because the actual set of threads may change
     * dynamically while constructing this result, the returned
     * collection is only a best-effort estimate.  The elements of the
     * returned collection are in no particular order.  This method is
     * designed to facilitate construction of subclasses that provide
     * more extensive monitoring facilities.
     *
     * @return the collection of threads
     */
    public final Collection<Thread> getQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            Thread t = p.thread;
            if (t != null)
                list.add(t);
        }
        return list;
    }

 

同步器提供的模板方法基本上可以分为3类:(1)独占式获取与释放同步状态(2)共享式获取与释放同步状态(3)查询同步队列中等待线程情况。

自定义的同步组件将使用同步器提供的模板方法实现自己的同步语义。

下面通过一个独占锁的示例来进一步了解同步器的工作原理。

/**
 * 实现排他锁
 * 可以看到Mutex将Lock接口均代理给了同步器的实现
 * @author perist
 * @date 2016/12/8
 * @time 10:51
 */
public class Mutex implements Lock, Serializable {
//内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {

    //是否处于占用状态
    @Override
    protected boolean isHeldExclusively() {
        return getState() == 1;
    }

    //当状态为0时获取锁
    @Override
    protected boolean tryAcquire(int arg) {
        assert arg == 1;// Otherwise unused
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    //释放锁,将状态设置为0
    @Override
    protected boolean tryRelease(int arg) {
        assert arg == 1;//Otherwise unused
        if (getState() == 0)
            throw new IllegalMonitorStateException();
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    //返回一个Condition,每个Condition都包含一个Condition队列
    Condition newCondition() {
        return new ConditionObject();
    }
}


//仅需将操作代理到sync上即可
private final Sync sync = new Sync();

public boolean isLocked() {
    return sync.isHeldExclusively();
}

public boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
}

@Override
public void lock() {
    sync.acquire(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
    return sync.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(time));
}

@Override
public void unlock() {
    sync.release(1);
}

@Override
public Condition newCondition() {
    return sync.newCondition();
}

}

上述示例中,Mutex是一个自定义的同步组件,它在同一时刻只允许一个线程占用锁。Mutex中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。在tryAcquire(int acquires)中,如果经过CAS设置成功(同步状态设置为1),则代表获取了同步状态,而在tryRelease(int releases)方法中只是将同步状态重置为0。

用户使用mutex时并不会直接和内部同步器打交道,而是调用mutex提供的方法,在mutex的实现中,以获取锁lock()为例,只需要在方法实现中调用模板方法acquire(int args)即可,当前线程调用方法获取同步状态失败后会被加入同步队列进行等待,这样就大大的降低了实现一个可靠的自定义同步组件的门槛。

 

 
0 0 投票数
Article Rating
订阅评论
提醒
guest
0 评论
最旧
最新 最多投票
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x