Codementor Events

Controlling Parallelism of Java 8 Collection Streams

Published Jan 23, 2017
Controlling Parallelism of Java 8 Collection Streams

When we're using collection streams in parallel of Java, there doesn't seem to be any parameter that takes our own thread pool. Some may wonder how many threads certain operation would be given while others may actually believe that we can leave JVM to it because it would know what to do. We could rely on JVM in certain simple use case, but more often than not, there are more things to be done.

Using parallel stream wrongly can be disastrous for an app and could seriously downgrade its performance. So what's wrong with relying on JVM's parallel streams? The fact that all parallel streams rely on the common ForkJoinPool makes them very restrictive, i.e. if you run blocking operations on it OR if you run multiple parallel streams in parallel. Therefore, even though you have protected the system by limiting the resources that it can use, you have also limited its performance.

You have a couple of options -

1) You can control the size of the common forkjoinpool by setting JVM property:

     
     -Djava.util.concurrent.ForkJoinPool.common.parallelism=10

This would work for the overall system, but you wouldn't have fine-grained control over parallelism demands of various situations. For example, in general, IO tasks can be parallelized more than a pure computing task.

2) You can pass your own pool by doing the following trick:

        final int parallelism = 10;
 
        ForkJoinPool forkJoinPool = null;

        try {
            forkJoinPool = new ForkJoinPool(parallelism);
            forkJoinPool.submit(() ->

                    //parallel stream invoked here
                    feedUrls.parallelStream()
                            .map(this::printFeedInfoStart)
                            .map(new FeedParser()::parseFeed)
                            .map(firebasePersistor::persist)
                            .map(this::printFeedInfoEnd)
                            .collect(Collectors.toList())
            
           ).get(); //this makes it an overall blocking call

        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            if (forkJoinPool != null) {
                forkJoinPool.shutdown(); //always remember to shutdown the pool
            }
        }

This way you can control certain special situations. However, you must be aware of overusing it. If you over-commit, the system could fight for resources that are not there!

Discover and read more posts from Nitin Puri
get started
post commentsBe the first to share your opinion
Grzegorz Piwowarek
5 years ago

There’s also an interesting bug related to the second point :)
https://bugs.openjdk.java.net/browse/JDK-8190974

btw, two weeks ago I wrote a set of custom collectors that can run things in​ parallel on custom executors:
http://github.com/pivovarit/parallel-collectors

Vinay Shivshankar
6 years ago

Appreciate the work you have put in here. It seems like using parallel streams does not draw the benefit, you would expect in terms of parallelism and might adversely affect performance in an SOA. So just to confirm, the common fork join pool used in parallel stream is shared on a single jvm correct? And it would just use the no. of cores as the threads available to it by default?

Show more replies