on
Control Parallelism In Java Streams
Previously, I had a task to write a transformation pipeline of gigantic data sets. I will not speak about complexity due to the difference in the environments, and the size of data to handle is variable, as my first concern was the performance and how to speed up the execution. Besides the methods to play with data in Java Streams
, it provides parallelism to use concurrency
and multiple cores
efficiency. In this tutorial, we’ll see how to use this API effectively.
Sample as 1, 2, 3
A stream represents a sequence of elements and supports different kind of operations to perform computations upon those elements:
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.stream()
.filter(s -> s.startsWith("c"))
.map(String::toUpperCase)
.sorted()
.forEach(System.out::println);
Stream methods are either intermediate or terminal. Intermediate methods return a stream. Terminal methods are either void or return a non-stream result.
Streams can be created from different data sources, especially Collections
. Collections support stream()
and parallelStream()
methods to either create a sequential or a parallel stream.
The same code above can use parallelStream()
. Let’s focus on parallel streams for now.
How it works?
The Stream API uses ForkJoinPool
to execute concurrent tasks.
Parallel streams work under the hood by employing the fork/join framework introduced in Java 7.
Also, a ForkJoinPool
may be constructed with a given target parallelism level; by default, equal to the number of available processors. So the idea was to configure the parallelism level based on the environment.
So now we have how to do it right.
Control the parallelism with custom fork/join pool
Solution #1
The intention is to construct a custom fork-join pool with the desired number of threads and execute the parallel stream within it:
ForkJoinPool forkjpCustom = new ForkJoinPool(4);
forkjpCustom.submit(
() -> IntStream.range(0, Integer.MAX_VALUE)
.map(i -> UUID.randomUUID().toString())
.parallelStream()
.forEach(System.out::println)
);
forkjpCustom.shutdownNow();
So the idea is to execute the parallelStream
statement inside of the submit
block which will hand the job to the framework.
For demonstration reasons, we created, started, and stopped the pool in the method. That’s not a good approach to take.
You have to be careful when using the ForkJoinPool
. This object must be created when the application started and shut it down when the application before the application is stopped.
Solution #2
It stills another way to customize the number of threads. You can set the system property java.util.concurrent.ForkJoinPool.common.parallelism
to the value you want:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
or by setting a JVM argument as follow:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
By using this solution, you limit the thread pool size of the entire application. The idea is to use parallelism in specific places of the application logic but not everywhere because it can affect the application performance and limits the hardware utilization extremely.
Conclusion
We have seen how Parallel Stream with the cooperation of ForkJoinPool
can help to enhance the performance of a Java application.