引言
最近对ReactiveX的RxJava觉得很赞,但其实,Java8的Stream难道就不赞了吗,Java的不断发展,Stream出现了,用了一下,的确觉得很赞,感觉RxJava和Java8的Stream在使用上特别像,很多操作符都是一样的,使用下来,就感觉数据对于我们来说就是流水(River),这是我自己的比喻哈,对流水各种处理,最后得到我们想要的。我不知道两者到底谁借鉴谁,但是觉得属于一派,好了,逼逼半天,结合网上看到的别人的博客,总结一下,写文章不是为了装逼,我认为我的博客是我的笔记本,记录一些我想翻阅的资料,至少它不会随着时间的流逝而消失。
JAVA8中的stream API与JAVA I/O中的InputStream和OutputStream在名字上比较类似,但是其实是另外一个东西,Stream API是JAVA函数式编程中的一个重要组成部分。
本文描述如何使用JAVA8的Stream API。通过本文,你可以了解Stream API的执行顺序,不同的执行顺序会对stream api的执行效率有较大的影响。本文会详细描述Stream API中的reduce,collect,flatMap等操作,结尾部分会深入讲解parallel streams。
Streams如何工作?
stream是一个可以对个序列中的元素执行各种计算操作的一个元素序列。
stream包含中间(intermediate operations)和最终(terminal operation)两种形式的操作。中间操作(intermediate operations)的返回值还是一个stream,因此可以通过链式调用将中间操作(intermediate operations)串联起来。最终操作(terminal operation)只能返回void或者一个非stream的结果。在上述例子中:filter, map ,sorted是中间操作,而forEach是一个最终操作。更多关于stream的中可用的操作可以查看java doc。上面例子中的链式调用也被称为操作管道流。
大多stream操作接受某种形式的lambda表达式作为参数,通过方法接口的形式指定操作的具体行为,这些方法接口的行为基本上都是无干扰(non-interfering)和无状态(stateless)。无干扰(non-interfering)的方法的定义是:该方法不修改stream的底层数据源,比如上述例子中:没有lambda表达式添加或者删除myList中的元素。无状态(stateless)方法的定义:操作的执行是独立的,比如上述例子中,没有lambda表达式在执行中依赖可能发生变化的外部变量或状态。
streams分类
可以从不同的数据源创建stream。java collection包中的Collections,Lists,Sets这些类中新增stream()和parallelStream()方法,通过这些方法可以创建一个顺序stream(sequential streams)或者一个并发的stream(Parallel streams)。并发stream(Parallel streams)更适合在多线程中使用,本文先介绍顺序流(sequential streams)在结尾会描述并发stream(Parallel streams).
List对象上调用stream()方法可以返回一个常规的对象流。在下面的例子中我们不需要创建一个collection对象也可以使用stream:
|
|
直接使用Stream.of()方法就能从一组对象创建一个stream对象,
除了常规的对象流,JAVA 8中的IntStream,LongStream,DoubleStream这些流能够处理基本数据类型如:int,long,double。比如:IntStream可以使用range()方法能够替换掉传统的for循环
基本类型流(primitive streams)使用方式与常规对象流类型(regular object streams)大部分相同,但是基本类型流(primitive streams)能使用一些特殊的lambda表达式,比如:用IntFunction代替Function,用IntPredicate代替Predicate,同时基本类型流(primitive streams)中可以支持一些聚合方法,如:sum(),average()等。
|
|
可以通过常规对象流(regular object stream)的mapToInt(), mapToLong(),mapToDouble(),基本类型对象流(primitive streams)中的mapToObj()等方法完成常规对象流和基本类型流之间的相互转换
下面这个例子中doubles stream先被映射成int stream,然后又被映射成String类型的对象流:
处理顺序
前面描述了如何创建和使用各种stream,现在开始深入了解stream执行引擎的工作原理。
Laziness(延迟加载)是中间操作(intermediate operations)的一个重要特性。如下面这个例子:中间操作(terminal operation)缺失,当执行这个代码片段的时候,并不会在控制台打印相应的内容,这是因为只有最终操作(terminal operation)存在的时候,中间操作(intermediate operations)才会执行。
给上面的例子添加最终操作(terminal operation)forEach:
|
|
执行结果比较让人惊奇,想当然的做法是水平执行此流上的所有元素。但是实际上是每一个元素沿着链垂直移动,第一个字符串”d2”执行完filter和forEach后第二个元素”a2”才开始执行。
这种沿着链垂直移动的行为可以降低每一个元素上进行操作的数量,如我们在下面的例子中所示:
|
|
当对给定元素执行判断为真时anyMatch操作会立刻返回true,在上面例子中执行到元素“A2”的时候,元素判断为真anyMatch立刻返回true,由于流是沿着链垂直移动的,因此上面的map操作只会执行两次。
执行效率与steream执行链顺序的关系
下面的例子由两个中间操作(intermediate operations)map和filter以及一个最终操作(terminal operation)forEach构成,我们观察这些动作是如何执行的。
你可能已经猜想到:map和filter操作被执行了5次,但是forEach操作只被执行了1次。我们可以通过修改操作的执行顺序(如:将filter操作移到操作链的头部),大幅度降低执行次数
修改后map只被执行了1次,如果此时数据量比较大则操作管道的执行效率会有较大的提升,在处理复杂方法链的时候需要注意执行顺序对执行效率的影响。
给上面的例子添加sort操作。
Sorting 是一种特殊的中间操作(intermediate operation),在对集合中元素进行排序过程中需要保存元素的状态,因此Sorting 是一种有状态的操作(stateful operation)。
首先,在整个输入集上执行排序操作(即先对集合进行水平操作),由于输入集合中的元素间存在多种组合,因此上面的例子中sorted操作被执行了8次。
可以通过对执行链重排序的方式,提升stream的执行效率。修改执行链顺序之后由于filter操作的过滤,导致sorted操作的输入集只有一个元素,在大数据量的情况下能够大幅度提高执行效率。
|
|
流复用
Java 8 streams不能被复用,当你执行完任何一个最终操作(terminal operation)的时候流就被关闭了。
在同一个stream中执行完anyMatch后再执行noneMatch就会抛出如下异常:
|
|
可以通过为每个最终操作(terminal operation)创建一个新的stream链的方式来解决上面的重用问题,Stream api中已经提供了一个stream supplier类来在已经存在的中间操作(intermediate operations )的stream基础上构建一个新的stream。
|
|
streamSupplier的每个get()方法会构造一个新的stream,我们可以在这个stream上执行期望的最终操作(terminal operation)。
高级操作
Streams支持多种不同的操作(operations),我们已经了解过filter,map等比较重要的操作。你可以通过Stream Javadoc进一步了解更多的操作。现在我们开始深入探讨更复杂的操作:collect flatMap reduce。
假设存在如下的用户列表:
Collect(收集)
Collect(收集)是一种是十分有用的最终操作,它可以把stream中的元素转换成另外一种形式,比如;list,set,map。Collect使用Collector作为参数,Collector包含四种不同的操作:supplier(初始构造器), accumulator(累加器), combiner(组合器), finisher(终结者)。这听起来很复杂,但是一个好消息是java 8通过Collectors类内置了各种复杂的收集操作,因此对于大部分常用的操作来说,你不需要自己去实现collector类。
从一个十分常见的用类开始:
|
|
通过上面的demo可以看出,将stream转换为List十分简单,如果想转换为Set的话,只需使用Collectors.toSet()就可以了。
下面的例子暂时将用户按年龄分组:
|
|
Collectors类功能繁多,你可以通过Collectors对stream中的元素进行汇聚,比如:计算所有用户的年纪。
|
|
可以通过summarizing collectors能返回一个内置的统计对象,通过这个对象能够获取更加全面的统计信息,比如用户年纪中的最大值,最小值,平均年纪等结果。
|
|
下面的例子展示如何将所有用户连接成一个字符串:
join collector的三个参数分别表示:连接符,字符串前缀,字符串后缀(可选)。
将一个stream转换为map,我们必须指定map的key和value如何映射。要注意的是key的值必须是唯一性的,否则会抛出IllegalStateException,但是可以通过使用合并函数(可选)绕过这个IllegalStateException异常:
前文已经介绍了jdk内置的一些很有用的collectors,接下来开始介绍如何构造我们自己所需的collector,我们的目标是将stream中所有用户的用户名变成大写并用”|”符号连接成一个字符串。为了达成这个目标我们通过Collector.of()方法创建了一个新的collector,我们必须给这个collector提供四种功能:supplier, accumulator, combiner,finisher.
|
|
由于JAVA中String是一个不可变对象,因此我们需要一个辅助类(比如StringJoiner)来帮助collect构造我们的字符串。supplier创建了一个包含适当分隔符的StringJoiner对象,accumulator用来将每个用户名转为大写并添加到supplier创建的StringJoiner中,combiner将两个StringJoiners对象连接成一个,最后一步的finisher从StringJoiner中构建出所希望的得到的string对象。
FlatMap(平铺变换,我自己这么叫的)
我们已经了解:通过map方法可以将stream中的一种对象转换成另外一种对象。但是map方法还是有使用场景限制,只能将一种对象映射为另外一种特定的已经存在的对象。是否能够将一个对象映射为多种对象,或者映射成一个根本不存在的对象呢。这就是flatMap方法出现的目的。
FlatMap方法可以将一个stream中的每一个元素对象转换为另一个stream中的另一种元素对象,因此可以将stream中的每个对象改造成零,一个或多个。flatMap操作的返回流包含这些改造后的对象。
为了演示flatMap,定义一个继承关系如下:
|
|
通过流实例化一队对象:
完成上述操作之后我们得到三个foos,每个foos包含三个bars。
FlatMap接收一个返回值为stream的函数做参数,通过传递合适的函数,就可以解析每一个foo下对应的bar对象
|
|
正如所见,我们成功地将三个对象的stream转换成一个包含九个对象的stream
最后,上面的示例代码可以简化为一个单一管道流:
FlatMap也支持JAVA8中新引入的Optional类,Optionals flatMap能返回一个另外的类的optional包装类,可以用来减少对null的检查。
假设有如下这种多层级结构:
|
|
为了获取内部outer实例的内部foo对象,需要添加一系列空指针判断
可以采用optionals flatMap 操作获得相同的结果:
上面的例子中flatMap的每次调用都会返回一个用Optional对象,如果有返回值则这个Optional对象是这个返回值的包装类,如果返回值不存在则返回null。
Reduce(归并)
reduce操作可以将stream中所有元素组合起来得到一个元素,JAVA8支持三中不同的reduce方法。
第一种能从stream元素序列中提取一个特定的元素。比如下面的从用户列表中选择年纪最大的用户操作:
上面的实例中reduce方法接收一个二元累加计算函数(BinaryOperator accumulator function)作为参数,二元操作(BinaryOperator)实际就是上在两个操作数共享同一类型。示例中函数比较两人年龄,返回的最大年龄的人。
第二种reduce操作接收一个标识值和一个二元操作累加器作为参数,这个reduce方法可以把stream中所有用户的名字和年龄汇总得到一个新用户。
|
|
第三种reduce方法,接收三个参数:一个标示值(identity value),一个二元操作累加器(BiFunction accumulator),一个二元组合方法。由于标识符参数未被严格限制为person类型,因此我们可以用这个reduce方法来获取用户的总年龄。
|
|
计算的结果是76,通过添加调试输出,我们可以详细地了解执行引擎中发生了什么。
|
|
从调试输出中可以看到,累加器做了所有的工作,它首先获取值为0的标示值和第一个用户Max,接下来的三步中持续sum值由于累加不断变大,在最后一步汇总的年纪增长到76。
注意,上面的调试输出中combiner没有执行,通过parallel执行上面相同stream。
|
|
通过并行的方式执行上面的stream操作,得到的是另外一种完全不相同的执行动作。在并行stream中combiner方法会被调用。这是由于累加器是被并行调用的,因此组合器需要对分开的累加操作进行求和。
Parallel Streams(并行流)
为了提高大量输入时的执行效率,stream可以采用并行的放行执行。并行流(Parallel Streams)通过ForkJoinPool.commonPool() 方法获取一个可用的ForkJoinPool。这个ForkJoinPool使用5个线程(实际上是由底层可用的物理cpu核数决定的)。
在我的机器上公共池初始化为每个默认3并行,这个值可以通过调整jvm参数来修改:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
Collections中包含parallelStream()方法,通过这个方法能够为Collections中的元素创建并行流。另外也可以调用stream的parallel()方法将一个顺序流转变为一个并行流的拷贝。
为了了解并行流的执行动作,下面的例子会打印当前线程的执行信息。
通过分析调试输出,我们可以更好地了解哪一个线程执行了哪些stream操作。从上面的输出中我们可以看到parallel stream使用了ForkJoinPool提供的所有可用的线程来执行流的各种操作。由于不能确定哪个线程会执行并行流的哪个操作,因此反复执行上面的代码,打印的结果会不同。
扩充上面的例子,添加sort操作
这个执行结果看起来比较奇怪,看起来sort操作只是在main线程中顺序执行的。实际上,parallel stream中的sort操作使用了JAVA 8的一个新方法:Arrays.parallelSort()。JAVA doc中是这样描述Arrays.parallelSort()的:待排序数组的长度决定了排序操作是顺序执行还是并行执行。java doc 描述如下:
If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.
回到上一章的例子,我们已经了解combiner方法只能在parallel streams中调用,让我们来看下那些线程被实际调用:
从控制台输出可以看到accumulator和combiner操作都被可用的线程并行执行了。
总结起来:在大数据量输入的时候,parallel streams可以带来比较大的性能提升。但是应该记住,一些并行操作,比如:reduce,collect需要额外的计算(组合操作),但是在顺序流中,这些组合操作是不需要的。
另外,我们知道所有的parallel stream操作共享一个jvm范围内的ForkJoinPool,所以你应该注意避免在parallel stream上执行慢阻塞流操作,因为这些操作可能导致你应用中依赖parallel streams操作的其他部分也会响应变慢。
总结
Java8的Stream我认为是借鉴了别的语言的优点,换种思路在写程序,考虑到各种各样的场景,对于我来说,我觉得Stream可以有效的处理数据,新东西的引进总归是经过深思熟虑的,希望大家一起玩起来。