Lock接口
Published:
使用方式
Lock lock = new ReentrantLock();
lock.lock();
try{
//to do
}finally{
lock.unlock(); //确保即使发生了异常也能正常释放锁
}
具有synchronized不具有的特性
- 尝试非阻塞地获取锁。当前线程尝试获取锁,如果这一时刻锁没有被其他线程获取到,则成功获取并持有锁。
- 能被中断地获取锁。获取锁的线程能够相应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放。
- 超时获取锁。在指定的截止之间之前获取锁,如果截止时间到了仍旧无法获取锁,则返回。
API
void lock();
获取锁,调用该方法当前线程将会获取锁,当锁获得后,从该方法返回
void lockInterruptibly() throws InterruptedException;
可中断地获取锁,和lock()方法的不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程
boolean tryLock();
尝试非阻塞的获取锁,调用该方法后立刻返回,如果能够获取则返回true,否则返回false
boolean tryLock(Long time,TimeUnit unit) throws InterruptedException;
超时的获取锁,当前线程在一下三种情况中会返回
①当前线程在超时时间内获得了锁
②当前线程在超时时间内被中断
③超时时间结束,返回false
void unlock();
释放锁
Condition newCondition();
获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的wait()方法,而调用后,当前线程将释放锁
队列同步器AbstractQueuedSynchronized
队列同步器AbstractQueuedSynchronized,是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取线程的工作。
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态。
同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。
同步器的设计是基于模板方法模式的,使用者继承同步器并重写指定的方法。
为了将此类用作同步器的基础,需要适当地重新定义以下方法,这是通过使用 getState()
、setState(int newState)
和/或 compareAndSetState(int expect, int update)
方法来检查和/或修改同步状态来实现的。
同步器可重写的方法
protected boolean tryAcquire(int)
独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
protected boolean tryRelease(int)
独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
protected int tryAcquireShared(int)
共享式获取同步状态,返回大于等于0的值,表示获取成功,反之获取失败
protected boolean tryReleaseShared(int)
共享式释放同步状态
protected boolean isHeldExclusively()
当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占
同步器提供的模板方法
①独占式获取与释放同步状态 ②共享式获取与释放同步状态 ③查询同步队列中等待的线程
void acquire(int arg)
独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法
void acquireInterruptibly(int arg)
与acquire(int arg)相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException并返回
boolean tryAcquireNanos(int arg,long nanos)
在acquireInterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false,如果获取到了返回true
void acquireShared(int arg)
共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态
void acquireSharedInterruptibly(int arg)
与acquireShared(int arg)相同,该方法响应中断
boolean tryAcquireSharedNanos(int arg,long nanos)
acquireSharedInterruptibly(int arg)基础上增加了超时限制
boolean release(int arg)
独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列第一个节点包含的线程唤醒
boolean releaseShared(int arg)
共享式的释放同步状态
Collection<Thread> getQueuedThreads()
获取等待在同步队列上的线程集合
使用示例
以下是一个非再进入的互斥锁类,它使用值 0 表示未锁定状态,使用 1 表示锁定状态。当非重入锁定不严格地需要当前拥有者线程的记录时,此类使得使用监视器更加方便。它还支持一些条件并公开了一个检测方法:
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Report whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Acquire the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Release the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provide a Condition
Condition newCondition() { return new ConditionObject(); }
// Deserialize properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
同步器的底层实现
1.同步队列 同步队列依赖内部的同步队列(一个FIFO双向队列、CLH队列
)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
- CAS设置尾节点
- 设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可。
2.独占式同步状态获取与释放
可能调用的方法
acquire(int arg);
tryAcquire(int arg);
setHead(node);
acquireQueued(addWait(NOde.EXCLUSIVE,arg));
enq(final Node node);
release(int arg);
tryRelease(int arg);
unparkSuccessor(NOde node);
总结 在获取同步状态时,同步器维护了一个同步队列,获取状态失败的线程都会被加入到队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且当前节点成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法来释放同步状态,然后唤醒头节点的后继节点
3.共享式同步状态获取与释放
共享式与独占式最主要的区别在于同一时刻能否有多个线程同时获取到同步状态
可能调用的方法
acquireShared(int arg);
doAcquireShared(int arg);
addWaiter(Node.SHARED)
tryAcquireShared(int arg);
setHeadAndPropagate(node,r);
releaseShared(int arg);
tryReleaseShared(int arg);
重入锁
ReentrantLock
公平锁 锁的获取顺序符合请求的绝对时间顺序,即FIFO
非公平锁 锁的获取顺序和请求时间无直接关系,容易造成“饥饿”
synchronized
读写锁
ReentrantReadWriteLock
- 公平性选择
- 重进入
- 锁降级,遵循获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁
LockSupport工具
void park(); //阻塞当前线程,如果调用unpark(Thread thread)方法或者当前线程被中断,才能从park()方法返回
void parkNanos(long nanos); //阻塞当前线程,最长不超过nanos纳秒,返回条件在park()的基础上增加了超时返回
void parkUtil(long deadline); //阻塞当前线程,直到deadline时间
void unpark(Thread thread); //唤醒处于阻塞状态的线程thread
Condition接口
使用实例
Condition condition = lock.newCondition();
condition.await();
condition.signal();
部分方法
void await() throws InterruptedException;
当前线程进入等待状态直到被通知或中断,当前线程进入运行状态且从await()方法返回的情况,包括:
- 其他线程调用该Condition的signal()或signalALl()方法,而当前线程被选中唤醒
- 其他线程中断当前线程
- 如果当前等待线程从await()方法返回,那么表明该线程已经获取了Condition对象所对应的锁
void awaitUninterruptibly(); //对中断不敏感
long awaitNanos(long nanosTimeout);
boolean awaitUntil(Date deadline) throws InterruptedException; //如果没有到指定时间就被通知返回true,否则返回false
void signal();
void signalAll();
并发包中的Lock拥有一个同步队列和多个等待队列 同步器内部存在一个实现Condition接口的内部类,因此每个Condition实例能够访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用