This blog post covers the Java Stream Gatherers.
Introduction
The Streams API2, introduced in Java 8, revolutionized how developers handle collections by enabling functional-style operations that are both expressive and efficient. This initial release laid a strong foundation, simplifying tasks like filtering, mapping, and reducing data in a clean and declarative way. Building on this foundation, Java 9 introduced key improvements that made streams even more adaptable. These enhancements focused on giving developers greater control over how streams are consumed, ensuring more flexibility and efficiency in handling evolving data scenarios. Since then, some more improvements were added with the latest being Java Stream Gatherers3.
Note: Java Stream Gatherers are still in preview!
What are Java Stream Gatherers?
Introduced in Java 22, Stream Gatherers add a new level of flexibility to stream processing by enhancing control over how elements flow through a pipeline. Unlike traditional collectors, which only operate at the end of a stream, gatherers allow custom logic to influence data processing at each step. By defining an initializer, an integrator, a combiner, and a finisher, developers can perform complex transformations that go beyond what existing functions like map
, filter
, or reduce
can handle.
Stateful Control: Gatherers maintain internal state across stream elements, making them ideal for scenarios that require decisions based on previously processed data, such as limiting results or applying complex filters.
This new functionality broadens the possibilities for stream manipulation, making it easier to build sophisticated data workflows.
How do Java Stream Gatherers work?
Unlike collectors, which gather elements only at the terminal stage, gatherers operate throughout the stream pipeline. They use four key components: the initializer, integrator, combiner, and finisher.
Initializer:
- Creates the initial state for the gatherer. This state is used and modified during stream processing.
- Example: Initializing a counter or list.
Integrator:
- The main logic of the gatherer. It processes each element, updates the state, and decides whether to pass the element downstream. It may also decide to finish the stream.
Combiner:
- Used in parallel streams to merge states from different threads. This allows gatherers to support parallel processing.
- Without a combiner, gatherers are limited to sequential execution.
Finisher:
- Runs after all elements have been processed. It can emit any remaining elements or perform final cleanup.
In Java, the Gatherer
interface is declared as Gatherer<T, A, R>
where T
is the type of elements the gatherer consumes, A
is the type of the internal state being used by the gatherer, and R
is the type of elements the gatherer emits.
Creating a new Java Stream Gatherer
In the following example a gatherer is created that counts occurrences of each object in a stream by filling a hash map and incrementing the associated counter. Only when the incoming stream finishes, the gatherer will emit a single element: a map with the count for each (equal) element it has seen in the incoming stream.
public static <T> Gatherer<T, Map<T, Long>, Map<T, Long>> countOccurrences() { return Gatherer.ofSequential( HashMap::new, // <- initializer, empty map (map, element, downstream) -> { // <- integrator map.merge(element, 1L, Long::sum); return true; }, (map, downstream) -> downstream.push(map) // <- finisher ); }
Here, the gatherer is created by providing three components as parameters with Gatherer.ofSequential()
. The first one being the initializer, which just creates an empty map. This map will be used as the state for the gatherer. Continuing, the integrator merges elements into the map by incrementing the counter. It also returns true to signal that processing should not be stopped after this element. In this case the downstream is not used in the integrator, because it is not desired to emit the consumed elements, but just the map in the end. Finally, when the incoming stream finishes, the finisher passes the map downstream.
The return value is a Gatherer
of type <T, Map<T, Long>, Map<T, Long>>
. With the first type parameter T one can declare the type of the objects of the stream on which the gatherer will be used. In the second type parameter, the type of the gatherer’s state is set. In the third type parameter, the type of the object which will be pushed downstream is set.
In the following, the newly created gatherer is used to count the number of events in a stream:
private static class Event { private int priority; private String message; public Event(int priority, String message) { this.priority = priority; this.message = message; } public int getPriority() { return priority; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Event event = (Event) o; return Objects.equals(message, event.message) && Objects.equals(priority, event.priority); } @Override public int hashCode() { return Objects.hash(message, priority); } @Override public String toString() { return "Event: " + message + ", priority: " + priority; } }
Note: It is important to override the hashCode()
and equals()
method depending on how the objects should be compared as hashmap keys.
public static void main (String[] args) { List<Event> events = List.of( new Event(1, "Warning occurred"), new Event(2, "Error occurred"), new Event(3, "Critical Error occurred"), new Event(4, "Fatal Error occurred"), new Event(2, "Error occurred")); Map<Event, Long> eventCounts = events.stream() .gather(countOccurrences()) .findFirst() .orElse(Collections.emptyMap()); // Print the results eventCounts.forEach((a,b) -> System.out.println(a + " found " + b + " times")); }
In this main method, a list of events is initialized. This list is streamed and the gatherer from above is called on it to count the occurrences of each event. A event is equal to another if both the message and the priority are equal in this scenario. Finally, the result is printed the console.
As a second example, a gatherer which actually uses the downstream in the integrator to emit elements will be demonstrated. Supposed, there is again a stream of events, and, for whatever reason, as soon as an event with a higher priority was seen, all following events should be filtered out if they have a lower priority. Furthermore, after five events were excluded, the priority threshold is reset. For this behaviour, a new state object is needed for the gatherer.
private static class State { private int filteredObjectsCount; private int maxPrio = Integer.MIN_VALUE; }
With this, the following gatherer can be created:
public static Gatherer<Event, State, Event> filterByHighestPrio() { return Gatherer.ofSequential( State::new, // <- initializer ((state, element, downstream) -> { // <- integrator if (element.getPriority() >= state.maxPrio) { downstream.push(element); state.maxPrio = element.getPriority(); state.filteredObjectsCount = 0; } else { if (++state.filteredObjectsCount > 4) { state.filteredObjectsCount = 0; state.maxPrio = Integer.MIN_VALUE; } } return true; }) ); }
Again, the initializer is creating a new State
object which holds the priority threshold and a counter for the excluded events. Now, the integrator checks the current element’s priority, and, only if it is equal or higher than the current threshold, it will be passed to the downstream. Also the counter is reset to 0. If it is lower priority, then the state’s counter is incremented by one. When the counter is bigger than 4, meaning five events were excluded, it is reset back to 0 and the threshold lowered back to the minimum.
public static void main (String[] args) { List<Event> events = List.of( new Event(1, "Warning occurred"), new Event(2, "Error occurred"), new Event(3, "Critical Error occurred"), new Event(4, "Fatal Error occurred"), new Event(2, "I am excluded"), new Event(2, "I am excluded"), new Event(2, "I am excluded"), new Event(2, "I am excluded"), new Event(2, "I am excluded"), new Event(2, "Error occurred"), new Event(2, "Error occurred"), new Event(2, "Error occurred")); events.stream() .gather(filterByHighestPrio()) .forEach(System.out::println); }
Running this main method will print the following result to the console.
As expected, all five events with the message „I am excluded“ are not passed downstream because their priority of 2 is lower than the current highest priority of 4 previously encountered by the gatherer and not more than 4 elements have been filter out so far.
Built-in Java Stream Gatherers Operations
Instead of building gatherers from scratch every time, it is possible to use built-in operations.
1. Fold: Similar to reduce
, it incrementally combines elements using an initial value and a folding function, returning a single aggregated result without wrapping it in an Optional
. This allows stateful, ordered reductions where a combiner is not feasible by applying each element to the state.
Revisiting the first gatherer from above, which counted occurrences, this could be rewritten by the fold
final HashMap<Event, Long> eventCounterHashMap = events.stream() .gather( Gatherers.fold( () -> new HashMap<Event, Long>(), (map, element) -> { map.merge(element, 1L, Long::sum); return map; } )).findFirst().orElse(new HashMap<>()); eventCounterHashMap.forEach((a,b) -> System.out.println(a + " found " + b + " times"));
2. Scan: Produces a running total or cumulative result by applying an accumulator function to each element.
For example, if the outcome should be the continuously sumed up value of all values of an integer stream, it could be used as follows:
private static Integer customBiFunction(Integer a, Integer b) { return a + b; }
Stream.of(1, 2, 3, 4) .gather(Gatherers.scan(() -> 0, (state, element) -> customBiFunction(state, element))) .forEach(System.out::println); // Output: 1, 3, 6, 10
The custom summation function is used as an integrator and the state is initialized with the value 50.
3. Map Concurrent: Applies a function concurrently to each element in the stream while preserving order. It leverages virtual threads for efficient parallel execution.
Stream.of(1, 2, 3, 4, 5, 6, 7, 8).gather(Gatherers.mapConcurrent(4, element -> { // 4 is the concurrency level try { Thread.currentThread().sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(element); return element * 2; })).collect(Collectors.toSet());
This example shows how to use mapConcurrent
by providing the concurrency level (4) and a mapper. In order to see the concurrency, a Thread.sleep(5000)
is used. As a result, four elements will be processed, then processing stops for five seconds. Afterwards, processing continues with the rest of the stream.
4. Fixed Window: Groups elements into fixed-size batches, emitting each batch as a list.
Stream.of(1, 2, 3, 4, 5) .gather(Gatherers.windowFixed(2)) .forEach(System.out::println); // Output: [1, 2], [3, 4], [5]
Only the batch size has to be provided with fixed windows. If the stream length is not a multiple of the batch size, then the last grouping has a reduced size.
5. Sliding Window: Forms overlapping windows of a specified size, shifting by one element with each step.
Stream.of(1, 2, 3, 4, 5) .gather(Gatherers.windowSliding(3)) .forEach(System.out::println); // Output: [1, 2, 3], [2, 3, 4], [3, 4, 5]
In this case, also only the window size is needed. Opposed to fixed size, this will always yield lists with the same length.