In the realm of software development, efficiently handling large datasets is crucial, especially with the proliferation of multicore processors. The Java Stream interface revolutionized the way collections are managed by supporting both sequential and parallel operations. However, harnessing the full potential of modern processors while maintaining the simplicity of the Stream API has been a challenge.
To address this, I developed an open-source library that introduces a novel approach to parallelizing stream operations. This library departs from traditional batching methods by processing each stream element in its own virtual thread, enabling a finer level of parallelism.
In this article, I will discuss the library and its design in detail. While the information provided exceeds what is needed to simply use the library, it offers an in-depth understanding of its construction.
The library is accessible on GitHub and can also be included as a dependency in Maven Central:
<dependency>
<groupId>com.github.verhas</groupId>
<artifactId>vtstream</artifactId>
<version>1.0.1</version>
</dependency>
Please verify the current version number on the Maven Central site or GitHub. This article is based on version 1.0.1 of the library.
Parallel Computing
Parallel computing is not a new concept. It has existed for decades. Early computers executed tasks in batches, in a serial manner, but the concept of time-sharing soon emerged.
The first time-sharing computer system was installed in 1961 at the Massachusetts Institute of Technology (MIT). Known as the Compatible Time-Sharing System (CTSS), it allowed multiple users to simultaneously log into a mainframe computer, creating the illusion of private sessions. CTSS was a groundbreaking development in computer science, paving the way for modern operating systems that support multitasking and multi-user operations.
While CTSS was not a parallel computing system per se, as it ran on a single IBM 7094 mainframe with one CPU, it executed code serially. Today, we have multicore processors and multiple processors in a single machine. For instance, the computer I'm using to edit this article has 10 processor cores.
To execute tasks concurrently, there are several approaches:
- Define the algorithm in a concurrent way, such as through reactive programming.
- Define the algorithm sequentially and allow some program to manage concurrency.
- Combine both methods.
When programming reactive algorithms or using Java 8 streams, we facilitate concurrent task execution. We define small parts and their interdependence so the environment can decide which parts to execute concurrently. The actual execution is managed by the framework, utilizing either virtual threads or traditional threads/processes.
The difference lies in the scheduler, which decides which processor executes the next task. In traditional threads or processes, the operating system acts as the scheduler. Threads within the same process share the same memory space, whereas processes have their own memory spaces. Similarly, virtual threads within the same OS thread share the same stack.
Transitioning to virtual threads reduces shared resources and overhead, making virtual threads much less costly compared to traditional threads. While a machine might support thousands of threads and processes, it can accommodate millions of virtual threads.
Implementing Streams with Threads
The library features two primary classes in the main directory: ThreadedStream
and Command
. ThreadedStream
implements the Stream interface:
public class ThreadedStream<T> implements Stream<T> {
The Command
class contains nested classes for stream operations:
public static class Filter<T> extends Command<T, T> {
public static class AnyMatch<T> extends Command<T, T> {
// ... other commands
These operators are intermediaries, while terminal operators are implemented within the ThreadedStream
class, converting the threaded stream into a regular stream before invoking the terminal operator. For example, the collect
method:
@Override
public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
return toStream().collect(supplier, accumulator, combiner);
}
The elements’ source is also a stream, allowing threading functionality to layer atop the existing stream implementation. This design supports streams as both data sources and destinations, with threading facilitating parallel execution of intermediary commands.
Stream Data Structure
The ThreadedStream
class manages its data with several member variables:
command
represents the command to be executed, which could be a no-op or null if unspecified.downstream
points to the precedingThreadedStream
in the chain, ensuring data is either fetched from the downstream stream or directly from the source.source
is the initial data stream.limit
specifies the maximum number of elements to process.chained
indicates if the stream is part of a processing chain.
Stream Build
The stream data structure is dynamically built as operations are chained. The process starts with the static method threaded
on the ThreadedStream
class:
final var k = ThreadedStream.threaded(Stream.of(1, 2, 3));
This creates a ThreadedStream
instance k
with the source stream of elements 1, 2, and 3. Further operations, such as map
, create new ThreadedStream
instances, each designating the preceding one as downstream:
final var t = k.map(x -> x * 2);
The map
method:
public <R> ThreadedStream<R> map(Function<? super T, ? extends R> mapper) {
return new ThreadedStream<>(new Command.Map<>(mapper), this);
}
This constructs a linked list of ThreadedStream
objects, which is traversed during execution when a terminal operation is called, enabling concurrent processing using virtual threads.
Stream Execution
Execution begins with a terminal operation, converting the threaded stream back into a conventional stream before performing the operation, such as collect
:
@Override
public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
return toStream().collect(supplier, accumulator, combiner);
}
The toStream()
method initiates a new virtual thread for each source stream element. Execution differs for ordered and unordered streams, determined by the source stream’s isParallel()
status.
Unordered Stream Execution
In unordered execution, results are forwarded as soon as they are ready, using a concurrent list for storage. A new virtual thread is created for each element, and the toUnorderedStream
method ensures efficient, parallel processing:
private Stream<T> toUnorderedStream() {
final var result = Collections.synchronizedList(new LinkedList<Command.Result<T>>());
final AtomicInteger n = new AtomicInteger(0);
final Stream<?> limitedSource = limit >= 0 ? source.limit(limit) : source;
limitedSource.forEach(
t -> {
Thread.startVirtualThread(() -> result.add(calculate(t)));
n.incrementAndGet();
});
return IntStream.range(0, n.get())
.mapToObj(i -> {
while (result.isEmpty()) {
Thread.yield();
}
return result.removeFirst();
})
.filter(f -> !f.isDeleted())
.peek(r -> {
if (r.exception() != null) {
throw new ThreadExecutionException(r.exception());
}
})
.map(Command.Result::result);
}
Ordered Stream Execution
Ordered execution ensures sequential processing, even in a parallel setup. A local Task
class is used to manage thread readiness:
private Stream<T> toOrderedStream() {
class Task {
Thread workerThread;
volatile Command.Result<T> result;
static void waitForResult(Task task) {
try {
task.workerThread.join();
} catch (InterruptedException e) {
task.result = deleted();
}
}
}
final var tasks = Collections.synchronizedList(new LinkedList<Task>());
final Stream<?> limitedSource = limit >= 0 ? source.limit(limit) : source;
limitedSource.forEach(
sourceItem -> {
Task task = new Task();
tasks.add(task);
task.workerThread = Thread.startVirtualThread(() -> task.result = calculate(sourceItem));
}
);
return tasks.stream()
.peek(Task::waitForResult)
.map(f -> f.result)
.peek(r -> {
if (r.exception() != null) {
throw new ThreadExecutionException(r.exception());
}
}
)
.filter(r -> !r.isDeleted()).map(Command.Result::result);
}
The Conclusion
This library, designed for parallel execution of stream operations, is open-source, offering flexibility for use and adaptation. Despite being reviewed by Istvan Kovacs, an expert in concurrent programming, it hasn't undergone extensive testing. Users should proceed with caution, thoroughly testing the library to ensure it meets their needs. The library is provided "as is," emphasizing the importance of due diligence in its use.
Do you enjoy the content we provide?
We rely on the support from our awesome readers like you to keep going! If you found this article helpful for learning how to program, consider a one-time donation. Each donation helps us stay ad-free and able to offer all of these articles free for everyone.
Thank you for your support of our programming blog!
This is a companion discussion topic for the original entry at https://coderoasis.com/java-threaded-streams