Stream 與平行化


Streamreducecollect 中提到 ... Collectoraccumulator()之作用,在使用具有平行處理能力的Stream時...嗯?這表示Stream有辦法進行平行處理?是的,這也是JDK8引入Lambda新特性主要目的之一,想要獲得平行處理能力在JDK8中可以說很簡單,例如這段程式碼:

List<Person> males = persons.stream()
                            .filter(person -> person.getGender() == Person.Gender.MALE)
                            .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);

只要改成以下,就可能擁有平行處理之能力:

List<Person> males = persons.parallelStream()
                            .filter(person -> person.getGender() == Person.Gender.MALE)
                            .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);

CollectionparallelStream()方法,傳回的Stream實例在實作時,會在可能的情況下進行平行處理,JDK8希望你想要進行平行處理時,必須有明確的語義,這也是為什麼會有stream()parallelStream()兩個方法,前者代表循序(Serial)處理,後者代表平行處理,想要知道Stream是否為平行處理,可以呼叫isParallel()來得知。

天下沒有白吃的午餐 - 留意順序

使用了parallelStream(),不代表一定會平行處理而使得執行必然變快,要呼叫哪個方法,必須思考你的處理過程是否能夠分而治之(Divide and conquer)而後合併結果,在這個例子中,filter()collect()方法基本上都有可能。

類似地,CollectorsgroupingBy()groupingByConcurrent()兩個方法,前者代表循序處理,後者代表平行處理,是否呼叫後者,同樣你得思考處理過程是否能夠分而治之而後合併結果,如果可能,方能從中獲益。例如原先有段程式:

Map<Person.Gender, List<Person>> males = persons.stream()
                  .collect(
                      groupingBy(Person::getGender));

想要在可能的情況下進行平行處理,可以改為:

Map<Person.Gender, List<Person>> males = persons.parallelStream()
                  .collect(
                      groupingByConcurrent(Person::getGender));

Stream實例若具有平行處理能力,處理過程會分而治之,也就是將任務切割為小任務,這表示每個小任務都是一個管線化操作,因此像以下的程式片段:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEach(out::println);

你得到的顯示順序不會是1、2、3、4、5、6、7、8、9,而可能是任意的順序,就forEach()這個終結操作來說,如果於平行處理時,希望最後順序是照著原來Stream來源的順序,那可以呼叫forEachOrdered()。例如:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEachOrdered(out::println);

在管線化操作時,如果forEachOrdered()中間有其他如filter()的中介操作,會試著平行化處理,然後最終forEachOrdered()會以來源順序處理,因此,使用forEachOrdered()這類的有序的處理時,可能會(或完全失去)失去平行化的一些優勢,實際上中介操作亦有可能如此,例如sorted()方法。

使用Streamreduce()collect()時,平行處理時也得留意一下順序,API文件上基本上會記載終結操作時是否依來源順序,reduce()基本上是按照來源順序,而collect()得視給予的Collector而定,在以下兩個例子,collect()都是依照來源順序處理:

List<Person> males = persons.parallelStream()
                            .filter(person -> person.getGender() == Person.Gender.MALE)
                            .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);

List<Person> males = persons.parallelStream()
                            .filter(person -> person.getGender() == Person.Gender.MALE)
                            .collect(toList());

collect()操作時若想要有平行效果,必須符合以下三個條件:

  • Stream必須有平行處理能力
  • 傳入的Collector必須有Collector.Characteristics.CONCURRENT特性。
  • Stream是無序的或者是Collector具有Collector.Characteristics.UNORDERED特性。

想要知道Collector具有Collector.Characteristics.UNORDEREDCollector.Characteristics.UNORDERED,可以呼叫Collectorcharacteristics()方法,平行處理的Stream基本上是無序的,如果不放心,可以呼叫Stream的unordered()方法。

Colllector具有CONCURRENTUNORDERED特性的例子之一是CollectorsgroupingByConcurrent()方法傳回的實例,因此在最後順序不重要時,使用groupingByConcurrent()來取代groupingBy()方法,對效能上會有所幫助。

天下沒有白吃的午餐 - 不要干擾來源

想要善用JDK8提供的平行處理能力,你的資料處理過程必須能夠分而治之,而後將每個小任務的結果加以合併,這表示當API在處理小任務時,你不應該進行干預,例如:

numbers.parallelStream()
       .filter(number -> {
            numbers.add(7);
            return number > 5;
       })
       .forEachOrdered(out::println);

無論是基於哪種理由,像這類對來源資料的干擾都令人困惑,實際上無論是否進行平行處理,這樣的程式都會引發ConcurrentModifiedException

天下沒有白吃的午餐 - 一次做一件事

JDK8提供高階語義的管線化API、在可能的情況下實現惰性、平行處理能力,目的之一是希望你思考處理的過程中,實際上是由哪些小任務組成,在過去,你可能基於(自我想像的)效能增進考量,在迴圈中做了很多件事,因而讓程式變得複雜,現在使用了高階API,就要避免走回頭路。例如,過去你在寫for迴圈時,可能會順便做些動作,像是過濾元素做顯示的同時,將元素作個運算並收集在另一個清單中:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> alsoLt = new ArrayList<>();
       
for(Integer number : numbers) {
    if(number > 5) {
        alsoLt.add(number + 10);
        out.println(number);
    }
}

在JDK8中使用高階API時,記得一次只做一件事:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);

List<Integer> biggerThan5 = numbers.stream()
                 .filter(number -> number > 5)
                 .collect(toList());
       
biggerThan5.forEach(out::println);
       
List<Integer> alsoLt = biggerThan5.stream()
                .map(number -> number + 10)
                .collect(toList());

避免寫出以下的程式:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> alsoLt = new ArrayList<>();
       
numbers.stream()
       .filter(number -> {
           boolean isBiggerThan5 = number > 5;
           if(isBiggerThan5) {
               alsoLt.add(number + 10);
           }
           return isBiggerThan5;
        })
        .forEach(out::println);

這樣的程式不僅不易理解,如果你試圖進行平行化處理時:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> alsoLt = new ArrayList<>();

numbers.parallelStream()
        .filter(number -> {
            boolean isBiggerThan5 = number > 5;
            if(isBiggerThan5) {
                 alsoLt.add(number + 10);
            }
            return isBiggerThan5;
        })
        .forEachOrdered(out::println);

就會發現,alsoLt的順序並不照著numbers的順序,然而上頭一次處理一個任務的版本,可以簡單地改為平行化版本:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);

List<Integer> biggerThan5 = numbers.parallelStream()
                 .filter(number -> number > 5)
                 .collect(toList());
       
biggerThan5.forEach(out::println);
       
List<Integer> alsoLt = biggerThan5.parallelStream()
                .map(number -> number + 10)
                .collect(toList());