Stream流——串行版
Stream流是java8引入的特性,极大的方便了我们对于程序内数据的操作,提高了性能。通过函数式编程解决复杂问题。
1.BaseStream<T,S extense BaseStream<T,S>>
他是流处理的基石概念,重点不在于这个接口定义了什么方法,而是它独特的参数类型。
首先约定好:T——入参类型 S——出参类型
S是继承于自己的同样的类型,从而形成一种递归,每一次返回的结果类型都是自己或它的子类。这样做是因为我们在流处理时,不会在原有的流上进行操作,而是形成新的流返回会去。这样设计免去了类型转换出错和增强了灵活性
2.Stream extends BaseStream<T, Stream>
BaseStream有4大子类,我们讲一个使用范围最广的——Stream
它定义了我们常用的一些方法如
Stream filter (Predicate<? super T> predicate)
这里的Predicate就是一个函数式例如判断对象是否为空 s->s!=null
中间操作 (Intermediate operations)
无状态 (Stateless) | 有状态 (Stateful) |
---|---|
unordered() | distinct() 去重 |
filter() 过滤元素 | sorted() 排序 |
map() 转换元素类型 | limit() |
mapToInt() | skip() 跳过前n个元素 |
mapToLong() | |
mapToDouble() | |
flatMap() | |
flatMapToInt() | |
flatMapToLong() | |
flatMapToDouble() | |
peek() |
结束操作 (Terminal operations)
非短路操作 | 短路操作 (short-circuiting) |
---|---|
forEach() | anyMatch() |
forEachOrdered() | allMatch() |
toArray() | noneMatch() |
reduce() 归约 | findFirst() |
collect() | findAny() |
max() | |
min() | |
count() |
咱们这里通过它的一个实现类ReferencePipeline来举个例子来体验一下
List<Integer> numbers = Arrays.asList(2, 1, 3, 8, 5, 6, 7, 4, 9, 10);
List<String> evenNumbers = numbers.stream().map(o->o.toString())//将元素转为字符串.filter(n -> n.length() == 1)//剔除大于两位数的元素.sorted()//排序.collect(Collectors.toList());//整合出一个新的流返回System.out.println(evenNumbers);
先定义了一个List,通过.stream()新建一个流管道,函数式编程的好处就是他可以把操作整合到一起,这里的 o->o.toString()和n->n.length()==1会被Java整合为
(o -> o.toString()) -> (n -> n.length() == 1)一个操作链
接下来,您是否好奇这个链条是如何组装的,反正我很好奇,let's dive into water
3.ReferencePipeline
它继承了AbstractPipeline,而在其中保存了三个引用,类型都是自己,分别是sourceStage指向第一个Sink(后续展开),接下来就是previousStage和nextStage分别链接上下Sink。
下图给出了具体的流程
中间操作是一种烂加载处理,只有当触发了collect()方法才会真正的调用每个Steam流中的wrapSink方法去处理数据。之后调用sort()进行排序。我们来具体看一个方法是如何处理的
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {Objects.requireNonNull(mapper);return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<R> sink) {return new Sink.ChainedReference<P_OUT, R>(sink) {@Overridepublic void accept(P_OUT u) {downstream.accept(mapper.apply(u));
在ReferencePipeline中它定义三个静态内部类——StatelessOp,StatafulOp,Head。先是对函数式判空,然后返回一个无状态流。关键在于它内部定义的opWrapSinlk
通过返回一个Sink类并在其中定义了具体的操作accept()。然后调用函数式,并通过accept()触发下游Stream进行进一步处理。
4.Sink
抽象的来讲,上面所说的ReferencePipeline就像是流水线上不停流动的传输带,而真正在加工物品的就是我们的Sink类
这里的Consumer接口就是我们将不同的流处理函数式拼接起来的关键
public interface Consumer<T> {void accept(T t);default Consumer<T> andThen(Consumer<? super T> after) {Objects.requireNonNull(after);return (T t) -> { accept(t); after.accept(t); };}
}
andThen()方法,首先是自己调用自己的accept(),再调用下游的accept()
像我们前面写到的例子,map(o->o.toString)和filter(n->n.length)中的参数就是一个Consumer
5.归约
我们将最后一步collect()定义为归约,将流中的所有元素归约为一个最终的结果。
它通过操作一个Collector进行操作,包含三个步骤
累加(accumulation):将流中的每个元素依次累加到一个容器中。
合并(combining):如果存在并行流,多个部分的结果需要合并。
完成(finishing):在所有元素处理完后,生成最终的结果。
public interface Collector<T, A, R> {Supplier<A> supplier(); // 提供一个容器,容器类型是 ABiConsumer<A, T> accumulator(); // 累加器,负责将元素添加到容器BinaryOperator<A> combiner(); // 合并器,用于并行处理时合并多个容器Function<A, R> finisher(); // 结果转换器,返回最终的结果Set<Collector.Characteristics> characteristics(); // 一些特征,指示这个 Collector 是否具有某些优化特性
}
例如我们最常用的toList()将流中的所有元素收集到一个Lsit中。
public static <T>Collector<T, ?, List<T>> toList() {return new CollectorImpl<>(ArrayList::new, List::add,(left, right) -> { left.addAll(right); return left; },CH_ID);}
ArrayList::new,返回一个List容器,通过List::add方法添加进入容器,后续处理并行流。
以下是关于 Java Stream API 中 collect
归约操作的表格,其中总结了常见的归约操作、作用说明及示例:
归约操作 | 作用说明 | 示例代码 |
---|---|---|
toList() | 将流中的元素收集到一个 List 中。 | List<String> result = names.stream().collect(Collectors.toList()); |
toSet() | 将流中的元素收集到一个 Set 中,自动去重。 | Set<String> result = names.stream().collect(Collectors.toSet()); |
joining() | 将流中的元素连接成一个字符串,支持指定分隔符、前缀和后缀。 | String result = names.stream().collect(Collectors.joining(", ", "[", "]")); |
groupingBy() | 根据某个条件将流中的元素分组,返回一个 Map 。 | Map<Integer, List<String>> groupedByLength = names.stream().collect(Collectors.groupingBy(String::length)); |
partitioningBy() | 将流中的元素分成两组,通常用于二元分类。 | Map<Boolean, List<String>> partitioned = names.stream().collect(Collectors.partitioningBy(name -> name.length() > 3)); |
summarizingInt() | 对流中的元素进行统计,返回 IntSummaryStatistics ,包括计数、求和、最小值、最大值、平均值等。 | IntSummaryStatistics stats = names.stream().collect(Collectors.summarizingInt(String::length)); |
reducing() | 对流中的元素进行归约操作(例如累加、求最大值等),返回一个单一结果。 | Optional<String> result = names.stream().collect(Collectors.reducing((s1, s2) -> s1.length() > s2.length() ? s1 : s2)); |
toMap() | 将流中的元素根据某个键值映射规则收集到一个 Map 中。 | Map<Integer, String> map = names.stream().collect(Collectors.toMap(String::length, name -> name)); |
| 将流中的元素根据某个键值映射规则收集到一个 `Map` 中。 | `Map<Integer, String> map = names.stream().collect(Collectors.toMap(String::length, name -> name));` |