Implementing Virtual Threads in Java Streams

In the realm of software development, efficiently handling large datasets is crucial, especially with the proliferation of multicore processors. The Java Stream interface revolutionized the way collections are managed by supporting both sequential and parallel operations. However, harnessing the full potential of modern processors while maintaining the simplicity of the Stream API has been a challenge.

To address this, I developed an open-source library that introduces a novel approach to parallelizing stream operations. This library departs from traditional batching methods by processing each stream element in its own virtual thread, enabling a finer level of parallelism.

In this article, I will discuss the library and its design in detail. While the information provided exceeds what is needed to simply use the library, it offers an in-depth understanding of its construction.

The library is accessible on GitHub and can also be included as a dependency in Maven Central:

<dependency>
    <groupId>com.github.verhas</groupId>
    <artifactId>vtstream</artifactId>
    <version>1.0.1</version>
</dependency>

Please verify the current version number on the Maven Central site or GitHub. This article is based on version 1.0.1 of the library.

Parallel Computing

Parallel computing is not a new concept. It has existed for decades. Early computers executed tasks in batches, in a serial manner, but the concept of time-sharing soon emerged.

The first time-sharing computer system was installed in 1961 at the Massachusetts Institute of Technology (MIT). Known as the Compatible Time-Sharing System (CTSS), it allowed multiple users to simultaneously log into a mainframe computer, creating the illusion of private sessions. CTSS was a groundbreaking development in computer science, paving the way for modern operating systems that support multitasking and multi-user operations.

While CTSS was not a parallel computing system per se, as it ran on a single IBM 7094 mainframe with one CPU, it executed code serially. Today, we have multicore processors and multiple processors in a single machine. For instance, the computer I'm using to edit this article has 10 processor cores.

To execute tasks concurrently, there are several approaches:

  1. Define the algorithm in a concurrent way, such as through reactive programming.
  2. Define the algorithm sequentially and allow some program to manage concurrency.
  3. Combine both methods.

When programming reactive algorithms or using Java 8 streams, we facilitate concurrent task execution. We define small parts and their interdependence so the environment can decide which parts to execute concurrently. The actual execution is managed by the framework, utilizing either virtual threads or traditional threads/processes.

The difference lies in the scheduler, which decides which processor executes the next task. In traditional threads or processes, the operating system acts as the scheduler. Threads within the same process share the same memory space, whereas processes have their own memory spaces. Similarly, virtual threads within the same OS thread share the same stack.

Transitioning to virtual threads reduces shared resources and overhead, making virtual threads much less costly compared to traditional threads. While a machine might support thousands of threads and processes, it can accommodate millions of virtual threads.

Implementing Streams with Threads

The library features two primary classes in the main directory: ThreadedStream and Command. ThreadedStream implements the Stream interface:

public class ThreadedStream<T> implements Stream<T> {

The Command class contains nested classes for stream operations:

public static class Filter<T> extends Command<T, T> {
public static class AnyMatch<T> extends Command<T, T> {
// ... other commands

These operators are intermediaries, while terminal operators are implemented within the ThreadedStream class, converting the threaded stream into a regular stream before invoking the terminal operator. For example, the collect method:

@Override
public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
    return toStream().collect(supplier, accumulator, combiner);
}

The elements’ source is also a stream, allowing threading functionality to layer atop the existing stream implementation. This design supports streams as both data sources and destinations, with threading facilitating parallel execution of intermediary commands.

Stream Data Structure

The ThreadedStream class manages its data with several member variables:

  • command represents the command to be executed, which could be a no-op or null if unspecified.
  • downstream points to the preceding ThreadedStream in the chain, ensuring data is either fetched from the downstream stream or directly from the source.
  • source is the initial data stream.
  • limit specifies the maximum number of elements to process.
  • chained indicates if the stream is part of a processing chain.

Stream Build

The stream data structure is dynamically built as operations are chained. The process starts with the static method threaded on the ThreadedStream class:

final var k = ThreadedStream.threaded(Stream.of(1, 2, 3));

This creates a ThreadedStream instance k with the source stream of elements 1, 2, and 3. Further operations, such as map, create new ThreadedStream instances, each designating the preceding one as downstream:

final var t = k.map(x -> x * 2);

The map method:

public <R> ThreadedStream<R> map(Function<? super T, ? extends R> mapper) {
    return new ThreadedStream<>(new Command.Map<>(mapper), this);
}

This constructs a linked list of ThreadedStream objects, which is traversed during execution when a terminal operation is called, enabling concurrent processing using virtual threads.

Stream Execution

Execution begins with a terminal operation, converting the threaded stream back into a conventional stream before performing the operation, such as collect:

@Override
public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
    return toStream().collect(supplier, accumulator, combiner);
}

The toStream() method initiates a new virtual thread for each source stream element. Execution differs for ordered and unordered streams, determined by the source stream’s isParallel() status.

Unordered Stream Execution

In unordered execution, results are forwarded as soon as they are ready, using a concurrent list for storage. A new virtual thread is created for each element, and the toUnorderedStream method ensures efficient, parallel processing:

private Stream<T> toUnorderedStream() {
    final var result = Collections.synchronizedList(new LinkedList<Command.Result<T>>());
    final AtomicInteger n = new AtomicInteger(0);
    final Stream<?> limitedSource = limit >= 0 ? source.limit(limit) : source;
    limitedSource.forEach(
            t -> {
                Thread.startVirtualThread(() -> result.add(calculate(t)));
                n.incrementAndGet();
            });
    return IntStream.range(0, n.get())
            .mapToObj(i -> {
                while (result.isEmpty()) {
                    Thread.yield();
                }
                return result.removeFirst();
            })
            .filter(f -> !f.isDeleted())
            .peek(r -> {
                if (r.exception() != null) {
                    throw new ThreadExecutionException(r.exception());
                }
            })
            .map(Command.Result::result);
}

Ordered Stream Execution

Ordered execution ensures sequential processing, even in a parallel setup. A local Task class is used to manage thread readiness:

private Stream<T> toOrderedStream() {
    class Task {
        Thread workerThread;
        volatile Command.Result<T> result;
        static void waitForResult(Task task) {
            try {
                task.workerThread.join();
            } catch (InterruptedException e) {
                task.result = deleted();
            }
        }
    }
    final var tasks = Collections.synchronizedList(new LinkedList<Task>());
    final Stream<?> limitedSource = limit >= 0 ? source.limit(limit) : source;
    limitedSource.forEach(
            sourceItem -> {
                Task task = new Task();
                tasks.add(task);
                task.workerThread = Thread.startVirtualThread(() -> task.result = calculate(sourceItem));
            }
    );
    return tasks.stream()
            .peek(Task::waitForResult)
            .map(f -> f.result)
            .peek(r -> {
                        if (r.exception() != null) {
                            throw new ThreadExecutionException(r.exception());
                        }
                    }
            )
            .filter(r -> !r.isDeleted()).map(Command.Result::result);
}

The Conclusion

This library, designed for parallel execution of stream operations, is open-source, offering flexibility for use and adaptation. Despite being reviewed by Istvan Kovacs, an expert in concurrent programming, it hasn't undergone extensive testing. Users should proceed with caution, thoroughly testing the library to ensure it meets their needs. The library is provided "as is," emphasizing the importance of due diligence in its use.

Do you enjoy the content we provide?

We rely on the support from our awesome readers like you to keep going! If you found this article helpful for learning how to program, consider a one-time donation. Each donation helps us stay ad-free and able to offer all of these articles free for everyone.

Thank you for your support of our programming blog!


This is a companion discussion topic for the original entry at https://coderoasis.com/java-threaded-streams