java) threadPool
- 병렬처리가 많아짐, 스레드 개수 증가, cpu바빠짐, 메모리 사용량증가, 갑작스러운 병렬작업의 폭증으로 인한 스레드 폭증을 막으려면 스레드풀을 사용해야한다.
- 작업처리에 사용되는 스레드를 제한된 개수만큼 정해놓고 작업 큐에 들어오는 작업들을 하나씩 스레드가 맡아 처리한다. 전체 스레드 갯수가 늘어나지 일정 이상 늘어나지않아 성능 저하를 막아줌.
- Excutors의 다양한 정적 메소드로 ExcutorService구현 객체를 만들 수 있다. 이것이 스레드 풀.
- newCachedThreadPool()
- 초기 스레드와 코어 스레드 개수가 0이고 스레드 개수 > 작업 개수 -> 새로운 스레드를 생성시켜 작업을 처리한다. 스레드 갯수는 운영체제의 성능과 상황에 따라 다름.
- 최대 스레드 값은 integer값이 가질수 있는 최대의 값만큼 추가.
- 1개의 스레드가 추가되고 60초동안 아무작업이 일어나지 않는다면 자동으로 추가된 스레드를 종료하고 스레드풀에서 제거되어짐.
- ExcutorService excutorService = ExcutorService.newCachedThreadPool();
- newFixedThreadPool(int nThreads)
- 초기 스레드 갯수는0, 코어스레드수는 nThreads이다. 스레드 개수 > 작업 개수 -> 새로운 스레드를 생성시켜 작업을 처리한다. 최대 스레드 갯수는 매개값으로 준 nThreads이다.
- 스레드가 놀고 있더라도 줄지 않는다.
- ExcutorService excutorService = ExcutorService.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
- CPU의 코어 갯수만큼 최대 스레드를 사용하는 스레드풀 생성.
- ThreadPoolExcutor(코어 스레드 개수, 최대스레드갯수, 노는 시간, 시간 단위, 작업큐)
- 원하는대로 설정가능.
- 남아있는 작업들을 마무리한 후 종료할 때에는 shutdown(), 마무리하지않고 곧바로 종료시 shutdownNow() 메소드를 호출, 스레드을 종료 시키지 않으면 main 스레드가 종료하더라도 계속 남아있기 때문에 어플리케이션 프로세스도 종료되지않음.
- 작업 처리 요청을 위한 excute(), submit()
- excute(Runnable runnable)
- Runnable을 작업 큐에 저장
- 작업 처리 결과를 받지 못한다.
- 작업 처리 도중 예외가 발생하면 해당 스레드가 종료되고 스레드풀에서 제거된다.
- submit(Runnable runnable)
- Runnable 또는 Callable을 작업 큐에 저장
- 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있음.
- 작업 처리 도중에 예외가 발생해도 제거 되지않고 다음 작업에 재사용된다.
- 생성 오버헤더를 줄일려면 가급적 submit()을 사용.
- 블로킹 방식의 작업 완료 통보
- Future 객체는 작업 결과가 아니라 작업이 완료될 때까지 기다렸다가(지연, 블로킹) 최종 결과를 얻는데 사용된다. get()을 호출하면 스레드가 작업을 완료할 때까지 블로킹, 완료 후에 처리결과를 리턴.
- get()메소드를 호출하는 스레드는 블로킹 되기 때문에 해당 스레드에서 다른 일을 할 수 없다. 그래서 새로운 스레드나 다른스레드에서 get메소드를 호출하는 것이 좋다.
- get() 이외에도 작업취소를 위한 cancel(), 작업취소확인 혹은 완료확인 isCancelled(), isDone() 메소드들이 존재
- 리턴값이 없는 작업 완료 통보
- 리턴값이 없는 경우 Runnable객체로 생성.
- get메소드는 null를 반환할 것이다.
- 리턴값이 존재하는 작업 완료 통보
- Callable로 작업 생성, 제네릭 타입 파라미터 T는 call()메서드가 리턴하는 타입이 되도록 한다.
public class main {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
System.out.println("[작업 처리 요청]");
Callable<Integer> callable = ()->{
int sum = 0;
for (int i = 1; i <= 10; i++) {
sum += i;
}
return sum;
};
Future<Integer> future = es.submit(callable);
try {
int sum = future.get();
System.out.println("[작업 처리 결과] " + sum);
System.out.println("[작업 처리 완료]");
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
es.shutdown();
}
}
- 작업 처리 결과를 외부 객체에 저장
- 대개 Result 객체는 공유객체가 되어, 두 개 이상의 스레드작업을 취합할 목적.
public class main {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
System.out.println("[작업 처리 요청]");
class Task implements Runnable{
Result result;
Task(Result result){
this.result = result;
}
@Override
public void run() {
int sum = 0;
for(int i = 0 ; i <= 10 ; i++){
sum += i;
}
result.addValue(sum);
}
}
Result result = new Result();
Runnable task1 = new Task(result);
Runnable task2 = new Task(result);
Future<Result> future1 = es.submit(task1, result);
Future<Result> future2 = es.submit(task2, result);
try {
result = future1.get();
result = future2.get();
System.out.println("[작업 처리 결과]" + result.accumValue);
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
es.shutdown();
}
}
class Result{
int accumValue;
synchronized void addValue(int value){
accumValue += value;
}
}
- 작업 완료 순으로 통보
- 스레드풀에서 작업처리가 완료된 것만 통보받는 방법 -> completionService를 이용
- poll()
- 완료된 작업의 future를 가져옴
- 완료된 작업이 없다면 null를 리턴.
- take()
- 완료된 작업의 future를 가져옴
- 완료된 작업이 없다면 있을 때까지 블로킹됨.
public class main {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(es);
System.out.println("[작업 처리 요청]");
for(int i=0; i<3; i++){
cs.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for(int i = 1; i <= 10; i++){
sum+= i;
}
return sum;
}
});
}
System.out.println("[처리 완료된 작업 확인]");
es.submit(new Runnable(){
@Override
public void run() {
while(true){
try{
Future<Integer> future = cs.take();
int value = future.get();
System.out.println("[처리 결과]" + value);
}catch(Exception e){
break;
}
}
}
});
try{
Thread.sleep(3000);
}catch(InterruptedException e){}
es.shutdownNow();
}
}
- 콜백 방식의 작업 완료 통보
- 스레드가 작업을 완료하면 특정 메소드를 자동 실행하는 기법.
- ExecutorService는 콜백을 위한 별도의 기능을 제공하지 않는다. Runnable구현 클래스를 작성할 때 콜백기능을 구현.
- 콜백 메소드를 가진 클래스는 직접 구현하거나 CompletionHandler를 이용
- CompletionHandler는 NIO 패키지에 포함되어 있으며 비동기 통신에서 콜백객체를 만들 때 사용된다.
- completed와 failed 메소드를 구현
public class main {
public class CallbackExample{
private ExecutorService es;
public CallbackExample(){
es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
private CompletionHandler<Integer, Void> callback =
new CompletionHandler<Integer, Void>(){
@Override
public void completed(Integer result, Void attachment) {
System.out.println("completed() 실행: "+ result);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("failed() 실행: "+ exc.toString());
}
};
public void doWork(final String x, final String y){
Runnable task = new Runnable(){
@Override
public void run() {
try{
int intX = Integer.parseInt(x);
int intY = Integer.parseInt(y);
int result = intX + intY;
callback.completed(result, null);
}catch(NumberFormatException e){
callback.failed(e, null);
}
}
};
es.submit(task);
}
public void finish(){
es.shutdown();
}
}
public static void main(String[] args) throws InterruptedException {
CallbackExample ce = new CallbackExample();
ce.doWork("3", "3");
ce.doWork("3", "삼");
ce.finish();
}
}
댓글
댓글 쓰기