在 Stream
的 reduce
與 collect
中提到 ... Collector
的accumulator()
之作用,在使用具有平行處理能力的Stream
時...嗯?這表示Stream有辦法進行平行處理?是的,這也是JDK8引入Lambda新特性主要目的之一,想要獲得平行處理能力在JDK8中可以說很簡單,例如這段程式碼:
List<Person> males = persons.stream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
只要改成以下,就可能擁有平行處理之能力:
List<Person> males = persons.parallelStream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
Collection
的parallelStream()
方法,傳回的Stream
實例在實作時,會在可能的情況下進行平行處理,JDK8希望你想要進行平行處理時,必須有明確的語義,這也是為什麼會有stream()
與parallelStream()
兩個方法,前者代表循序(Serial)處理,後者代表平行處理,想要知道Stream
是否為平行處理,可以呼叫isParallel()
來得知。
天下沒有白吃的午餐 - 留意順序
使用了parallelStream()
,不代表一定會平行處理而使得執行必然變快,要呼叫哪個方法,必須思考你的處理過程是否能夠分而治之(Divide and conquer)而後合併結果,在這個例子中,filter()
與collect()
方法基本上都有可能。
類似地,Collectors
有groupingBy()
與groupingByConcurrent()
兩個方法,前者代表循序處理,後者代表平行處理,是否呼叫後者,同樣你得思考處理過程是否能夠分而治之而後合併結果,如果可能,方能從中獲益。例如原先有段程式:
Map<Person.Gender, List<Person>> males = persons.stream()
.collect(
groupingBy(Person::getGender));
想要在可能的情況下進行平行處理,可以改為:
Map<Person.Gender, List<Person>> males = persons.parallelStream()
.collect(
groupingByConcurrent(Person::getGender));
Stream
實例若具有平行處理能力,處理過程會分而治之,也就是將任務切割為小任務,這表示每個小任務都是一個管線化操作,因此像以下的程式片段:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEach(out::println);
你得到的顯示順序不會是1、2、3、4、5、6、7、8、9,而可能是任意的順序,就forEach()
這個終結操作來說,如果於平行處理時,希望最後順序是照著原來Stream
來源的順序,那可以呼叫forEachOrdered()
。例如:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEachOrdered(out::println);
在管線化操作時,如果forEachOrdered()
中間有其他如filter()
的中介操作,會試著平行化處理,然後最終forEachOrdered()
會以來源順序處理,因此,使用forEachOrdered()
這類的有序的處理時,可能會(或完全失去)失去平行化的一些優勢,實際上中介操作亦有可能如此,例如sorted()
方法。
使用Stream
的reduce()
與collect()
時,平行處理時也得留意一下順序,API文件上基本上會記載終結操作時是否依來源順序,reduce()
基本上是按照來源順序,而collect()
得視給予的Collector
而定,在以下兩個例子,collect()
都是依照來源順序處理:
List<Person> males = persons.parallelStream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
List<Person> males = persons.parallelStream()
.filter(person -> person.getGender() == Person.Gender.MALE)
.collect(toList());
在collect()
操作時若想要有平行效果,必須符合以下三個條件:
Stream
必須有平行處理能力- 傳入的
Collector
必須有Collector.Characteristics.CONCURRENT
特性。 Stream
是無序的或者是Collector
具有Collector.Characteristics.UNORDERED
特性。
想要知道Collector
具有Collector.Characteristics.UNORDERED
或Collector.Characteristics.UNORDERED
,可以呼叫Collector
的characteristics()
方法,平行處理的Stream
基本上是無序的,如果不放心,可以呼叫Stream的unordered()
方法。
Colllector
具有CONCURRENT
與UNORDERED
特性的例子之一是Collectors
的groupingByConcurrent()
方法傳回的實例,因此在最後順序不重要時,使用groupingByConcurrent()
來取代groupingBy()
方法,對效能上會有所幫助。
天下沒有白吃的午餐 - 不要干擾來源
想要善用JDK8提供的平行處理能力,你的資料處理過程必須能夠分而治之,而後將每個小任務的結果加以合併,這表示當API在處理小任務時,你不應該進行干預,例如:
numbers.parallelStream()
.filter(number -> {
numbers.add(7);
return number > 5;
})
.forEachOrdered(out::println);
無論是基於哪種理由,像這類對來源資料的干擾都令人困惑,實際上無論是否進行平行處理,這樣的程式都會引發ConcurrentModifiedException
。
天下沒有白吃的午餐 - 一次做一件事
JDK8提供高階語義的管線化API、在可能的情況下實現惰性、平行處理能力,目的之一是希望你思考處理的過程中,實際上是由哪些小任務組成,在過去,你可能基於(自我想像的)效能增進考量,在迴圈中做了很多件事,因而讓程式變得複雜,現在使用了高階API,就要避免走回頭路。例如,過去你在寫for
迴圈時,可能會順便做些動作,像是過濾元素做顯示的同時,將元素作個運算並收集在另一個清單中:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> alsoLt = new ArrayList<>();
for(Integer number : numbers) {
if(number > 5) {
alsoLt.add(number + 10);
out.println(number);
}
}
在JDK8中使用高階API時,記得一次只做一件事:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> biggerThan5 = numbers.stream()
.filter(number -> number > 5)
.collect(toList());
biggerThan5.forEach(out::println);
List<Integer> alsoLt = biggerThan5.stream()
.map(number -> number + 10)
.collect(toList());
避免寫出以下的程式:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> alsoLt = new ArrayList<>();
numbers.stream()
.filter(number -> {
boolean isBiggerThan5 = number > 5;
if(isBiggerThan5) {
alsoLt.add(number + 10);
}
return isBiggerThan5;
})
.forEach(out::println);
這樣的程式不僅不易理解,如果你試圖進行平行化處理時:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> alsoLt = new ArrayList<>();
numbers.parallelStream()
.filter(number -> {
boolean isBiggerThan5 = number > 5;
if(isBiggerThan5) {
alsoLt.add(number + 10);
}
return isBiggerThan5;
})
.forEachOrdered(out::println);
就會發現,alsoLt
的順序並不照著numbers
的順序,然而上頭一次處理一個任務的版本,可以簡單地改為平行化版本:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> biggerThan5 = numbers.parallelStream()
.filter(number -> number > 5)
.collect(toList());
biggerThan5.forEach(out::println);
List<Integer> alsoLt = biggerThan5.parallelStream()
.map(number -> number + 10)
.collect(toList());