三. Stream API

过滤

1
2
3
4
5
6
7
8
9
10
record Fruit(String cname, String name, String category, String color) { }

Stream.of(
new Fruit("草莓", "Strawberry", "浆果", "红色"),
new Fruit("桑葚", "Mulberry", "浆果", "紫色"),
new Fruit("杨梅", "Waxberry", "浆果", "红色"),
new Fruit("核桃", "Walnut", "坚果", "棕色"),
new Fruit("草莓", "Peanut", "坚果", "棕色"),
new Fruit("蓝莓", "Blueberry", "浆果", "蓝色")
)

找到所有浆果

1
.filter(f -> f.category.equals("浆果"))

找到蓝色的浆果

方法1:

1
.filter(f -> f.category().equals("浆果") && f.color().equals("蓝色"))

方法2:让每个 lambda 只做一件事,两次 filter 相对于并且关系

1
2
.filter(f -> f.category.equals("浆果"))
.filter(f -> f.color().equals("蓝色"))

方法3:让每个 lambda 只做一件事,不过比方法2强的地方可以 or,and,nagate 运算

1
.filter(((Predicate<Fruit>) f -> f.category.equals("浆果")).and(f -> f.color().equals("蓝色")))

映射

1
.map(f -> f.cname() + "酱")

降维

例1

image-20240301105938703

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Stream.of(
List.of(
new Fruit("草莓", "Strawberry", "浆果", "红色"),
new Fruit("桑葚", "Mulberry", "浆果", "紫色"),
new Fruit("杨梅", "Waxberry", "浆果", "红色"),
new Fruit("蓝莓", "Blueberry", "浆果", "蓝色")
),
List.of(
new Fruit("核桃", "Walnut", "坚果", "棕色"),
new Fruit("草莓", "Peanut", "坚果", "棕色")
)
)

.flatMap(Collection::stream)
  • 这样把坚果和浆果两个集合变成了含六个元素的水果流

例2:

1
2
3
4
5
6
7
8
9
10
11
12
Stream.of(
new Order(1, List.of(
new Item(6499, 1, "HUAWEI MateBook 14s"),
new Item(6999, 1, "HUAWEI Mate 60 Pro"),
new Item(1488, 1, "HUAWEI WATCH GT 4")
)),
new Order(1, List.of(
new Item(8999, 1, "Apple MacBook Air 13"),
new Item(7999, 1, "Apple iPhone 15 Pro"),
new Item(2999, 1, "Apple Watch Series 9")
))
)

想逐一处理每个订单中的商品

1
.flatMap(order -> order.items().stream())

这样把一个有两个元素的订单流,变成了一个有六个元素的商品流

构建

根据已有的数组构建流

1
Arrays.stream(array)

根据已有的 Collection 构建流(包括 List,Set 等)

1
List.of("a","b","c").stream()

把一个对象变成流

1
Stream.of("d")

把多个对象变成流

1
Stream.of("x", "y")

拼接

两个流拼接

1
Stream.concat(Stream.of("a","b","c"), Stream.of("d"))

截取

1
2
3
Stream.concat(Stream.of("a", "b", "c"), Stream.of("d"))
.skip(1)
.limit(2)
  • skip 是跳过几个元素

  • limit 是限制处理的元素个数

  • dropWhile 是 drop 流中元素,直到条件不成立,留下剩余元素

  • takeWhile 是 take 流中元素,直到条件不成立,舍弃剩余元素

生成

生成从 0 ~ 9 的数字

1
IntStream.range(0, 10)

或者

1
IntStream.rangeClosed(0, 9)

如果想订制,可以用 iterate 方法,例如下面生成奇数序列

1
IntStream.iterate(1, x -> x + 2)
  • 参数1 是初始值
  • 参数2 是一个特殊 Function,即参数类型与返回值相同,它会根据上一个元素 x 的值计算出当前元素
  • 需要用 limit 限制元素个数

也可以用 iterate 的重载方法

1
IntStream.iterate(1, x -> x < 10, x -> x + 2)
  • 参数1 是初始值
  • 参数2 用来限制元素个数,一旦不满足此条件,流就结束
  • 参数3 相当于上个方法的参数2

iterate 的特点是根据上一个元素计算当前元素,如果不需要依赖上一个元素,可以改用 generate 方法

例如下面是生成 5 个随机 int

1
Stream.generate(()-> ThreadLocalRandom.current().nextInt()).limit(5)

不过如果只是生成随机数的话,有更简单的办法

1
ThreadLocalRandom.current().ints(5)

如果要指定上下限,例如下面是生成从 0~9 的100个随机数

1
ThreadLocalRandom.current().ints(100, 0, 10)

查找与判断

下面的代码找到流中任意(Any)一个偶数

1
2
3
4
5
6
int[] array = {1, 3, 5, 4, 7, 6, 9};

Arrays.stream(array)
.filter(x -> (x & 1) == 0)
.findAny()
.ifPresent(System.out::println);
  • 注意 findAny 返回的是 OptionalInt 对象,因为可能流中不存在偶数
  • 对于 OptionalInt 对象,一般需要用 ifPresent 或 orElse(提供默认值)来处理

与 findAny 比较类似的是 firstFirst,它俩的区别

  • findAny 是找在流中任意位置的元素,不需要考虑顺序,对于上例返回 6 也是可以的
  • findFirst 是找第一个出现在元素,需要考虑顺序,对于上例只能返回 4
  • findAny 在顺序流中与 findFirst 表现相同,区别在于并行流下会更快

判断流中是否存在任意一个偶数

1
Arrays.stream(array).anyMatch(x -> (x & 1) == 0)
  • 它返回的是 boolean 值,可以直接用来判断

判断流是否全部是偶数

1
Arrays.stream(array).allMatch(x -> (x & 1) == 0)
  • 同样,它返回的是 boolean 值,可以直接用来判断

判断流是否全部不是偶数

1
Arrays.stream(array).noneMatch(x -> (x & 1) == 0)
  • noneMatch 与 allMatch 含义恰好相反

排序与去重

已知有数据

1
2
3
4
5
6
7
8
9
10
11
12
13
record Hero(String name, int strength) { }

Stream.of(
new Hero("独孤求败", 100),
new Hero("令狐冲", 90),
new Hero("风清扬", 98),
new Hero("东方不败", 98),
new Hero("方证", 92),
new Hero("任我行", 92),
new Hero("冲虚", 90),
new Hero("向问天", 88),
new Hero("不戒", 88)
)

要求,首先按 strength 武力排序(逆序),武力相同的,按姓名长度排序(正序)

仅用 lambda 来解

1
2
3
4
.sorted((a,b)-> {
int res = Integer.compare(b.strength(), a.strength());
return (res == 0) ? Integer.compare(a.nameLength(), b.nameLength()) : res;
})

方法引用改写

1
2
3
4
5
.sorted(
Comparator.comparingInt(Hero::strength)
.reversed()
.thenComparingInt(Hero::nameLength)
)

其中:

  • comparingInt 接收一个 key 提取器(说明按对象中哪部分来比较),返回一个比较器
  • reversed 返回一个顺序相反的比较器
  • thenComparingInt 接收一个 key 提取器,返回一个新比较器,新比较器在原有比较器结果相等时执行新的比较逻辑

增加一个辅助方法

1
2
3
4
5
record Hero(String name, int strength) {
int nameLength() {
return this.name.length();
}
}

原理:

1
2
3
4
5
6
7
8
.sorted((e, f) -> {
int res =
((Comparator<Hero>) (c, d) ->
((Comparator<Hero>) (a, b) -> Integer.compare(a.strength(), b.strength()))
.compare(d, c))
.compare(e, f);
return (res == 0) ? Integer.compare(e.nameLength(), f.nameLength()) : res;
})

如果不好看,改成下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
.sorted(step3(step2(step1())))

static Comparator<Hero> step1() {
return (a, b) -> Integer.compare(a.strength(), b.strength());
}

static Comparator<Hero> step2(Comparator<Hero> step1) {
return (c, d) -> step1.compare(d, c);
}

static Comparator<Hero> step3(Comparator<Hero> step2) {
return (e, f) -> {
int res = step2.compare(e, f);
return (res == 0) ? Integer.compare(e.nameLength(), f.nameLength()) : res;
};
}

化简

reduce(init, (p,x) -> r)

  • init 代表初始值
  • (p,x) -> r 是一个 BinaryOperator,作用是根据上次化简结果 p 和当前元素 x,得到本次化简结果 r

这样两两化简,可以将流中的所有元素合并成一个结果

收集

collect( supplier, accumulator, combiner)

  • supplier 是描述如何创建收集容器 c :()-> c
  • accumulator 是描述如何向容器 c 添加元素 x:(c, x) -> void
  • combiner 是描述如何合并两个容器:(c1, c2) -> void
    • 串行流下不需要合并容器
    • 并行流如果用的是并发容器,也不需要合并

收集器

Collectors 类中提供了很多现成的收集器,详情见网页

下游收集器

做 groupingBy 分组收集时,组内可能需要进一步的数据收集,称为下游收集器,详情见网页

基本流

基本类型流指 IntStream、LongStream 和 DoubleStream,它们在做数值计算时有更好的性能。

转换成基本流

  • mapToInt
  • mapToLong
  • mapToDouble
  • flatMapToInt
  • flatMapToLong
  • flatMapToDouble
  • mapMultiToInt
  • mapMultiToLong
  • mapMultiToDouble

基本流转对象流

  • mapToObj
  • boxed

特性

  1. 一次使用:流只能使用一次(终结方法只能调用一次)
  2. 两类操作:
    1. 中间操作,lazy 懒惰的
    2. 终结操作,eager 迫切的

并行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Stream.of(1, 2, 3, 4)
.parallel()
.collect(Collector.of(
() -> {
System.out.printf("%-12s %s%n",simple(),"create");
return new ArrayList<Integer>();
},
(list, x) -> {
List<Integer> old = new ArrayList<>(list);
list.add(x);
System.out.printf("%-12s %s.add(%d)=>%s%n",simple(), old, x, list);
},
(list1, list2) -> {
List<Integer> old = new ArrayList<>(list1);
list1.addAll(list2);
System.out.printf("%-12s %s.add(%s)=>%s%n", simple(),old, list2, list1);
return list1;
},
list -> list,
Collector.Characteristics.IDENTITY_FINISH
));

效率

数组求和

其中

  • primitive 用 loop 循环对 int 求和
  • intStream 用 IntStream 对 int 求和
  • boxed 用 loop 循环对 Integer 求和
  • stream 用 Stream 对 Integer 求和

元素个数 100

Benchmark Mode Cnt Score (ns/op) Error (ns/op) Units
T01Sum.primitive avgt 5 25.424 ± 0.782 ns/op
T01Sum.intStream avgt 5 47.482 ± 1.145 ns/op
T01Sum.boxed avgt 5 72.457 ± 4.136 ns/op
T01Sum.stream avgt 5 465.141 ± 4.891 ns/op

元素个数 1000

Benchmark Mode Cnt Score (ns/op) Error (ns/op) Units
T01Sum.primitive avgt 5 270.556 ± 1.277 ns/op
T01Sum.intStream avgt 5 292.467 ± 10.987 ns/op
T01Sum.boxed avgt 5 583.929 ± 57.338 ns/op
T01Sum.stream avgt 5 5948.294 ± 2209.211 ns/op

元素个数 10000

Benchmark Mode Cnt Score (ns/op) Error (ns/op) Units
T01Sum.primitive avgt 5 2681.651 ± 12.614 ns/op
T01Sum.intStream avgt 5 2718.408 ± 52.418 ns/op
T01Sum.boxed avgt 5 6391.285 ± 358.154 ns/op
T01Sum.stream avgt 5 44414.884 ± 3213.055 ns/op

结论:

  • 做数值计算,优先挑选基本流(IntStream 等)在数据量较大时,它的性能已经非常接近普通 for 循环
  • 做数值计算,应当避免普通流(Stream)性能与其它几种相比,慢一个数量级

求最大值

其中(原始数据都是 int,没有包装类)

  • custom 自定义多线程并行求最大值
  • parallel 并行流求最大值
  • sequence 串行流求最大值
  • primitive loop 循环求最大值

元素个数 100

Benchmark Mode Cnt Score (ns/op) Error (ns/op) Units
T02Parallel.custom avgt 5 39619.796 ± 1263.036 ns/op
T02Parallel.parallel avgt 5 6754.239 ± 79.894 ns/op
T02Parallel.primitive avgt 5 29.538 ± 3.056 ns/op
T02Parallel.sequence avgt 5 80.170 ± 1.940 ns/op

元素个数 10000

Benchmark Mode Cnt Score (ns/op) Error (ns/op) Units
T02Parallel.custom avgt 5 41656.093 ± 1537.237 ns/op
T02Parallel.parallel avgt 5 11218.573 ± 1994.863 ns/op
T02Parallel.primitive avgt 5 2217.562 ± 80.981 ns/op
T02Parallel.sequence avgt 5 5682.482 ± 264.645 ns/op

元素个数 1000000

Benchmark Mode Cnt Score (ns/op) Error (ns/op) Units
T02Parallel.custom avgt 5 194984.564 ± 25794.484 ns/op
T02Parallel.parallel avgt 5 298940.794 ± 31944.959 ns/op
T02Parallel.primitive avgt 5 325178.873 ± 81314.981 ns/op
T02Parallel.sequence avgt 5 618274.062 ± 5867.812 ns/op

结论:

  • 并行流相对自己用多线程实现分而治之更简洁
  • 并行流只有在数据量非常大时,才能充分发力,数据量少,还不如用串行流

并行(发)收集

元素个数 100

Benchmark Mode Cnt Score (ns/op) Error (ns/op) Units
loop1 avgt 5 1312.389 ± 90.683 ns/op
loop2 avgt 5 1776.391 ± 255.271 ns/op
sequence avgt 5 1727.739 ± 28.821 ns/op
parallelNoConcurrent avgt 5 27654.004 ± 496.970 ns/op
parallelConcurrent avgt 5 16320.113 ± 344.766 ns/op

元素个数 10000

Benchmark Mode Cnt Score (ns/op) Error (ns/op) Units
loop1 avgt 5 211526.546 ± 13549.703 ns/op
loop2 avgt 5 203794.146 ± 3525.972 ns/op
sequence avgt 5 237688.651 ± 7593.483 ns/op
parallelNoConcurrent avgt 5 527203.976 ± 3496.107 ns/op
parallelConcurrent avgt 5 369630.728 ± 20549.731 ns/op

元素个数 1000000

Benchmark Mode Cnt Score (ms/op) Error (ms/op) Units
loop1 avgt 5 69.154 ± 3.456 ms/op
loop2 avgt 5 83.815 ± 2.307 ms/op
sequence avgt 5 103.585 ± 0.834 ns/op
parallelNoConcurrent avgt 5 167.032 ± 15.406 ms/op
parallelConcurrent avgt 5 52.326 ± 1.501 ms/op

结论:

  • sequence 是一个容器单线程收集,数据量少时性能占优
  • parallelNoConcurrent 是多个容器多线程并行收集,时间应该花费在合并容器上,性能最差
  • parallelConcurrent 是一个容器多线程并发收集,在数据量大时性能较优

MethodHandle 性能

正常方法调用、反射、MethodHandle、Lambda 的性能对比

Benchmark Mode Cnt Score Error Units
Sample2.lambda thrpt 5 389307532.881 ± 332213073.039 ops/s
Sample2.method thrpt 5 157556577.611 ± 4048306.620 ops/s
Sample2.origin thrpt 5 413287866.949 ± 65182730.966 ops/s
Sample2.reflection thrpt 5 91640751.456 ± 37969233.369 ops/s

综合练习

  1. 将 filter 的课堂例题修改为方法引用方式实现

  2. takeWhile 与 filter 的区别

  3. 三级排序

  4. 包含 null 值的排序

  5. 二维流扁平映射

  6. 三维流扁平映射

  7. 用 stream 打印九九乘法表

  8. 用 stream 生成斐波那契数列的前 10 项

    1
    2
    3
    Stream.iterate(new int[]{1, 1}, x -> new int[]{x[1], x[0] + x[1]})
    .map(x -> x[0])
    .limit(10)
  9. 自定义 Collector 求平均

函数编程-语法 函数编程-实际应用