Java8的Stream是时候用起来了

引言

最近对ReactiveX的RxJava觉得很赞,但其实,Java8的Stream难道就不赞了吗,Java的不断发展,Stream出现了,用了一下,的确觉得很赞,感觉RxJava和Java8的Stream在使用上特别像,很多操作符都是一样的,使用下来,就感觉数据对于我们来说就是流水(River),这是我自己的比喻哈,对流水各种处理,最后得到我们想要的。我不知道两者到底谁借鉴谁,但是觉得属于一派,好了,逼逼半天,结合网上看到的别人的博客,总结一下,写文章不是为了装逼,我认为我的博客是我的笔记本,记录一些我想翻阅的资料,至少它不会随着时间的流逝而消失。

JAVA8中的stream API与JAVA I/O中的InputStream和OutputStream在名字上比较类似,但是其实是另外一个东西,Stream API是JAVA函数式编程中的一个重要组成部分。

本文描述如何使用JAVA8的Stream API。通过本文,你可以了解Stream API的执行顺序,不同的执行顺序会对stream api的执行效率有较大的影响。本文会详细描述Stream API中的reduce,collect,flatMap等操作,结尾部分会深入讲解parallel streams。

Streams如何工作?

stream是一个可以对个序列中的元素执行各种计算操作的一个元素序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
List<String> myList =
Arrays.asList("a1", "a2", "b1", "c2", "c1");
myList
.stream()
.filter(s -> s.startsWith("c"))
.map(String::toUpperCase)
.sorted()
.forEach(System.out::println);
输出结果
----------------
// C1
// C2
----------------

stream包含中间(intermediate operations)和最终(terminal operation)两种形式的操作。中间操作(intermediate operations)的返回值还是一个stream,因此可以通过链式调用将中间操作(intermediate operations)串联起来。最终操作(terminal operation)只能返回void或者一个非stream的结果。在上述例子中:filter, map ,sorted是中间操作,而forEach是一个最终操作。更多关于stream的中可用的操作可以查看java doc。上面例子中的链式调用也被称为操作管道流。

大多stream操作接受某种形式的lambda表达式作为参数,通过方法接口的形式指定操作的具体行为,这些方法接口的行为基本上都是无干扰(non-interfering)和无状态(stateless)。无干扰(non-interfering)的方法的定义是:该方法不修改stream的底层数据源,比如上述例子中:没有lambda表达式添加或者删除myList中的元素。无状态(stateless)方法的定义:操作的执行是独立的,比如上述例子中,没有lambda表达式在执行中依赖可能发生变化的外部变量或状态。

streams分类

可以从不同的数据源创建stream。java collection包中的Collections,Lists,Sets这些类中新增stream()和parallelStream()方法,通过这些方法可以创建一个顺序stream(sequential streams)或者一个并发的stream(Parallel streams)。并发stream(Parallel streams)更适合在多线程中使用,本文先介绍顺序流(sequential streams)在结尾会描述并发stream(Parallel streams).

1
2
3
4
5
6
7
8
Arrays.asList("a1", "a2", "a3")
.stream()
.findFirst()
.ifPresent(System.out::println);
输出结果
----------------
// a1

List对象上调用stream()方法可以返回一个常规的对象流。在下面的例子中我们不需要创建一个collection对象也可以使用stream:

1
2
3
4
5
6
7
Stream.of("a1", "a2", "a3")
.findFirst()
.ifPresent(System.out::println);
输出结果
----------------
// a1

直接使用Stream.of()方法就能从一组对象创建一个stream对象,

除了常规的对象流,JAVA 8中的IntStream,LongStream,DoubleStream这些流能够处理基本数据类型如:int,long,double。比如:IntStream可以使用range()方法能够替换掉传统的for循环

1
2
3
4
5
6
7
8
IntStream.range(1, 4)
.forEach(System.out::println);
输出结果
----------------
// 1
// 2
// 3

基本类型流(primitive streams)使用方式与常规对象流类型(regular object streams)大部分相同,但是基本类型流(primitive streams)能使用一些特殊的lambda表达式,比如:用IntFunction代替Function,用IntPredicate代替Predicate,同时基本类型流(primitive streams)中可以支持一些聚合方法,如:sum(),average()等。

1
2
3
4
5
6
7
8
Arrays.stream(new int[] {1, 2, 3})
.map(n -> 2 * n + 1)
.average()
.ifPresent(System.out::println);
输出结果
----------------
// 5.0

可以通过常规对象流(regular object stream)的mapToInt(), mapToLong(),mapToDouble(),基本类型对象流(primitive streams)中的mapToObj()等方法完成常规对象流和基本类型流之间的相互转换

1
2
3
IntStream.range(1, 4)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);

下面这个例子中doubles stream先被映射成int stream,然后又被映射成String类型的对象流:

1
2
3
4
5
6
7
8
9
10
Stream.of(1.0, 2.0, 3.0)
.mapToInt(Double::intValue)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);
输出结果
----------------
// a1
// a2
// a3

处理顺序

前面描述了如何创建和使用各种stream,现在开始深入了解stream执行引擎的工作原理。

Laziness(延迟加载)是中间操作(intermediate operations)的一个重要特性。如下面这个例子:中间操作(terminal operation)缺失,当执行这个代码片段的时候,并不会在控制台打印相应的内容,这是因为只有最终操作(terminal operation)存在的时候,中间操作(intermediate operations)才会执行。

1
2
3
4
5
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
});

给上面的例子添加最终操作(terminal operation)forEach:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
})
.forEach(s -> System.out.println("forEach: " + s));
输出结果
----------------
filter: d2
forEach: d2
filter: a2
forEach: a2
filter: b1
forEach: b1
filter: b3
forEach: b3
filter: c
forEach: c

执行结果比较让人惊奇,想当然的做法是水平执行此流上的所有元素。但是实际上是每一个元素沿着链垂直移动,第一个字符串”d2”执行完filter和forEach后第二个元素”a2”才开始执行。

这种沿着链垂直移动的行为可以降低每一个元素上进行操作的数量,如我们在下面的例子中所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.anyMatch(s -> {最终操作
System.out.println("anyMatch: " + s);
return s.startsWith("A");
});
输出结果
----------------
// map: d2
// anyMatch: D2
// map: a2
// anyMatch: A2

当对给定元素执行判断为真时anyMatch操作会立刻返回true,在上面例子中执行到元素“A2”的时候,元素判断为真anyMatch立刻返回true,由于流是沿着链垂直移动的,因此上面的map操作只会执行两次。

执行效率与steream执行链顺序的关系

下面的例子由两个中间操作(intermediate operations)map和filter以及一个最终操作(terminal operation)forEach构成,我们观察这些动作是如何执行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("A");
})
.forEach(s -> System.out.println("forEach: " + s));
输出结果
----------------
// map: d2
// filter: D2
// map: a2
// filter: A2
// forEach: A2
// map: b1
// filter: B1
// map: b3
// filter: B3
// map: c
// filter: C

你可能已经猜想到:map和filter操作被执行了5次,但是forEach操作只被执行了1次。我们可以通过修改操作的执行顺序(如:将filter操作移到操作链的头部),大幅度降低执行次数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
输出结果
----------------
// filter: d2
// filter: a2
// map: a2
// forEach: A2
// filter: b1中间操作
// filter: b3
// filter: c

修改后map只被执行了1次,如果此时数据量比较大则操作管道的执行效率会有较大的提升,在处理复杂方法链的时候需要注意执行顺序对执行效率的影响。

给上面的例子添加sort操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Stream.of("d2", "a2", "b1", "b3", "c")
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
输出结果
----------------
//sort: a2; d2
//sort: b1; a2
//sort: b1; d2
//sort: b1; a2
//sort: b3; b1
//sort: b3; d2
//sort: c; b3
//sort: c; d2
//filter: a2
//map: a2
//forEach: A2
//filter: b1
//filter: b3
//filter: c
//filter: d2

Sorting 是一种特殊的中间操作(intermediate operation),在对集合中元素进行排序过程中需要保存元素的状态,因此Sorting 是一种有状态的操作(stateful operation)。

首先,在整个输入集上执行排序操作(即先对集合进行水平操作),由于输入集合中的元素间存在多种组合,因此上面的例子中sorted操作被执行了8次。

可以通过对执行链重排序的方式,提升stream的执行效率。修改执行链顺序之后由于filter操作的过滤,导致sorted操作的输入集只有一个元素,在大数据量的情况下能够大幅度提高执行效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
输出结果
----------------
// filter: d2
// filter: a2
// filter: b1
// filter: b3
// filter: c
// map: a2
// forEach: A2

流复用

Java 8 streams不能被复用,当你执行完任何一个最终操作(terminal operation)的时候流就被关闭了。

1
2
3
4
5
6
Stream<String> stream =
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true); // ok
stream.noneMatch(s -> true); // exception

在同一个stream中执行完anyMatch后再执行noneMatch就会抛出如下异常:

1
2
3
4
5
java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)

可以通过为每个最终操作(terminal operation)创建一个新的stream链的方式来解决上面的重用问题,Stream api中已经提供了一个stream supplier类来在已经存在的中间操作(intermediate operations )的stream基础上构建一个新的stream。

1
2
3
4
5
6
Supplier<Stream<String>> streamSupplier =
() -> Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));
streamSupplier.get().anyMatch(s -> true); // ok
streamSupplier.get().noneMatch(s -> true); // ok

streamSupplier的每个get()方法会构造一个新的stream,我们可以在这个stream上执行期望的最终操作(terminal operation)。

高级操作

Streams支持多种不同的操作(operations),我们已经了解过filter,map等比较重要的操作。你可以通过Stream Javadoc进一步了解更多的操作。现在我们开始深入探讨更复杂的操作:collect flatMap reduce。
假设存在如下的用户列表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Person {
String name;
int age;
Person(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return name;
}
}
List<Person> persons =
Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));

Collect(收集)

Collect(收集)是一种是十分有用的最终操作,它可以把stream中的元素转换成另外一种形式,比如;list,set,map。Collect使用Collector作为参数,Collector包含四种不同的操作:supplier(初始构造器), accumulator(累加器), combiner(组合器), finisher(终结者)。这听起来很复杂,但是一个好消息是java 8通过Collectors类内置了各种复杂的收集操作,因此对于大部分常用的操作来说,你不需要自己去实现collector类。

从一个十分常见的用类开始:

1
2
3
4
5
6
7
8
9
10
11
List<Person> filtered =
persons
.stream()
.filter(p -> p.name.startsWith("P"))
.collect(Collectors.toList());
System.out.println(filtered);
输出结果
----------------
// [Peter, Pamela]

通过上面的demo可以看出,将stream转换为List十分简单,如果想转换为Set的话,只需使用Collectors.toSet()就可以了。

下面的例子暂时将用户按年龄分组:

1
2
3
4
5
6
7
8
9
10
11
12
Map<Integer, List<Person>> personsByAge = persons
.stream()
.collect(Collectors.groupingBy(p -> p.age));
personsByAge
.forEach((age, p) -> System.out.format("age %s: %s\n", age, p));
输出结果
----------------
// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]

Collectors类功能繁多,你可以通过Collectors对stream中的元素进行汇聚,比如:计算所有用户的年纪。

1
2
3
4
5
6
7
8
9
Double averageAge = persons
.stream()
.collect(Collectors.averagingInt(p -> p.age));
System.out.println(averageAge);
输出结果
----------------
// 19.0

可以通过summarizing collectors能返回一个内置的统计对象,通过这个对象能够获取更加全面的统计信息,比如用户年纪中的最大值,最小值,平均年纪等结果。

1
2
3
4
5
6
7
8
9
10
IntSummaryStatistics ageSummary =
persons
.stream()
.collect(Collectors.summarizingInt(p -> p.age));
System.out.println(ageSummary);
输出结果
----------------
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}

下面的例子展示如何将所有用户连接成一个字符串:

1
2
3
4
5
6
7
8
9
10
11
String phrase = persons
.stream()
.filter(p -> p.age >= 18)
.map(p -> p.name)
.collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));
System.out.println(phrase);
输出结果
----------------
// In Germany Max and Peter and Pamela are of legal age.

join collector的三个参数分别表示:连接符,字符串前缀,字符串后缀(可选)。

将一个stream转换为map,我们必须指定map的key和value如何映射。要注意的是key的值必须是唯一性的,否则会抛出IllegalStateException,但是可以通过使用合并函数(可选)绕过这个IllegalStateException异常:

1
2
3
4
5
6
7
8
9
10
11
12
Map<Integer, String> map = persons
.stream()
.collect(Collectors.toMap(
p -> p.age,
p -> p.name,
(name1, name2) -> name1 + ";" + name2));
System.out.println(map);
输出结果
----------------
// {18=Max, 23=Peter;Pamela, 12=David}

前文已经介绍了jdk内置的一些很有用的collectors,接下来开始介绍如何构造我们自己所需的collector,我们的目标是将stream中所有用户的用户名变成大写并用”|”符号连接成一个字符串。为了达成这个目标我们通过Collector.of()方法创建了一个新的collector,我们必须给这个collector提供四种功能:supplier, accumulator, combiner,finisher.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Collector<Person, StringJoiner, String> personNameCollector =
Collector.of(
() -> new StringJoiner(" | "), // supplier
(j, p) -> j.add(p.name.toUpperCase()), // accumulator
(j1, j2) -> j1.merge(j2), // combiner
StringJoiner::toString); // finisher
String names = persons
.stream()
.collect(personNameCollector);
System.out.println(names);
输出结果
----------------
// MAX | PETER | PAMELA | DAVID

由于JAVA中String是一个不可变对象,因此我们需要一个辅助类(比如StringJoiner)来帮助collect构造我们的字符串。supplier创建了一个包含适当分隔符的StringJoiner对象,accumulator用来将每个用户名转为大写并添加到supplier创建的StringJoiner中,combiner将两个StringJoiners对象连接成一个,最后一步的finisher从StringJoiner中构建出所希望的得到的string对象。

FlatMap(平铺变换,我自己这么叫的)

我们已经了解:通过map方法可以将stream中的一种对象转换成另外一种对象。但是map方法还是有使用场景限制,只能将一种对象映射为另外一种特定的已经存在的对象。是否能够将一个对象映射为多种对象,或者映射成一个根本不存在的对象呢。这就是flatMap方法出现的目的。

FlatMap方法可以将一个stream中的每一个元素对象转换为另一个stream中的另一种元素对象,因此可以将stream中的每个对象改造成零,一个或多个。flatMap操作的返回流包含这些改造后的对象。

为了演示flatMap,定义一个继承关系如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Foo {
String name;
List<Bar> bars = new ArrayList<>();
Foo(String name) {
this.name = name;
}
}
class Bar {
String name;
Bar(String name) {
this.name = name;
}
}

通过流实例化一队对象:

1
2
3
4
5
6
7
8
9
10
11
12
List<Foo> foos = new ArrayList<>();
// create foos
IntStream
.range(1, 4)
.forEach(i -> foos.add(new Foo("Foo" + i)));
// create bars
foos.forEach(f ->
IntStream
.range(1, 4)
.forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));

完成上述操作之后我们得到三个foos,每个foos包含三个bars。

FlatMap接收一个返回值为stream的函数做参数,通过传递合适的函数,就可以解析每一个foo下对应的bar对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
foos.stream()
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
输出结果
----------------
// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3

正如所见,我们成功地将三个对象的stream转换成一个包含九个对象的stream

最后,上面的示例代码可以简化为一个单一管道流:

1
2
3
4
5
6
7
IntStream.range(1, 4)
.mapToObj(i -> new Foo("Foo" + i))
.peek(f -> IntStream.range(1, 4)
.mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
.forEach(f.bars::add))
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));

FlatMap也支持JAVA8中新引入的Optional类,Optionals flatMap能返回一个另外的类的optional包装类,可以用来减少对null的检查。

假设有如下这种多层级结构:

1
2
3
4
5
6
7
8
9
10
11
class Outer {
Nested nested;
}
class Nested {
Inner inner;
}
class Inner {
String foo;
}

为了获取内部outer实例的内部foo对象,需要添加一系列空指针判断

1
2
3
4
Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
System.out.println(outer.nested.inner.foo);
}

可以采用optionals flatMap 操作获得相同的结果:

1
2
3
4
5
Optional.of(new Outer())
.flatMap(o -> Optional.ofNullable(o.nested))
.flatMap(n -> Optional.ofNullable(n.inner))
.flatMap(i -> Optional.ofNullable(i.foo))
.ifPresent(System.out::println);

上面的例子中flatMap的每次调用都会返回一个用Optional对象,如果有返回值则这个Optional对象是这个返回值的包装类,如果返回值不存在则返回null。

Reduce(归并)

reduce操作可以将stream中所有元素组合起来得到一个元素,JAVA8支持三中不同的reduce方法。

第一种能从stream元素序列中提取一个特定的元素。比如下面的从用户列表中选择年纪最大的用户操作:

1
2
3
4
5
6
7
8
persons
.stream()
.reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
.ifPresent(System.out::println);
输出结果
----------------
// Pamela

上面的实例中reduce方法接收一个二元累加计算函数(BinaryOperator accumulator function)作为参数,二元操作(BinaryOperator)实际就是上在两个操作数共享同一类型。示例中函数比较两人年龄,返回的最大年龄的人。

第二种reduce操作接收一个标识值和一个二元操作累加器作为参数,这个reduce方法可以把stream中所有用户的名字和年龄汇总得到一个新用户。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Person result =
persons
.stream()
.reduce(new Person("", 0), (p1, p2) -> {
p1.age += p2.age;
p1.name += p2.name;
return p1;
});
System.out.format("name=%s; age=%s", result.name, result.age);
输出结果
----------------
// name=MaxPeterPamelaDavid; age=76

第三种reduce方法,接收三个参数:一个标示值(identity value),一个二元操作累加器(BiFunction accumulator),一个二元组合方法。由于标识符参数未被严格限制为person类型,因此我们可以用这个reduce方法来获取用户的总年龄。

1
2
3
4
5
6
7
8
9
Integer ageSum = persons
.stream()
.reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);
System.out.println(ageSum);
输出结果
----------------
// 76

计算的结果是76,通过添加调试输出,我们可以详细地了解执行引擎中发生了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Integer ageSum = persons
.stream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
输出结果
----------------
// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David

从调试输出中可以看到,累加器做了所有的工作,它首先获取值为0的标示值和第一个用户Max,接下来的三步中持续sum值由于累加不断变大,在最后一步汇总的年纪增长到76。

注意,上面的调试输出中combiner没有执行,通过parallel执行上面相同stream。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Integer ageSum = persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
输出结果
----------------
// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35

通过并行的方式执行上面的stream操作,得到的是另外一种完全不相同的执行动作。在并行stream中combiner方法会被调用。这是由于累加器是被并行调用的,因此组合器需要对分开的累加操作进行求和。

Parallel Streams(并行流)

为了提高大量输入时的执行效率,stream可以采用并行的放行执行。并行流(Parallel Streams)通过ForkJoinPool.commonPool() 方法获取一个可用的ForkJoinPool。这个ForkJoinPool使用5个线程(实际上是由底层可用的物理cpu核数决定的)。

1
2
3
4
5
6
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());
输出结果
----------------
// 3

在我的机器上公共池初始化为每个默认3并行,这个值可以通过调整jvm参数来修改:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
Collections中包含parallelStream()方法,通过这个方法能够为Collections中的元素创建并行流。另外也可以调用stream的parallel()方法将一个顺序流转变为一个并行流的拷贝。

为了了解并行流的执行动作,下面的例子会打印当前线程的执行信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
输出结果
----------------
filter: b1 [main]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: c2 [ForkJoinPool.commonPool-worker-3]
map: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map: b1 [main]
forEach: B1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-3]
map: a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

通过分析调试输出,我们可以更好地了解哪一个线程执行了哪些stream操作。从上面的输出中我们可以看到parallel stream使用了ForkJoinPool提供的所有可用的线程来执行流的各种操作。由于不能确定哪个线程会执行并行流的哪个操作,因此反复执行上面的代码,打印的结果会不同。

扩充上面的例子,添加sort操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format("sort: %s <> %s [%s]\n",
s1, s2, Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
输出结果
----------------
filter: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: b1 [main]
map: b1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-2]
map: a1 [ForkJoinPool.commonPool-worker-2]
map: c2 [ForkJoinPool.commonPool-worker-3]
sort: A2 <> A1 [main]
sort: B1 <> A2 [main]
sort: C2 <> B1 [main]
sort: C1 <> C2 [main]
sort: C1 <> B1 [main]
sort: C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

这个执行结果看起来比较奇怪,看起来sort操作只是在main线程中顺序执行的。实际上,parallel stream中的sort操作使用了JAVA 8的一个新方法:Arrays.parallelSort()。JAVA doc中是这样描述Arrays.parallelSort()的:待排序数组的长度决定了排序操作是顺序执行还是并行执行。java doc 描述如下:

If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.
回到上一章的例子,我们已经了解combiner方法只能在parallel streams中调用,让我们来看下那些线程被实际调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
List<Person> persons = Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s [%s]\n",
sum, p, Thread.currentThread().getName());
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
sum1, sum2, Thread.currentThread().getName());
return sum1 + sum2;
});
输出结果
----------------
accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2]
combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]

从控制台输出可以看到accumulator和combiner操作都被可用的线程并行执行了。

总结起来:在大数据量输入的时候,parallel streams可以带来比较大的性能提升。但是应该记住,一些并行操作,比如:reduce,collect需要额外的计算(组合操作),但是在顺序流中,这些组合操作是不需要的。

另外,我们知道所有的parallel stream操作共享一个jvm范围内的ForkJoinPool,所以你应该注意避免在parallel stream上执行慢阻塞流操作,因为这些操作可能导致你应用中依赖parallel streams操作的其他部分也会响应变慢。

总结

Java8的Stream我认为是借鉴了别的语言的优点,换种思路在写程序,考虑到各种各样的场景,对于我来说,我觉得Stream可以有效的处理数据,新东西的引进总归是经过深思熟虑的,希望大家一起玩起来。