Files
www/blogs/java/java8/java8-3.md
zhuhjay 22e48d9558 build(www): 添加 Drone CI 流水线配置
- 新增 .drone.yml 文件用于定义 CI/CD 流程
- 配置了基于 Docker 的部署步骤
- 设置了工作区和卷映射以支持持久化数据
- 添加了构建准备阶段和 Docker 部署阶段
- 定义了环境变量和代理设置
- 配置了 artifacts 目录的处理逻辑
- 添加了 timezone 映射以确保时间同步
- 设置了 docker.sock 映射以支持 Docker in Docker
2025-11-01 13:36:00 +08:00

259 lines
10 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
title: JDK8 特性(三)
date: 2022-05-25
sidebar: 'auto'
tags:
- JDK8
categories:
- Java
---
## 1 Stream流-续
### 1.7 并行的Stream流
- 串行的Stream流
- 目前我们使用的Stream流是串行的就是在一个线程上执行。
- 并行的Stream流
- parallelStream其实就是一个并行执行的流。它通过默认的ForkJoinPool可能提高多线程任务的速度。
#### 获取并行Stream流的两种方式
1. 直接获取并行的流
```java
public class StreamParallelTest {
@Test
public void parallel(){
ArrayList<Integer> list = new ArrayList<>();
// 直接获取并行的流
Stream<Integer> stream = list.parallelStream();
}
}
```
2. 将串行流转成并行流
```java
public class StreamParallelTest {
@Test
public void serialToParallel(){
Stream.of(9, 1, 34, 5, 3, 21, 56, 9)
// 转成并行流
.parallel()
.filter(i -> {
System.out.println(Thread.currentThread() + "::" + i);
return i > 50;
})
.forEach(System.out::println);
}
}
```
#### 串行和并行效率的比较
- 使用for循环串行Stream流并行Stream流来对10亿个数字求和。看消耗的时间。
```java
public class StreamParallelTest {
private static final int TIMES = 1000000000;
long start;
@Before
public void init(){
start = System.currentTimeMillis();
}
@Test
public void useFor(){
// 消耗时间: 339ms
int sum = 0;
for (int i = 0; i < TIMES; i++) {
sum += i;
}
}
@Test
public void useSerialStream(){
// 获取足够长度的串行流进行加和
// 消耗时间: 631ms
LongStream.rangeClosed(0, TIMES)
.reduce(0, Long::sum);
}
@Test
public void useParallelStream(){
// 获取足够长度的并行流进行加和
// 消耗时间: 317ms
LongStream.rangeClosed(0, TIMES)
.parallel()
.reduce(0, Long::sum);
}
@After
public void destroy(){
System.out.println("消耗时间: " +
(System.currentTimeMillis() - start) +
"ms");
}
}
```
- 我们可以看到parallelStream的效率是最高的。
- Stream并行处理的过程会分而治之也就是将一个大任务切分成多个小任务这表示每个任务都是一个操作。
#### ParallelStream线程安全问题
```java
public class StreamParallelTest {
@Test
public void parallelStreamNotice(){
List<Integer> list = new ArrayList<>(1000);
// 并行线程不安全
// list.size() = 947
IntStream.range(0, 1000)
.parallel()
.forEach(i -> list.add(i));
System.out.println("list.size() = " + list.size());
// 解决线程安全问题方案一: 使用同步代码块
Object lock = new Object();
IntStream.range(0, 1000)
.parallel()
.forEach(i -> {
synchronized (lock){
list.add(i);
}
});
System.out.println("list.size() = " + list.size());
// 解决线程安全问题方案二: 使用线程安全的集合
// 使用 Vector<Integer> 集合
Vector<Integer> v = new Vector<>();
IntStream.range(0, 1000)
.parallel()
.forEach(i -> v.add(i));
System.out.println("v.size() = " + v.size());
// 解决线程安全问题方案二: 使用线程安全的集合
// 使用集合工具类提供的线程安全的集合
List<Integer> synchronizedList = Collections.synchronizedList(list);
IntStream.range(0, 1000)
.parallel()
.forEach(i -> synchronizedList.add(i));
System.out.println("synchronizedList.size() = " + synchronizedList.size());
// 解决线程安全问题方案三: 调用Stream流的collect/toArray
// 使用Stream流的collect
List<Integer> collect = IntStream.range(0, 1000)
.parallel()
.boxed()
.collect(Collectors.toList());
System.out.println("collect.size() = " + collect.size());
// 解决线程安全问题方案三: 调用Stream流的collect/toArray
// 使用Stream流的toArray
Integer[] integers = IntStream.range(0, 1000)
.parallel()
.boxed()
.toArray(Integer[]::new);
System.out.println("integers.length = " + integers.length);
}
}
```
### 1.8 Fork/Join框架介绍
- parallelStream使用的是Fork/Join框架。Fork/Join框架自JDK 7引入。Fork/Join框架可以将一个大任务拆分为很多小 任务来异步执行。 Fork/Join框架主要包含三个模块
1. 线程池ForkJoinPool
2. 任务对象ForkJoinTask
3. 执行任务的线程ForkJoinWorkerThread
![](/java8/ForkJoin一览.png)
#### Fork/Join原理-分治法
- ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序那么会将这个任务分割成 两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推对于500万的数据也会做出同样的分割处理到最后会设置一个阈值来规定当数据规模到多少时停止这样的分割处理。比如当元素的数量小于10时会停止分割转而使用插入排序对它们进行排序。那么到最后所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。
![](/java8/ForkJoin-分治法.png)
#### Fork/Join原理-工作窃取算法
- Fork/Join最核心的地方就是利用了现代硬件设备多核在一个操作时候会有空闲的cpu那么如何利用好这个空闲的cpu就成了提高性能的关键而这里我们要提到的工作窃取work-stealing算法就是整个Fork/Join框架的核心理念 Fork/Join工作窃取work-stealing算法是指某个线程从其他队列里窃取任务来执行。
![](/java8/ForkJoin-工作窃取算法.png)
- 那么为什么需要使用工作窃取算法呢假如我们需要做一个比较大的任务我们可以把这个任务分割为若干互不依赖的子任务为了减少线程间的竞争于是把这些子任务分别放到不同的队列里并为每个队列创建一个单独的线程来执行队列里的任务线程和队列一一对应比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的 任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任 务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永 远从双端队列的尾部拿任务执行。
- 工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
- 上文中已经提到了在Java 8引入了自动并行化的概念。它能够让一部分Java代码自动地以并行的方式执行也就是我们使用了ForkJoinPool的ParallelStream。
- 对于ForkJoinPool通用线程池的线程数量通常使用默认值就可以了即运行时计算机的处理器数量。可以通过设置系统属性`java.util.concurrent.ForkJoinPool.common.parallelism=N`N为线程数量来调整ForkJoinPool的线程数量可以尝试调整成不同的参数来观察每次的输出结果。
#### Fork/Join案例
- 需求使用Fork/Join计算1-10000的和当一个任务的计算数量大于3000时拆分任务数量小于3000时计算。
![](/java8/ForkJoin案例.png)
```java
public class ForkJoinDemo {
public static void main(String[] args) {
long start = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
SumRecursiveTask task = new SumRecursiveTask(1, 10000L);
Long result = pool.invoke(task);
System.out.println("result = " + result);
long end = System.currentTimeMillis();
System.out.println("消耗的时间为: " + (end - start));
}
}
class SumRecursiveTask extends RecursiveTask<Long> {
private static final long THRESHOLD = 3000L;
private final long start;
private final long end;
public SumRecursiveTask(long start, long end){
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start;
if (length <= THRESHOLD) {
// 任务不用再拆分了.可以计算了
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
// 数量大于预定的数量,任务还需要再拆分
long middle = (start + end) / 2;
SumRecursiveTask left = new SumRecursiveTask(start, middle);
left.fork();
SumRecursiveTask right = new SumRecursiveTask(middle + 1, end);
right.fork();
return left.join() + right.join();
}
}
}
```
### 1.9 小结
1. parallelStream是线程不安全的
2. parallelStream适用的场景是CPU密集型的只是做到别浪费CPU假如本身电脑CPU的负载很大那还到处用并行流那并不能起到作用
3. I/O密集型磁盘I/O、网络I/O都属于I/O操作这部分操作是较少消耗CPU资源一般并行流中不适用于I/O密集型的操作就比如使用并流行进行大批量的消息推送涉及到了大量I/O使用并行流反而慢了很多
4. 在使用并行流的时候是无法保证元素的顺序的,也就是即使你用了同步集合也只能保证元素都正确但无法保证其中的顺序