线程池

1. 线程池

线程池的工作就是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

线程池的主要特点:线程复用;控制并发;线程管理

优势有以下几点:

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以无需等待线程创建就能立即执行。
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源。还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

1.1 架构图

Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类。

线程池

1.2 线程池三大方法

1.2.1 固定线程池

通过Executors.newFixedThreadPool(int)创建一个指定线程数量的线程池,执行长期任务性能好。

  • 代码示例:
// 一池子5个工作线程,类似一个银行有5个受理窗口
ExecutorService threadPool = Executors.newFixedThreadPool(5);
try{
    // 模拟有10个顾客来银行办理业务,目前池子里有5个工作人员提供服务
    for (int i = 1; i <= 10; i++) {
        int finalI = i;
        threadPool.execute(()-> System.out.println(Thread.currentThread().getName()+"\t 办理业务"+ finalI));
    }            
}catch(Exception e){
    e.printStackTrace();
}finally{
    threadPool.shutdown();
}
  • 底层代码
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

newFixedThreadPool创建的线程池corePoolSizemaximumPoolSize值是相等的,它使用的是LinkedBlockingQueue

1.2.2 单一线程池

通过Executors.newSingleThreadExecutor()来创建一个单一线程的线程池,一个任务一个任务的执行。

  • 代码示例:
// 一池1个工作线程,类似一个银行有1个受理窗口
ExecutorService threadPool = Executors.newSingleThreadExecutor();
try{
    // 10个顾客来银行,只有一个窗口开放
    for (int i = 1; i <= 10; i++) {
        int finalI = i;
        threadPool.execute(()-> System.out.println(Thread.currentThread().getName()+"\t 办理业务"+ finalI));
    }            
}catch(Exception e){
    e.printStackTrace();
}finally{
    threadPool.shutdown();
}
  • 底层代码
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

newSingleThreadExecutor创建的线程池corePoolSizemaximumPoolSize值都是1,它使用的也是LinkedBlockingQueue

1.2.3 自动扩容线程池

通过Executors.newCachedThreadPool();创建一个可自动扩容的线程池,适合执行很多短期异步任务,线程池会根据需求创建新线程。

  • 代码示例
// 该线程池会根据线程执行时间,自动扩容
ExecutorService threadPool = Executors.newCachedThreadPool();
try{
    // 模拟有10个顾客来银行办理业务
    for (int i = 1; i <= 10; i++) {
        TimeUnit.SECONDS.sleep(1);
        int finalI = i;
        threadPool.execute(()-> System.out.println(Thread.currentThread().getName()+"\t 办理业务"+ finalI));
    }            
}catch(Exception e){
    e.printStackTrace();
}finally{
    threadPool.shutdown();
}
  • 底层代码
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

newCachedThreadPool创建的线程池将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,它使用的是SynchronousQueue;当来任务时就创建线程运行,当线程空闲超过60秒,就销毁线程。

1.3 线程池七大参数

线程池全部共有7个参数,下面将会详解这些参数

  • 源码如下:
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

1.3.1 corePoolSize

表示线程池中的常驻核心线程数

1.3.1 maximumPoolSize

表示线程池中能够容纳同时执行的最大线程数,此值必须大于等于1

1.3.1 keepAliveTime

多余的空闲线程的存活时间;就是当线程池中线程数量超过常驻核心线程数时,同时这些线程处于空闲状态,当空闲时间达到keepAliveTime时,多余的线程就会被销毁直到剩下常驻核心线程数量。

1.3.1 unit

keepAliveTime的时间单位

1.3.1 workQueue

任务队列,用于存放已提交但尚未被执行的任务。

1.3.1 threadFactory

用于生产创建线程池中工作线程的线程工厂,一般默认即可。

1.3.1 handler

一个拒绝策略,表示当任务队列满了,并且工作线程已经大于等于线程池中最大线程数时,应该如果来拒绝后续请求执行的runnable的策略。

1.4 线程池底层工作原理

  • 线程池工作流程图如下:

线程池工作原理

1.5 线程池使用细节

  • 一般生产环境中都是自定义线程池,禁止使用Executors来创建线程池!

from 阿里Java开发手册

  • CPU密集型线程任务,最大线程数maximumPoolSize一般定义为CPU核心数+1,Java中代码获取CPU核心数:Runtime.getRuntime().availableProcessors()
  • 自定义线程池代码示例
ExecutorService threadPool = new ThreadPoolExecutor(
    2,
    Runtime.getRuntime().availableProcessors(),
    2,TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>(3),
    new ThreadPoolExecutor.AbortPolicy());

1.6 线程池的拒绝策略

自定义线程池时,可以选择使用ThreadPoolExecutor内置的拒绝策略,有如下四种:

  1. AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
  2. CallerRunsPolicy:“调用者运行”的一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
  3. DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。
  4. DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。

以上内置拒绝策略均实现了RejectedExecutionHandle接口

2. Stream流式计算

Stream流式数据渠道,用于操作数据源(集合、数组等)所生成的元素序列。

集合讲的是数据,流讲的是计算!

Stream流具有以下特点:

  1. 不会存储元素;
  2. 不会改变源对象,相反会返回一个持有结果的新Stream;
  3. 延迟执行,意味着会等到需要结果时才执行。

学习Stream流之前先复习下函数式接口。

2.1 四大函数式接口

函数式接口 参数类型 返回类型 用途
Function<T,R>函数型接口 T R 对类型为T的对象应用操作,返回结果为R类型的对象;包含方法:R apply(T t);
Predicate<T>断定型接口 T boolean 确定类型为T的对象是否满足某约束,并返回布尔值;包含方法:boolean test(T t);
Consumer<T>消费型接口 T void 对类型为T的对象应用操作,包含方法:void accept(T t);
Supplier<T>供给型接口 T 返回类型为T的对象,包含方法:T get();
  1. 函数型接口,操作T,返回R,方法apply
// 匿名内部类写法
Function<String,Integer> function = new Function<String, Integer>() {
    @Override
    public Integer apply(String s) {
        return s.length();
    }
};
// lambda写法
Function<String,Integer> function = s -> {return s.length();};
System.out.println(function.apply("hello"));
  1. 断定型接口,操作T,返回布尔,方法test
Predicate<String> predicate = new Predicate<String>() {
    @Override
    public boolean test(String s) {
        return s.isEmpty();
    }
};
Predicate<String> predicate = s -> { return s.isEmpty(); };
System.out.println(predicate.test("hello"));
  1. 消费型接口,操作S,无返回,方法accept
Consumer<String> consumer = new Consumer<String>() {
    @Override
    public void accept(String s) {
        System.out.println(s);
    }
};
Consumer<String> consumer = s -> { System.out.println(s); };
consumer.accept("hello");
  1. 供给型接口,无输入,返回T,方法get
Supplier<String> supplier = new Supplier<String>() {
    @Override
    public String get() {
        return "hello";
    }
};
Supplier<String> supplier = () -> {return "hello";};
System.out.println(supplier.get());

2.2 Stream流

使用流的基本步骤如下:

  1. 创建一个Stream:一个数据源,可以是数组、集合;
  2. 中间操作:一个中间操作,处理数据源数据;
  3. 终止操作:一个终止操作,执行中间操作链,产生结果。

练习题:请按照给出数据,找出同时满足以下全部条件的用户:偶数ID且年龄大于24且用户名转为大写且用户名字母倒序,最后只输出一个用户名。

public class StreamDemo {
    public static void main(String[] args) {
        User u1 = new User(11,"a",23);
        User u2 = new User(12,"b",24);
        User u3 = new User(13,"c",22);
        User u4 = new User(14,"d",28);
        User u5 = new User(16,"e",26);
        List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
        Predicate<User> judgment = user -> { return ((user.getId())%2==0 && user.getAge() > 24); };

        list.stream()
            .filter(user -> { return (user.getId() % 2 == 0 && user.getAge() > 24);})
            .map(user -> {return user.getName().toUpperCase();})
            .sorted((o1,o2)->{return o2.compareTo(o1);})
            .limit(1)
            .forEach(System.out::println);
    }
}

3. 分支合并框架

分支合并框架

分支合并的原理在于Fork将一个复杂任务进行拆分,拆分成多个小任务,Join将拆分任务的结果进行合并。

代码示例如下:

// 这是一个递归任务,继承后可以实现递归(自己调自己)调用的任务
class MyTask extends RecursiveTask<Integer>{
    private static final Integer ADJUST_VALUE = 10;
    private int begin;
    private int end;
    private int result;

    public MyTask(int begin,int end){
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        // 拆分为10个一组
        if ((end-begin) <= ADJUST_VALUE){
            for (int i = begin; i <= end; i++) {
                result = result + i;
            }
        }else {
            // 获取中间值
            int middle = (end + begin) / 2;
            // 新建两个任务,分配任务需求,一人处理一半
            MyTask task01 = new MyTask(begin,middle);
            MyTask task02 = new MyTask(middle + 1,end);
            // 继续分支
            task01.fork();
            task02.fork();
            // 合并拆分任务的结果值返回
            result = task01.join() + task02.join();
        }

        return result;
    }
}

/**
 * @Author: zero <[email] 1490829140@qq.com>
 * @Date: Create in 2020/5/15 9:42
 * @Description: 分支合并框架
 *
 *  ForkJoinPool
 *  ForkJoinTask
 *  RecursiveTask
 */

public class ForkJoinDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建任务
        MyTask myTask = new MyTask(0,100);
        // 创建连接池
        ForkJoinPool threadPool = new ForkJoinPool();
        // 提交任务
        ForkJoinTask<Integer> forkJoinTask = threadPool.submit(myTask);
        // 汇总结果并输出
        System.out.println(forkJoinTask.get());
        // 关闭连接池
        threadPool.shutdown();
    }
}

4. 异步回调

代码示例:

public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 异步调用,无返回值
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName() + "\t无返回"));
        completableFuture.get();
        //异步回调
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"\t completableFuture2");
            int i = 10/0;
            return 1024;
        });

        Integer result = completableFuture2.whenComplete((t, u) -> {
            System.out.println(">>>>t: " + t); // 正常执行
            System.out.println(">>>>u: " + u); // 异常执行
        }).exceptionally(f -> { // 如果出现异常会执行的函数
            System.out.println(">>>>exception:" + f.getMessage());
            return 444;
        }).get();
        System.out.println(result);
    }
}

相关文章推荐:

https://www.jianshu.com/p/b3c4dd85901e

https://www.jianshu.com/p/11327ad1d645

JUC相关代码见Github:JUC-Demos


  转载请注明: Zero的博客 线程池

 上一篇
NoSQL概述 NoSQL概述
1. NoSQL入门概述1.1 NoSQL数据库的发展历程互联网时代背景下,也并非直接出现NoSQL数据库的,而是经过了一些演变过程,随着数据量的不断增大,诞生了许多的技术。 1.1.1 单机MySQL的时代90年代,大部分网站访问量都不大
2020-06-14
下一篇 
JUC JUC
1. JUCJUC是java.util.concurrent工具包的简称,JDK1.5开始出现的,这是一个处理线程的工具包,在此包中有很多在并发编程中很常用的工具类。 主要由三大包构成: java.util.concurrent:并发包
2020-05-11
  目录