java) threadPool

  • 병렬처리가 많아짐, 스레드 개수 증가, cpu바빠짐, 메모리 사용량증가, 갑작스러운 병렬작업의 폭증으로 인한 스레드 폭증을 막으려면 스레드풀을 사용해야한다.
  • 작업처리에 사용되는 스레드를 제한된 개수만큼 정해놓고 작업 큐에 들어오는 작업들을 하나씩 스레드가 맡아 처리한다. 전체 스레드 갯수가 늘어나지 일정 이상 늘어나지않아 성능 저하를 막아줌.

  • Excutors 다양한 정적 메소드로 ExcutorService구현 객체를 만들 있다. 이것이 스레드 .
    • newCachedThreadPool()
      • 초기 스레드와 코어 스레드 개수가 0이고 스레드 개수 > 작업 개수 -> 새로운 스레드를 생성시켜 작업을 처리한다. 스레드 갯수는 운영체제의 성능과 상황에 따라 다름.
      • 최대 스레드 값은 integer값이 가질수 있는 최대의 값만큼 추가.
      • 1개의 스레드가 추가되고 60초동안 아무작업이 일어나지 않는다면 자동으로 추가된 스레드를 종료하고 스레드풀에서 제거되어짐.
      • ExcutorService excutorService = ExcutorService.newCachedThreadPool();
    • newFixedThreadPool(int nThreads)
      • 초기 스레드 갯수는0, 코어스레드수는 nThreads이다. 스레드 개수 > 작업 개수 -> 새로운 스레드를 생성시켜 작업을 처리한다. 최대 스레드 갯수는 매개값으로 nThreads이다.
      • 스레드가 놀고 있더라도 줄지 않는다.
      • ExcutorService excutorService = ExcutorService.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
  • CPU 코어 갯수만큼 최대 스레드를 사용하는 스레드풀 생성.
  • ThreadPoolExcutor(코어 스레드 , 최대스레드갯수, 노는 시간, 시간 단위, 작업큐)
    •  원하는대로 설정가능.
  • 남아있는 작업들을 마무리한 종료할 때에는 shutdown(), 마무리하지않고 곧바로 종료시 shutdownNow() 메소드를 호출, 스레드을 종료 시키지 않으면 main 스레드가 종료하더라도 계속 남아있기 때문에 어플리케이션 프로세스도 종료되지않음.
  • 작업 처리 요청을 위한 excute(), submit()
    • excute(Runnable runnable)
      • Runnable 작업 큐에 저장
      • 작업 처리 결과를 받지 못한다.
      • 작업 처리 도중 예외가 발생하면 해당 스레드가 종료되고 스레드풀에서 제거된다.
    • submit(Runnable runnable)
      • Runnable 또는 Callable 작업 큐에 저장
      • 리턴된 Future 통해 작업 처리 결과를 얻을 있음.
      • 작업 처리 도중에 예외가 발생해도 제거 되지않고 다음 작업에 재사용된다.
    • 생성 오버헤더를 줄일려면 가급적 submit() 사용.
  • 블로킹 방식의 작업 완료 통보
    • Future 객체는 작업 결과가 아니라 작업이 완료될 때까지 기다렸다가(지연, 블로킹) 최종 결과를 얻는데 사용된다. get() 호출하면 스레드가 작업을 완료할 때까지 블로킹, 완료 후에 처리결과를 리턴.
    • get()메소드를 호출하는 스레드는 블로킹 되기 때문에 해당 스레드에서 다른 일을 없다. 그래서 새로운 스레드나 다른스레드에서 get메소드를 호출하는 것이 좋다.
    • get() 이외에도 작업취소를 위한 cancel(), 작업취소확인 혹은 완료확인  isCancelled(), isDone() 메소드들이 존재

  • 리턴값이 없는 작업 완료 통보
    • 리턴값이 없는 경우 Runnable객체로 생성.
    • get메소드는 null 반환할 것이다.
  •  리턴값이 존재하는 작업 완료 통보
    • Callable 작업 생성, 제네릭 타입 파라미터 T call()메서드가 리턴하는 타입이 되도록 한다.
public class main {
    public static void main(String[] argsthrows InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        System.out.println("[작업 처리 요청]");
       Callable<Integercallable = ()->{
        int sum = 0;
        for (int i = 1i <= 10i++) {
            sum += i;
        }
        return sum;
       };
        Future<Integerfuture = es.submit(callable);
        try {
            int sum = future.get();
            System.out.println("[작업 처리 결과] " + sum);
            System.out.println("[작업 처리 완료]");
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        es.shutdown();
    }
}

  • 작업 처리 결과를 외부 객체에 저장
    • 대개 Result 객체는 공유객체가 되어, 이상의 스레드작업을 취합할 목적.
public class main {
    public static void main(String[] argsthrows InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        System.out.println("[작업 처리 요청]");
        class Task implements Runnable{
            Result result;
            Task(Result result){
                this.result = result;
            }
            @Override
            public void run() {
                int sum = 0;
                for(int i = 0 ; i <= 10 ; i++){
                    sum += i;
                }
                result.addValue(sum);
            }
        }
        Result result = new Result();
        Runnable task1 = new Task(result);
        Runnable task2 = new Task(result);
        Future<Resultfuture1 = es.submit(task1result);
        Future<Resultfuture2 = es.submit(task2result);
        try {
            result = future1.get();
            result = future2.get();
            System.out.println("[작업 처리 결과]" + result.accumValue);
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        es.shutdown();
    }
}
class Result{
    int accumValue;
    synchronized void addValue(int value){
        accumValue += value;
    }
}

  • 작업 완료 순으로 통보
    • 스레드풀에서 작업처리가 완료된 것만 통보받는 방법 -> completionService 이용
      • poll()
        • 완료된 작업의 future 가져옴
        • 완료된 작업이 없다면 null 리턴.
      • take()
        • 완료된 작업의 future 가져옴
        • 완료된 작업이 없다면 있을 때까지 블로킹됨.
public class main {
    public static void main(String[] argsthrows InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        CompletionService<Integercs = new ExecutorCompletionService<Integer>(es);
        System.out.println("[작업 처리 요청]");
        for(int i=0i<3i++){
            cs.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int sum = 0;
                    for(int i = 1i <= 10i++){
                        sum+= i;
                    }
                    return sum;
                }
            });
        }
        System.out.println("[처리 완료된 작업 확인]");
        es.submit(new Runnable(){
            @Override
            public void run() {
                while(true){
                    try{
                        Future<Integerfuture = cs.take();
                        int value = future.get();
                        System.out.println("[처리 결과]" + value);
                    }catch(Exception e){
                        break;
                    }
                }
            }
        });
        try{
            Thread.sleep(3000);
        }catch(InterruptedException e){}
        es.shutdownNow();
    }
}



  • 콜백 방식의 작업 완료 통보
    • 스레드가 작업을 완료하면 특정 메소드를 자동 실행하는 기법.
    • ExecutorService 콜백을 위한 별도의 기능을 제공하지 않는다. Runnable구현 클래스를 작성할 콜백기능을 구현.
    • 콜백 메소드를 가진 클래스는 직접 구현하거나 CompletionHandler 이용
      • CompletionHandler NIO 패키지에 포함되어 있으며 비동기 통신에서 콜백객체를 만들 사용된다.
      • completed failed 메소드를 구현
public class main {
    public class CallbackExample{
        private ExecutorService es;
        public CallbackExample(){
            es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        }
        private CompletionHandler<IntegerVoidcallback = 
        new CompletionHandler<IntegerVoid>(){
                    @Override
                    public void completed(Integer resultVoid attachment) {
                        System.out.println("completed() 실행: "result);
                    }
                    @Override
                    public void failed(Throwable excVoid attachment) {
                        System.out.println("failed() 실행: "exc.toString());
                    }
        };
        public void doWork(final String xfinal String y){
            Runnable task = new Runnable(){
                @Override
                public void run() {
                    try{
                        int intX = Integer.parseInt(x);
                        int intY = Integer.parseInt(y);
                        int result = intX + intY;
                        callback.completed(resultnull);
                    }catch(NumberFormatException e){
                        callback.failed(enull);
                    }
                }
            };
            es.submit(task);
        }
        public void finish(){
            es.shutdown();
        }
    }
    public static void main(String[] argsthrows InterruptedException {
       CallbackExample ce = new CallbackExample();
        ce.doWork("3""3");
        ce.doWork("3""");
        ce.finish();
    }
}

 

댓글

가장 많이 본 글