1. 线程池
线程池的工作就是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
线程池的主要特点:线程复用;控制并发;线程管理
优势有以下几点:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以无需等待线程创建就能立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源。还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
1.1 架构图
Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类。
1.2 线程池三大方法
1.2.1 固定线程池
通过Executors.newFixedThreadPool(int)
创建一个指定线程数量的线程池,执行长期任务性能好。
- 代码示例:
// 一池子5个工作线程,类似一个银行有5个受理窗口
ExecutorService threadPool = Executors.newFixedThreadPool(5);
try{
// 模拟有10个顾客来银行办理业务,目前池子里有5个工作人员提供服务
for (int i = 1; i <= 10; i++) {
int finalI = i;
threadPool.execute(()-> System.out.println(Thread.currentThread().getName()+"\t 办理业务"+ finalI));
}
}catch(Exception e){
e.printStackTrace();
}finally{
threadPool.shutdown();
}
- 底层代码
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newFixedThreadPool
创建的线程池corePoolSize
和maximumPoolSize
值是相等的,它使用的是LinkedBlockingQueue
。
1.2.2 单一线程池
通过Executors.newSingleThreadExecutor()
来创建一个单一线程的线程池,一个任务一个任务的执行。
- 代码示例:
// 一池1个工作线程,类似一个银行有1个受理窗口
ExecutorService threadPool = Executors.newSingleThreadExecutor();
try{
// 10个顾客来银行,只有一个窗口开放
for (int i = 1; i <= 10; i++) {
int finalI = i;
threadPool.execute(()-> System.out.println(Thread.currentThread().getName()+"\t 办理业务"+ finalI));
}
}catch(Exception e){
e.printStackTrace();
}finally{
threadPool.shutdown();
}
- 底层代码
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newSingleThreadExecutor
创建的线程池corePoolSize
和maximumPoolSize
值都是1,它使用的也是LinkedBlockingQueue
1.2.3 自动扩容线程池
通过Executors.newCachedThreadPool();
创建一个可自动扩容的线程池,适合执行很多短期异步任务,线程池会根据需求创建新线程。
- 代码示例
// 该线程池会根据线程执行时间,自动扩容
ExecutorService threadPool = Executors.newCachedThreadPool();
try{
// 模拟有10个顾客来银行办理业务
for (int i = 1; i <= 10; i++) {
TimeUnit.SECONDS.sleep(1);
int finalI = i;
threadPool.execute(()-> System.out.println(Thread.currentThread().getName()+"\t 办理业务"+ finalI));
}
}catch(Exception e){
e.printStackTrace();
}finally{
threadPool.shutdown();
}
- 底层代码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newCachedThreadPool
创建的线程池将corePoolSize
设置为0,将maximumPoolSize
设置为Integer.MAX_VALUE
,它使用的是SynchronousQueue
;当来任务时就创建线程运行,当线程空闲超过60秒,就销毁线程。
1.3 线程池七大参数
线程池全部共有7个参数,下面将会详解这些参数
- 源码如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
1.3.1 corePoolSize
表示线程池中的常驻核心线程数
1.3.1 maximumPoolSize
表示线程池中能够容纳同时执行的最大线程数,此值必须大于等于1
1.3.1 keepAliveTime
多余的空闲线程的存活时间;就是当线程池中线程数量超过常驻核心线程数时,同时这些线程处于空闲状态,当空闲时间达到keepAliveTime时,多余的线程就会被销毁直到剩下常驻核心线程数量。
1.3.1 unit
keepAliveTime的时间单位。
1.3.1 workQueue
任务队列,用于存放已提交但尚未被执行的任务。
1.3.1 threadFactory
用于生产创建线程池中工作线程的线程工厂,一般默认即可。
1.3.1 handler
一个拒绝策略,表示当任务队列满了,并且工作线程已经大于等于线程池中最大线程数时,应该如果来拒绝后续请求执行的runnable的策略。
1.4 线程池底层工作原理
- 线程池工作流程图如下:
1.5 线程池使用细节
- 一般生产环境中都是自定义线程池,禁止使用
Executors
来创建线程池!
- CPU密集型线程任务,最大线程数
maximumPoolSize
一般定义为CPU核心数+1,Java中代码获取CPU核心数:Runtime.getRuntime().availableProcessors()
- 自定义线程池代码示例
ExecutorService threadPool = new ThreadPoolExecutor(
2,
Runtime.getRuntime().availableProcessors(),
2,TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3),
new ThreadPoolExecutor.AbortPolicy());
1.6 线程池的拒绝策略
自定义线程池时,可以选择使用ThreadPoolExecutor
内置的拒绝策略,有如下四种:
- AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
- CallerRunsPolicy:“调用者运行”的一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。
- DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。
以上内置拒绝策略均实现了
RejectedExecutionHandle
接口
2. Stream流式计算
Stream流式数据渠道,用于操作数据源(集合、数组等)所生成的元素序列。
集合讲的是数据,流讲的是计算!
Stream流具有以下特点:
- 不会存储元素;
- 不会改变源对象,相反会返回一个持有结果的新Stream;
- 延迟执行,意味着会等到需要结果时才执行。
学习Stream流之前先复习下函数式接口。
2.1 四大函数式接口
函数式接口 | 参数类型 | 返回类型 | 用途 |
---|---|---|---|
Function<T,R> 函数型接口 |
T | R | 对类型为T的对象应用操作,返回结果为R类型的对象;包含方法:R apply(T t); |
Predicate<T> 断定型接口 |
T | boolean | 确定类型为T的对象是否满足某约束,并返回布尔值;包含方法:boolean test(T t); |
Consumer<T> 消费型接口 |
T | void | 对类型为T的对象应用操作,包含方法:void accept(T t); |
Supplier<T> 供给型接口 |
无 | T | 返回类型为T的对象,包含方法:T get(); |
- 函数型接口,操作T,返回R,方法apply
// 匿名内部类写法
Function<String,Integer> function = new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return s.length();
}
};
// lambda写法
Function<String,Integer> function = s -> {return s.length();};
System.out.println(function.apply("hello"));
- 断定型接口,操作T,返回布尔,方法test
Predicate<String> predicate = new Predicate<String>() {
@Override
public boolean test(String s) {
return s.isEmpty();
}
};
Predicate<String> predicate = s -> { return s.isEmpty(); };
System.out.println(predicate.test("hello"));
- 消费型接口,操作S,无返回,方法accept
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
Consumer<String> consumer = s -> { System.out.println(s); };
consumer.accept("hello");
- 供给型接口,无输入,返回T,方法get
Supplier<String> supplier = new Supplier<String>() {
@Override
public String get() {
return "hello";
}
};
Supplier<String> supplier = () -> {return "hello";};
System.out.println(supplier.get());
2.2 Stream流
使用流的基本步骤如下:
- 创建一个Stream:一个数据源,可以是数组、集合;
- 中间操作:一个中间操作,处理数据源数据;
- 终止操作:一个终止操作,执行中间操作链,产生结果。
练习题:请按照给出数据,找出同时满足以下全部条件的用户:偶数ID且年龄大于24且用户名转为大写且用户名字母倒序,最后只输出一个用户名。
public class StreamDemo {
public static void main(String[] args) {
User u1 = new User(11,"a",23);
User u2 = new User(12,"b",24);
User u3 = new User(13,"c",22);
User u4 = new User(14,"d",28);
User u5 = new User(16,"e",26);
List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
Predicate<User> judgment = user -> { return ((user.getId())%2==0 && user.getAge() > 24); };
list.stream()
.filter(user -> { return (user.getId() % 2 == 0 && user.getAge() > 24);})
.map(user -> {return user.getName().toUpperCase();})
.sorted((o1,o2)->{return o2.compareTo(o1);})
.limit(1)
.forEach(System.out::println);
}
}
3. 分支合并框架
分支合并的原理在于Fork将一个复杂任务进行拆分,拆分成多个小任务,Join将拆分任务的结果进行合并。
代码示例如下:
// 这是一个递归任务,继承后可以实现递归(自己调自己)调用的任务
class MyTask extends RecursiveTask<Integer>{
private static final Integer ADJUST_VALUE = 10;
private int begin;
private int end;
private int result;
public MyTask(int begin,int end){
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
// 拆分为10个一组
if ((end-begin) <= ADJUST_VALUE){
for (int i = begin; i <= end; i++) {
result = result + i;
}
}else {
// 获取中间值
int middle = (end + begin) / 2;
// 新建两个任务,分配任务需求,一人处理一半
MyTask task01 = new MyTask(begin,middle);
MyTask task02 = new MyTask(middle + 1,end);
// 继续分支
task01.fork();
task02.fork();
// 合并拆分任务的结果值返回
result = task01.join() + task02.join();
}
return result;
}
}
/**
* @Author: zero <[email] 1490829140@qq.com>
* @Date: Create in 2020/5/15 9:42
* @Description: 分支合并框架
*
* ForkJoinPool
* ForkJoinTask
* RecursiveTask
*/
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建任务
MyTask myTask = new MyTask(0,100);
// 创建连接池
ForkJoinPool threadPool = new ForkJoinPool();
// 提交任务
ForkJoinTask<Integer> forkJoinTask = threadPool.submit(myTask);
// 汇总结果并输出
System.out.println(forkJoinTask.get());
// 关闭连接池
threadPool.shutdown();
}
}
4. 异步回调
代码示例:
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 异步调用,无返回值
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName() + "\t无返回"));
completableFuture.get();
//异步回调
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+"\t completableFuture2");
int i = 10/0;
return 1024;
});
Integer result = completableFuture2.whenComplete((t, u) -> {
System.out.println(">>>>t: " + t); // 正常执行
System.out.println(">>>>u: " + u); // 异常执行
}).exceptionally(f -> { // 如果出现异常会执行的函数
System.out.println(">>>>exception:" + f.getMessage());
return 444;
}).get();
System.out.println(result);
}
}
相关文章推荐:
https://www.jianshu.com/p/b3c4dd85901e
https://www.jianshu.com/p/11327ad1d645
JUC相关代码见Github:JUC-Demos