Controlling Parallelism of Java 8 Collection Streams

Published Jan 23, 2017
Controlling Parallelism of Java 8 Collection Streams

When we're using collection streams in parallel of Java, there doesn't seem to be any parameter that takes our own thread pool. Some may wonder how many threads certain operation would be given while others may actually believe that we can leave JVM to it because it would know what to do. We could rely on JVM in certain simple use case, but more often than not, there are more things to be done.

Using parallel stream wrongly can be disastrous for an app and could seriously downgrade its performance. So what's wrong with relying on JVM's parallel streams? The fact that all parallel streams rely on the common ForkJoinPool makes them very restrictive, i.e. if you run blocking operations on it OR if you run multiple parallel streams in parallel. Therefore, even though you have protected the system by limiting the resources that it can use, you have also limited its performance.

You have a couple of options -

1) You can control the size of the common forkjoinpool by setting JVM property:

     
     -Djava.util.concurrent.ForkJoinPool.common.parallelism=10

This would work for the overall system, but you wouldn't have fine-grained control over parallelism demands of various situations. For example, in general, IO tasks can be parallelized more than a pure computing task.

2) You can pass your own pool by doing the following trick:

        final int parallelism = 10;
 
        ForkJoinPool forkJoinPool = null;

        try {
            forkJoinPool = new ForkJoinPool(parallelism);
            forkJoinPool.submit(() ->

                    //parallel stream invoked here
                    feedUrls.parallelStream()
                            .map(this::printFeedInfoStart)
                            .map(new FeedParser()::parseFeed)
                            .map(firebasePersistor::persist)
                            .map(this::printFeedInfoEnd)
                            .collect(Collectors.toList())
            
           ).get(); //this makes it an overall blocking call

        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            if (forkJoinPool != null) {
                forkJoinPool.shutdown(); //always remember to shutdown the pool
            }
        }

This way you can control certain special situations. However, you must be aware of overusing it. If you over-commit, the system could fight for resources that are not there!

Discover and read more posts from Nitin Puri
get started