JUC

JUC

new CountDownLatch(1).await();

threadLocal

CompletableFuture

  1. 特点
    1. 可编排式异步工具
  2. 例子
    ConcurrentHashMap<String, CompletableFuture<String>> cacheMap = new ConcurrentHashMap();
    public void asyncQueryRpcFactors() {
        List<String> factors = Arrays.asList("factor1", "factor2", "factor1", "factor3");
        factors.forEach(factor -> {
            // 复用future,只有第一次会发起实际调用
            CompletableFuture<String> future = cacheMap.computeIfAbsent(
                    factor,
                    k -> CompletableFuture.supplyAsync(() -> invokerAsync(k))
            );
            // 所有回调注册都OK
            future.thenAccept(data -> doBusiness(factor, data));
        });
        // 等待所有异步结束,仅为演示
        cacheMap.values().forEach(f -> {
            try {
                f.get(1, TimeUnit.SECONDS);
            } catch (Exception ignored) {

            }
        });
    }
  1. 解释
    1. computeIfAbsent先判断有没有这个key,没有的话就计算函数,然后把这个key加入到Map里面
    2. CompletableFuture.supplyAsync(() -> invokerAsync(k))
      1. 通过工厂方法直接生成一个

线程池

核心参数

使用ThreadPoolExecutor创建线程池的参数有什么

  1. corePoolSize核心线程数,默认不可回收,可通过allowCoreThreadTimeOut方法设置为可回收
  2. maximumPoolSize最大线程数
  3. keepAliveTime非核心线程空闲销毁时间
  4. workQueue任务队列,可以自定义,没空闲线程就将任务存在队列里面

核心线程状态

  1. 空闲时: watting
  2. 当队列中有可用任务被唤醒时: runable
  3. 被设置为可回收且回收时: terminated

线程获取任务的不同行为

  1. 设置核心线程可回收或者线程数超过了核心线程数
    1. 使用对队列使用poll(timeout,unit)方法,如果获取线程超时失败,那么线程就退出执行,进行terminated状态,并进行资源回收
  2. 核心线程不可回收线程数没有超过核心线程数
  3. 调用take()方法,并一直阻塞当前线程,即转入watting状态等待任务
  4. 总结
    1. 如果线程可回收就使用poll(timeout,unit)进行超时等待
    2. 如果不可回收就用take()阻塞等待
    3. 控制是否可回收的参数叫做timed
  5. 核心代码
// ThreadPoolExecutor
private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        // ...

        // 1、如果「设置了核心线程的存活时间」或者是「线程数量超过了核心线程数量」,则 timed 为 true。
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 2、扣减线程数量。
        // wc > maximuimPoolSize:线程池中的线程数量超过最大线程数量。其中 wc 为线程池中的线程数量。
        // timed && timeOut:timeOut 表示获取任务超时。
        // 分为两种情况:核心线程设置了存活时间 && 获取任务超时,则扣减线程数量;线程数量超过了核心线程数量 && 获取任务超时,则扣减线程数量。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 3、如果 timed 为 true,则使用 poll() 获取任务;否则,使用 take() 获取任务。
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // 4、获取任务之后返回。
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

拒绝策略

拒绝策略为当线程池处理不过来新提交的任务的时候会采取什么策略,处理不过来是指没有空闲队列来处理新提交的任务

  1. ThreadPoolExecutor.AbortPolicy
    1. 抛出一个RejectedExecutionException来拒绝执行这个任务
    2. 异常还是会抛出到调用线程
public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 1, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1),new ThreadPoolExecutor.AbortPolicy());


        Arrays.asList("t1","t2","t3","t4","t5").forEach((task)->{

            Runnable work=()->{
                System.out.printf("当前的线程是 - %s, id是 - %s,任务是 - %s \n",
                        Thread.currentThread().getName(),
                        Thread.currentThread().getId(),
                        task);
            };

            threadPoolExecutor.submit(work);
        });
        threadPoolExecutor.shutdown();
    }
}
D:\software\Java\jdk1.8.0_181\bin\java.exe -javaagent:D:\software\idea\idea2025\lib\idea_rt.jar=13076 -Dfile.encoding=UTF-8 -classpath D:\software\Java\jdk1.8.0_181\jre\lib\charsets.jar;D:\software\Java\jdk1.8.0_181\jre\lib\deploy.jar;D:\software\Java\jdk1.8.0_181\jre\lib\ext\access-bridge-64.jar;D:\software\Java\jdk1.8.0_181\jre\lib\ext\cldrdata.jar;D:\software\Java\jdk1.8.0_181\jre\lib\ext\dnsns.jar;D:\software\Java\jdk1.8.0_181\jre\lib\ext\jaccess.jar;D:\software\Java\jdk1.8.0_181\jre\lib\ext\jfxrt.jar;D:\software\Java\jdk1.8.0_181\jre\lib\ext\localedata.jar;D:\software\Java\jdk1.8.0_181\jre\lib\ext\nashorn.jar;D:\software\Java\jdk1.8.0_181\jre\lib\ext\sunec.jar;D:\software\Java\jdk1.8.0_181\jre\lib\ext\sunjce_provider.jar;D:\software\Java\jdk1.8.0_181\jre\lib\ext\sunmscapi.jar;D:\software\Java\jdk1.8.0_181\jre\lib\ext\sunpkcs11.jar;D:\software\Java\jdk1.8.0_181\jre\lib\ext\zipfs.jar;D:\software\Java\jdk1.8.0_181\jre\lib\javaws.jar;D:\software\Java\jdk1.8.0_181\jre\lib\jce.jar;D:\software\Java\jdk1.8.0_181\jre\lib\jfr.jar;D:\software\Java\jdk1.8.0_181\jre\lib\jfxswt.jar;D:\software\Java\jdk1.8.0_181\jre\lib\jsse.jar;D:\software\Java\jdk1.8.0_181\jre\lib\management-agent.jar;D:\software\Java\jdk1.8.0_181\jre\lib\plugin.jar;D:\software\Java\jdk1.8.0_181\jre\lib\resources.jar;D:\software\Java\jdk1.8.0_181\jre\lib\rt.jar;D:\project\mianshi\target\classes threadPoolRejectPolicyTry.Main
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@7ba4f24f rejected from java.util.concurrent.ThreadPoolExecutor@3b9a45b3[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at threadPoolRejectPolicyTry.Main.lambda$main$1(Main.java:28)
	at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
	at threadPoolRejectPolicyTry.Main.main(Main.java:19)
当前的线程是 - pool-1-thread-2, id是 - 21,任务是 - t3 
当前的线程是 - pool-1-thread-2, id是 - 21,任务是 - t2 
当前的线程是 - pool-1-thread-1, id是 - 20,任务是 - t1 

  1. ThreadPoolExecutor.CallerRunsPolicy
    1. 调用调用者线程来执行这个任务,会造成调用者线程的阻塞,但是会保证任务一定会被执行
  2. ThreadPoolExecutor.DiscardPolicy
    1. 直接不执行新提交的任务,并且也不会有任何反应
  3. ThreadPoolExecutor.DiscardOldestPolicy
    1. 淘汰掉任务队列中等待最久的任务(最快可以被执行的任务),然后再将这个新提交的任务入队

线程池任务处理流程

1759566872678-a564c06b-b3e0-439e-8649-401b0eb3ea99.png

  • 总结
    • 首先去使用核心线程,如果核心线程用完了,就让任务加入队列等一下,如果队列也满了,再考虑去创建一个新的非核心线程
    • 因为线程池就是解决资源创建销毁的消耗问题的,如果频繁创建线程反而违背了这个初衷
  • tip
    • 核心线程可以提前启动,通过线程池的prestartAllCoreThreads()prestartAllCoreThreads()方法

线程和线程池启动任务的不同方法

线程池:

  1. execute() 调用线程池中的线程去提交任务,然后交给线程来运行,
    1. 特点
      1. 当出现异常的时候,如果线程池中的线程没有进行异常捕获,直接在控制台打印错误,然后销毁执行任务的线程
  2. submit() 也是提交任务,返回一个Future对象,可以通过Future.get()方法来获取返回值,包括异常

线程(Thread对象):

  1. run() 相当于一个方法,直接调用这个方法相当于在本线程 本方法中再调用了一个方法,而不是交给另一个线程去异步执行
  2. start() 通知线程开始执行方法,然后通过本地方法start0调用run()方法,如果出现异常,不会管
  3. show the code
    1. 1759826012023-3d812de3-fecc-4b2a-9abd-33e142a4f730.png
    2. 1759825637926-67549671-e8bc-4e24-bda8-384aaa08dd53.png

如何给线程池命名?

核心:创建自定义的ThreadFactory类

  1. 利用外部工具
    1. Guava
      1. 代码
ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
  1. 自己继承并一个ThreadFactory类,然后在创建线程的时候手动给线程设置名字
    1. 主要是重写newThread方法
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final String name;

    /**
     * 创建一个带名字的线程池生产工厂
     */
    public NamingThreadFactory(String name) {
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }
}

ReentrantLock

如何进行加锁

  • 获取同一个创建的ReentrantLock对象,然后在调用Lock方法
public class SharedLockExample {
    public static void main(String[] args) {

        // 关键中的关键:整个程序只 new 了一次 Counter!
        Counter sharedCounter = new Counter();

        // ... 创建线程池 ...

        // 创建任务
        Runnable task = () -> {
            // 所有线程执行的任务,都是去操作那个唯一的 sharedCounter 对象
            sharedCounter.increment(); 
        };

        // 提交任务给多个线程去执行
        executor.submit(task);
        executor.submit(task);
        // ...
    }
}

Optional

这是一个

更新: 2025-10-08 12:44:19
原文: https://www.yuque.com/duifangzhengzaishuru-rqbua/axyc58/cgolul9e09co3gu0