Understanding How to Use the New Stream Gatherers in Java

Understanding How to Use the New Stream Gatherers in Java
Photo by kazuend / Unsplash

Back in late March, I wrote about Java 22 introducing Stream Gatherers into the mix for Java Developers. They are a new mechanism for manipulating streams of data. They are delivered from JEP 461, allowing a developer to create custom operators that simplify the highly complex operations.

Just at first glance, they seem very complex and really obscure to a Java Developer. I have had several of my fellow programmers come to me to explain why we would need them for Java code development. They have had a situation where they are confronted with a need or requirement for stream manipulations in their project code. This is where the gatherers have became a saving grace for their code base and they welcomed them with open arms to the Stream API that they use.

The Stream API and Stream Gatherers

Java streams model dynamic collections of elements. As the spec states...

A stream is a lazily computed, potentially unbounded sequence of values.

This means that you are able to consume and operate on data streams without stopping ever. You can think of this as sitting beside a river, watching the water flow past you. You never expect for the water to ever stop flowing past you. Then you choose to start to work or manipulate the river along with everything it contains. When you are done with this, you can just walk away.

The Stream API has several amazing built-in methods for working on the elements in a sequence of values. These are known as the functional operators like filter and map

In the Stream API, streams begin with a source of events, and operations like filter and map are known as “intermediate” operations. Each intermediate operation returns the stream, so you can compose them together. But with the Stream API, Java will not start applying any of these operations until the stream reaches a “terminal” operation. This supports efficient processing even with many operators chained together.

Stream's built-in intermediate operators are powerful, but they can’t cover the whole realm of imaginable requirements. For situations that are out of the box, we need a way to define custom operations. Gatherers give us that way.

What can you do with Stream Gatherers?

Say you are on the side of the river and leaves are floating past with numbers written on them. If you want to do something simple, like create an array of all the even numbers you see, you can use the built-in filter method:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
numbers.stream().filter(number -> number % 2 == 0).toArray()
// result: { 2, 4, 6 }

In the above example, we start with an array of integers (the source) and then turn it into a stream, applying a filter that only returns those numbers whose division by two leaves no remainder. The toArray() call is the terminal call. This is equivalent to checking each leaf for evenness and setting it aside if it passes.

Stream Gatherers' built-in methods

The java.util.stream.Gatherers interface comes with a handful of built-in functions that enable you to build custom intermediate operations. Let's take a look at what each one does.

What is the windowFixed method?

What if you wanted to take all the leaves floating by and collect them into buckets of two? This is surprisingly clunky to do with built-in functional operators. It requires transforming an array of single digits into an array of arrays. 

The windowFixed method is a simpler way to gather your leaves into buckets:

Stream.iterate(0, i -> i + 1)

This says: Give me a stream based on the iterating of integers by 1. Turn every two elements into a new array. Do it five times. Finally, turn the stream into a List. The result is:

[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]

Windowing is like moving a frame over the stream; it lets you take snapshots. 

What is the windowSliding method?

Another windowing function is windowSliding, which works like windowFixed() except each window starts on the next element in the source array, rather than at the end of the last window. Here's an example:

Stream.iterate(0, i -> i + 1)

The output of the code sample is as follows.

[[0, 1], [1, 2], [2, 3], [3, 4], [4, 5]]

Compare the windowSliding output with the output of windowFixed and you’ll see the difference. Each subarray in windowSliding contains the last element of the previous subarray, unlike windowFixed.

What is the Gatherers.fold method?

Gatherers.fold is like a refined version of the Stream.reduce method. It’s a bit nuanced to see where fold() comes in handy over reduce(). A good discussion is found in this article. Here's what the author, Viktor Klang, has to say about the differences between fold and reduce:

Folding is a generalization of reduction. With reduction, the result type is the same as the element type, the combiner is associative, and the initial value is an identity for the combiner. For a fold, these conditions are not required, though we give up parallelizability.

So we see that reduce is a kind of fold. Reduction takes a stream and turns it into a single value. Folding also does this, but it loosens the requirements: 1) that the return type is of the same type as the stream elements; 2) that the combiner is associative; and 3) that the initializer on fold is an actual generator function, not a static value.

The second requirement is relevant to parallelization, which I'll discuss in more detail soon. Calling Stream.parallel on a stream means the engine can break out the work into multiple threads. This only works if the operator is associative; that is, it works if the ordering of operations does not affect the outcome.

Here’s a simple use of fold:

    Gatherers.fold(() -> "", 
      (acc, element) -> acc.isEmpty() ? element : acc + "," + element

This example takes the collection of strings and combines them with commas. The same work done by reduce:

String result = Stream.of("hello", "world", "how", "are", "you?")
  .reduce("", (acc, element) -> acc.isEmpty() ? element : acc + "," + element);

You can see that with fold, you define a function (() -> “”) instead of an initial value (“”).  This means if you require more complex handling of the initiator, you can use the closure function. 

Now, let’s think about using the advantages of fold along with a diversity of types. Let's say that I have a stream of mixed-object types and we want to count occurrences. The code example of what I mean is below.

var result = Stream.of(1,"hello", true).gather(Gatherers.fold(() -> 0, (acc, el) -> acc + 1));

// result.findFirst().get() = 3

The result var will be the value of 3. You should have noticed that the stream has a number, a string, and a Boolean being used in the example code. Performing a similar feat with reduce is difficult because the accumulator argument (acc) is strongly typed in the code example below.

// bad, throws exception:

var result = Stream.of(1, "hello", true).reduce(0, (acc, el) -> acc + 1);

// Error: bad operand types for binary operator '+'

What if we used a collector to preform this work? Then we have the following.

var result2 = Stream.of("apple", "banana", "apple", "orange")
  .collect(Collectors.toMap(word -> word, word -> 1, Integer::sum, HashMap::new));

If we did it this way, then I lost access to the initializer and folding functions body if we needed it for more involved logic.

What is the Gatherers.scan method?

The Scan method is something like windowFixed. It accumulates the elements into a single element instead of an array. Again, here is an example gives more clarity (this example is from the Javadocs):

    Gatherers.scan(() -> "", (string, number) -> string + number)

The output of this is as follows.

["1", "12", "123", "1234", "12345", "123456", "1234567", "12345678", "123456789"]

So, the scan method will let us move through the stream elements and combine them cumulatively.

What is the mapConcurrent method?

With the mapConcurrent method, I am able to specify a maximum number of threads to use concurrently in running the map function provided. Virtual threads will be used in this instance. Here’s a very simple example that limits the concurrency to four threads while squaring numbers. Please note that mapConcurrent is overkill for such a simple dataset here.

Stream.of(1,2,3,4,5).gather(Gatherers.mapConcurrent(4, x -> x * x)).collect(Collectors.toList());

// Result: [1, 4, 9, 16, 25]

Besides adding the thread max, mapConcurrent will work exactly like the standard map function does for our code base.