基于哔哩哔哩视频学习笔记
JUC并发编程
并发
单个处理器同时来回进行多个操作
并行
多个处理器同时进行多个操作
栈与栈帧
每个线程启动后 虚拟机就会为其分配一块栈内存
每个栈由多个栈帧组成,对应着每次方法调用时所占用的内存
每个线程只能有一个活动栈帧,对应着当前正在 执行的那个方法
线程状态
/**
* Thread state for a thread which has not yet started.
新建
*/
NEW,
/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
运行中
*/
RUNNABLE,
/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
无法获取锁/锁排队
*/
BLOCKED,
/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called {@code Object.wait()}
* on an object is waiting for another thread to call
* {@code Object.notify()} or {@code Object.notifyAll()} on
* that object. A thread that has called {@code Thread.join()}
* is waiting for a specified thread to terminate.
无限等待
*/
WAITING,
/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
超时等待
*/
TIMED_WAITING,
/**
* Thread state for a terminated thread.
* The thread has completed execution.
结束
*/
TERMINATED;
创建线程
通过任务创建线程
Runnable runnable = new Runnable() {
@Override
public void run() {
log.info("hello");
}
};
//线程任务,线程名称
Thread thread = new Thread(runnable,"t1");
thread.start();
通过线程创建
//线程名称
Thread thread = new Thread("t1"){
//线程任务
@Override
public void run(){
log.info("hello");
}
};
thread.start();
FutureTask
FetureTask 接收Callable类型线程任务 用来处理有返回结果的线程
//创建带有返回值的线程任务
FutureTask<Integer> task = new FutureTask<>(() -> {
log.debug("running");
Thread.sleep(2000);
return 100;
});
Thread thread = new Thread(task, "t1");
thread.start();
//获取线程返回值 如果该线程任务未结束 会一直阻塞至线程结束获取到值或抛出异常
try {
log.debug("{}", task.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
线程方法
获取当前线程
currentThread();
等待线程
//等待线程运行结束
join()
//等待线程运行结束 最多等待n毫秒
join(long n)
修改线程名
setName(String name);
获取线程名
getName();
获取线程优先级
getPriority();
设置线程优先级
java中规定线程优先级是1~10整数,较大的优先级能提高该线程被CPU调度的几率
setPriority(int);
获取线程状态
getState();
睡眠线程
停止调度该线程(Timed Waiting)
在无限循环中避免cpu占用率到达100% 可以使用yield或sleep来让出cpu的使用权给其他程序
sleep(long);
//使用TimeUnit
TimeUnit.DAYS.sleep(1);
打断标记
将打断标记设置为true
如果线程正在sleep或wait将叫醒睡眠的线程 并抛出InterruptedException异常
睡眠线程 sleep wait join 打断标记为false
interrupt();
判断打断标记
如果不是打断sleep类线程 打断标记将会设置为true
可以做来判断当前线程是否被打断
//不会清除打断标记
thread.isInterrupted();
//会清除打断标记 static方法
Thread.interrupted();
让出线程
将线程从Runing 进入 Runnable 就绪状态 , 然后调度执行其他线程
yield();
守护线程
守护线程 所有非守护线程运行结束后 即使守护线程代码没有执行完,也会强制结束
//t1 为守护线程对象
t1.setDaemon(true);
线程锁
临界区: 多个线程对一个公共资源数据进行操作的代码块
synchronized 锁住任意对象,其他线程想要对其加锁需等待(阻塞)至释放对象锁
加锁成功将执行临界区代码
synchronized(object){
//临界区
}
//锁住对象this
public synchronized void a(){
}
//在静态方法中使用 锁住的为类对象
public static synchronized void a(){
}
public static void a(){
synchronized(object.class){
//临界区
}
}
Monitor重量锁
synchronized底层原理
jvm 会为加锁的对象分配一个Monitor 由jvm对象头Mark Word指向该对象地址
当中包含 EntryList Owner WaitSet
EntryList 中存储着没有拿到锁等着排队的线程 当释放锁的时候随机从中获取线程再进行上锁操作(链表)
Owner 当前锁的拥有者
WaitSet 存储者当前wait线程,当再次被唤醒时会进入EntryList
轻量级锁
轻量级锁能够提升程序性能的依据是“对绝大部分的锁,在整个同步周期内都不存在竞争”,注意这是经验数据。需要了解的是,轻量级锁所适应的场景是线程交替执行同步块的场合,如果存在同一时间访问同一锁的场合,就会导致轻量级锁膨胀为重量级锁。
优点
竞争的线程不会阻塞,使用自选,提高程序响应速度。
缺点
如果一直不能获取到锁,长时间的自旋会造成CPU消耗。
当对象已经被轻量级锁定的时候,会判断是否是锁重入,如果是重入的话,会记录一条Displaced Mark Word为空的Lock Record。如果不是重入,会膨胀为重量级锁。需要注意的是,即使膨胀为重量级锁,没有获取到锁的线程也不会马上阻塞,而是通过适应性自旋尝试获取锁,当自旋次数达到临界值后,才会阻塞未获取到的线程。JVM认为获取到锁的线程大概率会很快的释放锁,这样做是为了尽可能的避免用户态到内核态的切换。
synchronized默认会创建一个轻量级锁在栈中建立一个Lock Record,将无锁状态的Mark Word拷贝到锁记录的Displaced Mark Word中,将owner指向当前对象。
锁膨胀
多个线程对同一个锁操作后会从默认的轻量级锁膨胀为重量级锁
即为对象申请Monitor锁,让指向栈轻量锁地址值的MarkWord更改为指向Monitor对象地址
然后进入线程EntryList 等待BLOCKED
偏向锁
(java6之后引入自动开启)
在轻量锁中 进行锁重入都会再次创建一个栈获取锁记录并尝试进行替换
而偏向锁则进行锁重入之后会从Mark word中获取线程id
(mark word中替换hashcode 存储为线程id、而hashcode则无处存储 所以调用hashcode会撤销偏向锁)
判断是否为当前线程的锁
线程锁默认为偏向锁 当其他线程在不同调用该锁时会转换为轻量锁
当临界区只有一个线程使用时着重于偏向锁
当其他线程调用偏向锁对象时 将转换为轻量锁
Jvm禁用偏向锁
Vm options
-XX: -UseBiasedLocking
偏向锁延时
偏向锁默认是延迟的,不会在线程锁启动时立即生效,如果要避免延迟
Vm options
-XX:BiasedLockingStartupDelay=0
当偏向锁对象调用了hashcode将撤销偏向锁状态 因为mark word中hashcode被替换为线程id
没有地方再进行存储hashcode
一个对象在调用原生hashCode方法后(来自Object的,未被重写过的),该对象将无法进入偏向锁状态,起步就会是轻量级锁。若hashCode方法的调用是在对象已经处于偏向锁状态时调用,它的偏向状态会被立即撤销,并且锁会升级为重量级锁。
重偏向
当撤销偏向锁阈值超过20次后,jvm会批量给对象加锁时重新偏向至加锁线程
当撤销偏向锁阈值超过40次后,jvm会批量给对象加锁时会取消偏向锁 转换为轻量锁,并且该类新建对象也不再是偏向锁
自旋优化
当对象被重量锁之后 线程在进入队列之前会尝试自旋重试拿锁 当指定次数之后没有拿到锁才会进入阻塞队列
否则直接拿到锁,不再进入阻塞队列 上下文切换取阻塞队列也会造成资源消耗,这样可以起到优化
单核CPU自旋就是浪费
wait notify
指定锁线程进入WaitSet等待 并释放对象锁
只有获得对象锁才可以使用该方法,否则抛出异常
可以用作判断其他线程某件事情是否完成 如果没有则wait 如果完成其他线程调用notify唤醒
被唤醒后将延下继续执行
//无限制等待
object.wait();
//只等待1000毫秒
object.wait(1000);
随机唤醒一个WaitSet中object对象等待的线程(如果唤醒多个object等待线程需使用 notifyAll )
object.notify()
虚假唤醒(错误唤醒)
当有多个需要唤醒的线程 想要指定唤醒某个线程时可以使用notifyAll()方法唤醒所有
在被唤醒的线程中使用while来循环判断自己是否是需要被唤醒的线程
synchronized(lock){
while(条件不成立){
lock.wait();
}
//干活
}
//另一个线程
synchronized(lock){
lock.notifyAll();
}
//将值设置为true以达到唤醒标准
boolean hasTakeout = false;
//如果没有被唤醒 等待下次唤醒所有再次判断
//如果已唤醒跳过循环代码
while (!hasTakeout){
try {
Main.class.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
阻塞当前线程(park/unpark)
wait、notify和notifyAll 必须 配合Object Monitor一起使用 (线程锁synchronized)
而park和unpark不必
如果是Monitor则不会释放锁 wait会
可以使用打断标记进行打断
如果使用isInterrupted后面的park都将失效 因为打断标记没有进行清除
需要当前线程调用interrupted清除打断标记
每个线程都关联着一个park对象 当使用park时如果permit为1将消耗permit为0 不暂停线程 当为0时暂停
使用unpark补给permit为1
interrupt()方法会设置线程的中断状态为true,并调用unpark()方法,使得线程的许可permit为1。park()方法在执行时,会先检查线程的中断状态和许可permit,如果中断状态为true或许可permit为1,就不会阻塞线程,而是直接返回。park()方法返回后,会消耗掉许可permit,但不会消耗掉中断状态。因此,如果线程被interrupt()打断后,再调用park()方法,就会因为中断状态为true而失效。
LockSupport.park();
LockSupport.unpark(Thread thread);
//等待纳秒
LockSupport.parkNanos(long nanos);
//在毫秒值时间之前停止调度
LockSupport.parkUntil(long millis);
死锁
两个线程持有两个不同的锁 想要进行运行 必须获得对方正在锁住的对象
因而导致两个或多个线程都无法进行执行
活锁
两个线程在改变对方的线程结束条件 因而导致线程一直持续运行
定位死锁
通过java命令 jps查看所有java进程
再通过jstack 进程id查看具体java进程
在Found one java -level deadlock中jvm列举了发现死锁的线程
线程饥饿
在多个线程中 某个线程获取cpu调度率极低 称之为线程饥饿
可以使用ReentrantLock的公平锁解决
ReentrantLock ReentrantLock = new ReentrantLock(true);
ReentrantLock线程锁
与Synchronized它具有
可中断 (Synchronized一旦锁住除非执行完毕 否则当调用时会一直锁住)
可设置超时时间 (一旦超过某个时间,释放锁对象)
可设置为公平锁 (公平竞争锁)
支持多个条件变量(多个WaitSet对象)
//获取锁
reentrantLock.lock();
try{
//临界区
}finally{
//释放锁
reentrantLock.unlock();
}
阻塞锁打断
可被打断锁
thread.interrupt();
ReentrantLock reentrantLock = new ReentrantLock();
try {
//在锁竞争阻塞时 可被interrupt打断
reentrantLock.lockInterruptibly();
} catch (InterruptedException e) {
//被打断抛出的异常
log.debug("没有获得锁");
throw new RuntimeException(e);
}
try {
log.debug("临界区");
}finally {
reentrantLock.unlock();
}
阻塞锁超时/尝试获取锁
无参方法 尝试获取锁,已获取锁返回true 否则false
reentrantLock.tryLock()
ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
new Thread(() -> {
//无参方法 尝试获取锁,已获取锁返回true 否则false
if (!reentrantLock.tryLock()){
log.debug("未获取锁");
return;
}
try {
log.debug("临界区");
}finally {
reentrantLock.unlock();
}
}).start();
有参方法 在一定时间内为获取到锁 则超时结束
tryLock(long timeout, TimeUnit unit)
ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
//有参方法 一定时间内尝试获取锁,已获取锁返回true 否则false
try {
if (!reentrantLock.tryLock(1, TimeUnit.SECONDS)){
log.debug("未获取锁");
return;
}
} catch (InterruptedException e) {
log.debug("获取超时");
throw new RuntimeException(e);
}
try {
log.debug("临界区");
}finally {
reentrantLock.unlock();
}
公平锁
在创建ReentrantLock对象时构造函数为True则为公平锁 默认为false
一般情况没有必要使用公平锁,会降低并发度
ReentrantLock ReentrantLock = new ReentrantLock(true);
条件变量(线程睡眠/WaitSet)
创建条件变量对象
ReentrantLock reentrantLock = new ReentrantLock();
//通过reentrantLock对象锁创建条件变量(WaitSet)
Condition condition1 = reentrantLock.newCondition();
Condition condition2 = reentrantLock.newCondition();
使用条件变量对象睡眠当前线程
//线程睡眠、使用前需获取ReentrantLock锁
condition1.await();
唤醒条件变量对象中的线程
condition1.signal();
特殊域变量-易变(volatile)
volatile
能保证可见性
以及保证不被JVM重排序
(该变量在写之前的代码都不会被重排序至后面,以及读之后的代码都不会重排序至前面)
不能保证原子性
可见性
这样的一段代码 本应该再一秒后停止但实际上在一直运行
因为status对象 被高速缓存至线程内存 (当线程中多次循环运行同一个对象 将被缓存至线程内存)
对主内存修改了status 但线程缓存中读取的值没有发生变化
使用 volatile 修饰符
将从主内存中获取值(当变量全部在synchronized中使用 也可以防止被重排序)
但效率上会有所损失
private volatile static boolean status = true;
private static boolean status = true;
public static void main(String[] args){
new Thread(()->{
while (status){
}
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("stop");
status = false;
}
保证可见性
写屏障
写屏障保证在volition共享变量之前对共享变量的改动 都不会被重排序至后面
读屏障
读屏障保证在volition共享变量之后 对共享变量的读取 都是主存最新数据
并且读之后的代码都不会排序到前面
线程安全单例模式
单例模式有很多实现方法,饿汉式、懒汉式、静态内部类、枚举类、
饿汉式:
public final class Singleton implements Serializable{
//禁止被创建Singleton对象
private Singleton(){}
//饿汉式
private static final Singleton INSTANCE = new Singleton();
//使用方法提供对象 可以提供更多封、控制、支持泛型
public static Singleton getInstance(){
return INSTANCE;
}
//防止反序列化破坏单例模式
public Object readResovle(){
return INSTANCE;
}
}
枚举单例
枚举类天生就是一个单例模式 所以只需调用INSTANCE调用对象即可
public enum Sigleton{
INSTANCE;
}
无锁同步-原子操作(Atomic)
//创建对象并为其赋值
private final AtomicInteger atomicInteger = new AtomicInteger(1000);
public void withdraw(int i){
while (true){
int prew = atomicInteger.get();
int result = prew - i;
//原始值 修改后的值 当成功后返回true 否则为false
if(atomicInteger.compareAndSet(prew,result)){
break;
}
}
}
//使用也可以达到相同效果
atomicInteger.addAndGet(-1 * amount);
compareAndSet比较并设置值 当前值符合预期值时设置值并返回true
Java CAS是一种无锁的并发编程技术,它的全称是Compare And Swap,即比较和交换。它的基本思想是,通过比较一个变量的当前值和期望值,如果相同,就用一个新值替换它,否则就重试或者放弃。
CAS基于乐观锁思想 就算别的线程修改了值 再重试修改
无锁并发、无阻塞并发
在线程数少于CPU核心数时 使用CAS能大大提升效率
原子整数
AtomicInteger
AtomicLong
AtomicBoolean
自增并获取
//自增并获取值 ++i
atomicInteger.incrementAndGet();
//获取值并自增 i++
atomicInteger.getAndIncrement();
//获取值并自增指定参数
atomicInteger.getAndAdd(5);
//相反
atomicInteger.addAndGet(5);
更新并获取(对原有数据进行操作)
//IntUnaryOperator 对原有数据进行操作后获取
atomicInteger.updateAndGet(rawInteger -> rawInteger * 10);
//相反
atomicInteger.getAndUpdate(rawInteger -> rawInteger * 10);
原子引用
AtomicReference
AtomicMarkableReference
AtomicStampedReference
AtomicReference
对引用对象进行原子操作(对对象引用进行原子修改)
final AtomicReference<String> atomicReference = new AtomicReference<>("123");
//底层使用==进行比较 String是不可变所以每次创建都是新的对象
atomicReference.updateAndGet(string -> string+456);
//如果不是数据安全类使用方法
atomicReference.updateAndGet(a -> {
Integer a1 = a.getA();
return new A<>(a1 - 10);
});
AtomicStampedReference
AtomicReference只能判断出共享变量的值与最初值A是否相同,当其他线程将其修改至B
再修改会A是不知情的 也会允许其修改
AtomicStampedReference可以通过修改标记感知其他线程对引用对象的修改
当其他线程动过当前引用对象,则算cas修改失败
//创建对象 0位标记(stamp)
AtomicStampedReference<String> atomicStampedReference = new AtomicStampedReference<>("a",0);
//获取引用对象
String reference = atomicStampedReference.getReference();
//获取标记
int stamp = atomicStampedReference.getStamp();
//当对象与标记都与其相同时修改 对象及标记 成功返回true/失败false
atomicStampedReference.compareAndSet(reference,reference+"b",stamp,stamp+1);
AtomicStampedReference
AtomicStampedReference只是单纯的检查在此之前版本号有没有被更改过 如果有则进行boolean记录
//创建对象和默认值
AtomicMarkableReference<String> atomicMarkableReference = new AtomicMarkableReference<>("满",true);
//获取对象引用
String reference = atomicMarkableReference.getReference();
//当为满时并且为true创建新的对象并将其替换为空 修改为false
atomicMarkableReference.compareAndSet(reference,"空",true,false);
原子数组
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
原子字段更新器
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
AtomicReferenceFieldUpdater
原子累加器(效率更高)
LongAdder
A<Integer> a = new A<>(123);
AtomicReferenceFieldUpdater<A, Integer> atomicReferenceFieldUpdater
//参数引用对象类 字段类 字段名
= AtomicReferenceFieldUpdater.newUpdater(A.class,Integer.class,"a");
//修改的字段必须为private及volatile
atomicReferenceFieldUpdater.compareAndSet(a,a.getA(),1);
System.out.println(a.getA());
防止CPU缓存行伪共享
jdk8中@Contended 能有效防止字段-对象伪共享(对其进行缓存行填充)
线程池
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize 核心线程数(最多保留线程数)
maximumPoolSize 最大线程数 在任务队列已满情况下才会创建 - 救济线程(maximumPoolSize-corePoolSize)
keepAliveTime 生存时间 - 针对救急线程
workQueue 阻塞队列
threadFactory 线程工厂 - 可以为线程创建起名
handler 拒绝策略
JDK拒绝策略RejectedExecutionHandler
AbortPolicy 让调用者抛出RejectedExceutionException异常(默认)
CallerRunsPolicy 让调用者运行任务
DiscardPolicy 放弃本次任务
DiscardOldestPolicy 放弃列表中最早的任务,本任务取而代之
固定线程池
创建固定线程池 无急救线程
//固定线程 无限任务队列 无超时时间
//形参:线程池固定线程数
ExecutorService executors = Executors.newFixedThreadPool(1);
AtomicInteger integer = new AtomicInteger(1);
//int nThreads, ThreadFactory threadFactory
ExecutorService executors = Executors.newFixedThreadPool(1,(runnable)->{
return new Thread(runnable,"自定义线程名 - 线程:"+integer.getAndIncrement());
急救线程池(缓存线程池)
创建0核心线程 所有的线程都是急救线程(最大线程数为Integer.MAX_VALUE) 使用synchronizeQueue队列
线程数根据任务不断增长 急救线程池默认会在60秒后关闭
ExecutorService executorService = Executors.newCachedThreadPool();
synchronizeQueue
synchronizeQueue队列当添加数据时 没有另外线程调用时(take)
put线程将会被阻塞 直至被take(一手交钱一手交货)
单线程池
相比自己new的线程 在抛出异常后线程不会结束 与固定线程池相比 不能在对象创建后更换线程数
ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
线程池方法
带有返回值的线程(FutureTask)
executorService.submit();
批量运行带有返回值线程
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
带有超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
批量运行带有返回值线程-返回先运行完的线程
未运行完的则取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
关闭线程池
直至所有任务运行完结束
不再接收新的任务
已提交的任务会执行完
此方法不会阻塞调用的线程
void shutdown();
立即结束当前线程池
尝试终止正在执行的任务(如果没有sleep wait等会直至执行结束)
返回所有未执行的任务
此方法不会阻塞调用的线程
List<Runnable> shutdownNow();
阻塞线程直至线程池关闭
超时则返回false
boolean awaitTermination(long timeout, TimeUnit unit)
线程池大小
CPU密集型运算
通常采用cpu核数+1 能够实现最优的CPU利用率 +1 保证当前线程由于页缺失故障(操作系统) 或其他原因暂停
I/O密集型运行
线程写 = 核数 希望CPU利用率 总时间(CPU等待时间+CPU计算时间) / CPU计算时间
定时任务
延时任务Timer
jdk1.5版本后加入Timer 但是它的缺点所有任务都是在同一个线程运行(串行)
当有一个任务发生异常 后续的任务都将无法执行
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
log.debug("1");
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
log.debug("2");
}
};
timer.schedule(task1,1000);
timer.schedule(task2,2000);
相反ScheduledThreadPool则没有此类问题
//线程数
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
//参数:Runnable/Callable 延时时间 时间单位
scheduledExecutorService.schedule(()->{
log.debug("1");
},1,TimeUnit.SECONDS);
scheduledExecutorService.schedule(()->{
log.debug("2");
},1,TimeUnit.SECONDS);
scheduledExecutorService.shutdown();
定时任务
scheduleAtFixedRate 在任务进行时进行定时计算
//线程数
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
//定时任务
//参数:Runnable/Callable 初始延时 定时延时 时间单位
scheduledExecutorService.scheduleAtFixedRate(()->{
log.debug("1");
},1,1,TimeUnit.SECONDS);
scheduleWithFixedDelay 在任务进行完进行定时计算
//定时任务
//参数:Runnable/Callable 初始延时 定时延时 时间单位
scheduledExecutorService.scheduleWithFixedDelay(()->{
log.debug("1");
},1,1,TimeUnit.SECONDS);
定时时间
每周四18点执行一次
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
LocalDateTime now = LocalDateTime.now();
//设置时间
LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
//如果现在时间在指定日期之后则添加七天
if (now.isAfter(time)){
time = time.plusDays(7);
}
//获取两个时间之间的差值
long initailDelay = Duration.between(now,time).toMinutes();
//每七天执行一次
long period = 1000 * 60 * 60 * 24 * 7;
scheduledExecutorService.scheduleWithFixedDelay(()->{
log.debug("1");
},initailDelay,period,TimeUnit.MILLISECONDS);
拆分递归线程池
Stream流的并行操作parallel 基于ForkJoinPool
ForkJoinPool
public static void main(String[] args) {
//默认根据CPU核心数创建线程
ForkJoinPool forkJoinPool = new ForkJoinPool();
Integer invoke = forkJoinPool.invoke(new MyTask(5));
System.out.println(invoke);
}
//实现RecursiveTask抽象类方法
static class MyTask extends RecursiveTask<Integer>{
private int n;
public MyTask(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n == 1){
return n;
}
MyTask myTask = new MyTask(n-1);
myTask.fork();
return myTask.join() + n;
}
}
AQS自定义锁
public class TestLock implements Lock {
class TestSyn extends AbstractQueuedSynchronizer{
//尝试加锁 arg为重入锁参数
@Override
protected boolean tryAcquire(int arg) {
boolean b = compareAndSetState(0, 1);
if (b){
setExclusiveOwnerThread(Thread.currentThread());
}
return b;
}
//尝试解锁
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//是否为锁的持有者
@Override
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
//条件变量
public Condition newCondition(){
return new ConditionObject();
}
}
private final TestSyn testSyn = new TestSyn();
@Override
public void lock() {
testSyn.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
testSyn.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return testSyn.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(time);
return testSyn.tryAcquireNanos(1, nanos);
}
@Override
public void unlock() {
testSyn.release(0);
}
@Override
public Condition newCondition() {
return testSyn.newCondition();
}
}
ReentrantReadWriteLock读写锁
读操作可并发 写操作互斥
创建对象
private static final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
//读锁
private static final ReentrantReadWriteLock.ReadLock r = rw.readLock();
//写锁
private static final ReentrantReadWriteLock.WriteLock w = rw.writeLock();
在获取读锁的情况下不可以再获取写锁 会导致死锁(永久等待)
但是可以在写锁的情况下获取读锁
读写锁
public static Object read(){
r.lock();
try {
try {
log.debug("读");
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return data;
} finally {
r.unlock();
}
}
public static void write(){
w.lock();
try {
try {
log.debug("写");
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} finally {
w.unlock();
}
}
高性能读写锁StampedLock
缺点:不支持条件变量、锁重入
先乐观读 当乐观读失败后升级锁
StampedLock stampedLock = new StampedLock();
//获取戳
long stamp = stampedLock.tryOptimisticRead();
//数据没有被修改
if (stampedLock.validate(stamp)){
//返回数据
return;
}
//否则升级锁
try {
//获取戳
stamp = stampedLock.readLock();
return;
} finally {
//通过戳解锁
stampedLock.unlockRead(stamp);
}
共享资源线程限制
对同一资源同时线程访问量的限制
//限制量,是否为公平锁
Semaphore semaphore = new Semaphore(3,false);
for (int i = 0; i < 5; i++) {
try {
//获取信号量
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
//释放信号量
semaphore.release();
}
}
CountDownLatch线程同步协作
等待所有线程完成倒计时
相比join 是用于等待至线程结束 而线程池无法使用此方法
CountDownLatch不可重用 每次使用都需要重新创建构造对象(可以使用CyclicBarrier)
//计数
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
//countDown()用来让计数减一
new Thread(countDownLatch::countDown).start();
}
//等待至计数归零
countDownLatch.await();
CyclicBarrier
//计数器,当计数量为0时执行Runnable接口方法
CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{
log.debug("end");
});
try {
//计数量减一 并阻塞 当计数量0时释放(如果计数量为0再次调用时计数量会回复默认值)
cyclicBarrier.await();
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
线程安全集合类
Blocking 大部分实现基于悲观锁,并提供用来阻塞的方法
CopyOnWrite 在写时复制 类似于String创建新的对象并将值传递(读取遍历时弱一致性)
Concurrent 基于乐观锁
内部使用很多cas优化,可以提供较高的吞吐量
缺点:弱一致性
遍历时弱一致性 当迭代器遍历时,如果容器发生修改,迭代器仍然可以继续遍历,这时内容是旧的
求大小弱一致性,size的操作未必是准确的
读取弱一致性
遍历时发生修改,对非安全容器使用fail-fast机制也就是让遍历立刻失败,抛出异常
数据库的MVCC都是一致性的表现
并发高和一致性是矛盾的,需要权衡
Concurrent
ConcurrentMap<String, LongAdder> concurrentMap = new ConcurrentHashMap<>();
//computeIfAbsent方法的返回值是key对应的value,如果key不存在,则返回mappingFunction生成的新值。
LongAdder longAdder = concurrentMap.computeIfAbsent("123", value -> new LongAdder());
longAdder.increment();
线程安全类
String
Integer
StringBuffer
Random
Vector
Hashtable
FastDateFormat
Random
JUC