diff --git "a/02.\347\272\277\347\250\213\347\232\204\347\212\266\346\200\201\350\275\254\346\215\242\344\273\245\345\217\212\345\237\272\346\234\254\346\223\215\344\275\234/\347\272\277\347\250\213\347\212\266\346\200\201\350\275\254\346\215\242\344\273\245\345\217\212\345\237\272\346\234\254\346\223\215\344\275\234.md" "b/02.\347\272\277\347\250\213\347\232\204\347\212\266\346\200\201\350\275\254\346\215\242\344\273\245\345\217\212\345\237\272\346\234\254\346\223\215\344\275\234/\347\272\277\347\250\213\347\212\266\346\200\201\350\275\254\346\215\242\344\273\245\345\217\212\345\237\272\346\234\254\346\223\215\344\275\234.md" index 7458d85..c7a1468 100644 --- "a/02.\347\272\277\347\250\213\347\232\204\347\212\266\346\200\201\350\275\254\346\215\242\344\273\245\345\217\212\345\237\272\346\234\254\346\223\215\344\275\234/\347\272\277\347\250\213\347\212\266\346\200\201\350\275\254\346\215\242\344\273\245\345\217\212\345\237\272\346\234\254\346\223\215\344\275\234.md" +++ "b/02.\347\272\277\347\250\213\347\232\204\347\212\266\346\200\201\350\275\254\346\215\242\344\273\245\345\217\212\345\237\272\346\234\254\346\223\215\344\275\234/\347\272\277\347\250\213\347\212\266\346\200\201\350\275\254\346\215\242\344\273\245\345\217\212\345\237\272\346\234\254\346\223\215\344\275\234.md" @@ -52,7 +52,7 @@ - 由于java不能多继承可以实现多个接口,因此,在创建线程的时候尽量多考虑采用实现接口的形式; - 实现callable接口,提交给ExecutorService返回的是异步执行的结果,另外,通常也可以利用FutureTask(Callable callable)将callable进行包装然后FeatureTask提交给ExecutorsService。如图, -![FutureTask接口实现关系](https://github.com/CL0610/Java-concurrency/blob/master/2.%E7%BA%BF%E7%A8%8B%E7%9A%84%E7%8A%B6%E6%80%81%E8%BD%AC%E6%8D%A2%E4%BB%A5%E5%8F%8A%E5%9F%BA%E6%9C%AC%E6%93%8D%E4%BD%9C/futureTask%E6%8E%A5%E5%8F%A3%E5%AE%9E%E7%8E%B0%E5%85%B3%E7%B3%BB.png) +![FutureTask接口实现关系](https://github.com/CL0610/Java-concurrency/blob/master/02.%E7%BA%BF%E7%A8%8B%E7%9A%84%E7%8A%B6%E6%80%81%E8%BD%AC%E6%8D%A2%E4%BB%A5%E5%8F%8A%E5%9F%BA%E6%9C%AC%E6%93%8D%E4%BD%9C/futureTask%E6%8E%A5%E5%8F%A3%E5%AE%9E%E7%8E%B0%E5%85%B3%E7%B3%BB.png) 另外由于FeatureTask也实现了Runable接口也可以利用上面第二种方式(实现Runable接口)来新建线程; @@ -61,7 +61,7 @@ # 2. 线程状态转换 # -![线程状态转换图](https://github.com/CL0610/Java-concurrency/blob/master/2.%E7%BA%BF%E7%A8%8B%E7%9A%84%E7%8A%B6%E6%80%81%E8%BD%AC%E6%8D%A2%E4%BB%A5%E5%8F%8A%E5%9F%BA%E6%9C%AC%E6%93%8D%E4%BD%9C/%E7%BA%BF%E7%A8%8B%E7%8A%B6%E6%80%81%E8%BD%AC%E6%8D%A2%E5%85%B3%E7%B3%BB.png) +![线程状态转换图](https://github.com/CL0610/Java-concurrency/blob/master/02.%E7%BA%BF%E7%A8%8B%E7%9A%84%E7%8A%B6%E6%80%81%E8%BD%AC%E6%8D%A2%E4%BB%A5%E5%8F%8A%E5%9F%BA%E6%9C%AC%E6%93%8D%E4%BD%9C/%E7%BA%BF%E7%A8%8B%E7%8A%B6%E6%80%81%E8%BD%AC%E6%8D%A2%E5%85%B3%E7%B3%BB.png) @@ -71,7 +71,7 @@ 用一个表格将上面六种状态进行一个总结归纳。 -![JAVA线程的状态](https://github.com/CL0610/Java-concurrency/blob/master/2.%E7%BA%BF%E7%A8%8B%E7%9A%84%E7%8A%B6%E6%80%81%E8%BD%AC%E6%8D%A2%E4%BB%A5%E5%8F%8A%E5%9F%BA%E6%9C%AC%E6%93%8D%E4%BD%9C/%E7%BA%BF%E7%A8%8B%E7%8A%B6%E6%80%81.png) +![JAVA线程的状态](https://github.com/CL0610/Java-concurrency/blob/master/02.%E7%BA%BF%E7%A8%8B%E7%9A%84%E7%8A%B6%E6%80%81%E8%BD%AC%E6%8D%A2%E4%BB%A5%E5%8F%8A%E5%9F%BA%E6%9C%AC%E6%93%8D%E4%BD%9C/%E7%BA%BF%E7%A8%8B%E7%8A%B6%E6%80%81.png) # 3. 线程状态的基本操作 # @@ -83,7 +83,7 @@ isInterrupted()来感知其他线程对其自身的中断操作,从而做出响应。另外,同样可以调用Thread的静态方法 interrupted()对当前线程进行中断操作,该方法会清除中断标志位。**需要注意的是,当抛出InterruptedException时候,会清除中断标志位,也就是说在调用isInterrupted会返回false。** -![线程中断的方法](https://github.com/CL0610/Java-concurrency/blob/master/2.%E7%BA%BF%E7%A8%8B%E7%9A%84%E7%8A%B6%E6%80%81%E8%BD%AC%E6%8D%A2%E4%BB%A5%E5%8F%8A%E5%9F%BA%E6%9C%AC%E6%93%8D%E4%BD%9C/%E4%B8%AD%E6%96%AD%E7%BA%BF%E7%A8%8B%E6%96%B9%E6%B3%95.png) +![线程中断的方法](https://github.com/CL0610/Java-concurrency/blob/master/02.%E7%BA%BF%E7%A8%8B%E7%9A%84%E7%8A%B6%E6%80%81%E8%BD%AC%E6%8D%A2%E4%BB%A5%E5%8F%8A%E5%9F%BA%E6%9C%AC%E6%93%8D%E4%BD%9C/%E4%B8%AD%E6%96%AD%E7%BA%BF%E7%A8%8B%E6%96%B9%E6%B3%95.png) diff --git "a/08.\345\210\235\350\257\206Lock\344\270\216AbstractQueuedSynchronizer(AQS)/\345\210\235\350\257\206Lock\344\270\216AbstractQueuedSynchronizer(AQS).md" "b/08.\345\210\235\350\257\206Lock\344\270\216AbstractQueuedSynchronizer(AQS)/\345\210\235\350\257\206Lock\344\270\216AbstractQueuedSynchronizer(AQS).md" index 329ff6f..dc88175 100644 --- "a/08.\345\210\235\350\257\206Lock\344\270\216AbstractQueuedSynchronizer(AQS)/\345\210\235\350\257\206Lock\344\270\216AbstractQueuedSynchronizer(AQS).md" +++ "b/08.\345\210\235\350\257\206Lock\344\270\216AbstractQueuedSynchronizer(AQS)/\345\210\235\350\257\206Lock\344\270\216AbstractQueuedSynchronizer(AQS).md" @@ -27,9 +27,13 @@ 我们现在就来看看lock接口定义了哪些方法: > void lock(); //获取锁 +> > void lockInterruptibly() throws InterruptedException;//获取锁的过程能够响应中断 +> > boolean tryLock();//非阻塞式响应中断能立即返回,获取锁放回true反之返回fasle +> > boolean tryLock(long time, TimeUnit unit) throws InterruptedException;//超时获取锁,在超时内或者未中断的情况下能够获取锁 +> > Condition newCondition();//获取与lock绑定的等待通知组件,当前线程必须获得了锁才能进行等待,进行等待时会先释放锁,当再次获取锁时才能从等待中返回 上面是lock接口下的五个方法,也只是从源码中英译中翻译了一遍,感兴趣的可以自己的去看看。那么在locks包下有哪些类实现了该接口了?先从最熟悉的ReentrantLock说起。 diff --git "a/09.\346\267\261\345\205\245\347\220\206\350\247\243AbstractQueuedSynchronizer(AQS)/\346\267\261\345\205\245\347\220\206\350\247\243AbstractQueuedSynchronizer(AQS).md" "b/09.\346\267\261\345\205\245\347\220\206\350\247\243AbstractQueuedSynchronizer(AQS)/\346\267\261\345\205\245\347\220\206\350\247\243AbstractQueuedSynchronizer(AQS).md" index f76488d..3f3e1a2 100644 --- "a/09.\346\267\261\345\205\245\347\220\206\350\247\243AbstractQueuedSynchronizer(AQS)/\346\267\261\345\205\245\347\220\206\350\247\243AbstractQueuedSynchronizer(AQS).md" +++ "b/09.\346\267\261\345\205\245\347\220\206\350\247\243AbstractQueuedSynchronizer(AQS)/\346\267\261\345\205\245\347\220\206\350\247\243AbstractQueuedSynchronizer(AQS).md" @@ -8,14 +8,20 @@ **独占式锁:** > void acquire(int arg):独占式获取同步状态,如果获取失败则插入同步队列进行等待; +> > void acquireInterruptibly(int arg):与acquire方法相同,但在同步队列中进行等待的时候可以检测中断; +> > boolean tryAcquireNanos(int arg, long nanosTimeout):在acquireInterruptibly基础上增加了超时等待功能,在超时时间内没有获得同步状态返回false; +> > boolean release(int arg):释放同步状态,该方法会唤醒在同步队列中的下一个节点 **共享式锁:** > void acquireShared(int arg):共享式获取同步状态,与独占式的区别在于同一时刻有多个线程获取同步状态; +> > void acquireSharedInterruptibly(int arg):在acquireShared方法基础上增加了能响应中断的功能; +> > boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly基础上增加了超时等待的功能; +> > boolean releaseShared(int arg):共享式释放同步状态 @@ -27,41 +33,49 @@ 在AQS有一个静态内部类Node,其中有这样一些属性: > volatile int waitStatus //节点状态 +> > volatile Node prev //当前节点/线程的前驱节点 +> > volatile Node next; //当前节点/线程的后继节点 +> > volatile Thread thread;//加入同步队列的线程引用 +> > Node nextWaiter;//等待队列中的下一个节点 节点的状态有以下这些: > int CANCELLED = 1//节点从同步队列中取消 +> > int SIGNAL = -1//后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,使得后继节点的线程能够运行; +> > int CONDITION = -2//当前节点进入等待队列中 +> > int PROPAGATE = -3//表示下一次共享式同步状态获取将会无条件传播下去 +> > int INITIAL = 0;//初始状态 现在我们知道了节点的数据结构类型,并且每个节点拥有其前驱和后继节点,很显然这是**一个双向队列**。同样的我们可以用一段demo看一下。 - - public class LockDemo { - private static ReentrantLock lock = new ReentrantLock(); - - public static void main(String[] args) { - for (int i = 0; i < 5; i++) { - Thread thread = new Thread(() -> { - lock.lock(); - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - lock.unlock(); - } - }); - thread.start(); - } - } - } - +```java +public class LockDemo { + private static ReentrantLock lock = new ReentrantLock(); + + public static void main(String[] args) { + for (int i = 0; i < 5; i++) { + Thread thread = new Thread(() -> { + lock.lock(); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + lock.unlock(); + } + }); + thread.start(); + } + } +} +``` 实例代码中开启了5个线程,先获取锁之后再睡眠10S中,实际上这里让线程睡眠是想模拟出当线程无法获取锁时进入同步队列的情况。通过debug,当Thread-4(在本例中最后一个线程)获取锁失败后进入同步时,AQS时现在的同步队列如图所示: @@ -69,10 +83,10 @@ Thread-0先获得锁后进行睡眠,其他线程(Thread-1,Thread-2,Thread-3,Thread-4)获取锁失败进入同步队列,同时也可以很清楚的看出来每个节点有两个域:prev(前驱)和next(后继),并且每个节点用来保存获取同步状态失败的线程引用以及等待状态等信息。另外AQS中有两个重要的成员变量: - - private transient volatile Node head; - private transient volatile Node tail; - +```java +private transient volatile Node head; +private transient volatile Node tail; +``` 也就是说AQS实际上通过头尾指针来管理同步队列,同时实现包括获取锁失败的线程进行入队,释放锁时对同步队列中的线程进行通知等核心方法。其示意图如下: @@ -93,96 +107,96 @@ Thread-0先获得锁后进行睡眠,其他线程(Thread-1,Thread-2,Thread-3, ## 3.1 独占锁的获取(acquire方法) 我们继续通过看源码和debug的方式来看,还是以上面的demo为例,调用lock()方法是获取独占式锁,获取失败就将当前线程加入同步队列,成功则线程执行。而lock()方法实际上会调用AQS的**acquire()**方法,源码如下 - - public final void acquire(int arg) { - //先看同步状态是否获取成功,如果成功则方法结束返回 - //若失败则先调用addWaiter()方法再调用acquireQueued()方法 - if (!tryAcquire(arg) && - acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) - selfInterrupt(); - } - +```java +public final void acquire(int arg) { + //先看同步状态是否获取成功,如果成功则方法结束返回 + //若失败则先调用addWaiter()方法再调用acquireQueued()方法 + if (!tryAcquire(arg) && + acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) + selfInterrupt(); +} +``` 关键信息请看注释,acquire根据当前获得同步状态成功与否做了两件事情:1. 成功,则方法结束返回,2. 失败,则先调用addWaiter()然后在调用acquireQueued()方法。 > **获取同步状态失败,入队操作** 当线程获取独占式锁失败后就会将当前线程加入同步队列,那么加入队列的方式是怎样的了?我们接下来就应该去研究一下addWaiter()和acquireQueued()。addWaiter()源码如下: - - private Node addWaiter(Node mode) { - // 1. 将当前线程构建成Node类型 - Node node = new Node(Thread.currentThread(), mode); - // Try the fast path of enq; backup to full enq on failure - // 2. 当前尾节点是否为null? - Node pred = tail; - if (pred != null) { - // 2.2 将当前节点尾插入的方式插入同步队列中 - node.prev = pred; - if (compareAndSetTail(pred, node)) { - pred.next = node; - return node; - } - } - // 2.1. 当前同步队列尾节点为null,说明当前线程是第一个加入同步队列进行等待的线程 - enq(node); - return node; - } - +```java +private Node addWaiter(Node mode) { + // 1. 将当前线程构建成Node类型 + Node node = new Node(Thread.currentThread(), mode); + // Try the fast path of enq; backup to full enq on failure + // 2. 当前尾节点是否为null? + Node pred = tail; + if (pred != null) { + // 2.2 将当前节点尾插入的方式插入同步队列中 + node.prev = pred; + if (compareAndSetTail(pred, node)) { + pred.next = node; + return node; + } + } + // 2.1. 当前同步队列尾节点为null,说明当前线程是第一个加入同步队列进行等待的线程 + enq(node); + return node; +} +``` 分析可以看上面的注释。程序的逻辑主要分为两个部分:**1. 当前同步队列的尾节点为null,调用方法enq()插入;2. 当前队列的尾节点不为null,则采用尾插入(compareAndSetTail()方法)的方式入队。**另外还会有另外一个问题:如果 `if (compareAndSetTail(pred, node))`为false怎么办?会继续执行到enq()方法,同时很明显compareAndSetTail是一个CAS操作,通常来说如果CAS操作失败会继续自旋(死循环)进行重试。因此,经过我们这样的分析,enq()方法可能承担两个任务:**1. 处理当前同步队列尾节点为null时进行入队操作;2. 如果CAS尾插入节点失败后负责自旋进行尝试。**那么是不是真的就像我们分析的一样了?只有源码会告诉我们答案:),enq()源码如下: - - private Node enq(final Node node) { - for (;;) { - Node t = tail; - if (t == null) { // Must initialize - //1. 构造头结点 - if (compareAndSetHead(new Node())) - tail = head; - } else { - // 2. 尾插入,CAS操作失败自旋尝试 - node.prev = t; - if (compareAndSetTail(t, node)) { - t.next = node; - return t; - } - } - } - } - +```java +private Node enq(final Node node) { + for (;;) { + Node t = tail; + if (t == null) { // Must initialize + //1. 构造头结点 + if (compareAndSetHead(new Node())) + tail = head; + } else { + // 2. 尾插入,CAS操作失败自旋尝试 + node.prev = t; + if (compareAndSetTail(t, node)) { + t.next = node; + return t; + } + } + } +} +``` 在上面的分析中我们可以看出在第1步中会先创建头结点,说明同步队列是**带头结点的链式存储结构**。带头结点与不带头结点相比,会在入队和出队的操作中获得更大的便捷性,因此同步队列选择了带头结点的链式存储结构。那么带头节点的队列初始化时机是什么?自然而然是在**tail为null时,即当前线程是第一次插入同步队列**。compareAndSetTail(t, node)方法会利用CAS操作设置尾节点,如果CAS操作失败会在`for (;;)`for死循环中不断尝试,直至成功return返回为止。因此,对enq()方法可以做这样的总结: 1. **在当前线程是第一个加入同步队列时,调用compareAndSetHead(new Node())方法,完成链式队列的头结点的初始化**; 2. **自旋不断尝试CAS尾插入节点直至成功为止**。 现在我们已经很清楚获取独占式锁失败的线程包装成Node然后插入同步队列的过程了?那么紧接着会有下一个问题?在同步队列中的节点(线程)会做什么事情了来保证自己能够有机会获得独占式锁了?带着这样的问题我们就来看看acquireQueued()方法,从方法名就可以很清楚,这个方法的作用就是排队获取锁的过程,源码如下: - - final boolean acquireQueued(final Node node, int arg) { - boolean failed = true; - try { - boolean interrupted = false; - for (;;) { - // 1. 获得当前节点的先驱节点 - final Node p = node.predecessor(); - // 2. 当前节点能否获取独占式锁 - // 2.1 如果当前节点的先驱节点是头结点并且成功获取同步状态,即可以获得独占式锁 - if (p == head && tryAcquire(arg)) { - //队列头指针用指向当前节点 - setHead(node); - //释放前驱节点 - p.next = null; // help GC - failed = false; - return interrupted; - } - // 2.2 获取锁失败,线程进入等待状态等待获取独占式锁 - if (shouldParkAfterFailedAcquire(p, node) && - parkAndCheckInterrupt()) - interrupted = true; - } - } finally { - if (failed) - cancelAcquire(node); - } - } - +```java +final boolean acquireQueued(final Node node, int arg) { + boolean failed = true; + try { + boolean interrupted = false; + for (;;) { + // 1. 获得当前节点的先驱节点 + final Node p = node.predecessor(); + // 2. 当前节点能否获取独占式锁 + // 2.1 如果当前节点的先驱节点是头结点并且成功获取同步状态,即可以获得独占式锁 + if (p == head && tryAcquire(arg)) { + //队列头指针用指向当前节点 + setHead(node); + //释放前驱节点 + p.next = null; // help GC + failed = false; + return interrupted; + } + // 2.2 获取锁失败,线程进入等待状态等待获取独占式锁 + if (shouldParkAfterFailedAcquire(p, node) && + parkAndCheckInterrupt()) + interrupted = true; + } + } finally { + if (failed) + cancelAcquire(node); + } +} +``` 程序逻辑通过注释已经标出,整体来看这是一个这又是一个自旋的过程(for (;;)),代码首先获取当前节点的先驱节点,**如果先驱节点是头结点的并且成功获得同步状态的时候(if (p == head && tryAcquire(arg))),当前节点所指向的线程能够获取锁**。反之,获取锁失败进入等待状态。整体示意图为下图: @@ -192,22 +206,22 @@ Thread-0先获得锁后进行睡眠,其他线程(Thread-1,Thread-2,Thread-3, > **获取锁成功,出队操作** 获取锁的节点出队的逻辑是: - +```java //队列头结点引用指向当前节点 setHead(node); //释放前驱节点 p.next = null; // help GC failed = false; return interrupted; - +``` setHead()方法为: - +```java private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } - +``` 将当前节点通过setHead()方法设置为队列的头结点,然后将之前的头结点的next域设置为null并且pre域也为null,即与队列断开,无任何引用方便GC时能够将内存进行回收。示意图如下: @@ -218,45 +232,45 @@ setHead()方法为: 那么当获取锁失败的时候会调用shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法,看看他们做了什么事情。shouldParkAfterFailedAcquire()方法源码为: - - private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { - int ws = pred.waitStatus; - if (ws == Node.SIGNAL) - /* - * This node has already set status asking a release - * to signal it, so it can safely park. - */ - return true; - if (ws > 0) { - /* - * Predecessor was cancelled. Skip over predecessors and - * indicate retry. - */ - do { - node.prev = pred = pred.prev; - } while (pred.waitStatus > 0); - pred.next = node; - } else { - /* - * waitStatus must be 0 or PROPAGATE. Indicate that we - * need a signal, but don't park yet. Caller will need to - * retry to make sure it cannot acquire before parking. - */ - compareAndSetWaitStatus(pred, ws, Node.SIGNAL); - } - return false; +```java +private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { + int ws = pred.waitStatus; + if (ws == Node.SIGNAL) + /* + * This node has already set status asking a release + * to signal it, so it can safely park. + */ + return true; + if (ws > 0) { + /* + * Predecessor was cancelled. Skip over predecessors and + * indicate retry. + */ + do { + node.prev = pred = pred.prev; + } while (pred.waitStatus > 0); + pred.next = node; + } else { + /* + * waitStatus must be 0 or PROPAGATE. Indicate that we + * need a signal, but don't park yet. Caller will need to + * retry to make sure it cannot acquire before parking. + */ + compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } - + return false; +} +``` shouldParkAfterFailedAcquire()方法主要逻辑是使用`compareAndSetWaitStatus(pred, ws, Node.SIGNAL)`使用CAS将节点状态由INITIAL设置成SIGNAL,表示当前线程阻塞。当compareAndSetWaitStatus设置失败则说明shouldParkAfterFailedAcquire方法返回false,然后会在acquireQueued()方法中for (;;)死循环中会继续重试,直至compareAndSetWaitStatus设置节点状态位为SIGNAL时shouldParkAfterFailedAcquire返回true时才会执行方法parkAndCheckInterrupt()方法,该方法的源码为: - - private final boolean parkAndCheckInterrupt() { - //使得该线程阻塞 - LockSupport.park(this); - return Thread.interrupted(); - } - +```java +private final boolean parkAndCheckInterrupt() { + //使得该线程阻塞 + LockSupport.park(this); + return Thread.interrupted(); +} +``` 该方法的关键是会调用LookSupport.park()方法(关于LookSupport会在以后的文章进行讨论),该方法是用来阻塞当前线程的。因此到这里就应该清楚了,acquireQueued()在自旋过程中主要完成了两件事情: 1. **如果当前节点的前驱节点是头节点,并且能够获得同步状态的话,当前线程能够获得锁该方法执行结束退出**; @@ -274,49 +288,49 @@ shouldParkAfterFailedAcquire()方法主要逻辑是使用`compareAndSetWaitStatu ## 3.2 独占锁的释放(release()方法) ## 独占锁的释放就相对来说比较容易理解了,废话不多说先来看下源码: - - public final boolean release(int arg) { - if (tryRelease(arg)) { - Node h = head; - if (h != null && h.waitStatus != 0) - unparkSuccessor(h); - return true; - } - return false; - } - +```java +public final boolean release(int arg) { + if (tryRelease(arg)) { + Node h = head; + if (h != null && h.waitStatus != 0) + unparkSuccessor(h); + return true; + } + return false; +} +``` 这段代码逻辑就比较容易理解了,如果同步状态释放成功(tryRelease返回true)则会执行if块中的代码,当head指向的头结点不为null,并且该节点的状态值不为0的话才会执行unparkSuccessor()方法。unparkSuccessor方法源码: - - private void unparkSuccessor(Node node) { - /* - * If status is negative (i.e., possibly needing signal) try - * to clear in anticipation of signalling. It is OK if this - * fails or if status is changed by waiting thread. - */ - int ws = node.waitStatus; - if (ws < 0) - compareAndSetWaitStatus(node, ws, 0); - - /* - * Thread to unpark is held in successor, which is normally - * just the next node. But if cancelled or apparently null, - * traverse backwards from tail to find the actual - * non-cancelled successor. - */ - - //头节点的后继节点 - Node s = node.next; - if (s == null || s.waitStatus > 0) { - s = null; - for (Node t = tail; t != null && t != node; t = t.prev) - if (t.waitStatus <= 0) - s = t; - } - if (s != null) - //后继节点不为null时唤醒该线程 - LockSupport.unpark(s.thread); - } - +```java +private void unparkSuccessor(Node node) { + /* + * If status is negative (i.e., possibly needing signal) try + * to clear in anticipation of signalling. It is OK if this + * fails or if status is changed by waiting thread. + */ + int ws = node.waitStatus; + if (ws < 0) + compareAndSetWaitStatus(node, ws, 0); + + /* + * Thread to unpark is held in successor, which is normally + * just the next node. But if cancelled or apparently null, + * traverse backwards from tail to find the actual + * non-cancelled successor. + */ + + //头节点的后继节点 + Node s = node.next; + if (s == null || s.waitStatus > 0) { + s = null; + for (Node t = tail; t != null && t != node; t = t.prev) + if (t.waitStatus <= 0) + s = t; + } + if (s != null) + //后继节点不为null时唤醒该线程 + LockSupport.unpark(s.thread); +} +``` 源码的关键信息请看注释,首先获取头节点的后继节点,当后继节点的时候会调用LookSupport.unpark()方法,该方法会唤醒该节点的后继节点所包装的线程。因此,**每一次锁释放后就会唤醒队列中该节点的后继节点所引用的线程,从而进一步可以佐证获得锁的过程是一个FIFO(先进先出)的过程。** 到现在我们终于啃下了一块硬骨头了,通过学习源码的方式非常深刻的学习到了独占式锁的获取和释放的过程以及同步队列。可以做一下总结: @@ -332,45 +346,45 @@ shouldParkAfterFailedAcquire()方法主要逻辑是使用`compareAndSetWaitStatu ## 3.3 可中断式获取锁(acquireInterruptibly方法) ## 我们知道lock相较于synchronized有一些更方便的特性,比如能响应中断以及超时等待等特性,现在我们依旧采用通过学习源码的方式来看看能够响应中断是怎么实现的。可响应中断式锁可调用方法lock.lockInterruptibly();而该方法其底层会调用AQS的acquireInterruptibly方法,源码为: - - public final void acquireInterruptibly(int arg) - throws InterruptedException { - if (Thread.interrupted()) - throw new InterruptedException(); - if (!tryAcquire(arg)) - //线程获取锁失败 - doAcquireInterruptibly(arg); - } - +```java +public final void acquireInterruptibly(int arg) + throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + if (!tryAcquire(arg)) + //线程获取锁失败 + doAcquireInterruptibly(arg); +} +``` 在获取同步状态失败后就会调用doAcquireInterruptibly方法: - - private void doAcquireInterruptibly(int arg) - throws InterruptedException { - //将节点插入到同步队列中 - final Node node = addWaiter(Node.EXCLUSIVE); - boolean failed = true; - try { - for (;;) { - final Node p = node.predecessor(); - //获取锁出队 - if (p == head && tryAcquire(arg)) { - setHead(node); - p.next = null; // help GC - failed = false; - return; - } - if (shouldParkAfterFailedAcquire(p, node) && - parkAndCheckInterrupt()) - //线程中断抛异常 - throw new InterruptedException(); - } - } finally { - if (failed) - cancelAcquire(node); - } - } - +```java +private void doAcquireInterruptibly(int arg) + throws InterruptedException { + //将节点插入到同步队列中 + final Node node = addWaiter(Node.EXCLUSIVE); + boolean failed = true; + try { + for (;;) { + final Node p = node.predecessor(); + //获取锁出队 + if (p == head && tryAcquire(arg)) { + setHead(node); + p.next = null; // help GC + failed = false; + return; + } + if (shouldParkAfterFailedAcquire(p, node) && + parkAndCheckInterrupt()) + //线程中断抛异常 + throw new InterruptedException(); + } + } finally { + if (failed) + cancelAcquire(node); + } +} +``` 关键信息请看注释,现在看这段代码就很轻松了吧:),与acquire方法逻辑几乎一致,唯一的区别是当**parkAndCheckInterrupt**返回true时即线程阻塞时该线程被中断,代码抛出被中断异常。 ## 3.4 超时等待式获取锁(tryAcquireNanos()方法) @@ -381,55 +395,55 @@ shouldParkAfterFailedAcquire()方法主要逻辑是使用`compareAndSetWaitStatu 3. 超时时间结束,仍未获得锁返回false。 我们仍然通过采取阅读源码的方式来学习底层具体是怎么实现的,该方法会调用AQS的方法tryAcquireNanos(),源码为: - - public final boolean tryAcquireNanos(int arg, long nanosTimeout) - throws InterruptedException { - if (Thread.interrupted()) - throw new InterruptedException(); - return tryAcquire(arg) || - //实现超时等待的效果 - doAcquireNanos(arg, nanosTimeout); - } - +```java +public final boolean tryAcquireNanos(int arg, long nanosTimeout) + throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + return tryAcquire(arg) || + //实现超时等待的效果 + doAcquireNanos(arg, nanosTimeout); +} +``` 很显然这段源码最终是靠doAcquireNanos方法实现超时等待的效果,该方法源码如下: - - private boolean doAcquireNanos(int arg, long nanosTimeout) - throws InterruptedException { - if (nanosTimeout <= 0L) - return false; - //1. 根据超时时间和当前时间计算出截止时间 - final long deadline = System.nanoTime() + nanosTimeout; - final Node node = addWaiter(Node.EXCLUSIVE); - boolean failed = true; - try { - for (;;) { - final Node p = node.predecessor(); - //2. 当前线程获得锁出队列 - if (p == head && tryAcquire(arg)) { - setHead(node); - p.next = null; // help GC - failed = false; - return true; - } - // 3.1 重新计算超时时间 - nanosTimeout = deadline - System.nanoTime(); - // 3.2 已经超时返回false - if (nanosTimeout <= 0L) - return false; - // 3.3 线程阻塞等待 - if (shouldParkAfterFailedAcquire(p, node) && - nanosTimeout > spinForTimeoutThreshold) - LockSupport.parkNanos(this, nanosTimeout); - // 3.4 线程被中断抛出被中断异常 - if (Thread.interrupted()) - throw new InterruptedException(); - } - } finally { - if (failed) - cancelAcquire(node); - } - } - +```java +private boolean doAcquireNanos(int arg, long nanosTimeout) + throws InterruptedException { + if (nanosTimeout <= 0L) + return false; + //1. 根据超时时间和当前时间计算出截止时间 + final long deadline = System.nanoTime() + nanosTimeout; + final Node node = addWaiter(Node.EXCLUSIVE); + boolean failed = true; + try { + for (;;) { + final Node p = node.predecessor(); + //2. 当前线程获得锁出队列 + if (p == head && tryAcquire(arg)) { + setHead(node); + p.next = null; // help GC + failed = false; + return true; + } + // 3.1 重新计算超时时间 + nanosTimeout = deadline - System.nanoTime(); + // 3.2 已经超时返回false + if (nanosTimeout <= 0L) + return false; + // 3.3 线程阻塞等待 + if (shouldParkAfterFailedAcquire(p, node) && + nanosTimeout > spinForTimeoutThreshold) + LockSupport.parkNanos(this, nanosTimeout); + // 3.4 线程被中断抛出被中断异常 + if (Thread.interrupted()) + throw new InterruptedException(); + } + } finally { + if (failed) + cancelAcquire(node); + } +} +``` 程序逻辑如图所示: ![超时等待式获取锁(doAcquireNanos()方法)](http://upload-images.jianshu.io/upload_images/2615789-a80779d4736afb87.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) @@ -444,89 +458,89 @@ shouldParkAfterFailedAcquire()方法主要逻辑是使用`compareAndSetWaitStatu # 4. 共享锁 # ## 4.1 共享锁的获取(acquireShared()方法) ## 在聊完AQS对独占锁的实现后,我们继续一鼓作气的来看看共享锁是怎样实现的?共享锁的获取方法为acquireShared,源码为: - - public final void acquireShared(int arg) { - if (tryAcquireShared(arg) < 0) - doAcquireShared(arg); - } - +```java +public final void acquireShared(int arg) { + if (tryAcquireShared(arg) < 0) + doAcquireShared(arg); +} +``` 这段源码的逻辑很容易理解,在该方法中会首先调用tryAcquireShared方法,tryAcquireShared返回值是一个int类型,当返回值为大于等于0的时候方法结束说明获得成功获取锁,否则,表明获取同步状态失败即所引用的线程获取锁失败,会执行doAcquireShared方法,该方法的源码为: - - private void doAcquireShared(int arg) { - final Node node = addWaiter(Node.SHARED); - boolean failed = true; - try { - boolean interrupted = false; - for (;;) { - final Node p = node.predecessor(); - if (p == head) { - int r = tryAcquireShared(arg); - if (r >= 0) { - // 当该节点的前驱节点是头结点且成功获取同步状态 - setHeadAndPropagate(node, r); - p.next = null; // help GC - if (interrupted) - selfInterrupt(); - failed = false; - return; - } - } - if (shouldParkAfterFailedAcquire(p, node) && - parkAndCheckInterrupt()) - interrupted = true; - } - } finally { - if (failed) - cancelAcquire(node); - } - } - +```java +private void doAcquireShared(int arg) { + final Node node = addWaiter(Node.SHARED); + boolean failed = true; + try { + boolean interrupted = false; + for (;;) { + final Node p = node.predecessor(); + if (p == head) { + int r = tryAcquireShared(arg); + if (r >= 0) { + // 当该节点的前驱节点是头结点且成功获取同步状态 + setHeadAndPropagate(node, r); + p.next = null; // help GC + if (interrupted) + selfInterrupt(); + failed = false; + return; + } + } + if (shouldParkAfterFailedAcquire(p, node) && + parkAndCheckInterrupt()) + interrupted = true; + } + } finally { + if (failed) + cancelAcquire(node); + } +} +``` 现在来看这段代码会不会很容易了?逻辑几乎和独占式锁的获取一模一样,这里的自旋过程中能够退出的条件**是当前节点的前驱节点是头结点并且tryAcquireShared(arg)返回值大于等于0即能成功获得同步状态**。 ## 4.2 共享锁的释放(releaseShared()方法) ## 共享锁的释放在AQS中会调用方法releaseShared: - - public final boolean releaseShared(int arg) { - if (tryReleaseShared(arg)) { - doReleaseShared(); - return true; - } - return false; - } - +```java +public final boolean releaseShared(int arg) { + if (tryReleaseShared(arg)) { + doReleaseShared(); + return true; + } + return false; +} +``` 当成功释放同步状态之后即tryReleaseShared会继续执行doReleaseShared方法: +```java +private void doReleaseShared() { + /* + * Ensure that a release propagates, even if there are other + * in-progress acquires/releases. This proceeds in the usual + * way of trying to unparkSuccessor of head if it needs + * signal. But if it does not, status is set to PROPAGATE to + * ensure that upon release, propagation continues. + * Additionally, we must loop in case a new node is added + * while we are doing this. Also, unlike other uses of + * unparkSuccessor, we need to know if CAS to reset status + * fails, if so rechecking. + */ + for (;;) { + Node h = head; + if (h != null && h != tail) { + int ws = h.waitStatus; + if (ws == Node.SIGNAL) { + if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) + continue; // loop to recheck cases + unparkSuccessor(h); + } + else if (ws == 0 && + !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) + continue; // loop on failed CAS + } + if (h == head) // loop if head changed + break; + } +} - private void doReleaseShared() { - /* - * Ensure that a release propagates, even if there are other - * in-progress acquires/releases. This proceeds in the usual - * way of trying to unparkSuccessor of head if it needs - * signal. But if it does not, status is set to PROPAGATE to - * ensure that upon release, propagation continues. - * Additionally, we must loop in case a new node is added - * while we are doing this. Also, unlike other uses of - * unparkSuccessor, we need to know if CAS to reset status - * fails, if so rechecking. - */ - for (;;) { - Node h = head; - if (h != null && h != tail) { - int ws = h.waitStatus; - if (ws == Node.SIGNAL) { - if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) - continue; // loop to recheck cases - unparkSuccessor(h); - } - else if (ws == 0 && - !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) - continue; // loop on failed CAS - } - if (h == head) // loop if head changed - break; - } - } - - +``` 这段方法跟独占式锁释放过程有点点不同,在共享式锁的释放过程中,对于能够支持多个线程同时访问的并发组件,必须保证多个线程能够安全的释放同步状态,这里采用的CAS保证,当CAS操作失败continue,在下一次循环中进行重试。 ## 4.3 可中断(acquireSharedInterruptibly()方法),超时等待(tryAcquireSharedNanos()方法) ## diff --git "a/10.\345\275\273\345\272\225\347\220\206\350\247\243ReentrantLock/\345\275\273\345\272\225\347\220\206\350\247\243ReentrantLock.md" "b/10.\345\275\273\345\272\225\347\220\206\350\247\243ReentrantLock/\345\275\273\345\272\225\347\220\206\350\247\243ReentrantLock.md" index 59fe6f4..8b6af75 100644 --- "a/10.\345\275\273\345\272\225\347\220\206\350\247\243ReentrantLock/\345\275\273\345\272\225\347\220\206\350\247\243ReentrantLock.md" +++ "b/10.\345\275\273\345\272\225\347\220\206\350\247\243ReentrantLock/\345\275\273\345\272\225\347\220\206\350\247\243ReentrantLock.md" @@ -7,7 +7,7 @@ ReentrantLock重入锁,是实现Lock接口的一个类,也是在实际编程 要想支持重入性,就要解决两个问题:**1. 在线程获取锁的时候,如果已经获取锁的线程是当前线程的话则直接再次获取成功;2. 由于锁会被获取n次,那么只有锁在被释放同样的n次之后,该锁才算是完全释放成功。**通过[这篇文章](https://juejin.im/post/5aeb07ab6fb9a07ac36350c8),我们知道,同步组件主要是通过重写AQS的几个protected方法来表达自己的同步语义。针对第一个问题,我们来看看ReentrantLock是怎样实现的,以非公平锁为例,判断当前线程能否获得锁为例,核心方法为nonfairTryAcquire: - +```java final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); @@ -29,9 +29,9 @@ ReentrantLock重入锁,是实现Lock接口的一个类,也是在实际编程 } return false; } - +``` 这段代码的逻辑也很简单,具体请看注释。为了支持重入性,在第二步增加了处理逻辑,如果该锁已经被线程所占有了,会继续检查占有线程是否为当前线程,如果是的话,同步状态加1返回true,表示可以再次获取成功。每次重新获取都会对同步状态进行加一的操作,那么释放的时候处理思路是怎样的了?(依然还是以非公平锁为例)核心方法为tryRelease: - +```java protected final boolean tryRelease(int releases) { //1. 同步状态减1 int c = getState() - releases; @@ -47,24 +47,24 @@ ReentrantLock重入锁,是实现Lock接口的一个类,也是在实际编程 setState(c); return free; } - +``` 代码的逻辑请看注释,需要注意的是,重入锁的释放必须得等到同步状态为0时锁才算成功释放,否则锁仍未释放。如果锁被获取n次,释放了n-1次,该锁未完全释放返回false,只有被释放n次才算成功释放,返回true。到现在我们可以理清ReentrantLock重入性的实现了,也就是理解了同步语义的第一条。 # 3. 公平锁与公平锁 # ReentrantLock支持两种锁:**公平锁**和**非公平锁**。**何谓公平性,是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求上的绝对时间顺序,满足FIFO**。ReentrantLock的构造方法无参时是构造非公平锁,源码为: - +```java public ReentrantLock() { sync = new NonfairSync(); } - +``` 另外还提供了另外一种方式,可传入一个boolean值,true时为公平锁,false时为非公平锁,源码为: - +```java public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } - +``` 在上面非公平锁获取时(nonfairTryAcquire方法)只是简单的获取了一下当前状态做了一些逻辑处理,并没有考虑到当前同步队列中线程等待的情况。我们来看看公平锁的处理逻辑是怎样的,核心方法为: - +```java protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); @@ -85,7 +85,7 @@ ReentrantLock支持两种锁:**公平锁**和**非公平锁**。**何谓公平 return false; } } - +``` 这段代码的逻辑与nonfairTryAcquire基本上一直,唯一的不同在于增加了hasQueuedPredecessors的逻辑判断,方法名就可知道该方法用来判断当前节点在同步队列中是否有前驱节点的判断,如果有前驱节点说明有线程比当前线程更早的请求资源,根据公平性,当前线程请求资源失败。如果当前节点没有前驱节点的话,再才有做后面的逻辑判断的必要性。**公平锁每次都是从同步队列中的第一个节点获取到锁,而非公平性锁则不一定,有可能刚释放锁的线程能再次获取到锁**。 > **公平锁 VS 非公平锁** diff --git "a/11.\346\267\261\345\205\245\347\220\206\350\247\243\350\257\273\345\206\231\351\224\201ReentrantReadWriteLock/\346\267\261\345\205\245\347\220\206\350\247\243\350\257\273\345\206\231\351\224\201ReentrantReadWriteLock.md" "b/11.\346\267\261\345\205\245\347\220\206\350\247\243\350\257\273\345\206\231\351\224\201ReentrantReadWriteLock/\346\267\261\345\205\245\347\220\206\350\247\243\350\257\273\345\206\231\351\224\201ReentrantReadWriteLock.md" index 18eba37..72341d9 100644 --- "a/11.\346\267\261\345\205\245\347\220\206\350\247\243\350\257\273\345\206\231\351\224\201ReentrantReadWriteLock/\346\267\261\345\205\245\347\220\206\350\247\243\350\257\273\345\206\231\351\224\201ReentrantReadWriteLock.md" +++ "b/11.\346\267\261\345\205\245\347\220\206\350\247\243\350\257\273\345\206\231\351\224\201ReentrantReadWriteLock/\346\267\261\345\205\245\347\220\206\350\247\243\350\257\273\345\206\231\351\224\201ReentrantReadWriteLock.md" @@ -13,7 +13,7 @@ ## 2.1.写锁的获取 ## 同步组件的实现聚合了同步器(AQS),并通过重写重写同步器(AQS)中的方法实现同步组件的同步语义(关于同步组件的实现层级结构可以[看这篇文章](https://juejin.im/post/5aeb055b6fb9a07abf725c8c),AQS的底层实现分析可以[看这篇文章](https://juejin.im/post/5aeb07ab6fb9a07ac36350c8))。因此,写锁的实现依然也是采用这种方式。在同一时刻写锁是不能被多个线程所获取,很显然写锁是独占式锁,而实现写锁的同步语义是通过重写AQS中的tryAcquire方法实现的。源码为: - +```java protected final boolean tryAcquire(int acquires) { /* * Walkthrough: @@ -51,14 +51,14 @@ setExclusiveOwnerThread(current); return true; } - +``` 这段代码的逻辑请看注释,这里有一个地方需要重点关注,exclusiveCount(c)方法,该方法源码为: static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } 其中**EXCLUSIVE_MASK**为: `static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;` EXCLUSIVE _MASK为1左移16位然后减1,即为0x0000FFFF。而exclusiveCount方法是将同步状态(state为int类型)与0x0000FFFF相与,即取同步状态的低16位。那么低16位代表什么呢?根据exclusiveCount方法的注释为独占式获取的次数即写锁被获取的次数,现在就可以得出来一个结论**同步状态的低16位用来表示写锁的获取次数**。同时还有一个方法值得我们注意: - +```java static int sharedCount(int c) { return c >>> SHARED_SHIFT; } - +``` 该方法是获取读锁被获取的次数,是将同步状态(int c)右移16次,即取同步状态的高16位,现在我们可以得出另外一个结论**同步状态的高16位用来表示读锁被获取的次数**。现在还记得我们开篇说的需要弄懂的第一个问题吗?读写锁是怎样实现分别记录读锁和写锁的状态的,现在这个问题的答案就已经被我们弄清楚了,其示意图如下图所示: ![读写锁的读写状态设计.png](http://upload-images.jianshu.io/upload_images/2615789-6af1818bbfa83051.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) @@ -68,7 +68,7 @@ ## 2.2.写锁的释放 ## 写锁释放通过重写AQS的tryRelease方法,源码为: - +```java protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); @@ -82,13 +82,13 @@ setState(nextc); return free; } - +``` 源码的实现逻辑请看注释,不难理解与ReentrantLock基本一致,这里需要注意的是,减少写状态` int nextc = getState() - releases;`只需要用**当前同步状态直接减去写状态的原因正是我们刚才所说的写状态是由同步状态的低16位表示的**。 # 3.读锁详解 # ## 3.1.读锁的获取 ## 看完了写锁,现在来看看读锁,读锁不是独占式锁,即同一时刻该锁可以被多个读线程获取也就是一种共享式锁。按照之前对AQS介绍,实现共享式同步组件的同步语义需要通过重写AQS的tryAcquireShared方法和tryReleaseShared方法。读锁的获取实现方法为: - +```java protected final int tryAcquireShared(int unused) { /* * Walkthrough: @@ -137,12 +137,12 @@ //4. 处理在第二步中CAS操作失败的自旋已经实现重入性 return fullTryAcquireShared(current); } - +``` 代码的逻辑请看注释,需要注意的是 **当写锁被其他线程获取后,读锁获取失败**,否则获取成功利用CAS更新同步状态。另外,当前同步状态需要加上SHARED_UNIT(`(1 << SHARED_SHIFT)`即0x00010000)的原因这是我们在上面所说的同步状态的高16位用来表示读锁被获取的次数。如果CAS失败或者已经获取读锁的线程再次获取读锁时,是靠fullTryAcquireShared方法实现的,这段代码就不展开说了,有兴趣可以看看。 ## 3.2.读锁的释放 ## 读锁释放的实现主要通过方法tryReleaseShared,源码如下,主要逻辑请看注释: - +```java protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 前面还是为了实现getReadHoldCount等新功能 @@ -175,10 +175,10 @@ return nextc == 0; } } - +``` # 4.锁降级 # 读写锁支持锁降级,**遵循按照获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁**,不支持锁升级,关于锁降级下面的示例代码摘自ReentrantWriteReadLock源码中: - +```java void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { @@ -205,4 +205,5 @@ rwl.readLock().unlock(); } } - } \ No newline at end of file + } +``` diff --git "a/12.\350\257\246\350\247\243Condition\347\232\204await\345\222\214signal\347\255\211\345\276\205\351\200\232\347\237\245\346\234\272\345\210\266/\350\257\246\350\247\243Condition\347\232\204await\345\222\214signal\347\255\211\345\276\205\351\200\232\347\237\245\346\234\272\345\210\266.md" "b/12.\350\257\246\350\247\243Condition\347\232\204await\345\222\214signal\347\255\211\345\276\205\351\200\232\347\237\245\346\234\272\345\210\266/\350\257\246\350\247\243Condition\347\232\204await\345\222\214signal\347\255\211\345\276\205\351\200\232\347\237\245\346\234\272\345\210\266.md" index df47e54..31ddd24 100644 --- "a/12.\350\257\246\350\247\243Condition\347\232\204await\345\222\214signal\347\255\211\345\276\205\351\200\232\347\237\245\346\234\272\345\210\266/\350\257\246\350\247\243Condition\347\232\204await\345\222\214signal\347\255\211\345\276\205\351\200\232\347\237\245\346\234\272\345\210\266.md" +++ "b/12.\350\257\246\350\247\243Condition\347\232\204await\345\222\214signal\347\255\211\345\276\205\351\200\232\347\237\245\346\234\272\345\210\266/\350\257\246\350\247\243Condition\347\232\204await\345\222\214signal\347\255\211\345\276\205\351\200\232\347\237\245\346\234\272\345\210\266.md" @@ -24,18 +24,18 @@ # 2.Condition实现原理分析 # ## 2.1 等待队列 ## 要想能够深入的掌握condition还是应该知道它的实现原理,现在我们一起来看看condiiton的源码。创建一个condition对象是通过`lock.newCondition()`,而这个方法实际上是会new出一个**ConditionObject**对象,该类是AQS([AQS的实现原理的文章](https://juejin.im/post/5aeb07ab6fb9a07ac36350c8))的一个内部类,有兴趣可以去看看。前面我们说过,condition是要和lock配合使用的也就是condition和Lock是绑定在一起的,而lock的实现原理又依赖于AQS,自然而然ConditionObject作为AQS的一个内部类无可厚非。我们知道在锁机制的实现上,AQS内部维护了一个同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到**同步队列**,同样的,condition内部也是使用同样的方式,内部维护了一个 **等待队列**,所有调用condition.await方法的线程会加入到等待队列中,并且线程状态转换为等待状态。另外注意到ConditionObject中有两个成员变量: - +```java /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; - +``` 这样我们就可以看出来ConditionObject通过持有等待队列的头尾指针来管理等待队列。主要注意的是Node类复用了在AQS中的Node类,其节点状态和相关属性可以去看[AQS的实现原理的文章](https://juejin.im/post/5aeb07ab6fb9a07ac36350c8),如果您仔细看完这篇文章对condition的理解易如反掌,对lock体系的实现也会有一个质的提升。Node类有这样一个属性: //后继节点 Node nextWaiter; 进一步说明,**等待队列是一个单向队列**,而在之前说AQS时知道同步队列是一个双向队列。接下来我们用一个demo,通过debug进去看是不是符合我们的猜想: - +```java public static void main(String[] args) { for (int i = 0; i < 10; i++) { Thread thread = new Thread(() -> { @@ -51,6 +51,7 @@ thread.start(); } } +``` 这段代码没有任何实际意义,甚至很臭,只是想说明下我们刚才所想的。新建了10个线程,没有线程先获取锁,然后调用condition.await方法释放锁将当前线程加入到等待队列中,通过debug控制当走到第10个线程的时候查看`firstWaiter`即等待队列中的头结点,debug模式下情景图如下: ![debug模式下情景图](http://upload-images.jianshu.io/upload_images/2615789-67a211209835e36d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) @@ -74,7 +75,7 @@ ## 2.2 await实现原理 ## **当调用condition.await()方法后会使得当前获取lock的线程进入到等待队列,如果该线程能够从await()方法返回的话一定是该线程获取了与condition相关联的lock**。接下来,我们还是从源码的角度去看,只有熟悉了源码的逻辑我们的理解才是最深的。await()方法源码为: - +```java public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); @@ -98,10 +99,10 @@ if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } - +``` 代码的主要逻辑**请看注释**,我们都知道**当当前线程调用condition.await()方法后,会使得当前线程释放lock然后加入到等待队列中,直至被signal/signalAll后会使得当前线程从等待队列中移至到同步队列中去,直到获得了lock后才会从await方法返回,或者在等待时被中断会做中断处理**。那么关于这个实现过程我们会有这样几个问题:1. 是怎样将当前线程添加到等待队列中去的?2.释放锁的过程?3.怎样才能从await方法退出?而这段代码的逻辑就是告诉我们这三个问题的答案。具体**请看注释**,在第1步中调用addConditionWaiter将当前线程添加到等待队列中,该方法源码为: - +```java private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. @@ -120,9 +121,9 @@ lastWaiter = node; return node; } - +``` 这段代码就很容易理解了,将当前节点包装成Node,如果等待队列的firstWaiter为null的话(等待队列为空队列),则将firstWaiter指向当前的Node,否则,更新lastWaiter(尾节点)即可。就是**通过尾插入的方式将当前线程封装的Node插入到等待队列中即可**,同时可以看出等待队列是一个**不带头结点的链式队列**,之前我们学习AQS时知道同步队列**是一个带头结点的链式队列**,这是两者的一个区别。将当前节点插入到等待对列之后,会使当前线程释放lock,由fullyRelease方法实现,fullyRelease源码为: - +```java final int fullyRelease(Node node) { boolean failed = true; try { @@ -140,16 +141,16 @@ node.waitStatus = Node.CANCELLED; } } - +``` 这段代码就很容易理解了,**调用AQS的模板方法release方法释放AQS的同步状态并且唤醒在同步队列中头结点的后继节点引用的线程**,如果释放成功则正常返回,若失败的话就抛出异常。到目前为止,这两段代码已经解决了前面的两个问题的答案了,还剩下第三个问题,怎样从await方法退出?现在回过头再来看await方法有这样一段逻辑: - +```java while (!isOnSyncQueue(node)) { // 3. 当前线程进入到等待状态 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } - +``` 很显然,当线程第一次调用condition.await()方法时,会进入到这个while()循环中,然后通过LockSupport.park(this)方法使得当前线程进入等待状态,那么要想退出这个await方法第一个前提条件自然而然的是要先退出这个while循环,出口就只剩下两个地方:**1. 逻辑走到break退出while循环;2. while循环中的逻辑判断为false**。再看代码出现第1种情况的条件是当前等待的线程被中断后代码会走到break退出,第二种情况是当前节点被移动到了同步队列中(即另外线程调用的condition的signal或者signalAll方法),while中逻辑判断为false后结束while循环。总结下,就是**当前线程被中断或者调用condition.signal/condition.signalAll方法当前节点移动到了同步队列后** ,这是当前线程退出await方法的前提条件。当退出while循环后就会调用`acquireQueued(node, savedState)`,这个方法在介绍AQS的底层实现时说过了,若感兴趣的话可以去[看这篇文章](https://juejin.im/post/5aeb07ab6fb9a07ac36350c8),该方法的作用是在**自旋过程中线程不断尝试获取同步状态,直至成功(线程获取到lock)**。这样也说明了**退出await方法必须是已经获得了condition引用(关联)的lock**。到目前为止,开头的三个问题我们通过阅读源码的方式已经完全找到了答案,也对await方法的理解加深。await方法示意图如下图: ![await方法示意图](http://upload-images.jianshu.io/upload_images/2615789-1cb1c2fe3c1ddf38.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) @@ -166,7 +167,7 @@ condition还额外支持了超时机制,使用者可调用方法awaitNanos,awa > 不响应中断的支持 要想不响应中断可以调用condition.awaitUninterruptibly()方法,该方法的源码为: - +```java public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); @@ -179,12 +180,12 @@ condition还额外支持了超时机制,使用者可调用方法awaitNanos,awa if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } - +``` 这段方法与上面的await方法基本一致,只不过减少了对中断的处理,并省略了reportInterruptAfterWait方法抛被中断的异常。 ## 2.3 signal/signalAll实现原理 ## **调用condition的signal或者signalAll方法可以将等待队列中等待时间最长的节点移动到同步队列中**,使得该节点能够有机会获得lock。按照等待队列是先进先出(FIFO)的,所以等待队列的头节点必然会是等待时间最长的节点,也就是每次调用condition的signal方法是将头节点移动到同步队列中。我们来通过看源码的方式来看这样的猜想是不是对的,signal方法源码为: - +```java public final void signal() { //1. 先检测当前线程是否已经获取lock if (!isHeldExclusively()) @@ -194,9 +195,9 @@ condition还额外支持了超时机制,使用者可调用方法awaitNanos,awa if (first != null) doSignal(first); } - +``` signal方法首先会检测当前线程是否已经获取lock,如果没有获取lock会直接抛出异常,如果获取的话再得到等待队列的头指针引用的节点,之后的操作的doSignal方法也是基于该节点。下面我们来看看doSignal方法做了些什么事情,doSignal方法源码为: - +```java private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) @@ -207,9 +208,9 @@ signal方法首先会检测当前线程是否已经获取lock,如果没有获 } while (!transferForSignal(first) && (first = firstWaiter) != null); } - +``` 具体逻辑请看注释,真正对头节点做处理的逻辑在**transferForSignal**放,该方法源码为: - +```java final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. @@ -231,7 +232,7 @@ signal方法首先会检测当前线程是否已经获取lock,如果没有获 LockSupport.unpark(node.thread); return true; } - +``` 关键逻辑请看注释,这段代码主要做了两件事情1.将头结点的状态更改为CONDITION;2.调用enq方法,将该节点尾插入到同步队列中,关于enq方法请看AQS的底层实现这篇文章。现在我们可以得出结论:**调用condition的signal的前提条件是当前线程已经获取了lock,该方法会使得等待队列中的头节点即等待时间最长的那个节点移入到同步队列,而移入到同步队列后才有机会使得等待线程被唤醒,即从await方法中的LockSupport.park(this)方法中返回,从而才有机会使得调用await方法的线程成功退出**。signal执行示意图如下图: ![signal执行示意图](http://upload-images.jianshu.io/upload_images/2615789-3750f5baf7995623.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) @@ -241,7 +242,7 @@ signal方法首先会检测当前线程是否已经获取lock,如果没有获 > signalAll sigllAll与sigal方法的区别体现在doSignalAll方法上,前面我们已经知道d**oSignal方法只会对等待队列的头节点进行操作,**,而doSignalAll的源码为: - +```java private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { @@ -251,7 +252,7 @@ sigllAll与sigal方法的区别体现在doSignalAll方法上,前面我们已 first = next; } while (first != null); } - +``` 该方法只不过时间等待队列中的每一个节点都移入到同步队列中,即“通知”当前调用condition.await()方法的每一个线程。 # 3. await与signal/signalAll的结合思考 # @@ -264,7 +265,7 @@ sigllAll与sigal方法的区别体现在doSignalAll方法上,前面我们已 # 3. 一个例子 # 我们用一个很简单的例子说说condition的用法: - +```java public class AwaitSignal { private static ReentrantLock lock = new ReentrantLock(); @@ -313,7 +314,7 @@ sigllAll与sigal方法的区别体现在doSignalAll方法上,前面我们已 } } } - +``` 输出结果为: Thread-0当前条件不满足等待 @@ -325,4 +326,4 @@ sigllAll与sigal方法的区别体现在doSignalAll方法上,前面我们已 > 参考文献 -《java并发编程的艺术》 \ No newline at end of file +《java并发编程的艺术》 diff --git "a/14.\345\271\266\345\217\221\345\256\271\345\231\250\344\271\213ConcurrentHashMap(JDK 1.8\347\211\210\346\234\254)/\345\271\266\345\217\221\345\256\271\345\231\250\344\271\213ConcurrentHashMap(JDK 1.8\347\211\210\346\234\254).md" "b/14.\345\271\266\345\217\221\345\256\271\345\231\250\344\271\213ConcurrentHashMap(JDK 1.8\347\211\210\346\234\254)/\345\271\266\345\217\221\345\256\271\345\231\250\344\271\213ConcurrentHashMap(JDK 1.8\347\211\210\346\234\254).md" index fb39832..388865d 100644 --- "a/14.\345\271\266\345\217\221\345\256\271\345\231\250\344\271\213ConcurrentHashMap(JDK 1.8\347\211\210\346\234\254)/\345\271\266\345\217\221\345\256\271\345\231\250\344\271\213ConcurrentHashMap(JDK 1.8\347\211\210\346\234\254).md" +++ "b/14.\345\271\266\345\217\221\345\256\271\345\231\250\344\271\213ConcurrentHashMap(JDK 1.8\347\211\210\346\234\254)/\345\271\266\345\217\221\345\256\271\345\231\250\344\271\213ConcurrentHashMap(JDK 1.8\347\211\210\346\234\254).md" @@ -2,11 +2,11 @@ # 1.ConcurrentHashmap简介 # 在使用HashMap时在多线程情况下扩容会出现CPU接近100%的情况,因为hashmap并不是线程安全的,通常我们可以使用在java体系中古老的hashtable类,该类基本上所有的方法都采用synchronized进行线程安全的控制,可想而知,在高并发的情况下,每次只有一个线程能够获取对象监视器锁,这样的并发性能的确不令人满意。另外一种方式通过Collections的`Map synchronizedMap(Map m)`将hashmap包装成一个线程安全的map。比如SynchronzedMap的put方法源码为: - +```java public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);} } - +``` 实际上SynchronizedMap实现依然是采用synchronized独占式锁进行线程安全的并发控制的。同样,这种方案的性能也是令人不太满意的。针对这种境况,Doug Lea大师不遗余力的为我们创造了一些线程安全的并发容器,让每一个java开发人员倍感幸福。相对于hashmap来说,ConcurrentHashMap就是线程安全的map,其中**利用了锁分段的思想提高了并发度**。 @@ -40,6 +40,7 @@ volatile int sizeCtl; 4. **sun.misc.Unsafe U** 在ConcurrentHashMapde的实现中可以看到大量的U.compareAndSwapXXXX的方法去修改ConcurrentHashMap的一些属性。这些方法实际上是利用了CAS算法保证了线程安全性,这是一种乐观策略,假设每一次操作都不会产生冲突,当且仅当冲突发生的时候再去尝试。而CAS操作依赖于现代处理器指令集,通过底层**CMPXCHG**指令实现。CAS(V,O,N)核心思想为:**若当前变量实际值V与期望的旧值O相同,则表明该变量没被其他线程进行修改,因此可以安全的将新值N赋值给变量;若当前变量实际值V与期望的旧值O不相同,则表明该变量已经被其他线程做了处理,此时将新值N赋给变量操作就是不安全的,在进行重试**。而在大量的同步组件和并发容器的实现中使用CAS是通过`sun.misc.Unsafe`类实现的,该类提供了一些可以直接操控内存和线程的底层操作,可以理解为java中的“指针”。该成员变量的获取是在静态代码块中: +```java static { try { U = sun.misc.Unsafe.getUnsafe(); @@ -48,13 +49,14 @@ volatile int sizeCtl; throw new Error(e); } } - +``` > **ConcurrentHashMap中关键内部类** 1. **Node** Node类实现了Map.Entry接口,主要存放key-value对,并且具有next域 +```java static class Node implements Map.Entry { final int hash; final K key; @@ -62,12 +64,14 @@ Node类实现了Map.Entry接口,主要存放key-value对,并且具有next域 volatile Node next; ...... } +``` 另外可以看出很多属性都是用volatile进行修饰的,也就是为了保证内存可见性。 2. **TreeNode** 树节点,继承于承载数据的Node类。而红黑树的操作是针对TreeBin类的,从该类的注释也可以看出,也就是TreeBin会将TreeNode进行再一次封装 +```java ** * Nodes for use in TreeBins */ @@ -79,10 +83,12 @@ Node类实现了Map.Entry接口,主要存放key-value对,并且具有next域 boolean red; ...... } +``` 3. **TreeBin** 这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象。 +```java static final class TreeBin extends Node { TreeNode root; volatile TreeNode first; @@ -94,10 +100,12 @@ Node类实现了Map.Entry接口,主要存放key-value对,并且具有next域 static final int READER = 4; // increment value for setting read lock ...... } +``` 4. **ForwardingNode** 在扩容时才会出现的特殊节点,其key,value,hash全部为null。并拥有nextTable指针引用新的table数组。 +```java static final class ForwardingNode extends Node { final Node[] nextTable; ForwardingNode(Node[] tab) { @@ -106,7 +114,7 @@ Node类实现了Map.Entry接口,主要存放key-value对,并且具有next域 } ..... } - +``` > **CAS关键操作** @@ -115,25 +123,32 @@ Node类实现了Map.Entry接口,主要存放key-value对,并且具有next域 1. **tabAt** +```java static final Node tabAt(Node[] tab, int i) { return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } +``` + 该方法用来获取table数组中索引为i的Node元素。 2. **casTabAt** +```java static final boolean casTabAt(Node[] tab, int i, Node c, Node v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } +``` 利用CAS操作设置table数组中索引为i的元素 3. **setTabAt** +```java static final void setTabAt(Node[] tab, int i, Node v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); } +``` 该方法用来设置table数组中索引为i的元素 @@ -143,6 +158,7 @@ Node类实现了Map.Entry接口,主要存放key-value对,并且具有next域 ## 3.1 实例构造器方法 ## 在使用ConcurrentHashMap第一件事自然而然就是new 出来一个ConcurrentHashMap对象,一共提供了如下几个构造器方法: +```java // 1. 构造一个空的map,即table数组还未初始化,初始化放在第一次插入数据时,默认大小为16 ConcurrentHashMap() // 2. 给定map的大小 @@ -153,9 +169,11 @@ Node类实现了Map.Entry接口,主要存放key-value对,并且具有next域 ConcurrentHashMap(int initialCapacity, float loadFactor) // 5. 给定map大小,加载因子以及并发度(预计同时操作数据的线程) ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) +``` ConcurrentHashMap一共给我们提供了5中构造器方法,具体使用请看注释,我们来看看第2种构造器,传入指定大小时的情况,该构造器源码为: +```java public ConcurrentHashMap(int initialCapacity) { //1. 小于0直接抛异常 if (initialCapacity < 0) @@ -167,9 +185,11 @@ ConcurrentHashMap一共给我们提供了5中构造器方法,具体使用请 //3. 赋值给sizeCtl this.sizeCtl = cap; } +``` 这段代码的逻辑请看注释,很容易理解,如果小于0就直接抛出异常,如果指定值大于了所允许的最大值的话就取最大值,否则,在对指定值做进一步处理。最后将cap赋值给sizeCtl,关于sizeCtl的说明请看上面的说明,**当调用构造器方法之后,sizeCtl的大小应该就代表了ConcurrentHashMap的大小,即table数组长度**。tableSizeFor做了哪些事情了?源码为: +```java /** * Returns a power of two table size for the given desired capacity. * See Hackers Delight, sec 3.2 @@ -183,12 +203,14 @@ ConcurrentHashMap一共给我们提供了5中构造器方法,具体使用请 n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; } +``` 通过注释就很清楚了,该方法会将调用构造器方法时指定的大小转换成一个2的幂次方数,也就是说ConcurrentHashMap的大小一定是2的幂次方,比如,当指定大小为18时,为了满足2的幂次方特性,实际上concurrentHashMapd的大小为2的5次方(32)。另外,需要注意的是,**调用构造器方法的时候并未构造出table数组(可以理解为ConcurrentHashMap的数据容器),只是算出table数组的长度,当第一次向ConcurrentHashMap插入数据的时候才真正的完成初始化创建table数组的工作**。 ## 3.2 initTable方法 ## 直接上源码: +```java private final Node[] initTable() { Node[] tab; int sc; while ((tab = table) == null || tab.length == 0) { @@ -215,12 +237,14 @@ ConcurrentHashMap一共给我们提供了5中构造器方法,具体使用请 } return tab; } +``` 代码的逻辑请见注释,有可能存在一个情况是多个线程同时走到这个方法中,为了保证能够正确初始化,在第1步中会先通过if进行判断,若当前已经有一个线程正在初始化即sizeCtl值变为-1,这个时候其他线程在If判断为true从而调用Thread.yield()让出CPU时间片。正在进行初始化的线程会调用U.compareAndSwapInt方法将sizeCtl改为-1即正在初始化的状态。另外还需要注意的事情是,在第四步中会进一步计算数组中可用的大小即为数组实际大小n乘以加载因子0.75.可以看看这里乘以0.75是怎么算的,0.75为四分之三,这里`n - (n >>> 2)`是不是刚好是n-(1/4)n=(3/4)n,挺有意思的吧:)。如果选择是无参的构造器的话,这里在new Node数组的时候会使用默认大小为`DEFAULT_CAPACITY`(16),然后乘以加载因子0.75为12,也就是说数组的可用大小为12。 ## 3.3 put方法 ## 使用ConcurrentHashMap最长用的也应该是put和get方法了吧,我们先来看看put方法是怎样实现的。调用put方法时实际具体实现是putVal方法,源码如下: +```java /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); @@ -293,6 +317,7 @@ ConcurrentHashMap一共给我们提供了5中构造器方法,具体使用请 addCount(1L, binCount); return null; } +``` put方法的代码量有点长,我们按照上面的分解的步骤一步步来看。**从整体而言,为了解决线程安全的问题,ConcurrentHashMap使用了synchronzied和CAS的方式**。在之前了解过HashMap以及1.8版本之前的ConcurrenHashMap都应该知道ConcurrentHashMap结构图,为了方面下面的讲解这里先直接给出,如果对这有疑问的话,可以在网上随便搜搜即可。 @@ -310,9 +335,11 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 我们知道对于一个hash表来说,hash值分散的不够均匀的话会大大增加哈希冲突的概率,从而影响到hash表的性能。因此通过spread方法进行了一次重hash从而大大减小哈希冲突的可能性。spread方法为: +```java static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; } +``` 该方法主要是**将key的hashCode的低16位于高16位进行异或运算**,这样不仅能够使得hash值能够分散能够均匀减小hash冲突的概率,另外只用到了异或运算,在性能开销上也能兼顾,做到平衡的trade-off。 @@ -330,13 +357,15 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 如果当前节点不为null,且该节点为特殊节点(forwardingNode)的话,就说明当前concurrentHashMap正在进行扩容操作,关于扩容操作,下面会作为一个具体的方法进行讲解。那么怎样确定当前的这个Node是不是特殊的节点了?是通过判断该节点的hash值是不是等于-1(MOVED),代码为(fh = f.hash) == MOVED,对MOVED的解释在源码上也写的很清楚了: +```java static final int MOVED = -1; // hash for forwarding nodes - +``` > 5.当table[i]为链表的头结点,在链表中插入新值 在table[i]不为null并且不为forwardingNode时,并且当前Node f的hash值大于0(fh >= 0)的话说明当前节点f为当前桶的所有的节点组成的链表的头结点。那么接下来,要想向ConcurrentHashMap插入新值的话就是向这个链表插入新值。通过synchronized (f)的方式进行加锁以实现线程安全性。往链表中插入节点的部分代码为: +```java if (fh >= 0) { binCount = 1; for (Node e = f;; ++binCount) { @@ -359,6 +388,7 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 } } } +``` 这部分代码很好理解,就是两种情况:1. 在链表中如果找到了与待插入的键值对的key相同的节点,就直接覆盖即可;2. 如果直到找到了链表的末尾都没有找到的话,就直接将待插入的键值对追加到链表的末尾即可 @@ -366,6 +396,7 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 按照之前的数组+链表的设计方案,这里存在一个问题,即使负载因子和Hash算法设计的再合理,也免不了会出现拉链过长的情况,一旦出现拉链过长,甚至在极端情况下,查找一个节点会出现时间复杂度为O(n)的情况,则会严重影响ConcurrentHashMap的性能,于是,在JDK1.8版本中,对数据结构做了进一步的优化,引入了红黑树。而当链表长度太长(默认超过8)时,链表就转换为红黑树,利用红黑树快速增删改查的特点提高ConcurrentHashMap的性能,其中会用到红黑树的插入、删除、查找等算法。当table[i]为红黑树的树节点时的操作为: +```java if (f instanceof TreeBin) { Node p; binCount = 2; @@ -376,6 +407,7 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 p.val = value; } } +``` 首先在if中通过`f instanceof TreeBin`判断当前table[i]是否是树节点,这下也正好验证了我们在最上面介绍时说的TreeBin会对TreeNode做进一步封装,对红黑树进行操作的时候针对的是TreeBin而不是TreeNode。这段代码很简单,调用putTreeVal方法完成向红黑树插入新节点,同样的逻辑,**如果在红黑树中存在于待插入键值对的Key相同(hash值相等并且equals方法判断为true)的节点的话,就覆盖旧值,否则就向红黑树追加新节点**。 @@ -383,6 +415,7 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 当完成数据新节点插入之后,会进一步对当前链表大小进行调整,这部分代码为: +```java if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); @@ -390,6 +423,7 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 return oldVal; break; } +``` 很容易理解,如果当前链表节点个数大于等于8(TREEIFY_THRESHOLD)的时候,就会调用treeifyBin方法将tabel[i](第i个散列桶)拉链转换成红黑树。 @@ -411,6 +445,7 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 ## 3.4 get方法 ## 看完了put方法再来看get方法就很容易了,用逆向思维去看就好,这样存的话我反过来这么取就好了。get方法源码为: +```java public V get(Object key) { Node[] tab; Node e, p; int n, eh; K ek; // 1. 重hash @@ -434,6 +469,7 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 } return null; } +``` 代码的逻辑请看注释,首先先看当前的hash桶数组节点即table[i]是否为查找的节点,若是则直接返回;若不是,则继续再看当前是不是树节点?通过看节点的hash值是否为小于0,如果小于0则为树节点。如果是树节点在红黑树中查找节点;如果不是树节点,那就只剩下为链表的形式的一种可能性了,就向后遍历查找节点,若查找到则返回节点的value即可,若没有找到就返回null。 @@ -443,6 +479,7 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 当ConcurrentHashMap容量不足的时候,需要对table进行扩容。这个方法的基本思想跟HashMap是很像的,但是由于它是支持并发扩容的,所以要复杂的多。原因是它支持多线程进行扩容操作,而并没有加锁。我想这样做的目的不仅仅是为了满足concurrent的要求,而是希望利用并发处理去减少扩容带来的时间影响。transfer方法源码为: +```java private final void transfer(Node[] tab, Node[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) @@ -588,6 +625,7 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 } } } +``` 代码逻辑请看注释,整个扩容操作分为**两个部分**: @@ -612,6 +650,7 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 为了统计元素个数,ConcurrentHashMap定义了一些变量和一个内部类 +```java /** * A padded cell for distributing counts. Adapted from LongAdder * and Striped64. See their internal docs for explanation. @@ -638,12 +677,13 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 * Table of counter cells. When non-null, size is a power of 2. */ private transient volatile CounterCell[] counterCells; - +``` > **mappingCount与size方法** **mappingCount**与**size**方法的类似 从给出的注释来看,应该使用mappingCount代替size方法 两个方法都没有直接返回basecount 而是统计一次这个值,而这个值其实也是一个大概的数值,因此可能在统计的时候有其他线程正在执行插入或删除操作。 +```java public int size() { long n = sumCount(); return ((n < 0L) ? 0 : @@ -676,7 +716,7 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 } return sum; } - +``` @@ -684,6 +724,7 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 在put方法结尾处调用了addCount方法,把当前ConcurrentHashMap的元素个数+1这个方法一共做了两件事,更新baseCount的值,检测是否进行扩容。 +```java private final void addCount(long x, int check) { CounterCell[] as; long b, s; //利用CAS方法更新baseCount的值 @@ -726,7 +767,7 @@ put方法的代码量有点长,我们按照上面的分解的步骤一步步 } } } - +``` @@ -756,4 +797,4 @@ JDK6,7中的ConcurrentHashmap主要使用Segment来实现减小锁粒度,分 1.8版本的HashMap -[http://www.importnew.com/20386.html](http://www.importnew.com/20386.html) \ No newline at end of file +[http://www.importnew.com/20386.html](http://www.importnew.com/20386.html)