基于哔哩哔哩视频学习笔记

JUC并发编程

并发

单个处理器同时来回进行多个操作

并行

多个处理器同时进行多个操作

栈与栈帧

每个线程启动后 虚拟机就会为其分配一块栈内存

每个栈由多个栈帧组成,对应着每次方法调用时所占用的内存

每个线程只能有一个活动栈帧,对应着当前正在 执行的那个方法

线程状态
/**
* Thread state for a thread which has not yet started.
新建
*/
NEW,

    /**
* Thread state for a runnable thread.  A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
运行中
*/
    RUNNABLE,

    /**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
无法获取锁/锁排队
*/
    BLOCKED,

    /**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
*   <li>{@link Object#wait() Object.wait} with no timeout</li>
*   <li>{@link #join() Thread.join} with no timeout</li>
*   <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called {@code Object.wait()}
* on an object is waiting for another thread to call
* {@code Object.notify()} or {@code Object.notifyAll()} on
* that object. A thread that has called {@code Thread.join()}
* is waiting for a specified thread to terminate.
无限等待
*/
    WAITING,

    /**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
*   <li>{@link #sleep Thread.sleep}</li>
*   <li>{@link Object#wait(long) Object.wait} with timeout</li>
*   <li>{@link #join(long) Thread.join} with timeout</li>
*   <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
*   <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
超时等待
*/
    TIMED_WAITING,

    /**
* Thread state for a terminated thread.
* The thread has completed execution.
结束
*/
    TERMINATED;

创建线程

通过任务创建线程

Runnable runnable = new Runnable() {
    @Override
    public void run() {
        log.info("hello");
    }
};
//线程任务,线程名称
Thread thread = new Thread(runnable,"t1");
thread.start();

通过线程创建

//线程名称
Thread thread = new Thread("t1"){
    //线程任务
    @Override
    public void run(){
        log.info("hello");
    }
};
thread.start();

FutureTask

FetureTask 接收Callable类型线程任务 用来处理有返回结果的线程

    //创建带有返回值的线程任务
   	FutureTask<Integer> task = new FutureTask<>(() -> {
   	    log.debug("running");
   	    Thread.sleep(2000);
   	    return 100;
   	});

   	Thread thread = new Thread(task, "t1");
   	thread.start();

   	//获取线程返回值 如果该线程任务未结束 会一直阻塞至线程结束获取到值或抛出异常
   	try {
   	    log.debug("{}", task.get());
   	} catch (Exception e) {
   	    throw new RuntimeException(e);
   	}

线程方法

获取当前线程

currentThread();

等待线程

//等待线程运行结束
join()
//等待线程运行结束 最多等待n毫秒
join(long n)

修改线程名

setName(String name);

获取线程名

getName();

获取线程优先级

getPriority();

设置线程优先级

java中规定线程优先级是1~10整数,较大的优先级能提高该线程被CPU调度的几率

setPriority(int);

获取线程状态

getState();

睡眠线程

停止调度该线程(Timed Waiting)

在无限循环中避免cpu占用率到达100% 可以使用yield或sleep来让出cpu的使用权给其他程序

sleep(long);

//使用TimeUnit
TimeUnit.DAYS.sleep(1);

打断标记

将打断标记设置为true

如果线程正在sleep或wait将叫醒睡眠的线程 并抛出InterruptedException异常

睡眠线程 sleep wait join 打断标记为false

interrupt();

判断打断标记

如果不是打断sleep类线程 打断标记将会设置为true

可以做来判断当前线程是否被打断

//不会清除打断标记
thread.isInterrupted();

//会清除打断标记 static方法
Thread.interrupted();

让出线程

将线程从Runing 进入 Runnable 就绪状态 , 然后调度执行其他线程

yield();

守护线程

守护线程 所有非守护线程运行结束后 即使守护线程代码没有执行完,也会强制结束

//t1 为守护线程对象
t1.setDaemon(true);

线程锁

临界区: 多个线程对一个公共资源数据进行操作的代码块

synchronized 锁住任意对象,其他线程想要对其加锁需等待(阻塞)至释放对象锁

加锁成功将执行临界区代码

synchronized(object){
    //临界区
}


//锁住对象this
public synchronized void a(){
    
}

//在静态方法中使用 锁住的为类对象
public static synchronized void a(){
    
}

public static void a(){
    synchronized(object.class){
    //临界区
	}
}

Monitor重量锁

synchronized底层原理

jvm 会为加锁的对象分配一个Monitor 由jvm对象头Mark Word指向该对象地址

当中包含 EntryList Owner WaitSet

EntryList 中存储着没有拿到锁等着排队的线程 当释放锁的时候随机从中获取线程再进行上锁操作(链表)

Owner 当前锁的拥有者

WaitSet 存储者当前wait线程,当再次被唤醒时会进入EntryList

轻量级锁

轻量级锁能够提升程序性能的依据是“对绝大部分的锁,在整个同步周期内都不存在竞争”,注意这是经验数据。需要了解的是,轻量级锁所适应的场景是线程交替执行同步块的场合,如果存在同一时间访问同一锁的场合,就会导致轻量级锁膨胀为重量级锁。

优点

竞争的线程不会阻塞,使用自选,提高程序响应速度。

缺点

如果一直不能获取到锁,长时间的自旋会造成CPU消耗。

当对象已经被轻量级锁定的时候,会判断是否是锁重入,如果是重入的话,会记录一条Displaced Mark Word为空的Lock Record。如果不是重入,会膨胀为重量级锁。需要注意的是,即使膨胀为重量级锁,没有获取到锁的线程也不会马上阻塞,而是通过适应性自旋尝试获取锁,当自旋次数达到临界值后,才会阻塞未获取到的线程。JVM认为获取到锁的线程大概率会很快的释放锁,这样做是为了尽可能的避免用户态到内核态的切换。

synchronized默认会创建一个轻量级锁在栈中建立一个Lock Record,将无锁状态的Mark Word拷贝到锁记录的Displaced Mark Word中,将owner指向当前对象。

锁膨胀

多个线程对同一个锁操作后会从默认的轻量级锁膨胀为重量级锁

即为对象申请Monitor锁,让指向栈轻量锁地址值的MarkWord更改为指向Monitor对象地址

然后进入线程EntryList 等待BLOCKED

偏向锁

(java6之后引入自动开启)

在轻量锁中 进行锁重入都会再次创建一个栈获取锁记录并尝试进行替换

而偏向锁则进行锁重入之后会从Mark word中获取线程id

(mark word中替换hashcode 存储为线程id、而hashcode则无处存储 所以调用hashcode会撤销偏向锁)

判断是否为当前线程的锁

线程锁默认为偏向锁 当其他线程在不同调用该锁时会转换为轻量锁

当临界区只有一个线程使用时着重于偏向锁

当其他线程调用偏向锁对象时 将转换为轻量锁

Jvm禁用偏向锁

Vm options

-XX: -UseBiasedLocking

偏向锁延时

偏向锁默认是延迟的,不会在线程锁启动时立即生效,如果要避免延迟

Vm options

-XX:BiasedLockingStartupDelay=0

当偏向锁对象调用了hashcode将撤销偏向锁状态 因为mark word中hashcode被替换为线程id

没有地方再进行存储hashcode

一个对象在调用原生hashCode方法后(来自Object的,未被重写过的),该对象将无法进入偏向锁状态,起步就会是轻量级锁。若hashCode方法的调用是在对象已经处于偏向锁状态时调用,它的偏向状态会被立即撤销,并且锁会升级为重量级锁。

重偏向

当撤销偏向锁阈值超过20次后,jvm会批量给对象加锁时重新偏向至加锁线程

当撤销偏向锁阈值超过40次后,jvm会批量给对象加锁时会取消偏向锁 转换为轻量锁,并且该类新建对象也不再是偏向锁

自旋优化

当对象被重量锁之后 线程在进入队列之前会尝试自旋重试拿锁 当指定次数之后没有拿到锁才会进入阻塞队列

否则直接拿到锁,不再进入阻塞队列 上下文切换取阻塞队列也会造成资源消耗,这样可以起到优化

单核CPU自旋就是浪费

wait notify

指定锁线程进入WaitSet等待 并释放对象锁

只有获得对象锁才可以使用该方法,否则抛出异常

可以用作判断其他线程某件事情是否完成 如果没有则wait 如果完成其他线程调用notify唤醒

被唤醒后将延下继续执行

//无限制等待
object.wait();

//只等待1000毫秒
object.wait(1000);

随机唤醒一个WaitSet中object对象等待的线程(如果唤醒多个object等待线程需使用 notifyAll )

object.notify()

虚假唤醒(错误唤醒)

当有多个需要唤醒的线程 想要指定唤醒某个线程时可以使用notifyAll()方法唤醒所有

在被唤醒的线程中使用while来循环判断自己是否是需要被唤醒的线程

synchronized(lock){
    while(条件不成立){
        lock.wait();
    }
    //干活
}

//另一个线程
synchronized(lock){
    lock.notifyAll();
}

//将值设置为true以达到唤醒标准
boolean hasTakeout = false;
	//如果没有被唤醒 等待下次唤醒所有再次判断
	//如果已唤醒跳过循环代码
     while (!hasTakeout){
         try {
             Main.class.wait();
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
         }
     }

阻塞当前线程(park/unpark)

wait、notify和notifyAll 必须 配合Object Monitor一起使用 (线程锁synchronized)

而park和unpark不必

如果是Monitor则不会释放锁 wait会

可以使用打断标记进行打断

如果使用isInterrupted后面的park都将失效 因为打断标记没有进行清除

需要当前线程调用interrupted清除打断标记

每个线程都关联着一个park对象 当使用park时如果permit为1将消耗permit为0 不暂停线程 当为0时暂停

使用unpark补给permit为1

interrupt()方法会设置线程的中断状态为true,并调用unpark()方法,使得线程的许可permit为1。park()方法在执行时,会先检查线程的中断状态和许可permit,如果中断状态为true或许可permit为1,就不会阻塞线程,而是直接返回。park()方法返回后,会消耗掉许可permit,但不会消耗掉中断状态。因此,如果线程被interrupt()打断后,再调用park()方法,就会因为中断状态为true而失效。

LockSupport.park();

LockSupport.unpark(Thread thread);

//等待纳秒
LockSupport.parkNanos(long nanos);

//在毫秒值时间之前停止调度
LockSupport.parkUntil(long millis);

死锁

两个线程持有两个不同的锁 想要进行运行 必须获得对方正在锁住的对象

因而导致两个或多个线程都无法进行执行

活锁

两个线程在改变对方的线程结束条件 因而导致线程一直持续运行

定位死锁

通过java命令 jps查看所有java进程

再通过jstack 进程id查看具体java进程

在Found one java -level deadlock中jvm列举了发现死锁的线程

线程饥饿

在多个线程中 某个线程获取cpu调度率极低 称之为线程饥饿

可以使用ReentrantLock的公平锁解决

ReentrantLock ReentrantLock = new ReentrantLock(true);

ReentrantLock线程锁

与Synchronized它具有

可中断 (Synchronized一旦锁住除非执行完毕 否则当调用时会一直锁住)

可设置超时时间 (一旦超过某个时间,释放锁对象)

可设置为公平锁 (公平竞争锁)

支持多个条件变量(多个WaitSet对象)

//获取锁
reentrantLock.lock();
try{
    //临界区
}finally{
    //释放锁
    reentrantLock.unlock();
}

阻塞锁打断

可被打断锁

thread.interrupt();

   ReentrantLock reentrantLock = new ReentrantLock();
        try {
            //在锁竞争阻塞时 可被interrupt打断
            reentrantLock.lockInterruptibly();
        } catch (InterruptedException e) {
            //被打断抛出的异常
            log.debug("没有获得锁");
            throw new RuntimeException(e);
        }
        try {
            log.debug("临界区");
        }finally {
            reentrantLock.unlock();
        }
	

阻塞锁超时/尝试获取锁

无参方法 尝试获取锁,已获取锁返回true 否则false

reentrantLock.tryLock()

  ReentrantLock reentrantLock = new ReentrantLock();
        reentrantLock.lock();
        new Thread(() -> {
            //无参方法 尝试获取锁,已获取锁返回true 否则false
            if (!reentrantLock.tryLock()){
                log.debug("未获取锁");
                return;
            }
            try {
                log.debug("临界区");
            }finally {
                reentrantLock.unlock();
            }
        }).start();

有参方法 在一定时间内为获取到锁 则超时结束

tryLock(long timeout, TimeUnit unit)

    ReentrantLock reentrantLock = new ReentrantLock();
        reentrantLock.lock();
        //有参方法 一定时间内尝试获取锁,已获取锁返回true 否则false
        try {
            if (!reentrantLock.tryLock(1, TimeUnit.SECONDS)){
                log.debug("未获取锁");
                return;
            }
        } catch (InterruptedException e) {
            log.debug("获取超时");
            throw new RuntimeException(e);
        }
        try {
            log.debug("临界区");
        }finally {
            reentrantLock.unlock();
        }

公平锁

在创建ReentrantLock对象时构造函数为True则为公平锁 默认为false

一般情况没有必要使用公平锁,会降低并发度

ReentrantLock ReentrantLock = new ReentrantLock(true);

条件变量(线程睡眠/WaitSet)

创建条件变量对象
ReentrantLock reentrantLock = new ReentrantLock();        
//通过reentrantLock对象锁创建条件变量(WaitSet)       
Condition condition1 = reentrantLock.newCondition(); 
Condition condition2 = reentrantLock.newCondition();

使用条件变量对象睡眠当前线程
//线程睡眠、使用前需获取ReentrantLock锁
condition1.await();

唤醒条件变量对象中的线程
condition1.signal();

特殊域变量-易变(volatile)

volatile

能保证可见性

以及保证不被JVM重排序

(该变量在写之前的代码都不会被重排序至后面,以及读之后的代码都不会重排序至前面)

不能保证原子性

可见性

这样的一段代码 本应该再一秒后停止但实际上在一直运行

因为status对象 被高速缓存至线程内存 (当线程中多次循环运行同一个对象 将被缓存至线程内存)

对主内存修改了status 但线程缓存中读取的值没有发生变化

使用 volatile 修饰符

将从主内存中获取值(当变量全部在synchronized中使用 也可以防止被重排序)

但效率上会有所损失

private volatile static boolean status = true;

 private static boolean status = true;
    public static void main(String[] args){
        new Thread(()->{
            while (status){

            }
        }).start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.debug("stop");
        status = false;
    }

保证可见性

写屏障

写屏障保证在volition共享变量之前对共享变量的改动 都不会被重排序至后面

读屏障

读屏障保证在volition共享变量之后 对共享变量的读取 都是主存最新数据

并且读之后的代码都不会排序到前面

线程安全单例模式

单例模式有很多实现方法,饿汉式、懒汉式、静态内部类、枚举类、

饿汉式:

public final class Singleton implements Serializable{
    //禁止被创建Singleton对象
	private Singleton(){}
    //饿汉式
	private static final Singleton INSTANCE = new Singleton();

    //使用方法提供对象 可以提供更多封、控制、支持泛型
    public static Singleton getInstance(){
        return INSTANCE;
    }

    //防止反序列化破坏单例模式
    public Object readResovle(){
        return INSTANCE;
    }
}

枚举单例

枚举类天生就是一个单例模式 所以只需调用INSTANCE调用对象即可

public enum Sigleton{
    INSTANCE;
}

无锁同步-原子操作(Atomic)

 //创建对象并为其赋值
    private final AtomicInteger atomicInteger = new AtomicInteger(1000);
    public void withdraw(int i){
        while (true){
            int prew = atomicInteger.get();
            int result = prew - i;
            //原始值 修改后的值 当成功后返回true 否则为false
            if(atomicInteger.compareAndSet(prew,result)){
                break;
            }
        }
    }

//使用也可以达到相同效果
atomicInteger.addAndGet(-1 * amount);

compareAndSet比较并设置值 当前值符合预期值时设置值并返回true

Java CAS是一种无锁的并发编程技术,它的全称是Compare And Swap,即比较和交换。它的基本思想是,通过比较一个变量的当前值和期望值,如果相同,就用一个新值替换它,否则就重试或者放弃。

CAS基于乐观锁思想 就算别的线程修改了值 再重试修改

无锁并发、无阻塞并发

在线程数少于CPU核心数时 使用CAS能大大提升效率

原子整数

AtomicInteger

AtomicLong

AtomicBoolean

自增并获取

 //自增并获取值 ++i       
atomicInteger.incrementAndGet();

//获取值并自增 i++
atomicInteger.getAndIncrement();

//获取值并自增指定参数
atomicInteger.getAndAdd(5);

//相反
atomicInteger.addAndGet(5);

更新并获取(对原有数据进行操作)

//IntUnaryOperator 对原有数据进行操作后获取
atomicInteger.updateAndGet(rawInteger -> rawInteger * 10);

//相反
atomicInteger.getAndUpdate(rawInteger -> rawInteger * 10);

原子引用

AtomicReference

AtomicMarkableReference

AtomicStampedReference

AtomicReference

对引用对象进行原子操作(对对象引用进行原子修改)

 final AtomicReference<String> atomicReference = new AtomicReference<>("123");
 //底层使用==进行比较 String是不可变所以每次创建都是新的对象
 atomicReference.updateAndGet(string -> string+456);

//如果不是数据安全类使用方法
  atomicReference.updateAndGet(a -> {
      Integer a1 = a.getA();
      return new A<>(a1 - 10);
  });

AtomicStampedReference

AtomicReference只能判断出共享变量的值与最初值A是否相同,当其他线程将其修改至B

再修改会A是不知情的 也会允许其修改

AtomicStampedReference可以通过修改标记感知其他线程对引用对象的修改

当其他线程动过当前引用对象,则算cas修改失败

//创建对象 0位标记(stamp)
AtomicStampedReference<String> atomicStampedReference = new AtomicStampedReference<>("a",0);

//获取引用对象
String reference = atomicStampedReference.getReference();

//获取标记
int stamp = atomicStampedReference.getStamp();

//当对象与标记都与其相同时修改 对象及标记 成功返回true/失败false
atomicStampedReference.compareAndSet(reference,reference+"b",stamp,stamp+1);

AtomicStampedReference

AtomicStampedReference只是单纯的检查在此之前版本号有没有被更改过 如果有则进行boolean记录

//创建对象和默认值
AtomicMarkableReference<String> atomicMarkableReference = new AtomicMarkableReference<>("满",true);
//获取对象引用
String reference = atomicMarkableReference.getReference();

//当为满时并且为true创建新的对象并将其替换为空 修改为false
atomicMarkableReference.compareAndSet(reference,"空",true,false);

原子数组

AtomicIntegerArray

AtomicLongArray

AtomicReferenceArray

原子字段更新器

AtomicIntegerFieldUpdater

AtomicLongFieldUpdater

AtomicReferenceFieldUpdater

原子累加器(效率更高)

LongAdder

A<Integer> a = new A<>(123);    
AtomicReferenceFieldUpdater<A, Integer>  atomicReferenceFieldUpdater
//参数引用对象类 字段类 字段名
= AtomicReferenceFieldUpdater.newUpdater(A.class,Integer.class,"a");
//修改的字段必须为private及volatile
atomicReferenceFieldUpdater.compareAndSet(a,a.getA(),1);   
System.out.println(a.getA()); 

防止CPU缓存行伪共享

jdk8中@Contended 能有效防止字段-对象伪共享(对其进行缓存行填充)

线程池

构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

corePoolSize 核心线程数(最多保留线程数)

maximumPoolSize 最大线程数 在任务队列已满情况下才会创建 - 救济线程(maximumPoolSize-corePoolSize)

keepAliveTime 生存时间 - 针对救急线程

workQueue 阻塞队列

threadFactory 线程工厂 - 可以为线程创建起名

handler 拒绝策略

JDK拒绝策略RejectedExecutionHandler

AbortPolicy 让调用者抛出RejectedExceutionException异常(默认)

CallerRunsPolicy 让调用者运行任务

DiscardPolicy 放弃本次任务

DiscardOldestPolicy 放弃列表中最早的任务,本任务取而代之

固定线程池

创建固定线程池 无急救线程

 //固定线程  无限任务队列   无超时时间 
//形参:线程池固定线程数
ExecutorService executors = Executors.newFixedThreadPool(1);


 AtomicInteger integer = new AtomicInteger(1);
//int nThreads, ThreadFactory threadFactory	
 ExecutorService executors = Executors.newFixedThreadPool(1,(runnable)->{
            return new Thread(runnable,"自定义线程名 - 线程:"+integer.getAndIncrement());

急救线程池(缓存线程池)

创建0核心线程 所有的线程都是急救线程(最大线程数为Integer.MAX_VALUE) 使用synchronizeQueue队列

线程数根据任务不断增长 急救线程池默认会在60秒后关闭

ExecutorService executorService = Executors.newCachedThreadPool();
synchronizeQueue

synchronizeQueue队列当添加数据时 没有另外线程调用时(take)

put线程将会被阻塞 直至被take(一手交钱一手交货)

单线程池

相比自己new的线程 在抛出异常后线程不会结束 与固定线程池相比 不能在对象创建后更换线程数

ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

线程池方法

带有返回值的线程(FutureTask)
executorService.submit();

批量运行带有返回值线程
 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

带有超时时间

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)

批量运行带有返回值线程-返回先运行完的线程

未运行完的则取消

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

带超时时间

 <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)

关闭线程池

直至所有任务运行完结束

不再接收新的任务

已提交的任务会执行完

此方法不会阻塞调用的线程

void shutdown();

立即结束当前线程池

尝试终止正在执行的任务(如果没有sleep wait等会直至执行结束)

返回所有未执行的任务

此方法不会阻塞调用的线程

List<Runnable> shutdownNow();

阻塞线程直至线程池关闭

超时则返回false

boolean awaitTermination(long timeout, TimeUnit unit)

线程池大小

CPU密集型运算

通常采用cpu核数+1 能够实现最优的CPU利用率 +1 保证当前线程由于页缺失故障(操作系统) 或其他原因暂停

I/O密集型运行

线程写 = 核数 希望CPU利用率 总时间(CPU等待时间+CPU计算时间) / CPU计算时间

定时任务

延时任务Timer

jdk1.5版本后加入Timer 但是它的缺点所有任务都是在同一个线程运行(串行)

当有一个任务发生异常 后续的任务都将无法执行

Timer timer = new Timer();
        TimerTask task1 = new TimerTask() {
            @Override
            public void run() {
                log.debug("1");
            }
        };

        TimerTask task2 = new TimerTask() {
            @Override
            public void run() {
                log.debug("2");
            }
        };
        timer.schedule(task1,1000);
        timer.schedule(task2,2000);

相反ScheduledThreadPool则没有此类问题

//线程数
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
	//参数:Runnable/Callable	延时时间		时间单位
     scheduledExecutorService.schedule(()->{
         log.debug("1");
     },1,TimeUnit.SECONDS);

     scheduledExecutorService.schedule(()->{
         log.debug("2");
     },1,TimeUnit.SECONDS);

     scheduledExecutorService.shutdown();

定时任务

scheduleAtFixedRate 在任务进行时进行定时计算

//线程数
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
    //定时任务
    //参数:Runnable/Callable 初始延时  定时延时  时间单位
    scheduledExecutorService.scheduleAtFixedRate(()->{
        log.debug("1");
    },1,1,TimeUnit.SECONDS);

scheduleWithFixedDelay 在任务进行完进行定时计算

//定时任务
//参数:Runnable/Callable 初始延时  定时延时  时间单位
scheduledExecutorService.scheduleWithFixedDelay(()->{
    log.debug("1");
},1,1,TimeUnit.SECONDS);

定时时间

每周四18点执行一次

  ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        LocalDateTime now = LocalDateTime.now();
        //设置时间
        LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
        //如果现在时间在指定日期之后则添加七天
        if (now.isAfter(time)){
             time = time.plusDays(7);
        }
        //获取两个时间之间的差值
        long initailDelay = Duration.between(now,time).toMinutes();
        //每七天执行一次
        long period = 1000 * 60 * 60 * 24 * 7;
        scheduledExecutorService.scheduleWithFixedDelay(()->{
            log.debug("1");
        },initailDelay,period,TimeUnit.MILLISECONDS);

拆分递归线程池

Stream流的并行操作parallel 基于ForkJoinPool

ForkJoinPool

public static void main(String[] args) {
        //默认根据CPU核心数创建线程
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        Integer invoke = forkJoinPool.invoke(new MyTask(5));
        System.out.println(invoke);
    }

	//实现RecursiveTask抽象类方法
    static class MyTask extends RecursiveTask<Integer>{
        private int n;

        public MyTask(int n) {
            this.n = n;
        }

        @Override
        protected Integer compute() {
            if (n == 1){
                return n;
            }
            MyTask myTask = new MyTask(n-1);
            myTask.fork();
            return myTask.join() + n;
        }
    }

AQS自定义锁

public class TestLock implements Lock {

    class TestSyn extends AbstractQueuedSynchronizer{

        //尝试加锁 arg为重入锁参数
        @Override
        protected boolean tryAcquire(int arg) {
            boolean b = compareAndSetState(0, 1);
            if (b){
                setExclusiveOwnerThread(Thread.currentThread());
            }

            return b;
        }

        //尝试解锁
        @Override
        protected boolean tryRelease(int arg) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        //是否为锁的持有者
        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        //条件变量
        public Condition newCondition(){
            return new ConditionObject();
        }
    }

    private final TestSyn testSyn = new TestSyn();
    @Override
    public void lock() {
        testSyn.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        testSyn.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
      return testSyn.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(time);
        return testSyn.tryAcquireNanos(1, nanos);
    }

    @Override
    public void unlock() {
        testSyn.release(0);
    }

    @Override
    public Condition newCondition() {
        return testSyn.newCondition();
    }
}

ReentrantReadWriteLock读写锁

读操作可并发 写操作互斥

创建对象

private static final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
//读锁
private static final ReentrantReadWriteLock.ReadLock r = rw.readLock();
//写锁
private static final ReentrantReadWriteLock.WriteLock w = rw.writeLock();

在获取读锁的情况下不可以再获取写锁 会导致死锁(永久等待)

但是可以在写锁的情况下获取读锁

读写锁
public static Object read(){
        r.lock();
        try {
            try {
                log.debug("读");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return data;
        } finally {
            r.unlock();
        }
    }

 public static void write(){
        w.lock();
        try {
            try {
                log.debug("写");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        } finally {
            w.unlock();
        }
    }

高性能读写锁StampedLock

缺点:不支持条件变量、锁重入

先乐观读 当乐观读失败后升级锁

StampedLock stampedLock = new StampedLock();
   //获取戳
   long stamp = stampedLock.tryOptimisticRead();
   //数据没有被修改
   if (stampedLock.validate(stamp)){
       //返回数据
       return;
   }
   //否则升级锁
   try {
       //获取戳
       stamp = stampedLock.readLock();
       return;
   } finally {
       //通过戳解锁
       stampedLock.unlockRead(stamp);
   }

共享资源线程限制

对同一资源同时线程访问量的限制

//限制量,是否为公平锁
Semaphore semaphore = new Semaphore(3,false);
for (int i = 0; i < 5; i++) {
    try {
        //获取信号量
        semaphore.acquire();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }finally {
        //释放信号量
        semaphore.release();
    }
}

CountDownLatch线程同步协作

等待所有线程完成倒计时

相比join 是用于等待至线程结束 而线程池无法使用此方法

CountDownLatch不可重用 每次使用都需要重新创建构造对象(可以使用CyclicBarrier)

    //计数
  	CountDownLatch countDownLatch = new CountDownLatch(5);
  	for (int i = 0; i < 5; i++) {
  	    //countDown()用来让计数减一
  	    new Thread(countDownLatch::countDown).start();
  	}
  	//等待至计数归零 
  	countDownLatch.await();

CyclicBarrier

    //计数器,当计数量为0时执行Runnable接口方法
    CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{
        log.debug("end");
    });

     try {
         //计数量减一 并阻塞 当计数量0时释放(如果计数量为0再次调用时计数量会回复默认值)
         cyclicBarrier.await();
     } catch (BrokenBarrierException e) {
         throw new RuntimeException(e);
     }

线程安全集合类

Blocking 大部分实现基于悲观锁,并提供用来阻塞的方法

CopyOnWrite 在写时复制 类似于String创建新的对象并将值传递(读取遍历时弱一致性)

Concurrent 基于乐观锁

内部使用很多cas优化,可以提供较高的吞吐量

缺点:弱一致性

遍历时弱一致性 当迭代器遍历时,如果容器发生修改,迭代器仍然可以继续遍历,这时内容是旧的

求大小弱一致性,size的操作未必是准确的

读取弱一致性

遍历时发生修改,对非安全容器使用fail-fast机制也就是让遍历立刻失败,抛出异常

数据库的MVCC都是一致性的表现

并发高和一致性是矛盾的,需要权衡

Concurrent

 ConcurrentMap<String, LongAdder> concurrentMap = new ConcurrentHashMap<>();
//computeIfAbsent方法的返回值是key对应的value,如果key不存在,则返回mappingFunction生成的新值。
LongAdder longAdder = concurrentMap.computeIfAbsent("123", value -> new LongAdder());
longAdder.increment();

线程安全类

String

Integer

StringBuffer

Random

Vector

Hashtable

FastDateFormat

Random

JUC

Hi~