You can execute streams in serial or in parallel. When a stream executes in parallel, the Java runtime partitions the stream into multiple sub-streams. Aggregate operations iterate over and process these sub-streams in parallel and then combine the results. This post shows how you can get and use parallel stream in Java.
This parallel execution of data using parallel streams, with each sub-stream running in a separate thread, results in increased performance. Since, part of the data (sub-stream) is processed by different processors of multi-core processors in separate threads which are later combined to give the final result, bulk operations can also be processed in less time making the whole process more efficient and less time consuming.
How to get a parallel stream in Java
Collection has methods Collection.stream()
and Collection.parallelStream()
, which produce sequential and
parallel streams respectively.
You can also call parallel()
method on a sequential stream to get a parallel stream. The parallel() method is defined in
the BaseStream interface.
For example, if you have an Employee class and for some testing you want to create 1000 objects of Employee class then you can use parallel() method with range-
List<Employee> empList = IntStream.rangeClosed(1, 1000).parallel().mapToObj(Employee::new).collect(Collectors.toList());
Things to consider when using Java parallel streams
Since partial results are computed by separate threads and later combined so it becomes important to think about the following points-
- Is separate combiner needed to combine partial results or aggregate function itself can work as combiner too.
- Since multi-threading is involved so any shared variable should not be updated by any operation in the parallel stream.
- Most of the time collection is used as a data source of the stream and collections are not thread safe. Which means that multiple threads cannot manipulate a collection without introducing thread interference errors.
- Parallel stream uses common fork-join thread pool. Thus, running a performance intensive long running task may block all threads in the pool, which may block all other tasks (as no threads will be available) that are using parallel streams.
- Note that parallelism is not automatically faster than performing operations serially, although it can be if you have enough data and processor cores. While aggregate operations enable you to more easily implement parallelism, it is still your responsibility to determine if your application is suitable for parallelism.
Java parallel streams examples
Let’s first see a simple example using parallel stream in Java where you need to find the employee with maximum salary.
Employee class
public class Employee { private String lastName; private String firstName; private String empId; private int age; private int salary; public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getEmpId() { return empId; } public void setEmpId(String empId) { this.empId = empId; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getFullName(){ return this.firstName + " " + this.lastName; } public int getSalary() { return salary; } public void setSalary(int salary) { this.salary = salary; } }
Using parallel stream
import java.util.ArrayList; import java.util.List; import java.util.OptionalInt; public class ParallelDemo1 { public static void main(String[] args) { // getting list of employee List<Employee> empList = createList(); OptionalInt maxSalary = empList.parallelStream().mapToInt(e -> e.getSalary()).max(); if(maxSalary.isPresent()){ System.out.println("Max Salary " + maxSalary.getAsInt()); } } // Stub method to create list of employee objects private static List createList(){ List<Employee> empList = new ArrayList<Employee>(); Employee emp = new Employee(); emp.setEmpId("E001"); emp.setAge(40); emp.setFirstName("Ram"); emp.setLastName("Chandra"); emp.setSalary(5000); empList.add(emp); emp = new Employee(); emp.setEmpId("E002"); emp.setAge(35); emp.setFirstName("Sheila"); emp.setLastName("Baijal"); emp.setSalary(7000); empList.add(emp); emp = new Employee(); emp.setEmpId("E003"); emp.setAge(24); emp.setFirstName("Mukesh"); emp.setLastName("Rishi"); emp.setSalary(9000); empList.add(emp); emp = new Employee(); emp.setEmpId("E004"); emp.setAge(37); emp.setFirstName("Rani"); emp.setLastName("Mukherjee"); emp.setSalary(10000); empList.add(emp); return empList; } }
Output
Max Salary 10000
Java Parallel Stream Mistake-updating a shared variable
As already stated above updating a shared state when using parallel stream may cause problem due to multi-threading. Let’s see it with an example.
There is an Employee class and 500 objects of the Employee class are stored in a list. Then using parallel stream you are trying to get the total salary paid to all employees.
Employee class
Employee class is same as used above with one difference, now there is a constructor with int argument which is used to set salary property. Using range method 500 objects will be created with salary set as 1..500.
public class Employee { private String lastName; private String firstName; private String empId; private int age; private int salary; Employee(int salary){ this.salary = salary; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getEmpId() { return empId; } public void setEmpId(String empId) { this.empId = empId; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getFullName(){ return this.firstName + " " + this.lastName; } public int getSalary() { return salary; } }
Total Salary Calculation
import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; public class ParallelDemo { public static void main(String[] args) { Salary sal = new Salary(); List<Employee> empList = createList(); empList.parallelStream().forEach(sal::doProcess); System.out.println("Total - " + sal.getTotalSalary()); } // Stub method to create list of employee objects private static List createList(){ List<Employee> empList = IntStream.rangeClosed(1, 500).mapToObj(Employee::new) .collect(Collectors.toList()); return empList; } } class Salary{ //shared variable private int total = 0; public void doProcess(Employee emp){ addSalary(emp.getSalary()); } public void addSalary(int salary){ total = total + salary; } public int getTotalSalary(){ return total; } }
Output
Total – 113359, Total – 125250, Total – 120901, Total – 123835, Total – 125250
I got these 5 outputs on executing it 5 times. You can see that output is different (Correct output is 125250 by the way). It is because total is changed from the parallel stream which is a shared variable.
If you have seen the first example I have given for parallel stream you must have got an idea what’s the better way to do it.
public class ParallelDemo { public static void main(String[] args) { Salary sal = new Salary(); List<Employee> empList = createList(); //empList.parallelStream().forEach(sal::doProcess); int totalSalary = empList.parallelStream().mapToInt(e -> e.getSalary()).sum(); sal.addSalary(totalSalary); System.out.println("Total - " + sal.getTotalSalary()); } // Stub method to create list of employee objects private static List createList(){ List<Employee> empList = IntStream.rangeClosed(1, 500).mapToObj(Employee::new) .collect(Collectors.toList()); return empList; } }
Here note that you are not changing the shared variable in the stream. You are getting the salaries of the employees and summing it. That way, even in parallel stream there is no problem as different threads are getting different data to add and then those partial results are combined to get the total salary.
- Refer Reduction Operations in Java Stream API to see an example of condition when not having a proper combiner function may result in error with parallel stream.
Using Collectors.groupingByConcurrent with Parallel Streams
Operation groupingBy performs poorly with parallel streams. This is because it operates by merging two maps by key, which is computationally expensive. With parallel stream in Java you should use groupingByConcurrent operation instead of groupingBy which returns an instance of ConcurrentMap instead of Map.
For example, if you have an Employee class and you want to group employees on the basis of gender it can be done as-
import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; public class ParallelDemo1 { public static void main(String[] args) { ParallelDemo1 pd = new ParallelDemo1(); // getting list of employee List<Employee> empList = pd.createList(); ConcurrentMap<Character, List<Employee>> bySalary = empList.parallelStream() .collect(Collectors.groupingByConcurrent(e -> e.gender)); bySalary.forEach((K, V)->{ System.out.println("Key- " + K + " Value "); V.forEach(v->System.out.println(v.name)); }); } // Stub method to create list of employee objects private List<Employee> createList(){ List<Employee> empList = Arrays.asList(new Employee("E001", 40, "Ram", 'M', 5000), new Employee("E002", 35, "Sheila", 'F', 7000), new Employee("E003", 24, "Mukesh", 'M', 9000), new Employee("E004", 37, "Rani", 'F', 10000)); return empList; } class Employee { private String empId; private int age; private String name; private char gender; private int salary; Employee(String empId, int age, String name, char gender, int salary){ this.empId = empId; this.age = age; this.name = name; this.gender = gender; this.salary = salary; } } }
Output
Key- F Value Sheila Rani Key- M Value Mukesh Ram
Refer Java Stream - Collectors.groupingBy() With Examples to know more about groupingBy operations in Java Stream API.
Using forEachOrdered with Parallel Streams
The order in which a pipeline processes the elements of a stream depends on whether the stream is executed in serial or in parallel.
As example:
Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 }; List<Integer> listOfIntegers = new ArrayList<>(Arrays.asList(intArray)); System.out.println("listOfIntegers:"); listOfIntegers.stream().forEach(e -> System.out.print(e + " "));
Output
listOfIntegers: 1 2 3 4 5 6 7 8
Here you can see that the pipeline prints the elements of the list listOfIntegers in the order that they were added to the list.
With parallel stream-
System.out.println("Parallel stream"); listOfIntegers.parallelStream().forEach(e -> System.out.print(e + " "));
Output
Parallel stream: 3 4 1 6 2 5 7 8
Here you can see that the pipeline prints the elements of the list in an apparently random order. When you execute a stream in parallel, the Java compiler and runtime determine the order in which to process the stream's elements to maximize the benefits of parallel computing unless otherwise specified by the stream operation.
Using forEachOrdered-
System.out.println("With forEachOrdered:"); listOfIntegers.parallelStream().forEachOrdered(e -> System.out.print(e + " "));
Output
With forEachOrdered: 8 7 6 5 4 3 2 1
Note that the method forEachOrdered()
is used here, which processes the elements of the stream in the order specified by its source, regardless of whether you executed the stream in serial or parallel. Note that you may lose the benefits of parallelism
if you use operations like forEachOrdered with parallel streams.
Points to note
- Parallelism is not automatically faster than performing operations serially, although it can be if you have enough data and processor cores.
- There is increased overhead of splitting the data, managing multiple threads, combining the partial results when you are using parallel stream in Java.
- Make sure that there is enough data for computation that offsets this increased overhead and time saved in parallel processing of the data scores over any overhead tasks.
- Performance of parallel execution also depends upon the number of processors available.
That's all for this topic Parallel Stream in Java Stream API. If you have any doubt or any suggestions to make please drop a comment. Thanks!
>>>Return to Java Advanced Tutorial Page
Related Topics
You may also like-
No comments:
Post a Comment