diff --git a/src/java.base/share/classes/java/util/StringJoiner.java b/src/java.base/share/classes/java/util/StringJoiner.java index f8127d9b70718..155a7f644f7fd 100644 --- a/src/java.base/share/classes/java/util/StringJoiner.java +++ b/src/java.base/share/classes/java/util/StringJoiner.java @@ -133,6 +133,36 @@ public StringJoiner(CharSequence delimiter, checkAddLength(0, 0); } + /** + * Constructs a {@code StringJoiner} with no characters in it using copies + * of the supplied {@code prefix}, {@code delimiter} and {@code suffix}. + * If no characters are added to the {@code StringJoiner} and methods + * accessing the string value of it are invoked, it will return the + * {@code prefix + suffix} (or properties thereof) in the result, unless + * {@code setEmptyValue} has first been called. + * + * @param delimiter the sequence of characters to be used between each + * element added to the {@code StringJoiner} + * @param prefix the sequence of characters to be used at the beginning + * @param suffix the sequence of characters to be used at the end + * @param initialCapacity the number of elements that can be added before + * the internal buffer needs to be resized + * @throws NullPointerException if {@code prefix}, {@code delimiter}, or + * {@code suffix} is {@code null} + */ + public StringJoiner(CharSequence delimiter, + CharSequence prefix, + CharSequence suffix, + int initialCapacity) { + this(delimiter, prefix, suffix); + if (initialCapacity < 0) { + throw new IllegalArgumentException("Capacity must be non-negative"); + } + // Capacity is doubled when growing the array, so ensure it is at least + // 1. + elts = new String[Math.max(initialCapacity, 1)]; + } + /** * Sets the sequence of characters to be used when determining the string * representation of this {@code StringJoiner} and no elements have been diff --git a/src/java.base/share/classes/java/util/stream/Collector.java b/src/java.base/share/classes/java/util/stream/Collector.java index 9ef6567908333..29b44afd5d8b7 100644 --- a/src/java.base/share/classes/java/util/stream/Collector.java +++ b/src/java.base/share/classes/java/util/stream/Collector.java @@ -31,6 +31,8 @@ import java.util.function.BiConsumer; import java.util.function.BinaryOperator; import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.LongFunction; import java.util.function.Supplier; // A compilation test for the code snippets in this class-level javadoc can be found at: @@ -206,6 +208,19 @@ public interface Collector { */ Supplier supplier(); + /** + * A function that creates and returns a new mutable result container given + * the exact number of input elements or {@code -1} if not known. + * + * @implSpec The default implementation returns a function that returns the + * result of calling {@link #supplier()}. + * + * @return a function which returns a new, mutable result container + */ + default LongFunction sizedSupplier() { + return _ -> supplier().get(); + } + /** * A function that folds a value into a mutable result container. * @@ -273,7 +288,7 @@ public static Collector of(Supplier supplier, ? Collectors.CH_ID : Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH, characteristics)); - return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, cs); + return Collectors.CollectorImpl.ofUnsized(supplier, accumulator, combiner, cs); } /** @@ -308,7 +323,85 @@ public static Collector of(Supplier supplier, Collections.addAll(cs, characteristics); cs = Collections.unmodifiableSet(cs); } - return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, finisher, cs); + return Collectors.CollectorImpl.ofUnsized(supplier, accumulator, combiner, finisher, cs); + } + + /** + * Returns a new {@code Collector} described by the given {@code supplier}, + * {@code sizedSupplier}, {@code accumulator}, and {@code combiner} functions. + * The resulting {@code Collector} has the + * {@code Collector.Characteristics.IDENTITY_FINISH} characteristic. + * + * @param supplier The supplier function for the new collector if the stream + * size is unknown + * @param sizedSupplier The function which returns a new result container + * of the appropriate size if the stream size is known + * @param accumulator The accumulator function for the new collector + * @param combiner The combiner function for the new collector + * @param characteristics The collector characteristics for the new + * collector + * @param The type of input elements for the new collector + * @param The type of intermediate accumulation result, and final result, + * for the new collector + * @throws NullPointerException if any argument is null + * @return the new {@code Collector} + */ + public static Collector ofSized(Supplier supplier, + IntFunction sizedSupplier, + BiConsumer accumulator, + BinaryOperator combiner, + Characteristics... characteristics) { + Objects.requireNonNull(supplier); + Objects.requireNonNull(sizedSupplier); + Objects.requireNonNull(accumulator); + Objects.requireNonNull(combiner); + Objects.requireNonNull(characteristics); + Set cs = (characteristics.length == 0) + ? Collectors.CH_ID + : Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH, + characteristics)); + return Collectors.CollectorImpl.of(supplier, sizedSupplier, accumulator, combiner, cs); + } + + /** + * Returns a new {@code Collector} described by the given {@code supplier}, + * {@code sizedSupplier}, {@code accumulator}, {@code combiner}, and + * {@code finisher} functions. + * + * @param supplier The supplier function for the new collector if the stream + * size is unknown + * @param sizedSupplier The function which returns a new result container + * of the appropriate size if the stream size is known + * @param accumulator The accumulator function for the new collector + * @param combiner The combiner function for the new collector + * @param finisher The finisher function for the new collector + * @param characteristics The collector characteristics for the new + * collector + * @param The type of input elements for the new collector + * @param The intermediate accumulation type of the new collector + * @param The final result type of the new collector + * @throws NullPointerException if any argument is null + * @return the new {@code Collector} + */ + public static Collector ofSized(Supplier supplier, + IntFunction sizedSupplier, + BiConsumer accumulator, + BinaryOperator combiner, + Function finisher, + Characteristics... characteristics) { + Objects.requireNonNull(supplier); + Objects.requireNonNull(sizedSupplier); + Objects.requireNonNull(accumulator); + Objects.requireNonNull(combiner); + Objects.requireNonNull(finisher); + Objects.requireNonNull(characteristics); + Set cs = Collectors.CH_NOID; + if (characteristics.length > 0) { + cs = EnumSet.noneOf(Characteristics.class); + Collections.addAll(cs, characteristics); + cs = Collections.unmodifiableSet(cs); + } + return Collectors.CollectorImpl.of(supplier, sizedSupplier, accumulator, combiner, finisher, cs); } /** diff --git a/src/java.base/share/classes/java/util/stream/Collectors.java b/src/java.base/share/classes/java/util/stream/Collectors.java index fe20bc1709a38..531a980795299 100644 --- a/src/java.base/share/classes/java/util/stream/Collectors.java +++ b/src/java.base/share/classes/java/util/stream/Collectors.java @@ -50,6 +50,8 @@ import java.util.function.BinaryOperator; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.LongFunction; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.function.ToDoubleFunction; @@ -195,17 +197,55 @@ private static Function castingIdentity() { * @param the type of the result */ record CollectorImpl(Supplier supplier, + LongFunction sizedSupplier, BiConsumer accumulator, BinaryOperator combiner, Function finisher, Set characteristics ) implements Collector { - CollectorImpl(Supplier supplier, - BiConsumer accumulator, - BinaryOperator combiner, - Set characteristics) { - this(supplier, accumulator, combiner, castingIdentity(), characteristics); + static CollectorImpl ofUnsized(Supplier supplier, + BiConsumer accumulator, + BinaryOperator combiner, + Function finisher, + Set characteristics) { + return new CollectorImpl<>(supplier, + _ -> supplier.get(), + accumulator, + combiner, + finisher, + characteristics); + } + + static CollectorImpl ofUnsized(Supplier supplier, + BiConsumer accumulator, + BinaryOperator combiner, + Set characteristics) { + return ofUnsized(supplier, accumulator, combiner, castingIdentity(), characteristics); + } + + static CollectorImpl of(Supplier supplier, + IntFunction sizedSupplier, + BiConsumer accumulator, + BinaryOperator combiner, + Function finisher, + Set characteristics) { + return new CollectorImpl<>(supplier, + exactSizeIfKnown -> exactSizeIfKnown != -1 + ? sizedSupplier.apply((int) Math.min(exactSizeIfKnown, Integer.MAX_VALUE)) + : supplier.get(), + accumulator, + combiner, + finisher, + characteristics); + } + + static CollectorImpl of(Supplier supplier, + IntFunction sizedSupplier, + BiConsumer accumulator, + BinaryOperator combiner, + Set characteristics) { + return of(supplier, sizedSupplier, accumulator, combiner, castingIdentity(), characteristics); } } @@ -223,9 +263,9 @@ record CollectorImpl(Supplier supplier, */ public static > Collector toCollection(Supplier collectionFactory) { - return new CollectorImpl<>(collectionFactory, Collection::add, - (r1, r2) -> { r1.addAll(r2); return r1; }, - CH_ID); + return CollectorImpl.ofUnsized(collectionFactory, Collection::add, + (r1, r2) -> { r1.addAll(r2); return r1; }, + CH_ID); } /** @@ -240,9 +280,9 @@ record CollectorImpl(Supplier supplier, */ public static Collector> toList() { - return new CollectorImpl<>(ArrayList::new, List::add, - (left, right) -> { left.addAll(right); return left; }, - CH_ID); + return CollectorImpl.of(ArrayList::new, ArrayList::new, List::add, + (left, right) -> { left.addAll(right); return left; }, + CH_ID); } /** @@ -258,17 +298,17 @@ record CollectorImpl(Supplier supplier, */ public static Collector> toUnmodifiableList() { - return new CollectorImpl<>(ArrayList::new, List::add, - (left, right) -> { left.addAll(right); return left; }, - list -> { - if (list.getClass() == ArrayList.class) { // ensure it's trusted - return SharedSecrets.getJavaUtilCollectionAccess() - .listFromTrustedArray(list.toArray()); - } else { - throw new IllegalArgumentException(); - } - }, - CH_NOID); + return CollectorImpl.of(ArrayList::new, ArrayList::new, List::add, + (left, right) -> { left.addAll(right); return left; }, + list -> { + if (list.getClass() == ArrayList.class) { // ensure it's trusted + return SharedSecrets.getJavaUtilCollectionAccess() + .listFromTrustedArray(list.toArray()); + } else { + throw new IllegalArgumentException(); + } + }, + CH_NOID); } /** @@ -287,15 +327,15 @@ record CollectorImpl(Supplier supplier, */ public static Collector> toSet() { - return new CollectorImpl<>(HashSet::new, Set::add, - (left, right) -> { - if (left.size() < right.size()) { - right.addAll(left); return right; - } else { - left.addAll(right); return left; - } - }, - CH_UNORDERED_ID); + return CollectorImpl.ofUnsized(HashSet::new, Set::add, + (left, right) -> { + if (left.size() < right.size()) { + right.addAll(left); return right; + } else { + left.addAll(right); return left; + } + }, + CH_UNORDERED_ID); } /** @@ -316,16 +356,16 @@ record CollectorImpl(Supplier supplier, @SuppressWarnings("unchecked") public static Collector> toUnmodifiableSet() { - return new CollectorImpl<>(HashSet::new, Set::add, - (left, right) -> { - if (left.size() < right.size()) { - right.addAll(left); return right; - } else { - left.addAll(right); return left; - } - }, - set -> (Set)Set.of(set.toArray()), - CH_UNORDERED_NOID); + return CollectorImpl.ofUnsized(HashSet::new, Set::add, + (left, right) -> { + if (left.size() < right.size()) { + right.addAll(left); return right; + } else { + left.addAll(right); return left; + } + }, + set -> (Set) Set.of(set.toArray()), + CH_UNORDERED_NOID); } /** @@ -336,7 +376,7 @@ record CollectorImpl(Supplier supplier, * {@code String}, in encounter order */ public static Collector joining() { - return new CollectorImpl<>( + return CollectorImpl.ofUnsized( StringBuilder::new, StringBuilder::append, (r1, r2) -> { r1.append(r2); @@ -373,8 +413,9 @@ record CollectorImpl(Supplier supplier, public static Collector joining(CharSequence delimiter, CharSequence prefix, CharSequence suffix) { - return new CollectorImpl<>( + return CollectorImpl.of( () -> new StringJoiner(delimiter, prefix, suffix), + exactSize -> new StringJoiner(delimiter, prefix, suffix, exactSize), StringJoiner::add, StringJoiner::merge, StringJoiner::toString, CH_NOID); } @@ -431,7 +472,7 @@ BinaryOperator mapMerger(BinaryOperator mergeFunction) { Collector mapping(Function mapper, Collector downstream) { BiConsumer downstreamAccumulator = downstream.accumulator(); - return new CollectorImpl<>(downstream.supplier(), + return new CollectorImpl<>(downstream.supplier(), downstream.sizedSupplier(), (r, t) -> downstreamAccumulator.accept(r, mapper.apply(t)), downstream.combiner(), downstream.finisher(), downstream.characteristics()); @@ -476,7 +517,7 @@ BinaryOperator mapMerger(BinaryOperator mergeFunction) { Collector flatMapping(Function> mapper, Collector downstream) { BiConsumer downstreamAccumulator = downstream.accumulator(); - return new CollectorImpl<>(downstream.supplier(), + return CollectorImpl.ofUnsized(downstream.supplier(), (r, t) -> { try (Stream result = mapper.apply(t)) { if (result != null) @@ -526,7 +567,7 @@ BinaryOperator mapMerger(BinaryOperator mergeFunction) { Collector filtering(Predicate predicate, Collector downstream) { BiConsumer downstreamAccumulator = downstream.accumulator(); - return new CollectorImpl<>(downstream.supplier(), + return CollectorImpl.ofUnsized(downstream.supplier(), (r, t) -> { if (predicate.test(t)) { downstreamAccumulator.accept(r, t); @@ -567,7 +608,7 @@ public static Collector collectingAndThen(Collector dow characteristics = Collections.unmodifiableSet(characteristics); } } - return new CollectorImpl<>(downstream.supplier(), + return new CollectorImpl<>(downstream.supplier(), downstream.sizedSupplier(), downstream.accumulator(), downstream.combiner(), downstream.finisher().andThen(finisher), @@ -642,7 +683,7 @@ public static Collector collectingAndThen(Collector dow */ public static Collector summingInt(ToIntFunction mapper) { - return new CollectorImpl<>( + return CollectorImpl.ofUnsized( () -> new int[1], (a, t) -> { a[0] += mapper.applyAsInt(t); }, (a, b) -> { a[0] += b[0]; return a; }, @@ -660,7 +701,7 @@ public static Collector collectingAndThen(Collector dow */ public static Collector summingLong(ToLongFunction mapper) { - return new CollectorImpl<>( + return CollectorImpl.ofUnsized( () -> new long[1], (a, t) -> { a[0] += mapper.applyAsLong(t); }, (a, b) -> { a[0] += b[0]; return a; }, @@ -693,7 +734,7 @@ public static Collector collectingAndThen(Collector dow * the proper result if the stream contains infinite values of * the same sign. */ - return new CollectorImpl<>( + return CollectorImpl.ofUnsized( () -> new double[3], (a, t) -> { double val = mapper.applyAsDouble(t); sumWithCompensation(a, val); @@ -702,7 +743,7 @@ public static Collector collectingAndThen(Collector dow a[2] += b[2]; // Subtract compensation bits return sumWithCompensation(a, -b[1]); }, - a -> computeFinalSum(a), + Collectors::computeFinalSum, CH_NOID); } @@ -753,7 +794,7 @@ static double computeFinalSum(double[] summands) { */ public static Collector averagingInt(ToIntFunction mapper) { - return new CollectorImpl<>( + return CollectorImpl.ofUnsized( () -> new long[2], (a, t) -> { a[0] += mapper.applyAsInt(t); a[1]++; }, (a, b) -> { a[0] += b[0]; a[1] += b[1]; return a; }, @@ -772,7 +813,7 @@ static double computeFinalSum(double[] summands) { */ public static Collector averagingLong(ToLongFunction mapper) { - return new CollectorImpl<>( + return CollectorImpl.ofUnsized( () -> new long[2], (a, t) -> { a[0] += mapper.applyAsLong(t); a[1]++; }, (a, b) -> { a[0] += b[0]; a[1] += b[1]; return a; }, @@ -810,7 +851,7 @@ static double computeFinalSum(double[] summands) { * the negated low-order bits of the sum computed via compensated * summation, and index 2 holds the number of values seen. */ - return new CollectorImpl<>( + return CollectorImpl.ofUnsized( () -> new double[4], (a, t) -> { double val = mapper.applyAsDouble(t); sumWithCompensation(a, val); a[2]++; a[3]+= val;}, (a, b) -> { @@ -846,7 +887,7 @@ static double computeFinalSum(double[] summands) { */ public static Collector reducing(T identity, BinaryOperator op) { - return new CollectorImpl<>( + return CollectorImpl.ofUnsized( boxSupplier(identity), (a, t) -> { a[0] = op.apply(a[0], t); }, (a, b) -> { a[0] = op.apply(a[0], b[0]); return a; }, @@ -905,7 +946,7 @@ public void accept(T t) { } } - return new CollectorImpl<>( + return CollectorImpl.ofUnsized( OptionalBox::new, OptionalBox::accept, (a, b) -> { if (b.present) a.accept(b.value); @@ -955,7 +996,7 @@ public void accept(T t) { Collector reducing(U identity, Function mapper, BinaryOperator op) { - return new CollectorImpl<>( + return CollectorImpl.ofUnsized( boxSupplier(identity), (a, t) -> { a[0] = op.apply(a[0], mapper.apply(t)); }, (a, b) -> { a[0] = op.apply(a[0], b[0]); return a; }, @@ -1117,7 +1158,7 @@ public void accept(T t) { Supplier> mangledFactory = (Supplier>) mapFactory; if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { - return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID); + return CollectorImpl.ofUnsized(mangledFactory, accumulator, merger, CH_ID); } else { @SuppressWarnings("unchecked") @@ -1128,7 +1169,7 @@ public void accept(T t) { M castResult = (M) intermediate; return castResult; }; - return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID); + return CollectorImpl.ofUnsized(mangledFactory, accumulator, merger, finisher, CH_NOID); } } @@ -1287,7 +1328,7 @@ public void accept(T t) { } if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { - return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_CONCURRENT_ID); + return CollectorImpl.ofUnsized(mangledFactory, accumulator, merger, CH_CONCURRENT_ID); } else { @SuppressWarnings("unchecked") @@ -1298,7 +1339,7 @@ public void accept(T t) { M castResult = (M) intermediate; return castResult; }; - return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_CONCURRENT_NOID); + return CollectorImpl.ofUnsized(mangledFactory, accumulator, merger, finisher, CH_CONCURRENT_NOID); } } @@ -1371,13 +1412,13 @@ public void accept(T t) { new Partition<>(downstream.supplier().get(), downstream.supplier().get()); if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { - return new CollectorImpl<>(supplier, accumulator, merger, CH_ID); + return CollectorImpl.ofUnsized(supplier, accumulator, merger, CH_ID); } else { Function, Map> finisher = par -> new Partition<>(downstream.finisher().apply(par.forTrue), downstream.finisher().apply(par.forFalse)); - return new CollectorImpl<>(supplier, accumulator, merger, finisher, CH_NOID); + return CollectorImpl.ofUnsized(supplier, accumulator, merger, finisher, CH_NOID); } } @@ -1440,10 +1481,13 @@ public void accept(T t) { public static Collector> toMap(Function keyMapper, Function valueMapper) { - return new CollectorImpl<>(HashMap::new, - uniqKeysMapAccumulator(keyMapper, valueMapper), - uniqKeysMapMerger(), - CH_ID); + // uniqKeysMapMerger() throws if duplicate keys are encountered, so + // presizing is generally beneficial. + return CollectorImpl.of(HashMap::new, + HashMap::newHashMap, + uniqKeysMapAccumulator(keyMapper, valueMapper), + uniqKeysMapMerger(), + CH_ID); } /** @@ -1641,7 +1685,7 @@ public void accept(T t) { BiConsumer accumulator = (map, element) -> map.merge(keyMapper.apply(element), valueMapper.apply(element), mergeFunction); - return new CollectorImpl<>(mapFactory, accumulator, mapMerger(mergeFunction), CH_ID); + return CollectorImpl.ofUnsized(mapFactory, accumulator, mapMerger(mergeFunction), CH_ID); } /** @@ -1699,10 +1743,13 @@ public void accept(T t) { public static Collector> toConcurrentMap(Function keyMapper, Function valueMapper) { - return new CollectorImpl<>(ConcurrentHashMap::new, - uniqKeysMapAccumulator(keyMapper, valueMapper), - uniqKeysMapMerger(), - CH_CONCURRENT_ID); + // uniqKeysMapMerger() throws if duplicate keys are encountered, so + // presizing is generally beneficial. + return CollectorImpl.of(ConcurrentHashMap::new, + ConcurrentHashMap::new, + uniqKeysMapAccumulator(keyMapper, valueMapper), + uniqKeysMapMerger(), + CH_CONCURRENT_ID); } /** @@ -1805,7 +1852,7 @@ public void accept(T t) { BiConsumer accumulator = (map, element) -> map.merge(keyMapper.apply(element), valueMapper.apply(element), mergeFunction); - return new CollectorImpl<>(mapFactory, accumulator, mapMerger(mergeFunction), CH_CONCURRENT_ID); + return CollectorImpl.ofUnsized(mapFactory, accumulator, mapMerger(mergeFunction), CH_CONCURRENT_ID); } /** @@ -1822,7 +1869,7 @@ public void accept(T t) { */ public static Collector summarizingInt(ToIntFunction mapper) { - return new CollectorImpl<>( + return CollectorImpl.ofUnsized( IntSummaryStatistics::new, (r, t) -> r.accept(mapper.applyAsInt(t)), (l, r) -> { @@ -1845,7 +1892,7 @@ public void accept(T t) { */ public static Collector summarizingLong(ToLongFunction mapper) { - return new CollectorImpl<>( + return CollectorImpl.ofUnsized( LongSummaryStatistics::new, (r, t) -> r.accept(mapper.applyAsLong(t)), (l, r) -> { @@ -1868,7 +1915,7 @@ public void accept(T t) { */ public static Collector summarizingDouble(ToDoubleFunction mapper) { - return new CollectorImpl<>( + return CollectorImpl.ofUnsized( DoubleSummaryStatistics::new, (r, t) -> r.accept(mapper.applyAsDouble(t)), (l, r) -> { @@ -1926,6 +1973,8 @@ public void accept(T t) { Supplier c1Supplier = Objects.requireNonNull(downstream1.supplier(), "downstream1 supplier"); Supplier c2Supplier = Objects.requireNonNull(downstream2.supplier(), "downstream2 supplier"); + LongFunction c1SizedSupplier = Objects.requireNonNull(downstream1.sizedSupplier(), "downstream1 sizedSupplier"); + LongFunction c2SizedSupplier = Objects.requireNonNull(downstream2.sizedSupplier(), "downstream2 sizedSupplier"); BiConsumer c1Accumulator = Objects.requireNonNull(downstream1.accumulator(), "downstream1 accumulator"); BiConsumer c2Accumulator = @@ -1949,8 +1998,18 @@ public void accept(T t) { } class PairBox { - A1 left = c1Supplier.get(); - A2 right = c2Supplier.get(); + A1 left; + A2 right; + + PairBox() { + left = c1Supplier.get(); + right = c2Supplier.get(); + } + + PairBox(long exactSizeIfKnown) { + left = c1SizedSupplier.apply(exactSizeIfKnown); + right = c2SizedSupplier.apply(exactSizeIfKnown); + } void add(T t) { c1Accumulator.accept(left, t); @@ -1970,7 +2029,7 @@ R get() { } } - return new CollectorImpl<>(PairBox::new, PairBox::add, PairBox::combine, PairBox::get, characteristics); + return new CollectorImpl<>(PairBox::new, PairBox::new, PairBox::add, PairBox::combine, PairBox::get, characteristics); } /** diff --git a/src/java.base/share/classes/java/util/stream/ForEachOps.java b/src/java.base/share/classes/java/util/stream/ForEachOps.java index 6163ed4061e91..69e4053db6ccd 100644 --- a/src/java.base/share/classes/java/util/stream/ForEachOps.java +++ b/src/java.base/share/classes/java/util/stream/ForEachOps.java @@ -29,11 +29,13 @@ import java.util.Objects; import java.util.Spliterator; import java.util.concurrent.CountedCompleter; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.DoubleConsumer; import java.util.function.IntConsumer; import java.util.function.IntFunction; import java.util.function.LongConsumer; +import java.util.function.LongFunction; import java.lang.invoke.VarHandle; import java.lang.invoke.MethodHandles; @@ -120,6 +122,22 @@ public static TerminalOp makeDouble(DoubleConsumer action, return new ForEachOp.OfDouble(action, ordered); } + /** + * Constructs a {@code TerminalOp} that accumulates elements into a + * container provided by a sized supplier, using the provided accumulator. + * + *

+ * See JDK-8072840 + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@State(Scope.Thread) +@Warmup(iterations = 4, time = 2, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 4, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(value = 3) +public class SizedCollectors { + + /** + * Implementation notes: + * - parallel version requires thread-safe sink, we use the same for sequential version for better comparison + * - Q is chosen to do some non-trivial work, but not too much so that allocation and copying overhead is still a + * relevant factor. This is meant to be representative of use cases of streams in which data is reshaped rather + * than processed. + */ + + @Param("1000") + private int N; + + @Param("10") + private int Q; + + private Collector> sizedCollector; + private Collector> unsizedCollector; + + private Collector sizedJoiner; + private Collector unsizedJoiner; + + @Setup + public void setup() { + sizedCollector = Collector.ofSized( + ConcurrentHashMap::new, + ConcurrentHashMap::new, + (map, input) -> map.put(input, doWork(input, Q)), + (_, _) -> { + throw new IllegalArgumentException(); + }, + Collector.Characteristics.CONCURRENT, + Collector.Characteristics.UNORDERED + ); + unsizedCollector = Collector.of( + ConcurrentHashMap::new, + (map, input) -> map.putIfAbsent(input, doWork(input, Q)), + (_, _) -> { + throw new IllegalArgumentException(); + }, + Collector.Characteristics.CONCURRENT, + Collector.Characteristics.UNORDERED + ); + + sizedJoiner = Collectors.joining(","); + unsizedJoiner = Collector.of(() -> new StringJoiner(",", "", ""), + StringJoiner::add, StringJoiner::merge, StringJoiner::toString); + } + + @Benchmark + public Map seq_sized() { + return LongStream.range(0, N).boxed().collect(sizedCollector); + } + + @Benchmark + public Map seq_unsized() { + return LongStream.range(0, N).boxed().collect(unsizedCollector); + } + + @Benchmark + public Map par_sized() { + return LongStream.range(0, N).parallel().boxed().collect(sizedCollector); + } + + @Benchmark + public Map par_unsized() { + return LongStream.range(0, N).parallel().boxed().collect(unsizedCollector); + } + + @Benchmark + public String join_sized() { + return LongStream.range(0, N).mapToObj(l -> "Hello " + doWork(l, Q)).collect(sizedJoiner); + } + + @Benchmark + public String join_unsized() { + return LongStream.range(0, N).mapToObj(l -> "Hello " + doWork(l, Q)).collect(unsizedJoiner); + } + + /** + * Make some work. + * This method have a couple of distinguishable properties: + * - the run time is linear with Q + * - the computation is dependent on input, preventing common reductions + * - the returned result is dependent on loop result, preventing dead code elimination + * - the returned result is almost always false + *

+ * This code uses inlined version of ThreadLocalRandom.next() to mitigate the edge effects + * of acquiring TLR every single call. + * + * @param input input + * @return result + */ + public static boolean doWork(long input, long count) { + long t = input; + for (int i = 0; i < count; i++) { + t += (t * 0x5DEECE66DL + 0xBL) & (0xFFFFFFFFFFFFL); + } + return (t == 0); + } +}

This OP only supports parallel evaluation. + * + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeAccumulateSized( + LongFunction sizedSupplier, + BiConsumer accumulator) { + Objects.requireNonNull(sizedSupplier); + Objects.requireNonNull(accumulator); + return new AccumulateSizedOp<>(sizedSupplier, accumulator); + } + /** * A {@code TerminalOp} that evaluates a stream pipeline and sends the * output to itself as a {@code TerminalSink}. Elements will be sent in @@ -251,6 +269,56 @@ public void accept(double t) { } } + /** + * Specialized implementation class for parallel streams that collect into a + * presized collection. + */ + static class AccumulateSizedOp implements TerminalOp, TerminalSink { + final LongFunction sizedSupplier; + final BiConsumer accumulator; + R container; + + AccumulateSizedOp(LongFunction sizedSupplier, + BiConsumer accumulator) { + this.sizedSupplier = sizedSupplier; + this.accumulator = accumulator; + } + + // TerminalOp + + @Override + public int getOpFlags() { + return StreamOpFlag.NOT_ORDERED; + } + + @Override + public R evaluateSequential(PipelineHelper helper, + Spliterator spliterator) { + throw new UnsupportedOperationException(); + } + + @Override + public R evaluateParallel(PipelineHelper helper, + Spliterator spliterator) { + container = sizedSupplier.apply(spliterator.getExactSizeIfKnown()); + new ForEachOps.ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke(); + return container; + } + + // TerminalSink + + @Override + public void accept(T u) { + accumulator.accept(container, u); + } + + @Override + public R get() { + return container; + } + } + + /** A {@code ForkJoinTask} for performing a parallel for-each operation */ @SuppressWarnings("serial") static final class ForEachTask extends CountedCompleter { diff --git a/src/java.base/share/classes/java/util/stream/GathererOp.java b/src/java.base/share/classes/java/util/stream/GathererOp.java index 39758bd834ad2..ef6165090ddf7 100644 --- a/src/java.base/share/classes/java/util/stream/GathererOp.java +++ b/src/java.base/share/classes/java/util/stream/GathererOp.java @@ -37,6 +37,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.IntFunction; +import java.util.function.LongFunction; import java.util.function.Supplier; import java.util.stream.Gatherer.Integrator; @@ -77,13 +78,16 @@ static Stream of( static final class NodeBuilder implements Consumer { private static final int LINEAR_APPEND_MAX = 8; // TODO revisit static final class Builder extends SpinedBuffer implements Node { - Builder() { + Builder(int initialCapacity) { + super(initialCapacity); } } - NodeBuilder() { + NodeBuilder(long exactSizeIfKnown) { + initialCapacity = exactSizeIfKnown != -1 ? (int) Math.min(exactSizeIfKnown, Integer.MAX_VALUE) : 16; } + private final int initialCapacity; private Builder rightMost; private Node leftMost; @@ -94,7 +98,7 @@ private boolean isEmpty() { @Override public void accept(X x) { final var b = rightMost; - (b == null ? (rightMost = new NodeBuilder.Builder<>()) : b).accept(x); + (b == null ? (rightMost = new NodeBuilder.Builder<>(initialCapacity)) : b).accept(x); } public NodeBuilder join(NodeBuilder that) { @@ -311,7 +315,7 @@ public CR collect(Collector c) { u.wrapSpliterator(u.sourceSpliterator(0)), parallel, gatherer, - c.supplier(), + c.sizedSupplier(), c.accumulator(), parallel ? c.combiner() : null, c.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) @@ -331,7 +335,7 @@ public RR collect(Supplier supplier, u.wrapSpliterator(u.sourceSpliterator(0)), parallel, gatherer, - supplier, + _ -> supplier.get(), accumulator, parallel ? (l, r) -> { combiner.accept(l, r); @@ -349,7 +353,7 @@ public RR collect(Supplier supplier, private CR evaluate(final Spliterator spliterator, final boolean parallel, final Gatherer gatherer, - final Supplier collectorSupplier, + final LongFunction collectorSizedSupplier, final BiConsumer collectorAccumulator, final BinaryOperator collectorCombiner, final Function collectorFinisher) { @@ -371,10 +375,10 @@ final class Sequential implements Consumer, Gatherer.Downstream { CA collectorState; boolean proceed; - Sequential() { + Sequential(long exactSizeIfKnown) { if (initializer != Gatherer.defaultInitializer()) state = initializer.get(); - collectorState = collectorSupplier.get(); + collectorState = collectorSizedSupplier.apply(exactSizeIfKnown); proceed = true; } @@ -433,7 +437,7 @@ public CR get() { * strategy. */ if (!parallel) - return new Sequential().evaluateUsing(spliterator).get(); + return new Sequential(spliterator.getExactSizeIfKnown()).evaluateUsing(spliterator).get(); // Parallel section starts here: @@ -460,9 +464,12 @@ final class Hybrid extends CountedCompleter { protected Hybrid(Spliterator spliterator) { super(null); this.spliterator = spliterator; + long estimatedSize = spliterator.estimateSize(); this.targetSize = - AbstractTask.suggestTargetSize(spliterator.estimateSize()); - this.localResult = new Sequential(); + AbstractTask.suggestTargetSize(estimatedSize); + this.localResult = new Sequential(spliterator.hasCharacteristics(Spliterator.SIZED) + ? estimatedSize + : -1); this.cancelled = greedy ? null : new AtomicBoolean(false); this.leftPredecessor = null; } @@ -564,7 +571,7 @@ public void compute() { */ if (greedy && task.getPendingCount() > 0) { // Upstream elements are buffered - NodeBuilder nb = new NodeBuilder<>(); + NodeBuilder nb = new NodeBuilder<>(rightSplit.getExactSizeIfKnown()); rightSplit.forEachRemaining(nb); // Run the upstream task.spliterator = nb.build().spliterator(); } @@ -640,7 +647,7 @@ public void setRawResult(Sequential result) { } private void doProcess() { - if (!(localResult = new Sequential()).evaluateUsing(spliterator).proceed + if (!(localResult = new Sequential(spliterator.getExactSizeIfKnown())).evaluateUsing(spliterator).proceed && !greedy) cancelLaterTasks(); } @@ -737,4 +744,4 @@ private void cancelLaterTasks() { else return new Hybrid(spliterator).invoke().get(); } -} \ No newline at end of file +} diff --git a/src/java.base/share/classes/java/util/stream/ReduceOps.java b/src/java.base/share/classes/java/util/stream/ReduceOps.java index 64c94ef35fdd5..3cf762d600a84 100644 --- a/src/java.base/share/classes/java/util/stream/ReduceOps.java +++ b/src/java.base/share/classes/java/util/stream/ReduceOps.java @@ -37,6 +37,7 @@ import java.util.function.DoubleBinaryOperator; import java.util.function.IntBinaryOperator; import java.util.function.LongBinaryOperator; +import java.util.function.LongFunction; import java.util.function.ObjDoubleConsumer; import java.util.function.ObjIntConsumer; import java.util.function.ObjLongConsumer; @@ -154,14 +155,14 @@ public ReducingSink makeSink() { */ public static TerminalOp makeRef(Collector collector) { - Supplier supplier = Objects.requireNonNull(collector).supplier(); + LongFunction sizedSupplier = Objects.requireNonNull(collector).sizedSupplier(); BiConsumer accumulator = collector.accumulator(); BinaryOperator combiner = collector.combiner(); class ReducingSink extends Box implements AccumulatingSink { @Override public void begin(long size) { - state = supplier.get(); + state = sizedSupplier.apply(size); } @Override diff --git a/src/java.base/share/classes/java/util/stream/ReferencePipeline.java b/src/java.base/share/classes/java/util/stream/ReferencePipeline.java index 3133b20b03d4f..75c3e162c2f16 100644 --- a/src/java.base/share/classes/java/util/stream/ReferencePipeline.java +++ b/src/java.base/share/classes/java/util/stream/ReferencePipeline.java @@ -719,9 +719,7 @@ public R collect(Collector collector) { if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { - container = collector.supplier().get(); - BiConsumer accumulator = collector.accumulator(); - forEach(u -> accumulator.accept(container, u)); + container = evaluate(ForEachOps.makeAccumulateSized(collector.sizedSupplier(), collector.accumulator())); } else { container = evaluate(ReduceOps.makeRef(collector)); diff --git a/test/jdk/java/util/StringJoiner/StringJoinerTest.java b/test/jdk/java/util/StringJoiner/StringJoinerTest.java index 25948fb8e55db..d05b214b811a7 100644 --- a/test/jdk/java/util/StringJoiner/StringJoinerTest.java +++ b/test/jdk/java/util/StringJoiner/StringJoinerTest.java @@ -327,6 +327,24 @@ public void testDelimiterCombinations() { testCombos(",", "<", ">"); } + public void testNegativeCapacity() { + try { + new StringJoiner(DASH, "{", "}", -1); + fail("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException ex) { + // okay + } + } + + public void testZeroCapacity() { + StringJoiner sj = new StringJoiner(DASH, "{", "}", 0); + assertEquals(sj.toString(), "{}"); + sj.add(ONE); + assertEquals(sj.toString(), "{" + ONE + "}"); + sj.add(TWO); + assertEquals(sj.toString(), "{" + ONE + DASH + TWO + "}"); + } + public void OOM1() { try { new StringJoiner(MAX_STRING, MAX_STRING, MAX_STRING).toString(); diff --git a/test/jdk/java/util/stream/test/org/openjdk/tests/java/util/stream/CollectorsTest.java b/test/jdk/java/util/stream/test/org/openjdk/tests/java/util/stream/CollectorsTest.java index 4ce3916bcbf33..82f94e909083e 100644 --- a/test/jdk/java/util/stream/test/org/openjdk/tests/java/util/stream/CollectorsTest.java +++ b/test/jdk/java/util/stream/test/org/openjdk/tests/java/util/stream/CollectorsTest.java @@ -47,6 +47,7 @@ import java.util.function.Supplier; import java.util.stream.Collector; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.LambdaTestHelpers; import java.util.stream.OpTestCase; import java.util.stream.Stream; @@ -64,11 +65,13 @@ import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.partitioningBy; import static java.util.stream.Collectors.reducing; +import static java.util.stream.Collectors.teeing; import static java.util.stream.Collectors.toCollection; import static java.util.stream.Collectors.toConcurrentMap; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toSet; +import static java.util.stream.Gatherers.scan; import static java.util.stream.LambdaTestHelpers.assertContents; import static java.util.stream.LambdaTestHelpers.assertContentsUnordered; import static java.util.stream.LambdaTestHelpers.mDoubler; @@ -807,4 +810,71 @@ public void testTeeing(String name, TestData.OfRef data) throws Reflect new GroupingByAssertion<>(classifier, Map.class, new TeeingAssertion<>(summing, counting, Map::entry))); } + + static class SizingValidator { + final int expectedSize; + int actualSize; + + SizingValidator() { + throw new UnsizedException(); + } + + SizingValidator(int expectedSize) { + this.expectedSize = expectedSize; + } + + synchronized void count(Object value) { + actualSize++; + } + + SizingValidator merge(SizingValidator other) { + actualSize += other.actualSize; + return this; + } + + Integer validateAndGetSize() { + if (actualSize != expectedSize) { + throw new IllegalStateException("Expected size: %s, actual size: %s".formatted(expectedSize, actualSize)); + } + return actualSize; + } + + static class UnsizedException extends RuntimeException {} + } + + @Test + public void testOfSized() { + Collector collector = Collector.ofSized( + SizingValidator::new, + SizingValidator::new, + SizingValidator::count, + SizingValidator::merge, + SizingValidator::validateAndGetSize, + Collector.Characteristics.UNORDERED, + Collector.Characteristics.CONCURRENT); + + assertEquals(collector.characteristics(), Set.of(Collector.Characteristics.UNORDERED, Collector.Characteristics.CONCURRENT)); + + // Serial collect, serial fused gather-collect and parallel concurrent unordered collect all presize the + // final container to the known size. + assertEquals((int) IntStream.range(0, 100).boxed().collect(collector), 100); + assertEquals((int) IntStream.range(0, 100).boxed().gather(scan(() -> 0, Integer::sum)).collect(collector), 100); + assertEquals((int) IntStream.range(0, 100).boxed().parallel().map(i -> 2 * i).collect(collector), 100); + + try { + IntStream.range(0, 100).boxed().filter(i -> i != 2).collect(collector); + fail("Expecting stream without known size to use unsized supplier"); + } catch (SizingValidator.UnsizedException _) {} + + // Collector combinators that preserve the stream size also presize. + assertEquals((int) IntStream.range(0, 100).boxed().collect(collectingAndThen(collector, i -> i + 1)), 101); + assertEquals((int) IntStream.range(0, 100).boxed().collect(mapping(i -> i + 1, collector)), 100); + assertEquals((int) IntStream.range(0, 100).boxed().collect(teeing(collector, collector, Integer::sum)), 200); + + // Collector combinators that don't preserve the stream size don't presize. + try { + IntStream.range(0, 100).boxed().collect(partitioningBy(i -> i % 2 == 0, collector)); + fail("Expecting partitioningBy combinator to drop sizedSupplier"); + } catch (SizingValidator.UnsizedException _) {} + } } diff --git a/test/micro/org/openjdk/bench/java/util/stream/SizedCollectors.java b/test/micro/org/openjdk/bench/java/util/stream/SizedCollectors.java new file mode 100644 index 0000000000000..d58b15cec589f --- /dev/null +++ b/test/micro/org/openjdk/bench/java/util/stream/SizedCollectors.java @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package org.openjdk.bench.java.util.stream; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.Map; +import java.util.StringJoiner; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +/** + * Benchmark for checking the effect of sized collectors. + *