JUC
线程创建方式:
- 第一种是通过继承 Thread 类并重写其 run() 方法来创建线程。在run() 方法中定义线程需要执行的任务逻辑,然后,创建该类的实例,调用 start() 方法启动线程,start() 方法会自动调用 run() 方法中的代码逻辑。这种方式简单直观,但由于 Java 不支持多重继承,因此限制了类的扩展性。
- 第二种是实现 Runnable 接口并将其传递给 Thread 构造器来创建线程。Runnable 是一个函数式接口,其中的 run() 方法定义了任务逻辑。这种方式更加灵活,因为它不占用类的继承关系,同时可以更好地支持资源共享,可以让多个线程共享同一个 Runnable 实例。这种方式适用于需要解耦任务逻辑与线程管理的场景。
- 第三种是通过实现 Callable 接口来创建有返回值的线程。Callable 接口类似于 Runnable,但它可以返回结果并抛出异常。Callable 的 call() 方法需要通过 FutureTask 包装后传递给 Thread 构造器。通过 Future 对象可以获取线程执行的结果或捕获异常。这种方式适用于需要获取线程执行结果或处理复杂任务的场景。
- 第四种是通过 Executor 框架创建线程池来管理线程。Executor 框架提供了更高级的线程管理功能,例如线程复用、任务调度等。通过 submit() 或 execute() 方法提交任务,避免频繁创建和销毁线程的开销。它作为最常被使用的方式,广泛用于需要高效管理大量线程的场景。
线程的生命周期:
- NEW: 初始状态,线程被创建出来但没有被调用
start()
。 - RUNNABLE: 运行状态,线程被调用了
start()
等待运行的状态。 - BLOCKED:阻塞状态,需要等待锁释放。
- WAITING:等待状态,表示该线程需要等待其他线程做出一些特定动作(通知或中断)。
- TIME_WAITING:超时等待状态,可以在指定的时间后自行返回而不是像 WAITING 那样一直等待。
- TERMINATED:终止状态,表示该线程已经运行完毕。
- 当线程执行
wait()
方法之后,线程进入 WAITING(等待) 状态。进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态。 - TIMED_WAITING(超时等待) 状态相当于在等待状态的基础上增加了超时限制,比如通过
sleep(long millis)
方法或wait(long millis)
方法可以将线程置于 TIMED_WAITING 状态。当超时时间结束后,线程将会返回到 RUNNABLE 状态。 - 当线程进入
synchronized
方法/块或者调用wait
后(被notify
)重新进入synchronized
方法/块,但是锁被其它线程占有,这个时候线程就会进入 BLOCKED(阻塞) 状态。 - 线程在执行完了
run()
方法之后将会进入到 TERMINATED(终止) 状态。
线程的上下文切换:
什么时线程的上下文切换?
它是指当 CPU 从一个线程切换到另一个线程时,操作系统需要保存当前线程的执行状态,并加载下一个线程的执行状态,以便它们能够正确地继续运行。执行状态主要包括:寄存器状态、程序计数器(PC)、栈信息、线程的优先级等。
发生时机
- 主动让出 CPU,比如调用了
sleep()
,wait()
等。 - 时间片用完,因为操作系统要防止一个线程或者进程长时间占用 CPU 导致其他线程或者进程饿死。
- 调用了阻塞类型的系统中断,比如请求 IO,线程被阻塞。
- 被终止或结束运行
线程上下文切换的过程
- 第一步是保存当前线程的上下文,将当前线程的寄存器状态、程序计数器、栈信息等保存到内存中。
- 第二步是根据线程调度算法,如:时间片轮转、优先级调度等,选择下一个要运行的线程。
- 第三步是加载下一个线程的上下文,从内存中恢复所选线程的寄存器状态、程序计数器和栈信息。
- 第四步是 CPU 开始执行被加载的线程的代码。
Thread的sleep()和Object的wait()对比
共同点:两者都可以暂停线程的执行。
区别:
sleep()
方法没有释放锁,而wait()
方法释放了锁 。wait()
通常被用于线程间交互/通信,sleep()
通常被用于暂停执行。wait()
方法被调用后,线程不会自动苏醒,需要别的线程调用同一个对象上的notify()
或者notifyAll()
方法。sleep()
方法执行完成后,线程会自动苏醒,或者也可以使用wait(long timeout)
超时后线程会自动苏醒。sleep()
是Thread
类的静态本地方法,wait()
则是Object
类的本地方法。
并发和并行的区别:
- 并发指的是多个任务在同一时间段内交替执行的能力。换句话说,并发并不一定要求任务同时进行,而是通过快速切换任务来实现“看起来同时运行”的效果。
- 并行指的是多个任务在同一时刻真正同时执行的能力。并行通常需要多核 CPU 的支持,每个核心独立处理一个任务,从而实现真正的并行计算。
同步和异步
- 同步指的是任务按照顺序依次执行的方式。在这种模式下,调用者会阻塞等待任务完成并返回结果后,才会继续执行后续的操作。
- 异步指的是任务无需等待立即返回,调用方可以继续执行其他操作,而任务的结果会在稍后通过如回调函数、事件通知或 Future 对象等机制传递给调用方。
什么是线程死锁:
它是指两个或多个线程在执行过程中,因为争夺资源而相互等待对方释放资源,从而导致所有相关线程都无法继续执行的情况。例如,线程 A 持有资源 1 并等待资源 2,而线程 B 持有资源 2 并等待资源 1,这样两个线程就会陷入互相等待的状态,形成死锁。
线程死锁的产生需要同时满足以下四个必要条件:
- 第一个是互斥条件,资源只能被一个线程占用,其他线程必须等待资源释放后才能使用。
- 第二个是请求与保持条件:一个线程因请求资源而阻塞时,对已获得的资源保持不放。
- 第三个是不可剥夺条件,线程持有的资源不能被强制剥夺,只有线程自己可以释放资源。
- 第四个是循环等待条件,存在一组线程形成循环等待,每个线程都在等待下一个线程所占有的资源。
如何检测死锁:
- 使用
jmap
、jstack
等命令查看 JVM 线程栈和堆内存的情况。如果有死锁,jstack
的输出中通常会有Found one Java-level deadlock:
的字样,后面会跟着死锁相关的线程信息。另外,实际项目中还可以搭配使用top
、df
、free
等命令查看操作系统的基本情况,出现死锁可能会导致 CPU、内存等资源消耗过高。 - 采用 VisualVM、JConsole 等工具进行排查。
JMM
Java 内存模型(Java Memory Model,JMM)定义了 Java 程序中的变量、线程如何和主存以及工作内存进行交互的规则。它主要涉及到多线程环境下的共享变量可见性、指令重排等问题,是理解并发编程中的关键概念。
Java 线程之间的通信由 Java 内存模型(简称 JMM)控制,从抽象的角度来说,JMM 定义了线程和主存之间的抽象关系。JMM 的抽象示意图如图所示:
注意,根据 JMM 的规定,线程对共享变量的所有操作都必须在自己的本地内存中进行,不能直接从主存中读取。
如何保证内容可见性
那么怎么知道这个共享变量的被其他线程更新了呢?这就是 JMM 的功劳了,也是 JMM 存在的必要性之一。JMM 通过控制主存与每个线程的本地内存之间的交互,来提供内存可见性保证。
Java 中的volatile关键字可以保证多线程操作共享变量的可见性以及禁止指令重排序,synchronized关键字不仅保证可见性,同时也保证了原子性(互斥性)。
在更底层,JMM 通过内存屏障来实现内存的可见性以及禁止重排序。为了程序员更方便地理解,设计者提出了 happens-before 的概念。(下面会讲)
JMM和happens-before
一方面,我们开发者需要 JMM 提供一个强大的内存模型来编写代码;另一方面,编译器和处理器希望 JMM 对它们的束缚越少越好,这样它们就可以尽可能多的做优化来提高性能,希望的是一个弱的内存模型。
JMM 考虑了这两种需求,并且找到了平衡点,对编译器和处理器来说,只要不改变程序的执行结果(单线程程序和正确同步了的多线程程序),编译器和处理器怎么优化都行。
happens-before 关系的定义如下:
- 如果一个操作 happens-before 另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。
- 两个操作之间存在 happens-before 关系,并不意味着 Java 平台的具体实现必须要按照 happens-before 关系指定的顺序来执行。如果重排序之后的执行结果,与按 happens-before 关系来执行的结果一致,那么 JMM 也允许这样的重排序。
Volatile关键字
- 在 Java 中,
volatile
关键字可以保证变量的可见性,如果我们将变量声明为volatile
,这就指示 JVM,这个变量是共享且不稳定的,每次使用它都到主存中进行读取。 volatile
关键字其实并非是 Java 语言特有的,在 C 语言里也有,它最原始的意义就是禁用 CPU 缓存。如果我们将一个变量使用volatile
修饰,这就指示 编译器,这个变量是共享且不稳定的,每次使用它都到主存中进行读取。volatile
关键字能保证数据的可见性,但不能保证数据的原子性。synchronized
关键字两者都能保证。
如何禁止指令重排序:
在 Java 中,volatile
关键字除了可以保证变量的可见性,还有一个重要的作用就是防止 JVM 的指令重排序。 如果我们将变量声明为 volatile
,在对这个变量进行读写操作的时候,会通过插入特定的 内存屏障 的方式来禁止指令重排序。
*示例:
面试中面试官经常会说:“单例模式了解吗?来给我手写一下!给我解释一下双重检验锁方式实现单例模式的原理呗!”
双重校验锁实现对象单例(线程安全):
public class Singleton {
private volatile static Singleton uniqueInstance;
private Singleton() {
}
public static Singleton getUniqueInstance() {
//先判断对象是否已经实例过,没有实例化过才进入加锁代码
if (uniqueInstance == null) {
//类对象加锁
synchronized (Singleton.class) {
//第二次判断是为了防止多线程情况下多次创建实例,违反单例模式
if (uniqueInstance == null) {
uniqueInstance = new Singleton();
}
}
}
return uniqueInstance;
}
}
uniqueInstance
采用 volatile
关键字修饰也是很有必要的, uniqueInstance = new Singleton();
这段代码其实是分为三步执行:
- 1.为
uniqueInstance
分配内存空间 - 2.初始化
uniqueInstance
- 3.将
uniqueInstance
指向分配的内存地址
但是由于 JVM 具有指令重排的特性,执行顺序有可能变成 1->3->2。指令重排在单线程环境下不会出现问题,但是在多线程环境下会导致一个线程获得还没有初始化的实例。例如,线程 T1 执行了 1 和 3,此时 T2 调用 getUniqueInstance
() 后发现 uniqueInstance
不为空,因此返回 uniqueInstance
,但此时 uniqueInstance
还未被初始化。
CAS
-
CAS 的全称是 Compare And Swap(比较与交换) ,用于实现乐观锁,被广泛应用于各大框架中。CAS 的思想很简单,就是用一个预期值和要更新的变量值进行比较,两值相等才会进行更新。
-
CAS 是一个原子操作,底层依赖于一条 CPU 的原子指令。
-
CAS 涉及到三个操作数:
- V:要更新的变量值(Var)
- E:预期值(Expected)
- N:拟写入的新值(New)
当且仅当 V 的值等于 E 时,CAS 通过原子方式用新值 N 来更新 V 的值。如果不等,说明已经有其它线程更新了 V,则当前线程放弃更新。
-
AtomicInteger源码中:在
compareAndSwapInt
操作失败时,会不断重试直到成功。也就是说,getAndAddInt
方法会通过compareAndSwapInt
方法来尝试更新value
的值,如果更新失败(当前值在此期间被其他线程修改),它会重新获取当前值并再次尝试更新,直到操作成功。
CAS算法存在哪些问题
ABA问题
如果一个变量 V 初次读取的时候是 A 值,并且在准备赋值的时候检查到它仍然是 A 值,那我们就能说明它的值没有被其他线程修改过了吗?很明显是不能的,因为在这段时间它的值可能被改为其他值,然后又改回 A,那 CAS 操作就会误认为它从来没有被修改过。这个问题被称为 CAS 操作的 “ABA”问题。
解决方案:
ABA 问题的解决思路是在变量前面追加上版本号或者时间戳。JDK 1.5 以后的 AtomicStampedReference
类就是用来解决 ABA 问题的,其中的 compareAndSet()
方法就是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
循环时间开销大
CAS 经常会用到自旋操作来进行重试,也就是不成功就一直循环执行直到成功。如果长时间不成功,会给 CPU 带来非常大的执行开销。
解决方案:
- 限制自旋次数:设置最大重试次数,超过后切换到阻塞模式(如使用 Lock)。
- 使用更高级的同步机制:例如 ReentrantLock 或 Semaphore,它们可以在竞争激烈时挂起线程,减少 CPU 开销。
只能保证一个共享变量的原子操作
CAS 操作仅能对单个共享变量有效。当需要操作多个共享变量时,CAS 就显得无能为力。
解决方案:
- 使用锁:通过显式锁(如 ReentrantLock)或内置锁(synchronized)来保证多个变量的原子性。
- 封装成对象:将多个变量封装到一个对象中,使用 AtomicReference 对整个对象进行 CAS 操作。
Synchronized
synchronized
关键字的使用方式主要有下面 3 种:
- 修饰实例方法(锁当前对象实例)
- 修饰静态方法(锁当前类)
- 修饰代码块(锁指定对象/类)
总结:
synchronized
关键字加到static
静态方法和synchronized(class)
代码块上都是是给 Class 类上锁;synchronized
关键字加到实例方法上是给对象实例上锁;- 尽量不要使用
synchronized(String a)
因为 JVM 中,字符串常量池具有缓存功能。
synchronized底层实现
它依赖于 JVM 的监视器锁(Monitor)机制,每个对象有一个监视器锁(monitor)。当 monitor 被占用时就会处于锁定状态,线程执行 monitorenter 指令时尝试获取锁,会判断 monitor 的进入数是否为 0 ,如果为 0 则该线程进入monitor,然后将进入数设置为 1,该线程即为monitor的所有者;如果不为 0,说明已有线程占有该monitor,那么线程就会进入并处于阻塞状态,直到monitor的进入数为 0,才会重新尝试获取monitor的所有权。
退出同步代码块时,线程会执行 monitorexit,该线程必须是 objectref 所对应的 monitor 的所有者。指令执行时,monitor 的进入数减 1,如果减 1 后进入数为 0,那线程退出 monitor,不再是这个 monitor 的所有者。其他被这个 monitor 阻塞的线程可以尝试去获取这个 monitor 的所有权。
synchronized和volatile的区别:
synchronized
关键字和 volatile
关键字是两个互补的存在,而不是对立的存在!
volatile
关键字是线程同步的轻量级实现,所以volatile
性能肯定比synchronized
关键字要好 。但是volatile
关键字只能用于变量而synchronized
关键字可以修饰方法以及代码块 。volatile
关键字能保证数据的可见性,但不能保证数据的原子性。synchronized
关键字两者都能保证。volatile
关键字主要用于解决变量在多个线程之间的可见性,而synchronized
关键字解决的是多个线程之间访问资源的同步性。
ReentrantLock
ReentrantLock
实现了Lock
接口,是一个可重入且独占式的锁,和synchronized
关键字类似。不过,ReentrantLock
更灵活、更强大,增加了轮询、超时、中断、公平锁和非公平锁等高级功能。ReentrantLock
里面有一个内部类Sync
,Sync
继承 AQS(AbstractQueuedSynchronizer
),添加锁和释放锁的大部分操作实际上都是在Sync
中实现的。Sync
有公平锁FairSync
和非公平锁NonfairSync
两个子类。(默认使用非公平锁,也可以通过构造器来显式的指定使用公平锁。)
Synchronized和ReentrantLock的异同
相同点:
两者都是可重入锁
不同点:
- synchronized依赖于jvm,而reentrantLock依赖于API
synchronized
是 Java 的内置关键字,它是隐式的,通过 JVM 提供的监视器锁机制实现同步,使用简单,无需手动管理锁的获取和释放ReentrantLock
是 java.util.concurrent.locks 包中的一个类,它是显式的,提供了更灵活的锁机制,需要开发者手动调用 lock() 和 unlock() 方法来控制锁的生命周期。
- 相比
synchronized
,ReentrantLock
增加了一些高级功能。- 等待可中断 :
ReentrantLock
提供了一种能够中断等待锁的线程的机制,通过lock.lockInterruptibly()
来实现这个机制。也就是说当前线程在等待获取锁的过程中,如果其他线程中断当前线程「interrupt()
」,当前线程就会抛出InterruptedException
异常,可以捕捉该异常进行相应处理。 - 可实现公平锁 :
ReentrantLock
可以指定是公平锁还是非公平锁。而synchronized
只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。ReentrantLock
默认情况是非公平的,可以通过ReentrantLock
类的ReentrantLock(boolean fair)
构造方法来指定是否是公平的。 - 可实现选择性通知(锁可以绑定多个条件):
synchronized
关键字与wait()
和notify()
/notifyAll()
方法相结合可以实现等待/通知机制。ReentrantLock
类当然也可以实现,但是需要借助于Condition
接口与newCondition()
方法。 - 支持超时 :
ReentrantLock
提供了tryLock(timeout)
的方法,可以指定等待获取锁的最长等待时间,如果超过了等待时间,就会获取锁失败,不会一直等待。
- 等待可中断 :
synchronized
在退出同步代码块时会自动释放锁,即使发生异常也不会导致死锁;而ReentrantLock
需要开发者手动调用 unlock() 方法释放锁,因此必须在 finally 块中确保锁的释放,否则可能导致死锁。
*补充
锁的状态有:
- 无锁状态(Unlocked):当一个对象或资源没有被任何线程持有锁时,它处于无锁状态。此时,多个线程可以自由访问该资源。
- 偏向锁(Biased Locking):偏向锁是一种优化机制,用于减少无竞争情况下的同步开销。当一个线程第一次获取锁时,JVM会将锁标记为偏向该线程,并记录线程ID。如果后续该线程再次尝试获取锁,无需进行额外的同步操作,直接判断线程ID是否匹配即可。偏向锁适用于只有一个线程访问同步块的场景。
- 轻量级锁(Lightweight Locking):当有第二个线程尝试获取已经被偏向的锁时,偏向锁会升级为轻量级锁。轻量级锁通过CAS(Compare-And-Swap)操作来尝试获取锁。如果CAS操作成功,则线程获取锁;如果失败,则进入自旋等待状态,尝试多次获取锁。
- 重量级锁(Heavyweight Locking):当多个线程竞争锁且自旋等待无法快速获取锁时,轻量级锁会升级为重量级锁。重量级锁会将未获取锁的线程挂起(进入阻塞状态),并由操作系统调度。这种方式会带来较大的性能开销,因为线程的挂起和唤醒需要上下文切换。
锁的升级过程:
锁的升级过程是一个从低开销到高开销的逐步演化过程,目的是在不同竞争程度下选择最优的锁实现。以下是锁升级的具体流程:
- 初始状态:无锁,对象刚创建时,没有任何线程竞争锁,处于无锁状态。
- 偏向锁,第一个线程尝试获取锁时,JVM会将锁标记为偏向锁,并记录线程ID。后续该线程再次尝试获取锁时,只需检查线程ID是否匹配,无需额外操作。
- 轻量级锁,当第二个线程尝试获取锁时,偏向锁失效,升级为轻量级锁。轻量级锁通过CAS操作尝试获取锁。如果CAS操作失败,线程会进入自旋状态,反复尝试获取锁。
- 重量级锁,如果自旋一定次数后仍然无法获取锁,或者系统检测到锁竞争激烈,轻量级锁会升级为重量级锁。重量级锁会将未获取锁的线程挂起,避免CPU资源浪费。
需要注意的是,锁的升级是单向的,即从无锁 → 偏向锁 → 轻量级锁 → 重量级锁。一旦锁升级为重量级锁,就不会再降级为轻量级锁或偏向锁。
ThreadLocal
ThreadLocal为变量在每个线程中都创建了一个副本,那么每个线程可以访问自己内部的副本变量。
ThreadLocal原理:
threadlocal的set()方法如下:
public void set(T value) {
//获取当前请求的线程
Thread t = Thread.currentThread();
//取出 Thread 类内部的 threadLocals 变量(哈希表结构)
ThreadLocalMap map = getMap(t);
if (map != null)
// 将需要存储的值放入到这个哈希表中
map.set(this, value);
else
createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
- 通过上面这些内容,我们足以通过猜测得出结论:最终的变量是放在了当前线程的
ThreadLocalMap
中,并不是存在ThreadLocal
上,ThreadLocal
可以理解为只是ThreadLocalMap
的封装,传递了变量值。ThrealLocal
类中可以通过Thread.currentThread()
获取到当前线程对象后,直接通过getMap(Thread t)
可以访问到该线程的ThreadLocalMap
对象。 - 每个
Thread
中都具备一个ThreadLocalMap
,而ThreadLocalMap
可以存储以ThreadLocal
为 key ,Object 对象为 value 的键值对。
threadlocal的数据结构如下图:
ThreadLocal是怎么导致内存泄漏问题的
通过上面的内容我们已经知道:每个线程维护一个名为 ThreadLocalMap
的 map。 当你使用 ThreadLocal
存储值时,实际上是将值存储在当前线程的 ThreadLocalMap
中,其中 ThreadLocal
实例本身作为 key,而你要存储的值作为 value。
ThreadLocalMap
的 key
和 value
引用机制:
- key 是弱引用:
ThreadLocalMap
中的 key 是ThreadLocal
的弱引用 (WeakReference<ThreadLocal<?>>
)。 这意味着,如果ThreadLocal
实例不再被任何强引用指向,垃圾回收器会在下次 GC 时回收该实例,导致ThreadLocalMap
中对应的 key 变为null
。 - value 是强引用:
ThreadLocalMap
中的 value 是强引用。 即使 key 被回收(变为null
),value 仍然存在于ThreadLocalMap
中,被强引用,不会被回收。
当 ThreadLocal
实例失去强引用后,其对应的 value 仍然存在于 ThreadLocalMap
中,因为 Entry
对象强引用了它。如果线程持续存活(例如线程池中的线程),ThreadLocalMap
也会一直存在,导致 key 为 null
的 entry 无法被垃圾回收,机会造成内存泄漏。
如何避免内存泄漏的发生?
- 在使用完
ThreadLocal
后,务必调用remove()
方法。 这是最安全和最推荐的做法。remove()
方法会从ThreadLocalMap
中显式地移除对应的 entry,彻底解决内存泄漏的风险。 即使将ThreadLocal
定义为static final
,也强烈建议在每次使用后调用remove()
。 - 在线程池等线程复用的场景下,使用
try-finally
块可以确保即使发生异常,remove()
方法也一定会被执行。
如何跨线程传递ThreadLocal的值
如果想要在异步场景下传递 ThreadLocal
值,有两种解决方案:
InheritableThreadLocal
:InheritableThreadLocal
是 JDK1.2 提供的工具,继承自ThreadLocal
。使用InheritableThreadLocal
时,会在创建子线程时,令子线程继承父线程中的ThreadLocal
值,但是无法支持线程池场景下的ThreadLocal
值传递。TransmittableThreadLocal
:TransmittableThreadLocal
(简称 TTL) 是阿里巴巴开源的工具类,继承并加强了InheritableThreadLocal
类,可以在线程池的场景下支持ThreadLocal
值传递。
线程池
线程池就是管理一系列线程的资源池。当有任务要处理时,直接从线程池中获取线程来处理,处理完之后线程并不会立即被销毁,而是等待下一个任务
为什么要使用线程池
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
如何创建线程池
方式一:通过ThreadPoolExecutor
构造函数来创建(推荐)。
方式二:通过 Executor
框架的工具类 Executors
来创建。
Executors
工具类可以创建多种类型的线程池,包括:
FixedThreadPool
:固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。SingleThreadExecutor
: 只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。CachedThreadPool
: 可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。ScheduledThreadPool
:给定的延迟后运行任务或者定期执行任务的线程池。
为什么不推荐使用Executors来创建线程池
Executors
返回线程池对象的弊端如下:
FixedThreadPool
和SingleThreadExecutor
:使用的是有界阻塞队列是LinkedBlockingQueue
,其任务队列的最大长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。CachedThreadPool
:使用的是同步队列SynchronousQueue
, 允许创建的线程数量为Integer.MAX_VALUE
,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。ScheduledThreadPool
和SingleThreadScheduledExecutor
:使用的无界的延迟阻塞队列DelayedWorkQueue
,任务队列最大长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。
Runnable和Callable的区别
- 1、最大的区别,runnable没有返回值,而实现callable接口的任务线程能返回执行结果
- 2、callable接口实现类中的run方法允许异常向上抛出,可以在内部处理,try catch,但是runnable接口实现类中run方法的异常必须在内部处理,不能抛出
线程池的常见参数有哪些
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor
3 个最重要的参数:
corePoolSize
: 任务队列未达到队列容量时,最大可以同时运行的线程数量。maximumPoolSize
: 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。workQueue
: 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
ThreadPoolExecutor
其他常见参数 :
keepAliveTime
:当线程池中的线程数量大于corePoolSize
,即有非核心线程(线程池中核心线程以外的线程)时,这些非核心线程空闲后不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
才会被回收销毁。unit
:keepAliveTime
参数的时间单位。threadFactory
:executor 创建新线程的时候会用到。handler
:拒绝策略
线程池的拒绝策略有哪些
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor
定义一些策略:
ThreadPoolExecutor.AbortPolicy
:抛出RejectedExecutionException
来拒绝新任务的处理。ThreadPoolExecutor.CallerRunsPolicy
:调用执行者自己的线程运行任务,也就是直接在调用execute
方法的线程中运行(run
)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果你的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。ThreadPoolExecutor.DiscardPolicy
:不处理新任务,直接丢弃掉。ThreadPoolExecutor.DiscardOldestPolicy
:此策略将丢弃最早的未处理的任务请求。
如果不允许丢弃任务的话,选择CallerRunsPolicy,只要当前程序不关闭就会使用执行execute
方法的线程执行该任务。
CallerRunsPolicy有什么风险
如果走到CallerRunsPolicy
的任务是个非常耗时的任务,且处理提交任务的线程是主线程,可能会导致主线程阻塞,影响程序的正常运行。
解决方案:
我们从问题的本质入手,调用者采用CallerRunsPolicy
是希望所有的任务都能够被执行,暂时无法处理的任务又被保存在阻塞队列BlockingQueue
中。这样的话,在内存允许的情况下,我们可以增加阻塞队列BlockingQueue
的大小并调整堆内存以容纳更多的任务,确保任务能够被准确执行。
为了充分利用 CPU,我们还可以调整线程池的maximumPoolSize
(最大线程数)参数,这样可以提高任务处理速度,避免累计在 BlockingQueue
的任务过多导致内存用完。
如何彻底解决?
如果服务器资源已达到可利用的极限,这就意味我们要在设计策略上改变线程池的调度了,我们都知道,导致主线程卡死的本质就是因为我们不希望任何一个任务被丢弃。换个思路,有没有办法既能保证任务不被丢弃且在服务器有余力时及时处理呢?
这里提供的一种任务持久化的思路,这里所谓的任务持久化,包括但不限于:
- 设计一张任务表将任务存储到 MySQL 数据库中。
- Redis 缓存任务。
- 将任务提交到消息队列中。
这里以mysql为例,简单介绍一下实现逻辑:
- 实现
RejectedExecutionHandler
接口自定义拒绝策略,自定义拒绝策略负责将线程池暂时无法处理(此时阻塞队列已满)的任务入库(保存到 MySQL 中)。注意:线程池暂时无法处理的任务会先被放在阻塞队列中,阻塞队列满了才会触发拒绝策略。 - 继承
BlockingQueue
实现一个混合式阻塞队列,该队列包含 JDK 自带的ArrayBlockingQueue
。另外,该混合式阻塞队列需要修改取任务处理的逻辑,也就是重写take()
方法,取任务时优先从数据库中读取最早的任务,数据库中无任务时再从ArrayBlockingQueue
中去取任务。
线程池中的阻塞队列
- 容量为
Integer.MAX_VALUE
的LinkedBlockingQueue
(有界阻塞队列):FixedThreadPool
和SingleThreadExecutor
。FixedThreadPool
最多只能创建核心线程数的线程(核心线程数和最大线程数相等),SingleThreadExecutor
只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。 SynchronousQueue
(同步队列):CachedThreadPool
。SynchronousQueue
没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。也就是说,CachedThreadPool
的最大线程数是Integer.MAX_VALUE
,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。-
DelayedWorkQueue
(延迟队列):ScheduledThreadPool
和SingleThreadScheduledExecutor
。DelayedWorkQueue
的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue
添加元素满了之后会自动扩容,增加原来容量的 50%,即永远不会阻塞,最大扩容可达Integer.MAX_VALUE
,所以最多只能创建核心线程数的线程。 ArrayBlockingQueue
(有界阻塞队列):底层由数组实现,容量一旦创建,就不能修改。
线程池处理任务的流程
再提一个有意思的小问题:线程池在提交任务前,可以提前创建线程吗?
答案是可以的!ThreadPoolExecutor
提供了两个方法帮助我们在提交任务之前,完成核心线程的创建,从而实现线程池预热的效果:
prestartCoreThread()
:启动一个线程,等待任务,如果已达到核心线程数,这个方法返回 false,否则返回 true;prestartAllCoreThreads()
:启动所有的核心线程,并返回启动成功的核心线程数。
线程池中的线程异常后,销毁还是复用
- 使用
execute()
提交任务:当任务通过execute()
提交到线程池并在执行过程中抛出异常时,如果这个异常没有在任务内被捕获,那么该异常会导致当前线程终止,并且异常会被打印到控制台或日志文件中。线程池会检测到这种线程终止,并创建一个新线程来替换它,从而保持配置的线程数不变。 - 使用
submit()
提交任务:对于通过submit()
提交的任务,如果在任务执行中发生异常,这个异常不会直接打印出来。相反,异常会被封装在由submit()
返回的Future
对象中。当调用Future.get()
方法时,可以捕获到一个ExecutionException
。在这种情况下,线程不会因为异常而终止,它会继续存在于线程池中,准备执行后续的任务。
简单来说:使用execute()
时,未捕获异常导致线程终止,线程池创建新线程替代;使用submit()
时,异常被封装在Future
中,线程继续复用。
如何设置线程池的大小
线程池大小设置过大或者过小都会有问题,合适的才是最好。
- 如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的,CPU 根本没有得到充分利用。
- 如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。
有一个简单并且适用面比较广的公式:
- CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
- I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何设计一个根据任务优先级来执行的线程池
假如我们需要实现一个优先级任务线程池的话,那可以考虑使用 PriorityBlockingQueue
(优先级阻塞队列)作为任务队列(ThreadPoolExecutor
的构造函数有一个 workQueue
参数可以传入任务队列)。
PriorityBlockingQueue
是一个支持优先级的无界阻塞队列,可以看作是线程安全的 PriorityQueue
,两者底层都是使用小顶堆形式的二叉堆,即值最小的元素优先出队。不过,PriorityQueue
不支持阻塞操作。
要想让 PriorityBlockingQueue
实现对任务的排序,传入其中的任务必须是具备排序能力的,方式有两种:
- 提交到线程池的任务实现
Comparable
接口,并重写compareTo
方法来指定任务之间的优先级比较规则。 - 创建
PriorityBlockingQueue
时传入一个Comparator
对象来指定任务之间的排序规则(推荐)。
存在的问题和解决方案:
这存在一些风险和问题,比如:
PriorityBlockingQueue
是无界的,可能堆积大量的请求,从而导致 OOM。- 可能会导致饥饿问题,即低优先级的任务长时间得不到执行。
- 由于需要对队列中的元素进行排序操作以及保证线程安全(并发控制采用的是可重入锁
ReentrantLock
),因此会降低性能。
解决方案:
- 对于 OOM 这个问题的解决比较简单粗暴,就是继承
PriorityBlockingQueue
并重写一下offer
方法(入队)的逻辑,当插入的元素数量超过指定值就返回 false 。 - 饥饿问题这个可以通过优化设计来解决(比较麻烦),比如等待时间过长的任务会被移除并重新添加到队列中,但是优先级会被提升。
- 对于性能方面的影响,是没办法避免的,毕竟需要对任务进行排序操作。并且,对于大部分业务场景来说,这点性能影响是可以接受的
Future
Future类有什么用
Future
类是异步思想的典型运用,主要用在一些需要执行耗时任务的场景,避免程序一直原地等待耗时任务执行完成,执行效率太低。具体来说是这样的:当我们执行某一耗时的任务时,可以将这个耗时任务交给一个子线程去异步执行,同时我们可以干点其他事情,不用傻傻等待耗时任务执行完成。等我们的事情干完后,我们再通过 Future
类获取到耗时任务的执行结果。这样一来,程序的执行效率就明显提高了。
这其实就是多线程中经典的 Future 模式,你可以将其看作是一种设计模式,核心思想是异步调用,主要用在多线程领域,并非 Java 语言独有。
在 Java 中,Future
类只是一个泛型接口,位于 java.util.concurrent
包下,其中定义了 5 个方法,主要包括下面这 4 个功能:
- 取消任务;
- 判断任务是否被取消;
- 判断任务是否已经执行完成;
- 获取任务执行结果。
Callable和Future有什么关系
我们可以通过 FutureTask
来理解 Callable
和 Future
之间的关系。
FutureTask
提供了 Future
接口的基本实现,常用来封装 Callable
和 Runnable
,具有取消任务、查看任务是否执行完成以及获取任务执行结果的方法。ExecutorService.submit()
方法返回的其实就是 Future
的实现类 FutureTask
。
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
FutureTask
有两个构造函数,可传入 Callable
或者 Runnable
对象。实际上,传入 Runnable
对象也会在方法内部转换为Callable
对象。
FutureTask
相当于对Callable
进行了封装,管理着任务执行的情况,存储了 Callable
的 call
方法的任务执行结果。
CompletableFuture类有什么用
Future
在实际使用过程中存在一些局限性比如不支持异步任务的编排组合、获取计算结果的 get()
方法为阻塞调用。
Java 8 才被引入CompletableFuture
类可以解决Future
的这些缺陷。CompletableFuture
除了提供了更为好用和强大的 Future
特性之外,还提供了函数式编程、异步任务编排组合(可以将多个异步任务串联起来,组成一个完整的链式调用)等能力。
CompletableFuture支持回调函数,也就是说我们可以在异步计算完成后自动执行某些操作。
一个任务需要依赖另外两个任务执行完之后再执行,怎么设计
这种任务编排场景非常适合通过CompletableFuture
实现。这里假设要实现 T3 在 T2 和 T1 执行完后执行。
// T1
CompletableFuture<Void> futureT1 = CompletableFuture.runAsync(() -> {
System.out.println("T1 is executing. Current time:" + DateUtil.now());
// 模拟耗时操作
ThreadUtil.sleep(1000);
});
// T2
CompletableFuture<Void> futureT2 = CompletableFuture.runAsync(() -> {
System.out.println("T2 is executing. Current time:" + DateUtil.now());
ThreadUtil.sleep(1000);
});
// 使用allOf()方法合并T1和T2的CompletableFuture,等待它们都完成
CompletableFuture<Void> bothCompleted = CompletableFuture.allOf(futureT1, futureT2);
// 当T1和T2都完成后,执行T3
bothCompleted.thenRunAsync(() -> System.out.println("T3 is executing after T1 and T2 have completed.Current time:" + DateUtil.now()));
// 等待所有任务完成,验证效果
ThreadUtil.sleep(3000);
使用completableFuture有一个任务失败怎么处理
使用 CompletableFuture
的时候一定要以正确的方式进行异常处理,避免异常丢失或者出现不可控问题。
下面是一些建议:
- 使用
whenComplete
方法可以在任务完成时触发回调函数,并正确地处理异常,而不是让异常被吞噬或丢失。 - 使用
exceptionally
方法可以处理异常并重新抛出,以便异常能够传播到后续阶段,而不是让异常被忽略或终止。 - 使用
handle
方法可以处理正常的返回结果和异常,并返回一个新的结果,而不是让异常影响正常的业务逻辑。 - 使用
CompletableFuture.allOf
方法可以组合多个CompletableFuture
,并统一处理所有任务的异常,而不是让异常处理过于冗长或重复。 - ……
在使用completableFuture的时候为什么要使用自定义的线程池
CompletableFuture
默认使用全局共享的 ForkJoinPool.commonPool()
作为执行器,所有未指定执行器的异步任务都会使用该线程池。这意味着应用程序、多个库或框架(如 Spring、第三方库)若都依赖 CompletableFuture
,默认情况下它们都会共享同一个线程池。
虽然 ForkJoinPool
效率很高,但当同时提交大量任务时,可能会导致资源竞争和线程饥饿,进而影响系统性能。
为避免这些问题,建议为 CompletableFuture
提供自定义线程池,带来以下优势:
- 隔离性:为不同任务分配独立的线程池,避免全局线程池资源争夺。
- 资源控制:根据任务特性调整线程池大小和队列类型,优化性能表现。
- 异常处理:通过自定义
ThreadFactory
更好地处理线程中的异常情况。
AQS原理
AQS 是基于 CLH 锁 (Craig, Landin, and Hagersten locks) 进一步优化实现的。
CLH 锁 对自旋锁进行了改进,是基于单链表的自旋锁。在多线程场景下,会将请求获取锁的线程组织成一个单向队列,每个等待的线程会通过自旋访问前一个线程节点的状态,前一个节点释放锁之后,当前节点才可以获取锁。CLH 锁 的队列结构如下图所示。
AQS 的 CLH 变体队列是一个双向队列,会暂时获取不到锁的线程将被加入到该队列中,CLH 变体队列和原本的 CLH 锁队列的区别主要有两点:
- 由 自旋 优化为 自旋 + 阻塞 :自旋操作的性能很高,但大量的自旋操作比较占用 CPU 资源,因此在 CLH 变体队列中会先通过自旋尝试获取锁,如果失败再进行阻塞等待。(自旋修改state,否则加入阻塞队列)
- 由 单向队列 优化为 双向队列 :在 CLH 变体队列中,会对等待的线程进行阻塞操作,当队列前边的线程释放锁之后,需要对后边的线程进行唤醒,因此增加了
next
指针,成为了双向队列。
AQS(AbstractQueuedSynchronizer
)的核心原理图:
AQS 使用 int 成员变量 state
表示同步状态,通过内置的 线程等待队列 来完成获取资源线程的排队工作。
state
变量由 volatile
修饰,用于展示当前临界资源的获锁情况。
- 以
ReentrantLock
为例,state
初始值为 0,表示未锁定状态。A 线程lock()
时,会调用tryAcquire()
独占该锁并将state+1
。此后,其他线程再tryAcquire()
时就会失败,直到 A 线程unlock()
到state=
0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state
会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。 - 再以
CountDownLatch
以例,任务分为 N 个子线程去执行,state
也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后countDown()
一次,state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即state=0
),会unpark()
主调用线程,然后主调用线程就会从await()
函数返回,继续后续动作。
Semaphore
synchronized
和ReentrantLock
都是一次只允许一个线程访问某个资源,而Semaphore
(信号量)可以用来控制同时访问特定资源的线程数量。- 当初始的资源个数为 1 的时候,
Semaphore
退化为排他锁。 Semaphore
有两种模式:- 公平模式: 调用
acquire()
方法的顺序就是获取许可证的顺序,遵循 FIFO; - 非公平模式: 抢占式的。(默认)
- 公平模式: 调用
Semaphore
通常用于那些资源有明确访问数量限制的场景比如限流(仅限于单机模式,实际项目中推荐使用 Redis +Lua 来做限流)。
原理
Semaphore
是共享锁的一种实现,它默认构造 AQS 的state
值为permits
,你可以将permits
的值理解为许可证的数量,只有拿到许可证的线程才能执行。- 调用
semaphore.acquire()
,线程尝试获取许可证,如果state >= 0
的话,则表示可以获取成功。如果获取成功的话,使用 CAS 操作去修改state
的值state=state-1
。如果state<0
的话,则表示许可证数量不足。此时会创建一个 Node 节点加入阻塞队列,挂起当前线程。 - 调用
semaphore.release();
,线程尝试释放许可证,并使用 CAS 操作去修改state
的值state=state+1
。释放许可证成功之后,同时会唤醒同步队列中的一个线程。被唤醒的线程会重新尝试去修改state
的值state=state-1
,如果state>=0
则获取令牌成功,否则重新进入阻塞队列,挂起线程。
CountDownLatch
CountDownLatch
允许 count
个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
CountDownLatch
是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch
使用完毕后,它不能再次被使用。
原理
CountDownLatch
是共享锁的一种实现,它默认构造 AQS 的 state
值为 count
。当线程使用 countDown()
方法时,其实使用了tryReleaseShared
方法以 CAS 的操作来减少 state
,直至 state
为 0 。当调用 await()
方法的时候,如果 state
不为 0,那就证明任务还没有执行完毕,await()
方法就会一直阻塞,也就是说 await()
方法之后的语句不会被执行。直到count
个线程调用了countDown()
使 state 值被减为 0,或者调用await()
的线程被中断,该线程才会从阻塞中被唤醒,await()
方法之后的语句得到执行。