# 多线程
# 并发与并行
- 并发: 多个任务在同一CPU,按照细分的时间片轮流交替进行。由于时间很短,看上去好像同时进行;
- 并行: 单位时间内,多个处理器,或多核处理器同时处理对各任务,是真正意义上的同时进行;
# 进程
内存中运行的应用程序,每一个进程都有自己独立的内存空间;
# 线程
进程中的一个控制单元,负责当前进程中的程序执行;
一个进程至少有一个线程,可运行多个线程,多个线程间可共享数据;
# 为什么使用多线程
- 充分利用多核CPU的计算能力;
- 提高系统并发能力和性能;
# 多线程的缺点
- 线程越多,内存占用也越多
- 多线程需要协调和管理
- 线程之间的共享资源访问会相互影响
### 并发编程的三要素
- 原子性
- 可见性: 一个线程对共享变量的修改,另一个线程能看到(synchronized,volatile)
- 有序性:程序的执行顺序按照代码的先后顺序
# 线程安全问题
- 线程切换带来的原子性问题
- 缓存导致的可见性问题
- 编译优化带来的有序性问题
# 线程安全问题的应对方案
- 原子性问题:
- JDK Atomic开头的原子类
- synchronized关键字
- lock锁机制
- 可见性问题
- synchronized关键字
- volatile描述符
- lock锁机制
- 有序性问题
- happens-before规则
# 线程死锁
两个或以上线程在执行过程中,由于竞争资源或彼此通信造成的一种堵塞现象.
# 线程死锁的必要条件
- 互斥条件
- 请求保持
- 不剥夺
- 循环等待
# 死锁的解决方案
- 破坏请求保持条件: 一次申请所有资源
- 破坏不剥夺条件: 占有部分资源尝试申请其它资源时,申请不到则释放占有的资源;
- 破坏循环等待条件: 按序申请资源
# java线程理论知识
# 线程状态
public
class Thread implements Runnable {
public enum State {
/*xxx: 新建*/
NEW,
/*xxx: 可运行(就绪状态)*/
RUNNABLE,
/*xxx: 锁阻塞*/
BLOCKED,
/*xxx: 无限等待*/
WAITING,
/*xxx: 计时等待*/
TIMED_WAITING,
/*xxx: 被终止*/
TERMINATED;
}
}
# 线程状态流转
- 新建->就绪: start()方法
- 就绪->阻塞:
- 运行的线程执行 object.wait()方法
- 运行的线程未获取到synchronized锁时
- 运行的线程执行了 Thread.sleep()方法,子线程的join()方法,或者发出IO请求时
- 阻塞->就绪:
- 其他线程notify(),本线程获取到锁对象
- sleep()时间到,或者wait()时间到并获取到锁
- join()等待的线程终止或者超时,或者IO操作完成
- 就绪->死亡:
- 执行完run()方法正常退出
- 因未捕获的异常,导致run()方法异常退出
# 线程调度
线程有两种调度模型,分时调度和抢占式调度
- 分时调度: 让所有的线程轮流获得CPU的使用权,并且平均分配各个线程占有CPU的时间片;
- 抢占式调度: 指让线池中,优先级高的线程首先占用CPU,优先级高的线程分配的CPU时间片相对多一些;对于线程池中,优先级相同的线程,随机选择; java虚拟机采用的是抢占式调度模型
# java线程调度策略
线程调度优先选择优先级高的运行,出现以下情况时,会暂停运行
- 线程调用yield()方法,让出CPU使用权
- 线程调用slepp()方法,进入计时状态
- 线程IO受阻
- 另一个更高优先级的线程出现
- 该线程的时间片用完
# 线程调度和时间分片
线程调度负责为Runnable状态的线程分配CPU时间片;
时间分片指分配给线程的计算资源;
# java线程同步和线程调度相关的方法
- wait(): 进入阻塞状态,并释放锁
- sleep(): 进入阻塞状态,不释放锁
- yield(): 让出CPU时间分片
- notify(): 幻想一个处于等待状态的线程,与线程的优先级有关
- notifyAll(): 唤醒所有处于阻塞的线程,让它们去竞争锁;
# wait与sleep的区别
- 类不同: sleep是thread的方法,wait是object的方法
- 是否释放锁不同: sleep不释放锁,wait释放锁
- 用处不同: wait用于线程间通信,sleep用于暂停执行;
- 用法不同: wait需要与notify 或notifyAll配合执行; sleep经过一段时间后自动苏醒; wait也可传参使其自动苏醒,但其苏醒后需要获取到锁才能进入就绪就绪状态,否则处于阻塞状态;
# wait的正确使用方式:if还是循环
处于 阻塞状态的线程,可能会受到错误警告或者伪唤醒,因此需要在循环中检查等待条件;
class Test{
void myFunction1(){
synchronized (monitor) {
// 判断条件谓词是否得到满足
while(!locked) {
// 等待唤醒
monitor.wait();
}
// 处理其他的业务逻辑
}
}
void myFunction2(){
/*xxx: 错误唤醒后,如果条件谓语没满足,将不再进行检查*/
synchronized (monitor) {
// 判断条件谓词是否得到满足
if(!locked) {
// 等待唤醒
monitor.wait();
}
// 处理其他的业务逻辑
}
}
}
# 为什么线程通信方法wait()、notify()、notifyAll()方法要定义在 Object类中
java中任何对象都可以被当作锁对象;
一个线程可以持有多个锁对象,直接定义在Thread类中管理起来比较麻烦;
综上,wait()、notify()、notifyAll()定义在 object类中更好;
# 为什么线程通信方法wait()、notify()、notifyAll()要在同步代码块中调用
这些方法会释放锁对象,要求这些方法需要线程持有锁对象,只能通过同步来实现;
因此,它们只能在 同步块 或 同步方法中被调用;
# Thread的yield与sleep方法为什么是静态的
sleep() 和 yield()方法作用于当前正在运行的线程,其他处于等待状态的线程调用它们没有意义,因此设置为静态比较合适;
# 如何停止正在运行的线程
- 使用interrupt()方法终止线程
- run方法执行完毕,正常退出
# 线程间数据共享
两个线程间共享变量,即可实现共享数据;
一般而言,共享的变量要求变量本身是线程安全的;
# 同步代码块与同步方法哪个更好
同步方法会锁住整个对象,同步块不会;
根据同步范围越小越好的原则,同步块更好;
# 线程安全
指某个方法在多线程环境下被调用时,能够正确处理多线程之间的共享变量,程序能够正确完成;
# servlet是否是线程安全的
servlet时单实例多线程,不是线程安全的;
springMVC的Controller和Servlet一样,也是属于单实例多线程的,不能保障共享变量的安全性;
# 新线程的构造方法、静态块由谁调用
该线程的构造方法、静态块由new它出来的那个线程调用;
run()方法由它自身调用;
# Java如何保证多线程安全
- 使用安全类,如 java.util.concurrent下的类,原子类atomicInteger等
- 使用自动锁-synchronized同步锁
- 使用手动锁Lock(如 reentrantlock重入锁);
# 实现线程同步的方法
- 同步代码块: sychronized(对象){} 修饰块
- 同步方法: sychronized 修饰方法
- 重入锁实现同步 reentrantlock类的锁有互斥功能
# 线程优先级的理解
线程优先级由1-10个等级,1代表最低,10代表最高;
如非特别需要,一般不去修改优先级;
# 乐观锁与悲观锁
乐观锁: 每次拿数据的时候认为别人不会修改,所以不上锁。只有在更新时才判断下此期间数据有没有被更新。适合多读操作的场合;
悲观锁: 每次拿数据都认为别人修改过,所以会上锁。别的对象拿数据时,会阻塞。
# volatile与sychronized的区别
线程在具体执行时,会先拷贝主存数据到线程本地(CPU缓存),操作完成后,再把本地缓存的数据刷到主存
synchronized解决的时执行控制的问题,使得sychronized保护的代码块无法被并发执行。它可以保证变量的修改可见性和原子性;
volatile关键字解决的时内存可见性问题,它仅能实现对原始变量操作的原子性,不能保证符合操作的原子性;
volatile主要用于满足对变量可见性有要求,而对读取顺序没有要求的场景;
# java多线程
# 用户线程与守护线程
- 用户线程: 运行在前台,执行具体任务;
- 守护线程: 运行在后台,为其他前台线程服务; 当所有用户线程执行结束后,守护线程会随着jvm一起结束;常见的如垃圾回收线程;
# 实现多线程的方式
- 通过继承Thread类
public class Test {
public static void main(String[] args) {
MyThread t = new MyThread();
t.start();
System.out.println("主线程执行完毕");
}
static class MyThread extends Thread{
@Override
public void run() {
System.out.println("线程:" + Thread.currentThread().getName() + "执行开始");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程:" + Thread.currentThread().getName() + "执行完毕");
}
}
}
- 通过重写Runnable接口
public class Test {
public static void main(String[] args) {
Thread t = new Thread(() -> {
System.out.println("线程:" + Thread.currentThread().getName() + "执行开始");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程:" + Thread.currentThread().getName() + "执行完毕");
});
t.start();
System.out.println("主线程执行完毕");
}
}
- 通过实现Callable接口
- FutureTask体系源码
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
V get() throws InterruptedException, ExecutionException;
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
public class FutureTask<V> implements RunnableFuture<V> {
private Callable<V> callable;
private Object outcome;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
//省略...
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
//等待完成或在中断或超时时中止
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
/*xxx: 超时中断*/
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
}
- Callable实现多线程的方式
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> ft= new FutureTask<>(()->{
System.out.println("线程:" + Thread.currentThread().getName() + "执行开始");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程:" + Thread.currentThread().getName() + "执行完毕");
return "success";
});
Thread t = new Thread(ft);
t.start();
String result=ft.get();
System.out.println("子线程执行完毕,结果是:"+result);
System.out.println("主线程执行完毕");
}
}
- 通过Executors工具类创建线程池
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService= Executors.newCachedThreadPool();
executorService.submit(()->{
System.out.println("线程:" + Thread.currentThread().getName() + "执行开始");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程:" + Thread.currentThread().getName() + "执行完毕");
return "success";
});
System.out.println("主线程执行完毕");
}
}
# 线程start方法和run方法的区别
start()方法用于启动线程,只能被调用一次;调用该方法后,线程进入就绪状态;
run()方法用于执行线程的运行代码,也成为线程体,可以反复调用;
# 启动新线程不能直接调用run()方法的原因
直接执行run()方法,该方法会被当成 main线程下的一个普通方法执行,而非多线程执行;
# Callable与Future接口的含义
Callable类似于Runnable接口,但是Callable被线程执行后,可以接收返回值;即Callable用于产生结果;
Future表示异步任务,是一个可能没有完成的异步任务结果;即Future用于接收结果;
# FutureTask
代表一个异步运算的任务,传入Callable实现类作为构造参数;
是Runnable接口的实现类,与Thread相结合实现多线程;
可以对异步运算任务的结果进行等待获取、判断是否已完成、取消任务等操作;
# 线程池
# 线程池的概念
java中开辟出的一种管理线程的概念;
线程池的好处,就是可以方便的管理线程,也可以减少内存的消耗;尤其时频繁的创建并销毁线程的场景;
# 线程池体系抽象结构
public interface Executor {
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
void shutdown();
<T> Future<T> submit(Callable<T> task);
//xxx: 省略其他抽象...
}
public abstract class AbstractExecutorService implements ExecutorService {
/*xxx: submit本质上调用的execute*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}
public class ThreadPoolExecutor extends AbstractExecutorService {
/*xxx: 任务队列,遵循先进先出的规则*/
private final BlockingQueue<Runnable> workQueue;
/*xxx: 线程工厂,用于创建新线程*/
private volatile ThreadFactory threadFactory;
/*xxx: 核心线程数,核心线程会一直存活*/
private volatile int corePoolSize;
/*xxx: 非核心线程的存活时间*/
private volatile long keepAliveTime;
/*xxx: 线程池的最大线程数*/
private volatile int maximumPoolSize;
/*xxx: 线程实例集合*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/*xxx: 当 线程池达到了最大线程数,且任务队列也满了后的拒绝策略*/
private volatile RejectedExecutionHandler handler;
/*xxx: 默认的拒绝策略是抛出异常*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/*xxx: 如果核心线程数没达到阈值,则直接创建一个新的线程,并将当前任务作为该线程的第一个任务,准备参与调度*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/*xxx: 如果核心线程已经达到阈值,且将该任务入队,且入队成功的情况下*/
if (isRunning(c) && workQueue.offer(command)) {
/*xxx: 进行二次检查*/
int recheck = ctl.get();
/*xxx: 如果pool已经被shutdown,或者任务已经被消费的情况下,执行拒绝策略*/
if (! isRunning(recheck) && remove(command))
reject(command);
/*xxx: 否则,创建新的线程,参与任务调度*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*xxx: 如果排队失败,则创建新线程*/
else if (!addWorker(command, false))
/*xxx: 创建失败,则执行拒绝策略*/
reject(command);
}
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
//消费任务队列中的任务
while (task != null || (task = getTask()) != null) {
task.run();
}
}
}
}
# 四种常见的线程池
- 可缓存的线程池-CachedThreadPool: 没有核心线程,非核心线程的数量为无限大。适用于耗时少,任务量大的场景
- 在存活期内,非核心线程可以继续消费任务;
public class Executors {
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
}
- 周期性执行任务的线程池-ScheduledThreadPool: 适用于执行周期性任务的场景
public class Executors {
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
}
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
}
- 单一线程池-SingleThreadPool: 适用于有顺序任务的执行场景
public class Executors {
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
}
- 定长线程池-FixedThread: 没有非核心线程
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
}
# java多线程协作
# CountDownLatch工具
# 使用场景
- 使得当前线程阻塞,等待其它线程完成后才继续执行
- 比如大任务拆解场景,如多线程下载图片;
# 使用介绍
- 接收一个int类型的参数作为计数器
await()
阻塞当前线程,直到计数器为零await(timeout)
线程阻塞,并指定阻塞时常(可防止死锁等场景)countDown()
计数器减1,计数器为0时释放所有等待的线程getCount()
返回当前计数
# 与原始的线程协同的区别
Thread.join()
可完成单个线程协同,但使用ExecutorService的时候,不能使用Thread.join(),必须使用CountDownLatchsyncronize
配合Object
的wait
和notifyAll
也可以实现线程协同- 调用join()方法,需要等待子线程执行完毕后,才能继续向下执行。而CountDownLatch只需要计数器为0即可,更加灵活
# 锁机制Condition
# 使用场景
与
synchronized+wait+nofity+notifyAll
机制相似,Condition为锁机制下的线程协作方案Condition
由Lock
对象产生,并且一个Lock对象可以产生多个Condition
# 使用介绍
lock.newCondition()
产生这把锁的Condition,多次调用可以产生多个condition.await()
阻塞等待操作,同时会释放锁,继续执行时,必须要获取锁,否则继续阻塞condition.signal()
唤醒等待操作,但是不会释放锁,需要手动释放锁
# synchronized同步
- 略