하마
5k
2018-03-07 17:28:50 작성 2018-03-09 17:34:45 수정됨
2
2662

Java 쓰레드풀 분석 - newFixedThreadPool 는 어떻게 동작하는가?


소프트웨어 엔지니어링에서 풀의 종류는 다양한데요.

쓰레드풀,메모리풀,캐쉬풀,커넥션풀,객체풀(자바에서 객체풀은 사용을 지양합니다.메모리를 할당하는 작업이 자바가 C/C++보다 빠름등등이 있습니다. "풀"어서 말하면 미리 만들어두고 돌려막기로 사용하자 라고 볼 수 있는데요. 미리 만들어 두는 방식 / 쓰레드가 태스크를 처리하는 방식에 따라서 다양한 풀의 구현체들이 있을 수 있습니다.  이 글에서는 openJDK8 기준의 자바에서 구현된 newFixedThreadPool 를 해부해보도록 하겠습니다. 

                                             

           (풀장에서 뛰어노는 쓰레드들의 모습들)         

                               

쓰레드풀은 동일하고 서로 독립적인 다수의 작업을 실행 할 때 가장 효과적이다.실행 시간이 오래 걸리는 작업과 금방 끝나는 작업을 섞어서 실행하도록 하면 풀의 크기가 굉장히 크지 않은 한 작업 실행을 방해하는 것과 비슷한 상황이 발생한다. 또한 크기게 제한되어 있는 쓰레드 풀에 다른 작업의 내용에 의존성을 갖고 있는 작업을 등록하면 데드락이 발생할 가능성이 높다. 다행스컯게도 일반적인 네트웍 기반의 서버 어플리케이션 (웹서버,메일서버,파일서버등)은 작업이 서로 동일하면서 독립적이어야 한다는 조건을 대부분 만족한다.  -  Java concurrency in practice 책 발췌 


전설의 개발자들이 참여한 저 엄청난 책은 자신들이 참여한 java.util.concurrent 패키지를 기준으로 한다. 근데 이 책이 2006년에 JDK1.5 기준으로 쓰여졌기 때문에 "동일하고 서로 독립적인 다수의 작업" 이라고 한정지었는데, 후에 JDK1.7에서 FORK-JOIN 풀이 나오면서 기존 풀들에서는 할 수 없었던 분할 수행작업을 효율적으로 수행 할 수 있는 보완재가 되었다.



1.newFixedThreadPool 사용법 (자바8기준)


execute 메소드


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream; 
public class SimpleThreadPool {
  public static void main(String[] args) {   
  ExecutorService executor = Executors.newFixedThreadPool(10);   
  IntStream.range(0, 10).forEach( n -> executor.execute( () -> 
        {       
          try {         
                TimeUnit.MILLISECONDS.sleep(300);         
                String threadName = Thread.currentThread().getName();         
                System.out.println("Hello " + threadName);        
          } catch (InterruptedException e) {
             e.printStackTrace();        
          }     
      })   
  );  
}
}

- 쓰레드풀에 쓰레드를 10개를 뛰어놀게 한다. 
- 10번을 반복해서 쓰레드풀에 일을 시킨다. (쓰레드풀안의 쓰레드 하나가 선택되어 일처리를 할것이다)
- execute 메소드를 사용했다. 이 메소드는 void 를 리턴한다. 즉 일처리를 시키기만 하지 결과를 보고받지 않을것이다.
- 구현내용은 300밀리초를 기다렸다가 Hello 쓰레드이름을 출력하는 것이다. 

- 자바8부터는 for(int i = 0; i < 10; i++) 보다는 저렇게 사용하는게 좋다.


submit 메소드

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class SimpleThreadPool {

  public static void main(String[] args) {

    ExecutorService executor = Executors.newFixedThreadPool(2);
    final List<Integer> integers = Arrays.asList(1,2,3,4,5);
    Future<Integer> future = executor.submit(() -> {
      TimeUnit.MILLISECONDS.sleep(5000);
      int result = integers.stream().mapToInt(i -> i.intValue()).sum();
      return result;
    });

    try {
      Integer result = future.get();
      System.out.print("result: " + result);
      executor.shutdownNow();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
  }
}

- 쓰레드풀에 쓰레드를 10개를 뛰어놀게 한다. 
- 쓰레드풀에 하나의 일을 시킨다. (쓰레드풀안의 쓰레드 하나가 선택되어 일처리를 할것이다)
- submit 메소드를 사용했다. 이 메소드는 future 를 리턴한다. 즉 일처리를 시키기고, 그에 따른 결과를 보고받을 것이다.
- 구현내용은 5000밀리초를 기다렸다가 리스트안의 숫자들의 합을 리턴한다.

- 리턴 받은 future 로 부터 값을 얻는다. 여기서 get()메소드는 블럭된다. (타임아웃을 매개변수로 넣을 수도 있다) 아마 일처리를 하는 쪽의 쓰레드에서 일을 다 끝내고 set() 같은 것을 해 줄 때가지 블럭될거 같다.


이제 사용법은 알았으니 과연 쓰레드풀을 자바에서 어떻게 구현하고 있는지 확인 해보자.

@어떻게 10개의 쓰레드를 가진 풀을 만드는지? 
(newFixedThreadPool(10))
@어떻게 풀에서 쓰레드 하나를 할당해 주는지? (execute)

@쓰레드풀에서 할당 될 워커(쓰레드)가 없을 때 태스크가 어떻게 되는지?

@어떻게 future 를 리턴해주는지. (submit



2.newFixedThreadPool 소스 분석(자바8기준)


* 참고로 지면이 한정된 관계로 예외처리 포함해서 여러 로직들이 생략되었다.


2-1. 어떻게 10개의 쓰레드를 가진 풀을 만드는지? (newFixedThreadPool(10))

public class Executors {

java/util/concurrent/Executors.java 라는 파일에는 Executors 라는 클래스가 있으며, 정적 메소드로써 newFiexedThreadPool 이라는 것을 제공한다. 매개변수는 쓰레드풀에 들어갈 쓰레드들의 개수를 넣어준다.


public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
}

퍼사드 역할 (Gof의 패턴중 하나로 내부 시스템의 복잡함을 감추고 사용자가 간편히 사용하도록 하는 의도)을 하고있다. 그저 팩토리로써 내부의 ThreadPoolExecutor 객체를 대신 만들어주고 있는데, 매개변수로는 순서대로 살펴보자.


- corePoolSize : 풀 안에 유지되는 쓰레드 개수 (시작시) 

- maximumPoolSize : 풀에 유지되는 최대 쓰레드 개수

- keepAliveTime : corePoolSize 보다 쓰레드 개수가 많아 질 때, 새로운 테스크를 기다리기 위한 시간. 시간이 지나면 쓰레드를 없애서 corePoolSize 를 유지한다.

- unit :  keepAliveTime 시간단위

- workQueue : 실행 되기전에 홀드시켜 두는 태스크를 유지하는 큐. 쓰레드가 남지 않을 경우 여기 태스크를 넣는다.


시작개수가 1이고 최대개수가 1이면  newSingleThreadExecutor 이 되는것이고,

시작개수가 0이고 최대개수가 MAX_VALUE 가 되면, newCachedThreadPool 풀이라 한다.
우리는 최소,최대가 10개로 고정된 쓰레드풀을 만든 것이다.

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

java/util/concurrent/ThreadPoolExecutor.java 라는 파일에는 ThreadPoolExecutor라는 클래스가 있으며, 들어온 매개변수에 추가로 Executors.defaultThreadFactory()와 defaultHandler 를 추가 매개변수로 받아서 새 객체를 만든다.

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}


 Executors.defaultThreadFactory()
이게 먼가 쓰레드를 풀에 생성하는 것과 관련이 있는거 같아서 따라가 본다.
public static ThreadFactory defaultThreadFactory() {
    return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
       
        return t;
    }
}

- 쓰레드 그룹이라는 것을 만들어 두고 있다. 자바는 하나의 객체안에 여러개의 쓰레드들을 그루핑하기 위한 편리한 방식을 제공하는데, 이 방식을 통해서suspend, resume, interrupt 같은 콜을 한방에 적용되게 할 수 있다.
- pool 의 첫번째 쓰레드라는 이름을 지정해주고 있다. "pool-1-thread-"
- newThread 라는 메소드를 제공하는거 봐서 역시 이 객체는 새로운 쓰레드를 생성하는 팩토리였던 것이다.
- 비데몬 쓰레드로 만든다. (메인쓰레드가 종료되도 종료되지 않고 살아있는)

결론적으로 위의 코드는 각종 세팅만 했지 실질적으로 쓰레드를 만들지는 않았다. 그럼 언제 만드는 것일까?

2-2. 어떻게 풀에서 쓰레드 하나를 할당해 주는지? (execute)

이번에는 executor.execute( 태스크  ) 이 호출에 대해서 따라가 보자. 

public void execute(Runnable command) {

  int c = ctl.get(); 
  // 1 단계 
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;

  }

  // 2 단계
    .... 생략 ....
}

위 소스에서 1단계만 살펴보자. 만약 corePoolSize 보다 작은 개수의 쓰레드가 풀 안에 생성되있다면 새로운 쓰레드를 만들어서 새로 들어온 태스크(위에 소스에서 command) 를 처리하고 리턴해준다. 이제addWorker(command, true) 를 쫒아가보자.

private boolean addWorker(Runnable firstTask, boolean core) {
  .. 상태체크 부분 생략 .. 

  Worker w = null;

  w = new Worker(firstTask); // (1)
  final Thread t = w.thread;
  if (t != null) {

    workers.add(w); // (1)
    t.start();
    workerStarted = true;

  }

  return workerStarted;
}
쓰레드 생성 및 태스크 할당에 촛점을 맞추어 천천히 살펴보자.

1. Worker 객체를 만들고 태스크를 할당한다. 
 w = new Worker(firstTask); // (1)
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
Worker 생성자에서 반가운 손님을 보게되는데 newThread 를 통해서 태스크를 새로운 쓰레드에 할당하는 모습이다.여기서 눈여겨 봐야 할 것이, 워커객체는 쓰레드를 참조하고 있으며, 역으로 쓰레드에도 this (워커객체)가 매개변수로 들어가는 것인데, 쓰레드는 워커객체 this 를 통해서 할당된 태스크에 대한 처리를 하리라는 것을 알 수 있다. 즉 쓰레드에 태스크가 직접적으로 할당되는 것이 아니라, 워커객체라는 대리자를 통해서 관리되는 것. 

2.쓰레드풀의  workers 컬렉션에 Worker 객체를 보관한다. 
workers.add(w);

쓰레드풀은 쓰레드를 직접 관리하는게 아니라, 워커라는 대리자를 통해서 관리한다.

3. 쓰레드를 실행 시킨다. 
t.start();
워커를 실행시키는게 아니라, 쓰레드를 직접 실행시킨다. 이렇게 되면 쓰레드는 워커 참조를 내부적으로 가지고 있기 때문에, 워커 참조를 통해 워커가 가지고 있는 우리가 전달한 태스크를 실행 시킬 것이다. 참고로 좀 더 정밀하게 컨트롤하기 위한 상태체크 부분들은 모두 빠져있는데 나중에 여유가 생기면 각자 살펴보도록 하자.

2-3. 쓰레드풀에서 할당 될 워커(쓰레드)가 없을 때 태스크가 어떻게 되는지?

10개의 고정된 크기의 쓰레드풀을 만들었는데, 12개의 태스크를 넣어주면 어떻게 될까? 예상대로 10개의 워커(쓰레드) 만 생겨서 10개의 태스크만 처리하다가, 먼저 끝난 워커가 나머지 2개중 하나의 태스크를 처리하게 된다. 이 과정에 대해서 소스로 살펴보도록 하자.
public void execute(Runnable command) {
        // 1단계
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2단계 
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
1단계에서는 아직 워커(쓰레드)가 10개가 안되었을때, addWorker 를 통해 워커도 만들고 쓰레드도 만들어서 들어온 태스크를 처리하게 된다.
2단계에서는 이제 더 이상 처리할 워커(스레드)가 없을 경우에 대해서 처리하는데, workerQueue.offer(command) 즉  워커큐에 태스크를 넣어주고 리턴한다. 

여기서 예상 할 수 있는 것이 워커(쓰레드)가 자신이 담당한 태스크 처리를 끝내면 저 큐에서 기다리고 있는 태스크를 가져다가 실행 하리라는 것을 눈치 챌 수 있을 것이다. 맞는지 확인해보자.
 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
  
        while (task != null || (task = getTask()) != null) {

            ... 생략 ...

            task.run();
                   
            ... 생략 ...
        }
           
    }
 예상대로 워커객체의  runWorker() 메소드를 보면 처음 워커가 시작하고 나서 while 문을 돌면서 들어오는 태스크를 꾸준히 처리해 주고 있다. 즉 쓰레드와 워커객체는 풀이 살아있는 동안 계속 살아 있는 것이다  (자동 소멸이 필요하면 그렇게 쓰레드풀을 구현 할 수도 있을 것이며, 이미 있다) . 가져온 태스크는 task.run()을 해주고 있다. 기억하라 우리의 task 는 Runnable 인터페이스를 상속받은 객체이고 run 을 구현했었다. getTask() 를 통해서 태스크를 가져오는 것이 보일 것이다. 그 안에서  workerQueue 에서 태스크를 가져올것이다. 확인해보자.

 private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
                   .. 생략 ..

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
              
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
 workerQueue 에서 (이름을 TaskQueue 라고 해야지 않나 싶다) 태스크를 가져오고 있다.


2-4. 어떻게 future 를 리턴해주는지. (submit
execute 메소드와 비슷할 거라 예상되는 submit 메소드를 살펴보자.
public <T> Future<T> submit(Callable<T> task) {
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
java/util/concurrent/AbstractExecutorService.java 는 ThreadPoolExecutor 의 부모클래스인데 submit는 여기에 존재한다.

1. 건네어진 태스크를 RunnableFuture 인터페이스를 상속받는 FutureTask 로 만든 후에
public FutureTask(Callable<V> callable) {
  this.callable = callable;
  this.state = NEW; // ensure visibility of callable
}
2. Runnable 인터페이스가 아닌 RunnableFuture 를 매개변수로 받는 execute 메소드로 보내진다.
   참고로 RunnableFuture 는 Runnable 인터페이스를 상속받았기 때문에 문제없다.


public void execute(Runnable command) {

  int c = ctl.get(); //1 단계
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
  c = ctl.get();
  }

  //2 단계
    .... 생략 ....
}


위의 addWorker 안에서는 쓰레드를 실행시키는데,(addWorker 코드는 위쪽에 이미소개했다.)
public void run() {
  runWorker(this);
}
Worker 객체의 run 메소드안에는 runWorker 메소드를 실행시킨다. 살펴보자.
final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;

  try {

    task.run();

  } finally {
    processWorkerExit(w, completedAbruptly);
  }
}
이게 핵심 중 하나인데 Worker 객체에서 태스크를 실행 시키고 널로 세팅한다

새로운 태스크가 할당될때마다 쓰레드안에 넣어주면 되는 것이며, 쓰레드를 소멸 시킬 필요없이 새로운 태스크를 받아서 계속 활용 하는 것이 쓰레드풀의 아이디어 이며, 이 글에서 생략된 나머지 상태관리 구현은 매우 복잡하긴 하지만 핵심은 아니다. 

  task.run();
에 주목하자. runnable 이나 runnableFuture 는 각각 run을 구현한다.

public void run() {

  try {
    Callable<V> c = callable;
    if (c != null && state == NEW) {
    V result;
    boolean ran;
    try {
      result = c.call();
      ran = true;
    } catch (Throwable ex) {
      result = null;
      ran = false;
      setException(ex);
    }
    if (ran)
      set(result);
    }
  }

....
}
- run 내부에서 태스크를 실행시키고 있다. c.call() 리턴받은 result 를 set 메소드로 넘겨준다.

마지막으로 Future get() 과 set() 매칭에 대해서 생각해보자.
글 서두에 활용 예제 코드에서 Integer result = future.get();  기억 나는가?
future 객체의 get()메소드를 통해 무엇인가 해결이 됬으면 결과를 리턴받고 있는데, get()이 완료되려면 바로 위의 코드에서 set(result)가 호출되어야 한다.
if (ran)
    set(result);
이거 말이다. set 과 get 메소드의 코드는 아래에 있는데 이 부분은 쓰레드 풀링과 직접적인 관계가 없는 주제이므로 이 글을 읽는 분에게 맡겨 놓겠다.

set 메소드
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

get 메소드


public V get() throws InterruptedException, ExecutionException {
  int s = state;
  if (s <= COMPLETING)
    s = awaitDone(false, 0L); // awaitDone 에서 블럭되고 있음을 알수 있다. 하염없이 기다림 
  return report(s);
}




부록 쓰레드풀의 상태관리)


쓰레드풀의 상태를 나타내는 멤버변수로 ctl 에 대해 살펴보자.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));  // RUNNING 상태와 쓰레드 개수1로 초기화

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;  // 00011111 11111111 11111111 11111111 

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS; // 11100000 00000000 00000000 00000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

private static int ctlOf(int rs, int wc) { return rs | wc; }  // 2가지 상태를 하나의 Int 로 합쳐주는 함수

이 ctl 는 하나의 변수 안에 2가지 내용을 패킹시켜 두었다.

첫째. workerCount   - 현재  워커(쓰레드) 의 개수
둘째. runState - 쓰레드풀의 현재 상태 (running, shutting down 등등)

위의 코드를 보면 ctlOf 메소드를 통해서 초기에는 (RUNNING,0) 으로 초기화 시켜 놓은 부분이 보일 것이다. 즉 초기 상태는 
workerCount 는 0 이고 runState 는 RUNNING 이라는 것이다. 앞쪽비트는 runState 로 사용하고, 뒤쪽 비트는 workerCount 로 사용 한다는 것인데, 맞는지 확인 해보자.

- CAPACITY ( -1 일 해주는 이유는 하위 모든 비트를 1로 만들기 위해서이다.)
(1<< 29) - 1 = 2^29 -1  =  536870911 = 0x1FFFFFFF = 00011111 11111111 11111111 11111111 

- RUNNING 은 다음과 같다.
-1 << 29  =  -536870912 =   0xE0000000 = 11100000 00000000 00000000 00000000

위 쪽의 3비트와 나머지 비트로 나누어 가진 것을 알 수 있다. 

따라서 ctl 의 초기 상태는 다음과 같다.
11100000 00000000 00000000 00000000

workerCount

하나의 Int 타입으로 2가지 내용을 패킹하기 위해서는 먼저 workerCount 를 대략 5억개로 제한시켜두었고 
private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3 = 29

private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 00011111 11111111 11111111 11111111

private static int workerCountOf(int c) { return c & CAPACITY; } // 현재 워커 갯수는?

비트마스킹이 조금이라도 더 빠르니깐 이렇게 한것이며,위에서 workerCount 가 쓰레드 개수라고 했듯이, 그 숫자가 더 넘어서게도면 AtomicLong 으로 교체 해야 할 수도 있겠다.

workerCount 는 워커의 숫자로써, 살아있는 쓰레드들의 실질적인 숫자로써 계속 달라 질 것이다.(예를들어 ThreadFactory 가 쓰레드 생성에 실패 했을 때, 종료되는 와중에도 여전히 태스크 처리를 수행 될때 등등), 내부적으로 사용되는 workerCount와는 다르게 사용자에게 알려지는 풀의 크기는 worker 를 모아둔 workers(HashSet) 의 크기가 된다.  

현재 ctl 의 초기 상태는 다음과 같다. 이것이 workerCountOf 의 매개변수인 c 로 들어가게 되는데 

11100000 00000000 00000000 00000000   (ctl)
 00011111 11111111 11111111 11111111 (capacity)

이 둘을 비트연산(&)하면 0 이다. 맞는지 확인 해보자.
int c = ctl.get();   
int ws = workerCountOf(c); // ws 는 0

runState
RUNNING: 새로운 태스크를 받고,  큐에 집어 넣는 일을 하라.
SHOUTDOWN: 새로운 태스크를 받지마라, 그러나 이미 큐에 있는 태스크는 처리하라
STOP: 새로운 태스크를 받지마라, 큐에 있는 태스크도 처리하지 않는다. 현재 진행중인 태스크에 인터럽트를 건다.
TIDYING: 모든 태스크는 소멸되었고, workerCount 는 0 이다. TIDYING 상태로 전이되는 쓰레드는 terminated() 훅 메소드를 실행 시킬 것이다. 
TERMINATED: terminated() 메소드가 완료 되었다. 

전이형태는 다음과 같다.
RUNNING -> SHUTDOWN    ( shoutdown() 메소드 호출시)
(RUNNING or SHUTDOWN) -> STOP ( shoutdownNow() 메소드 호출시)
SHUTDOWN -> TIDYING ( 풀과 큐가 비었을 때)
STOP -> TIDYING (풀이 비었을 때) 
TYDYING -> TERMINATED (terminated() 메소드가 완료 되었을 때)

참고:

http://tutorials.jenkov.com/java-util-concurrent/executorservice.html

http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/classes/java/util/concurrent/Executors.java


17
15
  • 댓글 2

  • 하비
    278
    2018-03-12 13:13:39

    좋은 글 감사합니다.

    덕분에 까보면서 이해하는게 한층 더 수월해 졌네요.

    0
  • ggulmool
    60
    2018-05-30 09:46:30 작성 2018-05-30 10:11:32 수정됨


    정리 잘해주셔서 소스분석하는데 많은 도움이 되었습니다.

    소스 분석하면서 한가지 궁금한점이 있어서 질문 남겨요.


    ThreadPoolExecutor의 execute메소드를 수행하면 addWorker를 하면서 Worker(Runnbable task)를 만들고 그때 DefaultThreadFactory를 통해서 쓰레드를 생성하고

    인스턴스 변수인 HashSet<Worker> workers에 add를 하고 theard.start()를 해서 기동하는데요.


    workers셋에 저장된 쓰레드 갯수가 maxPoolSize와 동일한 경우 그 이후에 들어온 execute(Runnable)요청은 BlockingQueue<Runnable> workQueue에 저장하고 처리를 마친 쓰레드의 실행 로직의 while문(getTask() != null)을 통해 큐에 저장된 Runable요청을 가져와 수행하는 구조인데요.


    while문(getTask() != null) 조건이 충족되지 않으면 finally 부분의 processWorkerExit()을 통해 workers.remove를 시키고 있는데요.


    여기서 궁금한 점이 workers셋이 여유가 있는 경우에 새로운 요청이 들어오면 매번 쓰레드를 새로 생성하고 다시 기동하는 구조인건가요?

    쓰레드풀 개념이 사전에 지정한 갯수만큼 쓰레드를 생성하고 기동시켜서 쓰레드 생성및기동시키는 비용을 줄이려고 사용하는데요.

    ThreadPoolExecutor는 workers셋에 저장된 쓰레드가 maxPoolSize인 경우에만 미리 기동된 쓰레드를 재사용하는 방식인가요?


    0
  • 로그인을 하시면 댓글을 등록할 수 있습니다.