Skip to content

[아이템 80] 스레드보다는 실행자, 태스크, 스트림을 애용하라 #81

@kihyun-yang

Description

@kihyun-yang

서론

  • 동시성 작업을 할 때는 작업 큐를 직접 생성할 수도 있겠지만 복잡한 작업들(안전실패, 응답불가 예방)이 필요하다.
  • java.util.concurrent 패키지는 실행자 프레임워크 (Executor Framework)라고 하는 인터페이스 기반의 유연한 태스크 실행 기능을 담고 있다.
  • java.util.concurrent 패키지의 Executors가 제공하는 정적 팩터리 메서드를 사용하면 다양한 작업 큐를 얻을 수 있다.
    • 그러니까 java.util.concurrent 패키지의 실행자, 태스크, 스트림을 이용하는 편이 낫다.
    • 직접 작업 큐를 구현하려면 복잡한 작업들 (안전실패, 응답불가 예방, 우하한 종료 등...)을 처리하는데 큰 노력이 필요하다.

실행자 서비스 (스레드 풀)의 주요 기능들

특정 태스크가 완료되기를 기다린다.

@DisplayName("하나의 worker를 가지는 쓰레드풀")
@Test
 void single() throws ExecutionException, InterruptedException {
     ExecutorService exec = Executors.newSingleThreadExecutor();

     exec.submit(() -> {
         System.out.println("First");
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     }).get();
     exec.execute(() -> System.out.println("Second"));
}
  • submit 메서드를 사용하면 Future 인터페이스를 반환받을 수 있고 get() 메서드를 통해 해당 Runnable (Callable도 가능) 를 실행할 때까지 기다리는 작업이 가능하다.
  • 따라서, 첫번째 Runnable 의 실행이 끝난 후 두번째 Runnable 도 실행이되고, 아래와 같은 출력을한다.
    First
    Second
    

태스크 모음 중 아무것 하나 혹은 모든 태스크가 완료되기를 기다린다.

@DisplayName("태스크 모음 중 아무것 하나 혹은 모든 태스크가 완료되기를 기다린다.")
@Test
void any() throws InterruptedException, ExecutionException {
  ExecutorService exec = Executors.newFixedThreadPool(3);

  List<Future<String>> returnStr = exec.invokeAll(tasks());
  System.out.println(returnStr.get(0).get()); // FIRST
  System.out.println(returnStr.get(1).get()); // SECOND
  System.out.println(returnStr.get(2).get()); // THRID
}

List<Callable<String>> tasks() {
  return Arrays.asList(() -> {
      try {
  	Thread.sleep(3000);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      return "FIRST";
  },
  () -> {
      try {
          Thread.sleep(1000);
      } catch (InterruptedException e) {
  	e.printStackTrace();
      }
  	return "SECOND";
  },
  () -> "THIRD");
}
@DisplayName("태스크 모음 중 아무것 하나 혹은 모든 태스크가 완료되기를 기다린다.")
@Test
void any() throws InterruptedException, ExecutionException {
    ExecutorService exec = Executors.newFixedThreadPool(3);

    System.out.println(exec.invokeAny(tasks())); // THRID 출력 후 
}

List<Callable<String>> tasks() {
    return Arrays.asList(() -> {
        try {
            Thread.sleep(3000);
         } catch (InterruptedException e) {
            e.printStackTrace(); // Interrupt
         }
         return "FIRST";
     },
     () -> {
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
              e.printStackTrace();
         }
         return "SECOND";
      },
      () -> "THIRD");
}

실행자 서비스가 종료하기를 기다린다.

@DisplayName("실행자 서비스가 종료하기를 기다린다")
@Test
void waitForTerminate() throws InterruptedException {
    ExecutorService exec = Executors.newSingleThreadExecutor();

    exec.execute(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("TEST"); // 출력안된다
    });

    exec.awaitTermination(2000, TimeUnit.MILLISECONDS);
}
@DisplayName("실행자 서비스가 종료하기를 기다린다")
@Test
void waitForTerminate() throws InterruptedException {
    ExecutorService exec = Executors.newSingleThreadExecutor();

    exec.execute(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
          System.out.println("TEST"); // 출력된다
    });

    exec.awaitTermination(4000, TimeUnit.MILLISECONDS);
}

완료된 태스크들의 결과를 차례로 받는다.

@DisplayName("완료된 태스크들의 결과를 차례로 받는다")
@Test
void sequence() throws ExecutionException, InterruptedException {
    CompletionService<String> cs = new ExecutorCompletionService<>(Executors.newFixedThreadPool(3));
    List<Callable<String>> tasks = tasks();
    tasks.forEach(cs::submit);
    for (int i = tasks.size(); i > 0; i--) {
        String r = cs.take().get();
        if (r != null)
            System.out.println(r); // THRID, SECOND, FISRT 순서로 출력
    }
}
  
List<Callable<String>> tasks() {
    return Arrays.asList(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "FIRST";
    },
    () -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "SECOND";
    },
    () -> "THIRD");
}

태스크를 특정 시간에 혹은 주기적으로 실행하게 한다.

@DisplayName("테스크를 특정 시간에 혹은 주기적으로 실행하게 한다.")
@Test
void schedule() throws InterruptedException {
    System.out.println("싱글쓰레드 쓰케쥴executors");
    ScheduledExecutorService exc = Executors.newSingleThreadScheduledExecutor();

    //3초 대기 후 출력 (Runnable command, long delay, TimeUnit unit) <- 매개변수
    exc.schedule(() -> System.out.println("Hello"), 3000, TimeUnit.MILLISECONDS);

    //특정 시간마다 반복 - 3초기다렸다가 3초마다 반복 (command, initial delay, period, TimeUnit unit)
    //시작딜레이(initialDelay) 이후 첫번째 실행을 시작으로 지정한 시간(period)만큼 차이로 정확하게 반복 실행.
    exc.scheduleAtFixedRate(() -> System.out.println("Hello!"), 3000, 3000, TimeUnit.MILLISECONDS); /// 3번출력
    Thread.sleep(10000);
}

스레드풀의 설정을 조금 더 견고하게 하고 싶다면 ThreadPoolExecutor 클래스를 직접 사용할 수 있다.

실행자 서비스 (스레드 풀) 생성 팁

  • Executors.newCachedThreadPool는 태스크를 요청받으면 바로 가용가능한 스레드에 할당하고 없으면 그 즉시 만든다. 태스크가 자주 쌓이는 시스템이라면 적합하지 않다.
    • CPU 이용률이 100%로 치닫을 우려.
  • 따라서 무거운 프로덕션 서버에서는 스레드 개수를 고정한 Executors.newFixedThreadPool을 선택하거나 완전히 통제할 수 있는 ThreadPoolExecutor를 직접 사용하는 편이 훨씬 낫다.
  • 작업 큐를 손수 만드는 일은 삼가야 하고, 스레드를 직접 다루는 것도 일반적으로 삼가야 한다.

포크-조인 태스크

  • 자바 7이 되면서 실행자 프레임워크는 포크-조인 태스크를 지원하도록 확장됐다.
  • 포크-조인 태스크는 포크-조인 풀이라는 특별한 실행자 서비스가 실행해준다.
  • 포크조인 태스크에서는 먼저 일을 끝낸 스레드가 다른 스레드의 남은 태스크를 수행하는 것이 가능해진다.
    • 이렇게 하여 모든 스레드가 바쁘게 움직여 CPU를 최대한 활용하면서 높은 처리량과 낮은 지연시간을 보장한다.
  • 자바 8부터 나온 병렬 스트림은 내부적으로 포크조인 프레임워크를 사용한다.
    • 포크-조인 태스크를 직접 작성하고 튜닝하기란 어려운 일이지만, 병렬 스트림을 사용하면 적은 노력으로 그 이점을 얻을 수 있다.

참고

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions