자바 8 인 액션

7. 병렬 데이터 처리와 성능

Featured image

병렬 스트림

컬렉션에 parallelStream을 호출하면 병렬 스트림이 생성된다. 병렬 스트림이란, 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다.
따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.

public static long sequentialSum(long n) {
    return Stream.iterate(1L, i -> i + 1)   //  무한 자연수 스트림 생성
                 .limit(n)                  //  n개 이하로 제한
                 .reduce(0L, Long::sum);    //  모든 숫자를 더하는 스트림 리듀싱 연산
}
public static long iterativeSum(long n) {
    long result = 0;
    for (long i = 1L; i<= n; i++) {
        result += i;
    }
    return result;
}

순차 스트림을 병렬 스트림으로 변환하기

public static long parallelSum(long n) {
    return Stream.iterate(1L, i -> i + 1)
                 .limit(n)
                 .parallel()                //  스트림을 병렬 스트림으로 변환
                 .reduce(0L, Long::sum);
}

parallel 메서드를 통해 스트림이 여러 청크로 분할되게 되고, 마지막 리듀싱 연산을 통해 생성된 부분 결과를 다시 리듀싱 연산으로 합쳐 전체 스트림의 리듀싱 결과를 도출한다.

parallel과 sequential 메서드를 통해 어떤 연산을 병렬로 실행할지, 순차로 실행할지 제어할 수 있다.

stream.parallel()
      .filter(...)
      .sequential()
      .map(...)
      .parallel()
      .reduce();  

parallel과 sequential 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다. 위 코드는 parallel이 마지막 호출되었으므로 위 파이프라인은 병렬로 실행된다. 병렬리듀싱연산

병렬 스트림은 내부적으로 ForkJoinPool을 사용한다. 기본적으로 ForkJoinPool은 프로세서 수, 즉 Runtime.getRuntime().availableProcessors()가 반환하는 값에 상응하는 thread를 갖는다.

스트림 성능 측정

자신의 기기에서 지원하는 코어 수 등에 따라서 성능 속도는 달라질 수 있다.
고전적인 for 루프는 저수준으로 동작하며 기본 값을 박싱하거나 언박싱할 필요가 없으므로 수행속도가 빠를 수 있다. 따라서 병렬 버전이 순차 버전보다 느리게 동작할 수 있다. 이유가 뭘까?

LongStream.rangeClosed를 활용하면 실질적으로 리듀싱 연산이 병렬로 수행된다. 올바른 자료구조를 선택해야 병렬 실행도 최적의 성능을 발휘할 수 있다.

병렬 스트림의 올바른 사용법

공유된 상태를 바꾸는 알고리즘을 사용할 때 병렬 스트림을 사용하면 문제가 발생한다.

public static long sideEffectSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).forEach(accumulator::add);
    return accumulator.total;
}

public class Accumulator {
    public long total = 0;
    public void add(long value) { total += value; }
}

위 같은 코드는 본질적으로 순차 실행할 수 있도록 구현되어 있으므로 병렬로 실행하게 되면 올바른 값을 얻을 수 없게 된다. 여러 스레드에서 동시에 total += value 를 실행하면서 문제가 발생되기 때문이다.
따라서 병렬 스트림과 병렬 계산에는 공유된 가변 상태를 피해야 한다.

병렬 스트림 효과적으로 사용하기

소스 분해성
ArrayList 훌륭함
LinkedList 나쁨
IntStream.range 훌륭함
Stream.iterate 나쁨
HashSet 좋음
TreeSet 좋음

포크/조인 프레임워크

포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다.
포크/조인 프레임워크에서는 서브태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.

RecursiveTask 활용

스레드 풀을 이용하려면 RecursiveTask의 서브클래스를 만들어야 한다. 여기서 R은 병렬화된 태스크가 생성하는 결과 형식 또는 결과가 없을 때(결과가 없더라도 다른 비지역 구조를 바꿀수 있다)는 RecursiveAction 형식이다.
RecursiveTask를 정의하려면 추상 메서드 compute를 구현해야 한다.

protected abstract R compute();

compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의한다.

if (태스크가 충분히 작거나  이상 분할할  없으면) {
    순차적으로 태스크 계산
} else {
    태스크를  서브태스크로 분할
    태스크가 다시 서브태스크로 분할되도록  메서드를 재귀적으로 호출함
    모든 서브태스크의 연산이 완료될 때까지 기다림
     서브태스크의 결과를 합침
}

이 알고리즘은 divide-and-conquer 알고리즘의 병렬화 버전이다. 포크/조인과정

포크/조인 프레임워크를 제대로 사용하는 방법

public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action); //  Spliterator 의 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 true를 반환(iterator 동작과 같다)
    Spliterator<T> trySplit();  //  Spliterator 의 일부 요소(자신이 반환한 요소)를 분할해서 두 번째 Spliterator를 생성하는 메서드
    long estimateSize();    //  탐색해야 할 요소 수 정보 제공 메서드
    int characteristics();
}

분할 과정

재귀분할과정 1단계 첫 번째 Spliterator에 trySplit을 호출하면 두 번째 Spliterator가 생성되고, trySplit의 결과가 null이 될 때 까지 이 과정을 반복한다. 4단계처럼 모든 trySplit의 결과가 null이면 재귀 분할 과정이 종료된다.

Spliterator의 특성

Spliterator의 characteristics 메서드는 Spliterator 자체의 특성 집합을 int 타입으로 반환한다.

특성 의미
ORDERED 리스트처럼 요소에 정해진 순서가 있으므로 Spliterator는 요소를 탐색하고 분할할 때 이 순서에 유의해야 한다.
DISTINCT x, y 두 요소를 방문했을 때 x.equals(y)는 항상 false를 반환한다.
SORTED 탐색된 요소는 미리 정의된 정렬 순서를 따른다.
SIZED 크기가 알려진 소스(ex. Set)로 Spliterator를 생성했으므로 estimatedSize()는 정확한 값을 반환한다.
NONNULL 탐색하는 모든 요소는 null이 아니다
IMMUTABLE 이 Spliterator의 소스는 불변이다. 즉, 요소를 탐색하는 동안 요소를 추가, 삭제할 수 없다.
CONCURRENT 동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다.
SUBSIZED 이 Spliterator 그리고 분할되는 모든 Spliterator는 SIZED 특성을 갖는다.

커스텀 Spliterator 구현하기

생략

정리