当前位置: 首页 > news >正文

网站技术开发文档模板特色产品推广方案

网站技术开发文档模板,特色产品推广方案,注册域名后怎么建站,延庆网站建设今天我们来学一下锁 会持续保持更新 欢迎追更哈 Java - 多线程 - 锁和提升 第1篇 首先强调一点#xff1a;Java多线程的锁都是基于对象的#xff0c;Java中的每一个对象都可以作为一个锁。同时#xff0c;类锁也是对象锁#xff0c;类是Class对象 Java8锁 核心思想 关键… 今天我们来学一下锁 会持续保持更新 欢迎追更哈 Java - 多线程 - 锁和提升 第1篇 首先强调一点Java多线程的锁都是基于对象的Java中的每一个对象都可以作为一个锁。同时类锁也是对象锁类是Class对象 Java8锁 核心思想 关键字在实例方法上锁为当前实例关键字在静态方法上锁为当前Class对象关键字在代码块上锁为括号里面的对象 在进行线程执行顺序的时候如果添加了线程睡眠那么就要看锁的对象是谁同一把锁 / 非同一把锁是不一样的 Synchronized synchronized 是Java提供的关键字用来保证原子性的 synchronized的作用域如下 作用在普通方法上此方法为原子方法也就是说同一个时刻只有一个线程可以进入其他线程必须在方法外等待此时锁是对象作用在静态方法上此方法为原子方法也就是说同一个时刻只有一个线程可以进入其他线程必须在方法外等待此时锁是当前的Class对象作用在代码块上此代码块是原子操作也就是说同一个时刻只有线程可以进入其他线程必须在方法外等待锁是 synchronized(XXX) 里面的 XXX 先看一段简单的代码 public class SynchronizedTest {public static void main(String[] args) {test1();test2();}// 使用synchronized修饰的方法public synchronized static void test1() {System.out.println(SynchronizedTest.test1);}// 使用synchronized修饰的代码块public static void test2() {synchronized (SynchronizedTest.class) {System.out.println(SynchronizedTest.test2);}} }执行之后对其进行执行javap -v命令反编译 // 省略啰嗦的代码 public class cn.zq.sync.SynchronizedTestminor version: 0major version: 52flags: ACC_PUBLIC, ACC_SUPER {// 源码public cn.zq.sync.SynchronizedTest();descriptor: ()Vflags: ACC_PUBLIC// main 方法public static void main(java.lang.String[]);descriptor: ([Ljava/lang/String;)Vflags: ACC_PUBLIC, ACC_STATIC// synchronized 修饰的静态方法 test1()public static synchronized void test1();descriptor: ()V// 在这里我们可以看到 flags 中有一个 ACC_SYNCHRONIZED// 这个就是一个标记符这是 保证原子性的关键// 当方法调用的时候调用指令将会检查方法的 ACC_SYNCHRONIZED 访问标记符是否被设置// 如果设置了线程将先获取 monitor获取成功之后才会执行方法体方法执行之后释放monitor// 在方法执行期间其他任何线程都无法在获得一个 monitor 对象本质上没区别。flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZEDCode:stack2, locals0, args_size00: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream;3: ldc #5 // String SynchronizedTest.test15: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V8: returnLineNumberTable:line 17: 0line 18: 8// 代码块使用的 synchronizedpublic static void test2();descriptor: ()Vflags: ACC_PUBLIC, ACC_STATICCode:stack2, locals2, args_size00: ldc #7 // class cn/zq/sync/SynchronizedTest2: dup3: astore_0// 这个 monitorenter 是一个指令// 每个对象都有一个监视器锁(monitor)当monitor被占用的时候就会处于锁定状态// 线程执行monitorenter的时候尝试获取monitor的锁。过程如下// 1.任何monitor进入数为0则线程进入并设置为1此线程就是monitor的拥有者// 2.如果线程已经占用当前线程再次进入的时候会将monitor的次数1// 3.如何其他的线程已经占用了monitor则线程进阻塞状态直到monitor的进入数为0// 4.此时其他线程才能获取当前代码块的执行权4: monitorenter5: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream;8: ldc #8 // String SynchronizedTest.test210: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V13: aload_0// 执行monitorexit这条指令的线程必须是拥有monitor的// 执行的之后monitor的进入数-1.如果为0那么线程就退出 monitor不再是此代码块的执行者// 此时再由其他的线程获得所有权// 其实 wait/notify 等方法也依赖于monitor对象// 所以只有在同步方法或者同步代码块中才可以使用否则会报错 java.lang.IllegalMonitorstateException 异常14: monitorexit15: goto 2318: astore_119: aload_020: monitorexit21: aload_122: athrow23: returnException table:from to target type5 15 18 any18 21 18 anyLineNumberTable:line 21: 0line 22: 5line 23: 13line 24: 23StackMapTable: number_of_entries 2frame_type 255 /* full_frame */offset_delta 18locals [ class java/lang/Object ]stack [ class java/lang/Throwable ]frame_type 250 /* chop */offset_delta 4 } SourceFile: SynchronizedTest.java 总结 使用synchronized修饰的同步方法 通过反编译我们可以看到被synchronized修饰的方法其中的 flags中有一个标记ACC_SYNCHRONIZED当线程执行方法的时候会先去检查是否有这样的一个标记如果有的话说明就是一个同步方法此时会为当前线程设置 monitor 获取成功之后才会去执行方法体执行完毕之后释放monitor 使用synchronized修饰的代码块 通过反编译我们看到在代码块的两侧有JVM指令在进入代码块之前指令是 monitorenter当线程执行到代码块的时候会先拿到monitor(初始值为0)然后线程将其设置为1此时当前线程独占monitor如果当前持有monitor的线程再次进入monitor则monitor的值1当其退出的时候monitor的次数-1当线程线程退出一次monitor的时候会执行monitorexit指令但是只有持有monitor的线程才能获取并执行monitorexit指令当当前线程monitor为0的时候当前线程退出持有锁此时其他线程再来争抢但是为什么要有两个 monitorexit呢 这个时候我们会发现synchronized是可重入锁其实现原理就是monitor的个数增加和减少同时wait / notify方法的执行也会依赖 monitor所以wait和notify方法必须放在同步代码块中否则会报错 java.lang.IllegalMonitorstateException因为方法区域很大所以设置一个标记现在执行完判断之后就全部锁起来而代码块不确定大小就需要细化monitor的范围 ReentrantLock ReentrantLock是Lock接口的一个实现类 在ReentrantLock内部有一个抽象静态内部类Sync 其中一个是 NonfairSync非公平锁另外一个是 FairSync 公平锁二者都实现了此抽象内部类SyncReentrantLock默认使用的是 非公平锁 我们看一下源码 public class ReentrantLock implements Lock, java.io.Serializable {// 锁的类型private final Sync sync;// 抽象静态类Sync继承了AbstractQueueSynchroniser [这个在下面进行解释]abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID -5179523762034025860L;// 抽象加锁方法abstract void lock();// 不公平的 tryLock 也就是不公平的尝试获取final boolean nonfairTryAcquire(int acquires) {// 获取当前线程对象final Thread current Thread.currentThread();// 获取线程的状态int c getState();// 根据线程的不同状态执行不同的逻辑if (c 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 获取独占模式的线程的当前锁的状态else if (current getExclusiveOwnerThread()) {// 获取新的层级大小int nextc c acquires;if (nextc 0) // overflowthrow new Error(Maximum lock count exceeded);// 设置锁的状态setState(nextc);return true;}return false;}// 尝试释放方法protected final boolean tryRelease(int releases) {int c getState() - releases;if (Thread.currentThread() ! getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free false;if (c 0) {free true;setExclusiveOwnerThread(null);}setState(c);return free;}// 返回当前线程是不是独占的protected final boolean isHeldExclusively() {return getExclusiveOwnerThread() Thread.currentThread();}// 返回 ConditionObject 对象final ConditionObject newCondition() {return new ConditionObject();}// 获得独占的线程final Thread getOwner() {return getState() 0 ? null : getExclusiveOwnerThread();}// 获得独占线程的状态final int getHoldCount() {return isHeldExclusively() ? getState() : 0;}// 判断是否是加锁的final boolean isLocked() {return getState() ! 0;}// 序列化private void readObject(java.io.ObjectInputStream s)throws java.io.IOException, ClassNotFoundException {s.defaultReadObject();setState(0); }}// 非公平锁继承了Syncstatic final class NonfairSync extends Sync {private static final long serialVersionUID 7316153563782823691L;// 加锁操作final void lock() {// 判断是不是第一次加锁 底层调用 Unsafe的compareAndSwapInt()方法if (compareAndSetState(0, 1))// 设置为独占锁setExclusiveOwnerThread(Thread.currentThread());// 如果不是第一次加锁则调用 acquire 方法在加一层锁elseacquire(1);}// 返回尝试加锁是否成功protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}// 公平锁static final class FairSync extends Sync {private static final long serialVersionUID -3000897897090466540L;// 加锁操作直接设置为1final void lock() {acquire(1);}// 尝试加锁protected final boolean tryAcquire(int acquires) {final Thread current Thread.currentThread();int c getState();if (c 0) {if (!hasQueuedPredecessors() compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current getExclusiveOwnerThread()) {int nextc c acquires;if (nextc 0)throw new Error(Maximum lock count exceeded);setState(nextc);return true;}return false;}} }Lock接口 public interface Lock {// 加锁void lock();// 不断加锁void lockInterruptibly() throws InterruptedException;// 尝试加锁boolean tryLock();// 尝试加锁具有超时时间boolean tryLock(long time, TimeUnit unit) throws InterruptedException;// 释放锁void unlock();// Condition 对象Condition newCondition(); }Condition接口 public interface Condition {// 等待void await() throws InterruptedException;// 超时等待boolean await(long time, TimeUnit unit) throws InterruptedException;// 超时纳秒等待long awaitNanos(long nanosTimeout) throws InterruptedException;// 可中断等待void awaitUninterruptibly();// 等待死亡boolean awaitUntil(Date deadline) throws InterruptedException;// 指定唤醒void signal();// 唤醒所有void signalAll(); }为什么官方提供的是非公平锁因为如果是公平锁假如一个线程需要执行很久那执行效率会大大降低 ReentrantLock的其他方法 public class ReentrantLock implements Lock, java.io.Serializable {// 锁的类型private final Sync sync;// 默认是非公平锁public ReentrantLock() {sync new NonfairSync();}// 有参构造可以设置锁的类型public ReentrantLock(boolean fair) {sync fair ? new FairSync() : new NonfairSync();}// 加锁public void lock() {sync.lock();}public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock() {return sync.nonfairTryAcquire(1);}public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}// 解锁 调用release() 因为是重入锁所以需要减少重入的层数public void unlock() {sync.release(1);}// 返回Condition对象 用来执行线程的唤醒等待等操作public Condition newCondition() {return sync.newCondition();}// 获取锁的层数public int getHoldCount() {return sync.getHoldCount();}public boolean isHeldByCurrentThread() {return sync.isHeldExclusively();}// 是否加锁public boolean isLocked() {return sync.isLocked();}// 是否是公平锁public final boolean isFair() {return sync instanceof FairSync;}// 获取独占锁protected Thread getOwner() {return sync.getOwner();}// 查询是否有任何线程正在等待获取此锁public final boolean hasQueuedThreads() {return sync.hasQueuedThreads();}// 查询给定线程是否正在等待获取此锁public final boolean hasQueuedThread(Thread thread) {return sync.isQueued(thread);}// 获取队列的长度public final int getQueueLength() {return sync.getQueueLength();}// 返回一个包含可能正在等待获取该锁的线程的集合protected CollectionThread getQueuedThreads() {return sync.getQueuedThreads();}// 判断是否等待public boolean hasWaiters(Condition condition) {if (condition null)throw new NullPointerException();if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))throw new IllegalArgumentException(not owner);return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);}// 获得等待队列的长度public int getWaitQueueLength(Condition condition) {if (condition null)throw new NullPointerException();if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))throw new IllegalArgumentException(not owner);return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);}// 获取正在等待的线程集合protected CollectionThread getWaitingThreads(Condition condition) {if (condition null)throw new NullPointerException();if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))throw new IllegalArgumentException(not owner);return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);}// toString()public String toString() {Thread o sync.getOwner();return super.toString() ((o null) ?[Unlocked] :[Locked by thread o.getName() ]);} }总结 ReentrantLock是独占锁ReentrantLock是可重入锁底层使用AbstractQueuedSynchronizer实现synchronized 和 ReentrantLock的区别 synchronized是是关键字可以作用在静态方法、普通方法、静态代码块底层使用monitor实现synchronized是内置锁是悲观锁其发生异常会中断锁所以不会发生死锁。是非中断锁ReentrantLock是类作用在方法中其比synchronized更加灵活但是必须手动加锁释放锁是乐观锁发生异常不会中断锁必须在finally中释放锁是可中断的使用Lock的读锁可以提供效率 AQS AQSAbstractQueueSynchronizer 抽象队列同步器 AQS定义了一套多线程访问共享资源的同步器框架很多同步器的实现都依赖AQS。如ReentrantLock、Semaphore、CountDownLatch … 首先看一下AQS队列的框架 它维护了一个volatile int state 代表共享资源和一个FIFO线程等待队列多线程争抢资源被阻塞的时候会先进进入此队列这里的volatile是核心。在下个部分进行讲解~ state的访问方式有三种 getState()setState()compareAndSetState() AQS定义了两种资源共享方式Exclusive独占只有一个线程可以执行如ReentrantLock和Share共享多个线程可同时执行如Semaphore、CountdownLatch 不同的自定义同步器争用共享资源的方式也不同。自定义的同步器在实现的时候只需要实现共享资源的获取和释放方式即可至于具体线程等待队列的维护如获取资源失败入队/唤醒出队AQS在顶层已经实现好了。 自定义同步器时需要实现以下方法即可 isHeldExclusively()该线程是否正在独占资源。只有用的Condition才需要去实现它tryAcquire(int)独占方式。尝试获取资源成功返回true否则返回falsetryRelease(int)独占方式。尝试释放资源成功返回true否则返回falsetryAcquireShared(int)共享方式。尝试获取资源。负数表示失败0表示成功但没有剩余可用资源正数表示成功且还有剩余资源tryReleaseShared(int)共享方式。尝试释放资源如果释放后允许唤醒后续等待节点返回true否则返回fasle 以ReentrantLock为例state初始化为0表示未锁定状态。A线程lock()时会调用tryAcquire()独占该锁然后将state1此后其他线程在调用tryAcquire()就会失败直到A线程unlock()到state为0为止其他线程才有机会获取该锁。当前在A释放锁之前A线程是可以重复获取此锁的(state)会累加。这就是可重入但是获取多少次就要释放多少次。 再和CountdownLock为例任务分为N个子线程去执行state也初始化为N(注意N要与线程的个数一致)。这N个子线程是并行执行的每个子线程执行完之后countDown一次。state会CAS-1。等到所有的子线程都执行完后(即state0)会upark()主调用线程然后主调用线程就会从await()函数返回继续剩余动作 一般来说自定义同步器要么是独占方法要么是共享方式也只需要实现tryAcquire - tryReleasetryAcquireShared - tryReleaseShared 中的一组即可但是AQS也支持自定义同步器同时实现独占锁和共享锁两种方式如ReentrantReadWriteLock AQS的源码 AbstractQueueSynchronizer 继承了 AbstractOwnableSynchronizer AbstractOwnableSynchronizer类 public abstract class AbstractOwnableSynchronizerimplements java.io.Serializable {private static final long serialVersionUID 3737899427754241961L;protected AbstractOwnableSynchronizer() { }// 独占模式当前的拥有者private transient Thread exclusiveOwnerThread;// 设置独占模式当前的拥有者protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread thread;}// 得到独占模式当前的拥有者protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;} } AbstractQueueSynchronizer类 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {private static final long serialVersionUID 7373984972572414691L;protected AbstractQueuedSynchronizer() { }// AbstractQueueSynchronizer 中的静态内部类 Node 节点static final class Node {// 指示节点正在以共享模式等待的标记static final Node SHARED new Node();// 指示节点正在以独占模式等待的标记static final Node EXCLUSIVE null;// 表示线程已经取消static final int CANCELLED 1;// 表示线程之后需要释放static final int SIGNAL -1;// 表示线程正在等待条件static final int CONDITION -2;// 指示下一个 acquireShared 应该无条件传播static final int PROPAGATE -3;// 状态标记volatile int waitStatus;// 队列的前一个节点volatile Node prev;// 队列的后一个节点volatile Node next;// 线程volatile Thread thread;// 下一个正在等待的节点Node nextWaiter;// 判断是否时共享的final boolean isShared() {return nextWaiter SHARED;}// 返回上一个节点不能为null为null抛出空指针异常final Node predecessor() throws NullPointerException {Node p prev;if (p null)throw new NullPointerException();elsereturn p;}// 构造Node() { // Used to establish initial head or SHARED marker}// 有参构造用来添加线程的队列Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter mode;this.thread thread;}// 有参构造根据等待条件使用Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus waitStatus;this.thread thread;}}// 头节点private transient volatile Node head;// 尾节点private transient volatile Node tail;// 状态private volatile int state;// 获取当前的状态protected final int getState() {return state;}//设置当前的状态protected final void setState(int newState) {state newState;}// 比较设置当前的状态protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}// 纳秒数使之更快的旋转static final long spinForTimeoutThreshold 1000L;// 将节点插入队列private Node enq(final Node node) {for (;;) {Node t tail;if (t null) { // Must initializeif (compareAndSetHead(new Node()))tail head;} else {node.prev t;if (compareAndSetTail(t, node)) {t.next node;return t;}}}}// 加一个等待节点private Node addWaiter(Node mode) {Node node new Node(Thread.currentThread(), mode);Node pred tail;if (pred ! null) {node.prev pred;if (compareAndSetTail(pred, node)) {pred.next node;return node;}}enq(node);return node;}// 设置头节点private void setHead(Node node) {head node;node.thread null;node.prev null;}// 如果存在后继节点就唤醒private void unparkSuccessor(Node node) {// 获得节点的状态int ws node.waitStatus;// 如果为负数就执行比较并设置方法设置状态if (ws 0)compareAndSetWaitStatus(node, ws, 0);// 唤醒后面的节点Node s node.next;if (s null || s.waitStatus 0) {s null;for (Node t tail; t ! null t ! node; t t.prev)if (t.waitStatus 0)s t;}if (s ! null)LockSupport.unpark(s.thread);}// 共享模式的释放动作并且向后继节点发出信号private void doReleaseShared() {for (;;) {Node h head;if (h ! null h ! tail) {int ws h.waitStatus;if (ws Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws 0 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h head) // loop if head changedbreak;}}// 设置队列的头并检查后继者能否在共享模式下等待如果可以就是否传播设置为0或者propagate状态private void setHeadAndPropagate(Node node, int propagate) {Node h head; // Record old head for check belowsetHead(node);if (propagate 0 || h null || h.waitStatus 0 ||(h head) null || h.waitStatus 0) {Node s node.next;if (s null || s.isShared())doReleaseShared();}}// 取消正在进行的尝试private void cancelAcquire(Node node) {// 节点为null直接返回if (node null)return;node.thread null;// 跳过已经取消的前一个节点Node pred node.prev;while (pred.waitStatus 0)node.prev pred pred.prev;Node predNext pred.next;node.waitStatus Node.CANCELLED;if (node tail compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {int ws;if (pred ! head ((ws pred.waitStatus) Node.SIGNAL ||(ws 0 compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) pred.thread ! null) {Node next node.next;if (next ! null next.waitStatus 0)compareAndSetNext(pred, predNext, next);} else {unparkSuccessor(node);}node.next node; // help GC}}// 还有好多方法... 其实本质就是基于 队列的判断和操作AQS提供了独占锁和共享锁的设计// 在AQS中使用到了Unsafe类所以AQS其实就是基于CAS算法的// AQS的一些方法就是直接调用 Unsafe 的方法 如下所示private static final Unsafe unsafe Unsafe.getUnsafe();private static final long stateOffset;private static final long headOffset;private static final long tailOffset;private static final long waitStatusOffset;private static final long nextOffset;static {try {stateOffset unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField(state));headOffset unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField(head));tailOffset unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField(tail));waitStatusOffset unsafe.objectFieldOffset(Node.class.getDeclaredField(waitStatus));nextOffset unsafe.objectFieldOffset(Node.class.getDeclaredField(next));} catch (Exception ex) { throw new Error(ex); }}// 比较并设置头private final boolean compareAndSetHead(Node update) {return unsafe.compareAndSwapObject(this, headOffset, null, update);}// 比较并设置尾private final boolean compareAndSetTail(Node expect, Node update) {return unsafe.compareAndSwapObject(this, tailOffset, expect, update);}// 比较并设置状态private static final boolean compareAndSetWaitStatus(Node node,int expect,int update) {return unsafe.compareAndSwapInt(node, waitStatusOffset,expect, update);}// 比较并设置下一个节点private static final boolean compareAndSetNext(Node node,Node expect,Node update) {return unsafe.compareAndSwapObject(node, nextOffset, expect, update);}// 除此之外 AQS 还有一个实现了Condition的类 如下public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID 1173984872572414699L;// 条件队列的第一个节点private transient Node firstWaiter;// 条件队列的最后一个节点private transient Node lastWaiter;public ConditionObject() { }// 在等待队列中添加一个新的节点private Node addConditionWaiter() {// 获取最后一个节点Node t lastWaiter;// 如果最后一个节点被取消了就清除它if (t ! null t.waitStatus ! Node.CONDITION) {unlinkCancelledWaiters();t lastWaiter;}Node node new Node(Thread.currentThread(), Node.CONDITION);if (t null)firstWaiter node;elset.nextWaiter node;lastWaiter node;return node;}// 删除并转移节点直到它没有取消或者不为nullprivate void doSignal(Node first) {do {if ( (firstWaiter first.nextWaiter) null)lastWaiter null;first.nextWaiter null;} while (!transferForSignal(first) (first firstWaiter) ! null);}// 删除所有的节点private void doSignalAll(Node first) {lastWaiter firstWaiter null;do {Node next first.nextWaiter;first.nextWaiter null;transferForSignal(first);first next;} while (first ! null);}// 取消节点的连接private void unlinkCancelledWaiters() {Node t firstWaiter;Node trail null;while (t ! null) {Node next t.nextWaiter;if (t.waitStatus ! Node.CONDITION) {t.nextWaiter null;if (trail null)firstWaiter next;elsetrail.nextWaiter next;if (next null)lastWaiter trail;}elsetrail t;t next;}}// 将等待最长的线程唤醒public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first firstWaiter;if (first ! null)doSignal(first);}// 唤醒所有的等待线程public final void signalAll() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first firstWaiter;if (first ! null)doSignalAll(first);}// 实现不间断的条件等待public final void awaitUninterruptibly() {Node node addConditionWaiter();int savedState fullyRelease(node);boolean interrupted false;while (!isOnSyncQueue(node)) {LockSupport.park(this);if (Thread.interrupted())interrupted true;}if (acquireQueued(node, savedState) || interrupted)selfInterrupt();}// 模式意味着在退出等待时重新中断private static final int REINTERRUPT 1;// 模式的含义是在退出等待时抛出InterruptedException异常private static final int THROW_IE -1;// 检查中断如果在信号通知之前被中断则返回THROW_IE// 如果在信号通知之后则返回REINTERRUPT如果未被中断则返回 0private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;}// 抛出InterruptedException重新中断当前线程// 或不执行任何操作具体取决于模式。private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode THROW_IE)throw new InterruptedException();else if (interruptMode REINTERRUPT)selfInterrupt();}// 实现不可中断的条件等待public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node addConditionWaiter();int savedState fullyRelease(node);int interruptMode 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode checkInterruptWhileWaiting(node)) ! 0)break;}if (acquireQueued(node, savedState) interruptMode ! THROW_IE)interruptMode REINTERRUPT;if (node.nextWaiter ! null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode ! 0)reportInterruptAfterWait(interruptMode);}// 纳秒级别的等待public final long awaitNanos(long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node addConditionWaiter();int savedState fullyRelease(node);final long deadline System.nanoTime() nanosTimeout;int interruptMode 0;while (!isOnSyncQueue(node)) {if (nanosTimeout 0L) {transferAfterCancelledWait(node);break;}if (nanosTimeout spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if ((interruptMode checkInterruptWhileWaiting(node)) ! 0)break;nanosTimeout deadline - System.nanoTime();}if (acquireQueued(node, savedState) interruptMode ! THROW_IE)interruptMode REINTERRUPT;if (node.nextWaiter ! null)unlinkCancelledWaiters();if (interruptMode ! 0)reportInterruptAfterWait(interruptMode);return deadline - System.nanoTime();}// 绝对定时等待public final boolean awaitUntil(Date deadline)throws InterruptedException {long abstime deadline.getTime();if (Thread.interrupted())throw new InterruptedException();Node node addConditionWaiter();int savedState fullyRelease(node);boolean timedout false;int interruptMode 0;while (!isOnSyncQueue(node)) {if (System.currentTimeMillis() abstime) {timedout transferAfterCancelledWait(node);break;}LockSupport.parkUntil(this, abstime);if ((interruptMode checkInterruptWhileWaiting(node)) ! 0)break;}if (acquireQueued(node, savedState) interruptMode ! THROW_IE)interruptMode REINTERRUPT;if (node.nextWaiter ! null)unlinkCancelledWaiters();if (interruptMode ! 0)reportInterruptAfterWait(interruptMode);return !timedout;}// 超时等待public final boolean await(long time, TimeUnit unit)throws InterruptedException {long nanosTimeout unit.toNanos(time);if (Thread.interrupted())throw new InterruptedException();Node node addConditionWaiter();int savedState fullyRelease(node);final long deadline System.nanoTime() nanosTimeout;boolean timedout false;int interruptMode 0;while (!isOnSyncQueue(node)) {if (nanosTimeout 0L) {timedout transferAfterCancelledWait(node);break;}if (nanosTimeout spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if ((interruptMode checkInterruptWhileWaiting(node)) ! 0)break;nanosTimeout deadline - System.nanoTime();}if (acquireQueued(node, savedState) interruptMode ! THROW_IE)interruptMode REINTERRUPT;if (node.nextWaiter ! null)unlinkCancelledWaiters();if (interruptMode ! 0)reportInterruptAfterWait(interruptMode);return !timedout;}// 判断是不是独占的final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {return sync AbstractQueuedSynchronizer.this;}// 返回是否有正在等待的protected final boolean hasWaiters() {if (!isHeldExclusively())throw new IllegalMonitorStateException();for (Node w firstWaiter; w ! null; w w.nextWaiter) {if (w.waitStatus Node.CONDITION)return true;}return false;}// 获得等待队列的长度protected final int getWaitQueueLength() {if (!isHeldExclusively())throw new IllegalMonitorStateException();int n 0;for (Node w firstWaiter; w ! null; w w.nextWaiter) {if (w.waitStatus Node.CONDITION)n;}return n;}// 获取所有正在等待的线程集合protected final CollectionThread getWaitingThreads() {if (!isHeldExclusively())throw new IllegalMonitorStateException();ArrayListThread list new ArrayListThread();for (Node w firstWaiter; w ! null; w w.nextWaiter) {if (w.waitStatus Node.CONDITION) {Thread t w.thread;if (t ! null)list.add(t);}}return list;}} } 总结 AQS为我们提供了很多实现。AQS内部有两个内部类ConditionObject和Node节点 和开头说的一样其维护了一个state和一个队列也提供了独占和共享的实现 总结一下流程 调用自定义同步器的tryAcquire()尝试直接去获取资源如果成功就直接返回没成功则addWaiter()将该线程加入等待队列的尾部并标记为独占模式acquireQueued()使得线程在队列中休息有机会轮到自己会被unpark()会去尝试获取资源。获取到资源之后才会返回。如果在整个等待过程中被中断过就返回true否则返回false如果线程在等待过程中被中断过它不是响应的。只是获取资源之后才再进行自我中断selfInterrupt()将中断补上 release() 是独占模式下线程共享资源的底层入口它会释放指定量的资源如果彻底释放了(state 0) 如果获取锁的线程在release时异常了没有unpark队列中的其他结点这时队列中的其他结点会怎么办是不是没法再被唤醒了 这时队列中等待锁的线程将永远处于park状态无法再被唤醒 获取锁的线程在什么情形下会release抛出异常呢 ? 线程突然死掉了可以通过thread.stop来停止线程的执行但该函数的执行条件要严苛的多而且函数注明是非线程安全的已经标明Deprecated线程被interupt了线程在运行态是不响应中断的所以也不会抛出异常 acquireShared()的流程 tryAcquireShared()尝试获取资源成功则直接返回失败则通过doAcquireShared()进入等待队列park()直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。 releaseShared() 释放掉资源之后唤醒和后继 不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可至于具体线程等待队列的维护如获取资源失败入队/唤醒出队等AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法 isHeldExclusively()该线程是否正在独占资源。只有用到condition才需要去实现它。tryAcquire(int)独占方式。尝试获取资源成功则返回true失败则返回false。tryRelease(int)独占方式。尝试释放资源成功则返回true失败则返回false。tryAcquireShared(int)共享方式。尝试获取资源。负数表示失败0表示成功但没有剩余可用资源正数表示成功且有剩余资源。tryReleaseShared(int)共享方式。尝试释放资源如果释放后允许唤醒后续等待结点返回true否则返回false。 volatile volatile是Java提供的关键字是轻量级的同步机制 JSR133提出Java5增强了语义 volatile关键字有三个重要的特点 保证内存可见性不保证原子性禁止指令重排序 提到volatile就要提到JMM - 什么是JMM JMMJava Memory Model 本身就是一种抽象的概念并不真实存在它描述的是一组规范和规则通过这种规则定义了程序的各个变量(包括实例字段、静态字段、和构造数组对象的元素)的访问方式 JMM关于同步的规定 线程解锁前必须把共享变量的值刷新到主内存线程加锁前必须读取主内存的最新的值到自己的工作内存加锁和解锁必须是同一把锁 happens-before 规则 前一个操作对下一个操作是完全可见的如果下一个操作对下下一个操作完全可见那么前一个操作也对下下个操作可见 重排序 JVM对指令的执行会进行优化重新排序可以发生在编译重排序、CPU重排序 什么是内存屏障 内存屏障分为2种 读屏障LoadBarrier写屏障Store Barrier 内存屏障的作用 阻止屏障两侧的指令重排序强制把缓冲区 / 高速缓存中的脏数据写回主内存或者让缓存中相应的的数据失效 编译器生成字节码的时候会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。编译器选择了一个比较保守的JMM内存屏障插入策略这样就可以保证在任何处理器平台任何程序中都有正确的volatile语义 在每个volatile写操作之前插入一个StoreStore屏障在每个volatile写操作之后入一个StoreLoad屏障在每个volatile读操作之前插入一个LoadLoad屏障在每个volatile读操作之前插入一个LoadStore屏障 原子性 问i为什么不是线程安全的 因为 i 不是原子操作i有三个操作 如何解决 使用 synchronized使用AtomicInteger [JUC下的原子类] 有序性 计算机在执行程序的时候为了提高性能编译器和处理器通常会对指令重排序一般分为3种- 源代码 - 编译器优化的重排 - 指令并行的重排 - 内存系统的重排 - 最终执行的指令单线程环境里面确保程序最终执行结果和代码顺序执行的结果一致处理器在执行重排序之前必须考虑指令之间的数据依赖性多线程环境种线程交替执行由于编译器优化重排序的存在两个线程中使用的变量能否保证一致性是无法确定的结果无法预测 指令重排序 多线程环境种线程交替执行由于编译器优化重排序的存在两个线程中使用的变量能否保证一致性是无法确定的结果无法预测此时使用volatile禁用指令重排序就可以解决这个问题 volatile的使用 单例设计模式中的 安全的双重检查锁 volatile的底层实现 根据JMM所有线程拿到的都是主内存的副本然后存储到各自线程的空间当某一线程修改之后立即修改主内存然后主内存通知其他线程修改Java代码 instance new Singleton();//instance 是 volatile 变量 汇编代码:0x01a3de1d: movb $0x0,0x1104800(%esi);0x01a3de24: lock addl $0x0,(%esp); 有 volatile 变量修饰的共享变量进行写操作的时候会多第二行汇编代码通过查 IA-32 架构软件开发者手册可知lock 前缀的指令在多核处理器下会引发了两件事情。将当前处理器缓存行的数据会写回到系统内存。这个写回内存的操作会引起在其他 CPU 里缓存了该内存地址的数据无效。如果对声明了volatile变量进行写操作JVM就会向处理器发送一条Lock前缀的指令将这个变量所在缓存行的数据写回到系统内存。但是就算写回到内存如果其他处理器缓存的值还是旧的再执行计算操作就会有问题所以在多处理器下为了保证各个处理器的缓存是一致的就会实现缓存一致性协议每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了当处理器发现自己缓存行对应的内存地址被修改就会将当前处理器的缓存行设置成无效状态当处理器要对这个数据进行修改操作的时候会强制重新从系统内存里把数据读到处理器缓存里。 自旋锁 自旋锁的其他种类 CAS 自旋锁 CASCompare And Swap比较并替换是线程并发运行时用到的一种技术 CAS是原子操作保证并发安全而不能保证并发同步 CAS是CPU的一个指令需要JNI调用Native方法才能调用CPU的指令 CAS是非阻塞的、轻量级的乐观锁 我们可以实现通过手写代码完成CAS自旋锁 CAS包括三个操作数 内存位置 - V期望值- A新值 - B 如果内存位置的值与期望值匹配那么处理器会自动将该位置的值设置为新值否则不做改变。无论是哪种情况都会在CAS指令之前返回该位置的值。 public class Demo {volatile static int count 0;public static void request() throws Exception {TimeUnit.MILLISECONDS.sleep(5);// 表示期望值int expectedCount;while (!compareAndSwap(expectedCount getCount(), expectedCount 1)) {}}public static synchronized boolean compareAndSwap(int expectedCount, int newValue) {if (expectedCount getCount()) {count newValue;return true;}return false;}public static int getCount() {return count;}public static void main(String[] args) throws Exception {long start System.currentTimeMillis();int threadSize 100;CountDownLatch countDownLatch new CountDownLatch(threadSize);for (int i 0; i threadSize; i) {new Thread(() - {try {for (int j 0; j 10; j) {request();}} catch (Exception e) {e.printStackTrace();} finally {countDownLatch.countDown();}}).start();}countDownLatch.await();long end System.currentTimeMillis();System.out.println(count : count 耗时 (end - start));} } 上述是我们自己书写的CAS自旋锁但是JDK已经提供了响应的方法 Java提供了 CAS 的支持在 sun.misc.Unsafe 类中如下 public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);参数说明 var1表示要操作的对象var2表示要操作对象中属性地址的偏移量var4表示需要修改数据的期望的值var5表示需要修改为的新值 CAS的实现原理 CAS通过调用JNI的代码实现JNIJava Native Interface 允许Java调用其他语言 而CompareAndSwapXxx系列的方法就是借助“C语言”CPU底层指令实现的 以常用的 Inter x86来说最后映射到CPU的指令为“cmpxchg”这个是一个原子指令CPU执行此命令的时候实现比较并替换的操作 cmpxchg 如何保证多核心下的线程安全 系统底层进行CAS操作的时候会判断当前操作系统是否为多核心如果是就给“总线”加锁只有一个线程对总线加锁保证只有一个线程进行操作加锁之后会执行CAS操作也就是说CAS的原子性是平台级别的 CAS这么强有没有什么问题 高并发情况下CAS会一直重试会损耗性能 CAS的ABA问题 CAS需要在操作值得时候检查下值有没有变化如果没有发生变化就更新但是如果原来一个值为A经过一轮操作之后变成了B然后又是一轮操作又变成了A此时这个位置有没有发生改变改变了的因为不是一直是A这就是ABA问题 如何解决ABA问题 解决ABA问题就是给值增加一个修改版本号每次值的变化都会修改它的版本号CAS在操作的时候都会去对比此版本号。 下面给出一个ABA的案例 public class CasAbaDemo {public static AtomicInteger a new AtomicInteger(1);public static void main(String[] args) {Thread main new Thread(() - {System.out.println(CasAbaDemo.main Thread.currentThread().getName() ,初始值 a.get());try {int executedNum a.get();int newNum executedNum 1;TimeUnit.SECONDS.sleep(3);boolean isCasSuccess a.compareAndSet(executedNum, newNum);System.out.println(Thread.currentThread().getName() ,CAS 操作: isCasSuccess);} catch (InterruptedException e) {e.printStackTrace();}}, 主线程);Thread thread new Thread(() - {try {TimeUnit.SECONDS.sleep(2);a.incrementAndGet();System.out.println(Thread.currentThread().getName() ,incrementAndGet,之后 a.get());a.decrementAndGet();System.out.println(Thread.currentThread().getName() ,decrementAndGet,之后 a.get());} catch (Exception e) {e.printStackTrace();}}, 干扰线程);main.start();thread.start();} } Java中ABA解决办法(AtomicStampedReference) AtomicStampedReference 主要包含一个引用对象以及一个自动更新的整数 “stamp”的pair对象来解决ABA问题 public class AtomicStampedReferenceV {private static class PairT {// 数据引用final T reference;// 版本号final int stamp;private Pair(T reference, int stamp) {this.reference reference;this.stamp stamp;}static T PairT of(T reference, int stamp) {return new PairT(reference, stamp);}}private volatile PairV pair;/*** 期望引用* param expectedReference the expected value of the reference* 新值引用* param newReference the new value for the reference* 期望引用的版本号* param expectedStamp the expected value of the stamp* 新值的版本号* param newStamp the new value for the stamp* return {code true} if successful*/public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp) {PairV current pair;return// 期望引用与当前引用一致expectedReference current.reference // 期望版本与当前版本一致expectedStamp current.stamp // 数据一致((newReference current.reference newStamp current.stamp) ||// 数据不一致casPair(current, Pair.of(newReference, newStamp)));} }修改之后完成ABA问题 public class CasAbaDemo02 {public static AtomicStampedReferenceInteger a new AtomicStampedReference(new Integer(1), 1);public static void main(String[] args) {Thread main new Thread(() - {System.out.println(CasAbaDemo.main Thread.currentThread().getName() ,初始值 a.getReference());try {Integer executedReference a.getReference();Integer newReference executedReference 1;Integer expectStamp a.getStamp();Integer newStamp expectStamp 1;TimeUnit.SECONDS.sleep(3);boolean isCasSuccess a.compareAndSet(executedReference, newReference, expectStamp, newStamp);System.out.println(Thread.currentThread().getName() ,CAS 操作: isCasSuccess);} catch (InterruptedException e) {e.printStackTrace();}}, 主线程);Thread thread new Thread(() - {try {TimeUnit.SECONDS.sleep(2);a.compareAndSet(a.getReference(), a.getReference() 1, a.getStamp(), a.getStamp() 1);System.out.println(Thread.currentThread().getName() ,incrementAndGet,之后 a.getReference());a.compareAndSet(a.getReference(), a.getReference() - 1, a.getStamp(), a.getStamp() - 1);System.out.println(Thread.currentThread().getName() ,decrementAndGet,之后 a.getReference());} catch (Exception e) {e.printStackTrace();}}, 干扰线程);main.start();thread.start();} }
http://www.w-s-a.com/news/452349/

相关文章:

  • 网站建设合同封面模板做代炼的网站
  • 外贸网站建站要多少钱南昌优化排名推广
  • 做公司网站的尺寸一般是多大企业管理网站
  • 苏州网站设计公司兴田德润i简介做签证宾馆订单用啥网站
  • 网站页面设计工具做网站租空间
  • 做智能网站系统百度提交入口
  • 网站建设代理商电话网站规划和建设方案
  • 双桥区网站制作seo 首页
  • 电子商务网站建设前期准备wordpress域名指向二级目录
  • 汕头建站网站模板淮北做网站电话
  • 手机做logo用什么网站服务器安全防护
  • 课程分销的网站怎么做北京企业网站建设方案
  • 吴兴区建设局网站湖北企业网站建设
  • 网页与网站的区别是什么2023年8月份新冠
  • 唐山网站建设外包公司安卓手机怎么搭建网页
  • 国内做网站最大的公司计量检测网站平台建设方案
  • 重庆沛宣网站建设网页制作初学者
  • php网站漂浮广告代码网络营销跟网站推广有啥区别
  • wordpress调用图片优化型网站建设的基本要求
  • 郑州模板网站建设策划公司做网站怎么赚钱滑县电
  • 东昌府聊城网站优化秦皇岛市妇幼保健院
  • 做网站能赚钱吗网页升级访问通知天天更新
  • 做网站使用什么软件的免费招聘网
  • 宁波网站建设公司推荐哪家淄博网站制作公司服务
  • 做网站网页挣钱不免费主题wordpress
  • 如何提高你的网站的粘性手机网站整站模板下载
  • 学校网站建设制度网站相关推荐怎么做
  • 昌图网站wordpress 视频外链
  • 企业网站要怎么建设重庆住房城乡建设部网站
  • html5网站特点seo教程培训班