这里写目录标题Gatherer、Collector 自定义 Stream 的中间操作、最终操作
- collect(Collector) 自定义终止操作
- 泛型
- 方法
- 示例
- 实现 toList
- Collectors.CollectorImpl 与工厂方法 Collector.of()
- gather(Gatherer) 自定义中间操作
- 泛型
- Downstream
- Integrator
- Gatherer 的实例方法(4 + 1 个)
- Gatherers.GathererImpl 与工厂方法 of、ofSequential
- 示例
- 实现 map(mapper) (无状态)
- 实现 limit(maxSize) (有状态)
- 实现滑动窗口 windowSliding(windowSize)
在 Stream
中存在两种操作:中间操作和终止操作,中间操作会返回另一个 Stream
,比如 map
、filter
,而终止操作可以返回最终结果(比如 count
、findFirst
)或其他的副作用(比如 forEach
)。
在 Stream
出现之初就提供了 <R, A> R collect(Collector<? super T, A, R> collector);
方法和 Collector
接口来自定义终止操作。但直到 jdk22 才提供了自定义中间操作的方法 <R> Stream<R> gather(Gatherer<? super T, ?, R> gatherer)
和接口 Gatherer
。
collect
的参数是 Collector
,返回值是 R
。Collectors
中定义了多个便捷的终止操作。
gather
的参数是 Gatherer
,返回值是 Stream<R>
。Gatherers
中定义了多个便捷的中间操作。
collect(Collector) 自定义终止操作
泛型
<T>
:元素的类型
<A>
:中间状态的类型,通常作为实现细节隐藏,使用 ?
替代
<R>
:最终结果的类型
public interface Collector<T, A, R>
方法
Supplier<A> supplier()
:提供一个中间结果,类型为 A
BiConsumer<A, T> accumulator()
:将元素累积到中间结中
BinaryOperator<A> combiner()
:合并两个中间结果,并返回合并后的中间结果(只有并行流会用到,如果实现的 Collector
只想在串行流中昂使用,可以直接抛出 UnsupportedOperationException
)
Function<A, R> finisher()
:将中间结果 A
转换为最终结果 R
Set<Characteristics> characteristics()
:提供此 Collector
的一些特性,它们提供了一些 hint,用于提升流执行时的性能。Characteristics
是个枚举,有以下 3 种取值:
CONCURRENT
:表明此Collector
是并发的,多个线程可以对同一个中间结果调用accumulator
方法(线程安全的)。如果Collector
是CONCURRENT
的,但不是UNORDERED
的,则只有在Stream
本身是StreamOpFlag.NOT_ORDERED
才并发执行UNORDERED
:表明此Collector
可能不会保留元素的输入顺序。(如果中间结果或和最终结果没有内部顺序,例如Set
,则可能是这样)IDENTITY_FINISH
:表明finisher
函数是恒等函数,可以省略,中间结果可以强制转换为最终结果
示例
实现 toList
内部类实现
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Stream;public class C1 {public static void main(String[] args) {List<Integer> list = Stream.of(1, 2, 3).collect(toList());System.out.println(list);// 输出// [1, 2, 3]}// 元素类型 T// 中间结果类型 ? 由于中间结果属于内部实现细节,可以不对外暴露,所以使用 ?// 最终结果类型 List<T>static <T> Collector<T, ?, List<T>> toList() {return new ToList<>();}static class ToList<T>// 元素类型 T// 中间结果类型 List<T>// 最终结果类型 List<T>implements Collector<T, List<T>, List<T>> {// ArrayList 作为中间结果@Overridepublic Supplier<List<T>> supplier() {return ArrayList::new;}// 将元素 T 累积到中间结果 List<T> 中@Overridepublic BiConsumer<List<T>, T> accumulator() {return List::add;}// 合并两个中间结果@Overridepublic BinaryOperator<List<T>> combiner() {return (left, right) -> {left.addAll(right);return left;};}// 将中间结果转换为最终结果,因为中间结果与最终结果一致,所以直接使用 Function.identity()@Overridepublic Function<List<T>, List<T>> finisher() {return Function.identity();}// 表明 finisher 是恒等函数,中间结果可强转为最终结果@Overridepublic Set<Characteristics> characteristics() {return EnumSet.of(Characteristics.IDENTITY_FINISH);}}
}
Collectors.CollectorImpl 与工厂方法 Collector.of()
Collector
的实现类为 Collectors.CollectorImpl
,有两个构造方法
// 第一个:5 个参数
record CollectorImpl<T, A, R>(Supplier<A> supplier,BiConsumer<A, T> accumulator,BinaryOperator<A> combiner,Function<A, R> finisher,Set<Characteristics> characteristics) implements Collector<T, A, R> {// 第二个:4 个参数,finisher 默认为 castingIdentity()CollectorImpl(Supplier<A> supplier,BiConsumer<A, T> accumulator,BinaryOperator<A> combiner,Set<Characteristics> characteristics) {this(supplier, accumulator, combiner, castingIdentity(), characteristics);}
}// 默认的 finisher,是个恒等函数
@SuppressWarnings("unchecked")
private static <I, R> Function<I, R> castingIdentity() {return i -> (R) i;
}
Collector
提供了两个 static
的 of
方法用于创建 Collector
,分别调用了上述两个构造方法:
static final Set<Collector.Characteristics> CH_ID= Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));public static<T, R> Collector<T, R, R> of(Supplier<R> supplier,BiConsumer<R, T> accumulator,BinaryOperator<R> combiner,Characteristics... characteristics) {Objects.requireNonNull(supplier);Objects.requireNonNull(accumulator);Objects.requireNonNull(combiner);Objects.requireNonNull(characteristics);// 默认会添加 Collector.Characteristics.IDENTITY_FINISHSet<Characteristics> cs = (characteristics.length == 0)? Collectors.CH_ID: Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH,characteristics));return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, cs);
}public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier,BiConsumer<A, T> accumulator,BinaryOperator<A> combiner,Function<A, R> finisher,Characteristics... characteristics) {Objects.requireNonNull(supplier);Objects.requireNonNull(accumulator);Objects.requireNonNull(combiner);Objects.requireNonNull(finisher);Objects.requireNonNull(characteristics);Set<Characteristics> cs = Collectors.CH_NOID;if (characteristics.length > 0) {cs = EnumSet.noneOf(Characteristics.class);Collections.addAll(cs, characteristics);cs = Collections.unmodifiableSet(cs);}return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, finisher, cs);
}
实现 toList
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Stream;public class C2 {public static void main(String[] args) {List<Integer> list = Stream.of(1, 2, 3).collect(toList());System.out.println(list);}static <T> Collector<T, ?, List<T>> toList() {return Collector.of(ArrayList::new,List::add,(left, right) -> {left.addAll(right);return left;},Function.identity());}
}// 输出
// [1, 2, 3]
gather(Gatherer) 自定义中间操作
Gatherer
可实现有状态(比如 distinct、sorted
)或无状态(比如 map、filter
)的中间操作,操作可以按顺序执行,也可以并行执行,如果提供了 combiner 函数的话。Gatherer
可以以一对一(比如 map、peek
)、一对多(比如 flatMap
)、多对一(比如 distinct
)或多对多(比如 sorted
)的方式转换元素。它们可以跟踪先前看到的元素以影响后续元素的转换(有状态的),它们可以短路以将无限流转换为有限流(比如 limit
)。例如,一个 Gatherer
可以将一个输入元素转换为一个输出元素,直到某个条件变为真,此时它开始将一个输入元素转换为两个输出元素。
Stream
接口中声明的每个现有中间操作都可以通过 gather(Gatherer)
来实现。
泛型
<T>
:输入元素的类型
<A>
:中间状态的类型,通常作为实现细节隐藏,使用 ?
替代
<R>
:输出元素的类型,即发送到 Stream
的下一个阶段的元素类型、Downstream
的泛型类型
public interface Gatherer<T, A, R>
Downstream
Downstream
可以将元素发送到 Stream
的下一个阶段,可在 Integrator
和 finisher 中使用。
这里的泛型 T
指的是 Gatherer
的泛型 R
,不要混淆。
interface Downstream<T> {// 将元素发送到 Stream 的下一个阶段boolean push(T element);// 检查是否下一个阶段不希望向其发送更多元素default boolean isRejecting() { return false; }
}
Integrator
Integrator
接收元素 T 并对其进行处理,可以有选择的使用中间状态 A,并可以有选择的使用 Downstream
向下一个阶段发送增量结果 R,可通过返回 false
来短路此 Gatherer
。
interface Integrator<A, T, R> {// 1 执行给定的操作:当前状态、下一个元素和下游对象;可能会检查和/或更新 State,可以选择向下游发送任意数量的元素 —— 然后返回是否要使用更多元素。// 2 处理元素 T element// 3 可以检查、更新中间状态 A state// 4 可以选择向 Downstream 发送任意数量(可不发送、可发送 1 个、可发送多个)的元素// 5 然后返回是否要处理更多元素 T element,返回 false 会短路此 Gatherer,丢弃未处理的元素// 比如实现 limit(n) 时,前几次返回 true,当到达 n 个时返回 false 来短路以丢弃未处理的元素boolean integrate(A state, T element, Downstream<? super R> downstream);// Integrator 的子接口,Greedy 会处理所有的元素,换句话说,Greedy 不会短路,该信息可用于优化 Stream 的执行// 需要处理所有元素的中间操作可以interface Greedy<A, T, R> extends Integrator<A, T, R> { }// 工厂方法,将 lambda 转换成 Integratorstatic <A, T, R> Integrator<A, T, R> of(Integrator<A, T, R> integrator) {return integrator;}// 工厂方法,将 lambda 转换成 Greedystatic <A, T, R> Greedy<A, T, R> ofGreedy(Greedy<A, T, R> greedy) {return greedy;}
}
Gatherer 的实例方法(4 + 1 个)
// 用于提供中间状态,比如实现滑动窗口时存储当前窗口的元素;实现 limit 时存储已经处理过几个元素了
default Supplier<A> initializer() {return defaultInitializer();
};// 用于提供 Integrator
Integrator<A, T, R> integrator();// 将两个中间状态合并成一个并返回
default BinaryOperator<A> combiner() {return defaultCombiner();
}// 1 处理完所有的元素之后调用,参数为中间状态 A 和 Downstream。
// 2 可以检查、更新中间状态 A state
// 3 可以选择向 Downstream 发送任意数量(可不发送、可发送 1 个、可发送多个)的元素
// 例如在实现 windowFixed(windowSize) 时(将元素分组到大小固定的 List 中),最后一个窗口的元素可能小于 windowSize 个,所以不会在 Integrator#integrate 方法中发送到 Stream 下一个阶段,此时需要在 finisher 中判断当前窗口是否有剩余的元素,如果有则发送到下一个阶段
default BiConsumer<A, Downstream<? super R>> finisher() {return defaultFinisher();
}// 组合两个 Gatherer,将 this 的输出作为 that 输入
default <RR> Gatherer<T, ?, RR> andThen(Gatherer<? super R, ?, ? extends RR> that) {Objects.requireNonNull(that);return Gatherers.Composite.of(this, that);
}
Gatherers.GathererImpl 与工厂方法 of、ofSequential
Gatherer
的默认实现为 Gatherers.GathererImpl
,有两个构造方法
record GathererImpl<T, A, R>(@Override Supplier<A> initializer,@Override Integrator<A, T, R> integrator,@Override BinaryOperator<A> combiner,@Override BiConsumer<A, Downstream<? super R>> finisher) implements Gatherer<T, A, R> {static <T, A, R> GathererImpl<T, A, R> of(Supplier<A> initializer,Integrator<A, T, R> integrator,BinaryOperator<A> combiner,BiConsumer<A, Downstream<? super R>> finisher) {return new GathererImpl<>(Objects.requireNonNull(initializer,"initializer"),Objects.requireNonNull(integrator, "integrator"),Objects.requireNonNull(combiner, "combiner"),Objects.requireNonNull(finisher, "finisher"));}
}
3 个特殊用途的 initializer、combiner、finisher 方法:
// 返回默认的 initializer,使用此 initializer 的 Gatherer 被认为是无状态的
static <A> Supplier<A> defaultInitializer() {return Gatherers.Value.DEFAULT.initializer();
}// 返回默认的 combiner,使用此 combiner 的 Gatherer 只能串行执行,不能并行执行
static <A> BinaryOperator<A> defaultCombiner() {return Gatherers.Value.DEFAULT.combiner();
}// 返回默认的 finisher,此 finisher 此空的,不执行任何操作
static <A, R> BiConsumer<A, Downstream<? super R>> defaultFinisher() {return Gatherers.Value.DEFAULT.finisher();
}
工厂方法 of、ofSequential
:
// 创建串行的、无状态的,且 中间状态为 Void
static <T, R> Gatherer<T, Void, R> ofSequential(Integrator<Void, T, R> integrator);// 创建串行的、无状态的,且 中间状态为 Void
static <T, R> Gatherer<T, Void, R> ofSequential(Integrator<Void, T, R> integrator,BiConsumer<Void, Downstream<? super R>> finisher);// 创建串行的
static <T, A, R> Gatherer<T, A, R> ofSequential(Supplier<A> initializer,Integrator<A, T, R> integrator);// 创建串行的
static <T, A, R> Gatherer<T, A, R> ofSequential(Supplier<A> initializer,Integrator<A, T, R> integrator,BiConsumer<A, Downstream<? super R>> finisher);// 4 个参数
static <T, A, R> Gatherer<T, A, R> of(Supplier<A> initializer,Integrator<A, T, R> integrator,BinaryOperator<A> combiner,BiConsumer<A, Downstream<? super R>> finisher) {return new Gatherers.GathererImpl<>(Objects.requireNonNull(initializer),Objects.requireNonNull(integrator),Objects.requireNonNull(combiner),Objects.requireNonNull(finisher));
}// 创建无状态的,且 中间状态为 Void
static <T, R> Gatherer<T, Void, R> of(Integrator<Void, T, R> integrator);// 创建无状态的,且 中间状态为 Void
static <T, R> Gatherer<T, Void, R> of(Integrator<Void, T, R> integrator,BiConsumer<Void, Downstream<? super R>> finisher);
示例
实现 map(mapper) (无状态)
import java.util.function.Function;
import java.util.stream.Gatherer;
import java.util.stream.Stream;@SuppressWarnings("preview")
public class G1 {public static void main(String[] args) {Stream.of(1, 2, 3).gather(map(x -> x * x)).forEach(System.out::println);}// <R> Stream<R> map(Function<? super T, ? extends R> mapper);static <T, R> Gatherer<T, Void, R> map(Function<? super T, ? extends R> mapper) {// 单参数的 ofSequential 创建串行的、无状态的return Gatherer.ofSequential(// map 会处理所有元素,不会短路,所以用 ofGreedy 包装成 Greedy 以优化 Stream 执行Gatherer.Integrator.ofGreedy(// map 是无状态的,state 用 _ 代替(_, element, downstream) -> {R r = mapper.apply(element);return downstream.push(r);}));}
}// 输出
1
4
9
实现 limit(maxSize) (有状态)
import java.util.stream.Gatherer;
import java.util.stream.Stream;@SuppressWarnings("preview")
public class G2 {public static void main(String[] args) {Stream.of(1, 2, 3).gather(limit(1)).forEach(System.out::println);System.out.println();Stream.of(1, 2, 3).gather(limit(2)).forEach(System.out::println);System.out.println();Stream.of(1, 2, 3).gather(limit(5)).forEach(System.out::println);System.out.println();}// Stream<T> limit(long maxSize);static <T> Gatherer<T, ?, T> limit(long maxSize) {// State 存储已发送到下一个阶段的元素数量class State {long count;}// 串行的、有状态的return Gatherer.ofSequential(State::new,(state, element, downstream) -> {// 数量未达到 maxSize,便发送元素到下一个阶段if (state.count < maxSize && downstream.push(element)) {state.count++;return true;}return false;});}
}// 输出
11
21
2
3
实现滑动窗口 windowSliding(windowSize)
输入:1,2,3,4,5,6,7,8
windowSize = 2,结果 [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]
windowSize = 6,结果 [[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4, 5, 6, 7, 8]]
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
import java.util.stream.Stream;@SuppressWarnings("preview")
public class G3 {public static void main(String[] args) {List<List<Integer>> windows2 = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).gather(windowSliding(2)).toList();System.out.println(windows2);List<List<Integer>> windows6 =Stream.of(1, 2, 3, 4, 5, 6, 7, 8).gather(Gatherers.windowSliding(6)).toList();System.out.println(windows6);}static <T> Gatherer<T, ?, List<T>> windowSliding(int windowSize) {if (windowSize <= 0) {throw new IllegalArgumentException("'windowSize' must be greater than zero");}// state 维护当前窗口和是否是第一个窗口class State {final List<T> window = new ArrayList<>(windowSize);// 当 Stream 的元素数量小于 windowSize 时,// finisher 中判断是否是第一个窗口,如果是,则将元素发送到下一个阶段,否则不发送boolean firstWindow = true;}// 串行的、有状态的return Gatherer.ofSequential(State::new,// 不短路Gatherer.Integrator.ofGreedy((state, element, downstream) -> {// 添加元素到当前窗口state.window.add(element);// 若当前窗口满足大小就发送到下一个阶段if (state.window.size() == windowSize) {boolean result = downstream.push(new ArrayList<>(state.window));// 删除窗口最左边的元素state.window.removeFirst();state.firstWindow = false;return result;}return true;}),(state, downstream) -> {// firstWindow 为 true 说明 Stream 中的元素数量小于 windowSize 且从未向下一个阶段发送过元素// 需要将当前窗口的元素发送到下一个阶段if (state.firstWindow && !state.window.isEmpty() && !downstream.isRejecting()) {downstream.push(new ArrayList<>(state.window));state.window.clear();}});}
}// 输出
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]
[[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4, 5, 6, 7, 8]]