하마
2k
2016-08-24 17:55:56.0 작성 2017-01-12 12:57:39.0 수정됨
11
4048

[Java] 쓰레드풀 과 ForkJoinPool


메모리풀/쓰레드풀/캐쉬풀/DB풀  Pool 4총사 중 먼저  쓰레드 풀에 대해서 쉽게 그림으로 배워보아요.
오류나 추가사항이 있으면 댓글로 남겨주시면 소정의  포옹으로 보답을.... 쿨럭 @@

자 시작합니다~!



쓰레드/풀 


똑똑똑!

누구니?

쓰레드 에요..

프로그램(프로세스) 안에서 실행 되는 하나의 흐름 단위에요.
내부에서 while 을 돌면 엄청 오랬동안 일을 할 수 도 있답니다.



쓰레드 끼리는 값 (메모리) 을 공유 할 수 있습니다.
가끔 서로 말도 없이 값을 바꾸어서 곤란에 빠지기도  합니다. 

내가 알람을 6시로 맞춰 놨는데 B가 8시로 맞춰놓는바람에 지각을 하고 말았어요. 

쓰레드는 필요 할때마다 OS 공장에서 만들어서 
사용하고 , 다 사용하고 나면 공장에서 수거해가요.

쓰레드는 동일한 메모리 영역에서 생성되고 관리되어서 상태 변이 속도가 '프로세스' 보다는
빠르지만 
그래도 생성/수거에 드는 비용이 나름 있다고 해요.

각각의 쓰레드는 동시에 자기가 맡은 일을 하기 때문에 빠르게 처리 된답니다.
멀티 쓰레드 라고 해요.


쓰레드 풀


생성 / 수거에 드는 비용을 줄이기고 재사용성을 높이기 위해 
풀장에다가 여러개의 쓰레드를 먼저 만들어두고 사용하기도 해요.

큐같은 자료구조를 가진 클래스에 쓰레드를 미리 몇개 만들어서 놓아두면
그게 쓰레드 풀이에요. 별거 없어요.

작업 할거 있으면  그 중에 나 지금 한가해요~~라고 표식이 된 놈을 찾아서 일 시키면 됩니다.
일하는 놈을 따로 모아두고, 일 안하는 놈을 따로 모아두면 더 일 시키기 쉽긴 하겠죠? 

실제로 일 하는 놈 일 안하는 놈을 등을 관리하는 기술이 아주 다양하답니다.
다만 쓰레드가 하나 뿐이어도 충분히 일 할 수 있는데 10개씩 미리 만들어 둘 필요는 없어요.

Java 5 에서는 저런것을 만들어주는 방법이 생겼다네요. 이 생각 저 생각해서 만들었다는데 

첫째.  1분에 한번씩 임무를 수행 시키기 위한 쓰레드풀 
1초에 한 개씩  DB 에 값이 들어 갈 때 , 1분 평균을 구해서 다른 테이블에 집어 넣을 때  필요.

newScheduledThreadPool(int corePoolSize)

둘째. 풀장에 쓰레드를 고정적으로 몇개를 놀게 해줄지 정하는 방식도 제공해요.
항상 비슷하게 일이 많다면 이렇게 만들어주면 좋을거 같아요.

newFixedThreadPool(int nThreads)

세째.  어쩔땐 일이 없지만 어쩔땐 일이 많아 질때는 어떻하죠?
그때는 유기적으로 쓰레드의 숫자가 증가하고 감소하는 쓰레드풀도 있답니다.

         newCachedThreadPool()            


이런 쓰레드풀에게 어떤 업무가 배달되면 서로 간에 자기가 하겠다고 다투기도 한답니다.
배달되는 편지함이 하나인데 서로 편지함을 열어 보겠다고 난장판을 벌이죠..
(라운드로빈 방식으로  하나씩 차례대로 가져가기도 합니다.)


그래서 어떤 경우는 쓰레드 자기 만의 고유 편지함을 따로 만들기도 합니다.


좀 더 자세히 살펴보도록 해요.

보통의 쓰레드 풀입니다.

폴더 안에서 파일을 읽어서 '사랑' 이라는 단어가 몇개 있는지 확인해 보기 위해
1. 폴더 3개를 각각 잡으로 만들어서 쓰레드 풀에 줍니다. 
2. 쓰레드 풀은 각 잡을 내부의 쓰레드에 배분 해줍니다.
3. 쓰레드들은 배분 받은 일을 합니다.
4. 끝  

이렇게 했을때 폴더 A 에는 내부에 폴더가 또 10개 정도 있고 파일도 많아서 
다른 쓰레드 들이 다 업무를 마쳤는데도 불구하고 혼자 일하고 있습니다.
나머지 쓰레드들은 멀뚱히 놀고만 있네요. 

자 이제 ForkJoin 방식을 보시죠.

폴더 안에서 파일을 읽어서 '사랑' 이라는 단어가 몇개 있는지 확인해 보기 위해
1. 폴더 3개를 감싸고 있는 부모 폴더를 잡으로 만들어 보냅니다.

2. 쓰레드 A 는 부모 잡을 받아서 자신의 로컬 큐에 1차 분활합니다.
3.쓰레드  B 는 놀고 있기때문에 A 로컬 큐에서 잡을 훔쳐다가 일을 합니다.
4. 쓰레드 B 가 훔쳐온 잡을 보니, 양이 많아서 분활합니다.
5. 이런식으로 세부적으로 분활해서 모든 쓰레드가 골고루 가져가서 일하게 됩니다.
6. 모든 쓰레드가 일을 종료하는 시간이 비슷해집니다. 


대략 감이 오시는 지요? 인생사 그렇듯이 그렇다고 항상 좋은건 아닙니다.

대략 말로 설명 드려 볼게요. 상상해 봅시다.

   (메모리풀,쓰레드풀 등 pool 은 자기의 솔루션에 맞춰서 직접 만드는게 가장 좋긴 합니다.
크리티컬하거나 여유가 있다면 말이죠 ^^) 
      

천만개의 랜덤한 숫자(1~100사이)가 있습니다. 
이 숫자들 중에서 10보다 작은 수가 몇개나 되는지 세는 코딩을 해봅시다.
CPU 는 4개라고 해요. 

방법 1.  그냥 쓰레드 하나로  천만번을 순회하면서 숫자를 센다.  (6초 걸림) 


방법2. ForkJoinPool 을 사용해서 리프가 100개 일 때까지 분활(fork)해서 각각의 수치를 위로 합쳐서(join) 계산한다. 쓰레드 4개를 골고루 사용하며 대신 태스크 객체는 분활한 만큼 만들어 지게 된다.(2.5초 걸림)  

방법3.  그냥 ThreadPoolExecutor 로 쓰레드 4개를 만든 후에 각각 천만개/4 로 나뉘어진 영역에 대해 순회하면서 숫자를 계산해서 합친다. ( 2초 걸림)  ForkJoinPool 보다 더 빠르네요? 네 그렇습니다. 쓸 때없는 객체 생성이 없어졌기때문이에요.

방법4. 저렇게 쓰레드 4개가 거의 동일한 일을 하게 된다면 ForkJoinPool 이 오히려 독이겠지만 하나의 쓰레드가 굉장히 오래 걸리고 나머지 3개의 쓰레드는 금방 끝이나는 경우는?? 네 이 경우는 ForkJoinPool 이 빛을 발하게 됩니다. (ThreadPoolExecutor  4초 , ForkJoinPool  3초) 


* 참고로  newFixedThreadPool 이런 팩토리 함수를 이용해서 만들어지는 것이 ThreadPoolExecutor 이다.즉 ThreadPoolExecutor 의 매개변수를 적절히 조절하면 newFixedThreadPool 나 newCachedThreadPool 에  해당하는 것들을 직접 만들 수 있다는 말. 


멀티쓰레드 처리율(throughput)  
 임백준님의 Akka 시작하기에서 발췌 

아카를 이용한 리팩토링을 끝마쳤을 때, 똑같은 컴퓨터 위에서 전과 동일한 몬테 카를로 시나리오를 수행하는데 걸리는 시간이 6시간에서 2시간으로 단축되었다. 66%의 시간이 절약된 것이다. 결과를 확인한 사람들은 깜짝 놀랐다. 단순히 자바 스레드에서 아카로 라이브러리를 바꾸었을 뿐인데 그렇게 엄청난 차이가 있을 수 있냐며 고개를 갸웃거렸다. 물론 이런 차이를 일반화할 수는 없다. 이런 결과 하나를 가지고 아카가 자바 스레 드보다 3배 빠르다고 말하는 어리석은 사람은 없을 것이다. 아카도 내부적으로도 자바 스레드를 사용하기 때문에 그런 비교 자체가 성립하지 않는다. 하지만 일반적 인 차원에서 짚고 넘어갈만한 부분도 있다. 이렇게 커다란 차이가 어디에서 비롯 되었는지 이해하려면 우선 암달의 법칙Amdahl’s law을 생각해볼 필요가 있다. 암달의 법칙은 이렇다. “멀티코어를 사용하는 프로그램의 속도는 프로그램 내부에 존재하는 순차적sequential 부분이 사용하는 시간에 의해서 제한된다.” Thread나 Task를 만들어서 ExecutorService에게 제출하는 식으로 동시성 코드를 작성하면 여러 개의 스레드가 동시에 작업을 수행한다. 하지만 프로그램 안에는 Thread나 Task가 포함하지 않는 코드가 존재한다. 여러 개의 스레드가 동시에 작업을 수행하더라도 synchronized 블록이나 데이터베이스, 네트워크 API 호출 등을 만날 때 다른 스레드와 나란히 줄을 서서 순차적으로 작업을 수행 해야 하는 경우도 있다. 암달의 법칙은 프로그램이 낼 수 있는 속도의 상한이 이런 순차적 코드가 사용하는 시간에 의해서 제한된다고 말하는 것이다. 이러한 순차적 코드의 또 다른 이름은 블로킹blocking 콜이다. 문제는 스레드 자체 가 아니라 스레드를 사용하면서 자기도 모르게 만들어내는 블로킹 콜이다. 조금 과장해서 말하자면 자바 개발자가 스레드를 이용해서 만들어내는 ‘동시성’ 코드는 일종의 신기루다. 사실은 코드 곳곳에 존재하는 블로킹 콜, 순차적 코드 때문에 전 체적인 프로그램의 처리율은 이미 상한이 정해져 있지만 여러 개의 스레드가 ‘동 시에’ 동작한다는 사실로부터 위안을 받을 뿐이다.

아래부터는  좀 더 딱딱한 글이니 시간이 없으면 필요할때 보시면 될거 같습니다.



Fork Join Pool 


Java 7  (Scala 언어도 사용) 에서 새로 지원하는 fork-join 풀은 기본적으로
큰 업무를 작은 업무로 나누어 배분해서 , 일을 한 후에 
일을 취합하는 형태입니다.
분할 정복 알고리즘과 비슷하다고 보면 되는데 그림을 볼까요?

이렇게 Fork 를 통해서 업무를 분담하고 Join 을 통해서 업무를 취합합니다.

자바에서 풀을 관리하는  ThreadPoolExecutor 와 마찬가지로 
ForkJoinPool 도 내부에 inbound queue 라는 편지함이 하나 있습니다. 
그걸 두고 싸우느라 시간을 낭비하는것을 방지하기 위해 
ForkJoinPool 은 쓰레드 개별 큐를 만들었어요.

위에 보다시피 왼쪽에서 업무를 보내면 (submit) 하나의  inbound queue 에 누적되고 그걸 A 와 B 쓰레드가 가져다가 일 처리를 합니다. A 와 B 는 각자 큐가 있으며 , 자신의 큐에 아무 업무가 없으면 상대방의 큐에서 업무를 훔쳐(?) 오기도 하네요. 최대한 노는 쓰레드가 없게 하기 위한 알고리즘입니다.
멍청하게 놀고 있는 쓰레드를 방지하기 위함이에요.

  • 쓰레드 자신의  task queue 로 덱(deque) 을 사용합니다. 덱은 양쪽 끝으로 넣다,뺏다 할수 있는 독특한 구조이며, C++ 에서 deque 자료구조는 vector,list 의 기능을 섞어 놓은듯한 모양새를 보입니다.이 덱은 ForKJoin 풀에서 중추를 담당하고 있습니다. 각 쓰레드는 덱의 한쪽 끝에서만 일합니다. 스택 처럼 말이죠. 나머지 한쪽 끝에는 잡을 훔치러온 다른 쓰레드가 접근합니다. 결과적으로 훔치러 온 녀석끼리 동일한 큐에서 경쟁을 벌일 수 는 있게 됩니다.

  • 잠재적인 문재는 쓰레드가 다른 쓰레드의 잡을 훔치러 갔다가 실패할 경우가 빈번할때 생깁니다.계속 빈 큐에다 대고 삽질을 하는거죠. 이것을 막기위해 "unemployed" 워커 쓰레드는 룰에 따라 휴식 상태로 바꿉니다. 


모든 기술은 항상 옳는 법이 없습니다.
단순 jsp 모델1 이 스프링 보다 좋을때도 많으며 
어떤 정렬 자료구조, 어떤 검색 알고리즘이 항상 좋은것은 아니듯이 

위의 ForkJoinPool 도 적절하게 사용하면 쓰레드들이 최적의 CPU 를 활용하는 효율성을 
보이겠지만 

그렇지 않다면 그냥 거추장스러운 개별 큐가 추가되서 낭비만 더 해 질 뿐입니다.


                      ForkJoinPool 인터페이스


ForkJoinPool 은 저런것들을 편하게 사용자가 수행하는데 도움이 되는 특별한 인터페이스를 제공합니다.쓰레드가 새로운 잡(업무) 을 분활해서 자신의 로컬 큐에 적재시키는 것 같은거 말이죠. 거기에는 Runnables 나 Callables 같은게 사용되지 않습니다.  대신 ForkJoinTask 이것이 사용됩니다. 그리고 ForkJoinTask 는 두가지로 방법을  제공하는데 하나는 리턴이 없는것( RecursiveAction ) 과 리턴이 있는 것 (.RecursiveTask입니다.


자 이제 코드로 말해볼까요? 개발자는 코드 아니겠습니까~


간단한 사용예 (RecursiveAction)

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveAction;

public class MyRecursiveAction extends RecursiveAction {

    private long workLoad = 0;

    public MyRecursiveAction(long workLoad) {
        this.workLoad = workLoad;
    }

    @Override
    protected void compute() {

        //일이 많으면 분할, 여기서는 16개 이상되면 분할함.
        if(this.workLoad > 16) {
            System.out.println("Splitting workLoad : " + this.workLoad);

            List<MyRecursiveAction> subtasks = new ArrayList<MyRecursiveAction>();

            subtasks.addAll(createSubtasks());

            for(RecursiveAction subtask : subtasks){
                subtask.fork(); // 로컬 큐로 던지면 쓰레드들이 달려 들어서 일을 가져 갈 것이다.
            }

        } else {
            // 일이 얼마 안되면 내가 하지 뭐
            System.out.println("Doing workLoad myself: " + this.workLoad);
        }
    }

    private List<MyRecursiveAction> createSubtasks() {
        List<MyRecursiveAction> subtasks = new ArrayList<MyRecursiveAction>();

        MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2); // 반토막으로 분할 
        MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);

        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }

}
MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24); // 24개의 일을 가진 잡을 만들어서
forkJoinPool.invoke(myRecursiveAction); 풀에 던진다.


복잡한 사용예 (RecursiveTask)

FolderProcessor.java

package forkJoinDemoAsyncExample;
 
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;
 
public class FolderProcessor extends RecursiveTask<List<String>>
{
   private static final long serialVersionUID = 1L;
   //This attribute will store the full path of the folder this task is going to process.
   private final String      path;
   //This attribute will store the name of the extension of the files this task is going to look for.
   private final String      extension;
 
   //Implement the constructor of the class to initialize its attributes
   public FolderProcessor(String path, String extension)
   {
      this.path = path;
      this.extension = extension;
   }
 
   //Implement the compute() method. As you parameterized the RecursiveTask class with the List<String> type,
   //this method has to return an object of that type.
   @Override
   protected List<String> compute()
   {
      //List to store the names of the files stored in the folder.
      List<String> list = new ArrayList<String>();
      //FolderProcessor tasks to store the subtasks that are going to process the subfolders stored in the folder
      List<FolderProcessor> tasks = new ArrayList<FolderProcessor>();
      //Get the content of the folder.
      File file = new File(path);
      File content[] = file.listFiles();
      //For each element in the folder, if there is a subfolder, create a new FolderProcessor object
      //and execute it asynchronously using the fork() method.
      if (content != null)
      {
         for (int i = 0; i < content.length; i++)
         {
            if (content[i].isDirectory())
            {
               FolderProcessor task = new FolderProcessor(content[i].getAbsolutePath(), extension);
               task.fork();
               tasks.add(task);
            }
            //Otherwise, compare the extension of the file with the extension you are looking for using the checkFile() method
            //and, if they are equal, store the full path of the file in the list of strings declared earlier.
            else
            {
               if (checkFile(content[i].getName()))
               {
                  list.add(content[i].getAbsolutePath());
               }
            }
         }
      }
      //If the list of the FolderProcessor subtasks has more than 50 elements,
      //write a message to the console to indicate this circumstance.
      if (tasks.size() > 50)
      {
         System.out.printf("%s: %d tasks ran.\n", file.getAbsolutePath(), tasks.size());
      }
      //add to the list of files the results returned by the subtasks launched by this task.
      addResultsFromTasks(list, tasks);
      //Return the list of strings
      return list;
   }
 
   //For each task stored in the list of tasks, call the join() method that will wait for its finalization and then will return the result of the task.
   //Add that result to the list of strings using the addAll() method.
   private void addResultsFromTasks(List<String> list, List<FolderProcessor> tasks)
   {
      for (FolderProcessor item : tasks)
      {
         list.addAll(item.join());
      }
   }
 
   //This method compares if the name of a file passed as a parameter ends with the extension you are looking for.
   private boolean checkFile(String name)
   {
      return name.endsWith(extension);
   }
}


And to use above FolderProcessor, follow below code:

Main.java

package forkJoinDemoAsyncExample;
 
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
 
public class Main
{
   public static void main(String[] args)
   {
      //Create ForkJoinPool using the default constructor.
      ForkJoinPool pool = new ForkJoinPool();
      //Create three FolderProcessor tasks. Initialize each one with a different folder path.
      FolderProcessor system = new FolderProcessor("C:\\Windows", "log");
      FolderProcessor apps = new FolderProcessor("C:\\Program Files", "log");
      FolderProcessor documents = new FolderProcessor("C:\\Documents And Settings", "log");
      //Execute the three tasks in the pool using the execute() method.
      pool.execute(system);
      pool.execute(apps);
      pool.execute(documents);
      //Write to the console information about the status of the pool every second
      //until the three tasks have finished their execution.
      do
      {
         System.out.printf("******************************************\n");
         System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
         System.out.printf("Main: Active Threads: %d\n", pool.getActiveThreadCount());
         System.out.printf("Main: Task Count: %d\n", pool.getQueuedTaskCount());
         System.out.printf("Main: Steal Count: %d\n", pool.getStealCount());
         System.out.printf("******************************************\n");
         try
         {
            TimeUnit.SECONDS.sleep(1);
         } catch (InterruptedException e)
         {
            e.printStackTrace();
         }
      } while ((!system.isDone()) || (!apps.isDone()) || (!documents.isDone()));
      //Shut down ForkJoinPool using the shutdown() method.
      pool.shutdown();
      //Write the number of results generated by each task to the console.
      List<String> results;
      results = system.join();
      System.out.printf("System: %d files found.\n", results.size());
      results = apps.join();
      System.out.printf("Apps: %d files found.\n", results.size());
      results = documents.join();
      System.out.printf("Documents: %d files found.\n", results.size());
   }
}


Output of above program will look like this:

Main: Parallelism: 2
Main: Active Threads: 3
Main: Task Count: 1403
Main: Steal Count: 5551
******************************************
******************************************
Main: Parallelism: 2
Main: Active Threads: 3
Main: Task Count: 586
Main: Steal Count: 5551
******************************************
System: 337 files found.
Apps: 10 files found.
Documents: 0 files found.


How it works?

In the FolderProcessor class, Each task processes the content of a folder. As you know, this content has the following two kinds of elements:

  • Files
  • Other folders

If the task finds a folder, it creates another Task object to process that folder and sends it to the pool using the fork() method. This method sends the task to the pool that will execute it if it has a free worker-thread or it can create a new one. The method returns immediately, so the task can continue processing the content of the folder. For every file, a task compares its extension with the one it’s looking for and, if they are equal, adds the name of the file to the list of results.

Once the task has processed all the content of the assigned folder, it waits for the finalization of all the tasks it sent to the pool using the join() method. This method called in a task waits for the finalization of its execution and returns the value returned by the compute() method. The task groups the results of all the tasks it sent with its own results and returns that list as a return value of the compute() method.



참고 싸이트

http://www.h-online.com/developer/features/The-fork-join-framework-in-Java-7-1762357.html
http://howtodoinjava.com/java-7/forkjoin-framework-tutorial-forkjoinpool-example/
http://www.oracle.com/technetwork/articles/java/fork-join-422606.html
http://zeroturnaround.com/rebellabs/fixedthreadpool-cachedthreadpool-or-forkjoinpool-picking-correct-java-executors-for-background-tasks/
http://gee.cs.oswego.edu/dl/papers/fj.pdf
http://stackoverflow.com/questions/7926864/how-is-the-fork-join-framework-better-than-a-thread-pool


22
17
  • 댓글 11

  • kkey21a
    1k
    2016-08-24 20:11:34.0

    이 보다 쉬울수 없네요. 감사드립니다~ 

    0
  • 손이시렵다
    1k
    2016-08-24 20:32:29.0
    늘 많이 배웁니다 감사합니다~!
    0
  • libedi
    224
    2016-08-25 13:34:39.0

    친절하면서도 자세한 설명 감사합니다.

    0
  • dkb
    1k
    2016-08-25 15:10:36.0

    좋은 자료 감사합니다.

    0
  • nhj12311
    202
    2016-08-25 19:21:17.0

    정말 쉽게 설명되어잇네요~/ 굳

    0
  • 질문
    8
    2016-08-26 09:26:25.0
    감사합니다.
    0
  • 무명소졸
    4k
    2016-08-27 07:02:43.0 작성 2016-08-27 10:56:43.0 수정됨

    정말 감탄할 따름  입니다.

    Respect!!

    0
  • fender
    6k
    2016-08-29 07:18:07.0

    저도 멀티 스레딩 쪽 코딩할 일이 자주 없어서개념이 명확하지 않았는데 도움이 많이 되었네요. 좋은 글 감사드립니다.

    0
  • pi
    428
    2016-08-29 08:46:17.0
    쑤레드풀링도 내부적으로 구현 전략이 많이 있네요. 맨날 가져다 흐기만했는데 많이 알게되었습니다.
    0
  • 천재개발자
    87
    2016-08-29 10:07:59.0

    감사합니다.


    0
  • huzii
    10
    2016-08-30 16:31:54.0

    은혜롭습니다

    0
  • 로그인을 하시면 댓글을 등록할 수 있습니다.