멀티스레딩

|

Java Multi-Threading

아래 내용은 자바 멀티스레딩 강의를 보며 정리한 내용에 설명을 추가한 것입니다.

run(), start() 차이점

  • run()

    • 메인 스레드에서 오버라이딩한 메서드 호출
  • start()

    • 별도 스레드를 생성하여 run() 메서드 호출

멀티스레딩 환경에서 하나의 데이터를 공유하면 문제가 발생한다

  • 자바 코드 최적화 과정에서 스레드를 상속받은 클래스 내부 변수(running)를 해당 스레드 내부에서 변경하지 않으면 변하지 않는다고 가정하고 cpu 캐싱 처리하여 최적화한다.
  • cpu 캐싱이란 ?
    • 변수 값을 메인메모리가 아닌 cpu 캐시에 저장하여 읽는 속도를 향상시킨다
  • 외부 스레드에서 스레드 클래스 내부 변수를 수정하고 싶다면 volatile 키워드를 사용해라
class Processor extends Thread{
  
  private boolean running = false;
  private volatile int counter = 0;
	
  @override
  public void run(){
    
    while(running){
      System.out.println("hello");
      counter++; 
    }
  }
}

public class App{
  
  public static void main(String[] args){
    Processor proc1 = new Processor();
    Processor proc2 = new Processor();
    
    proc1.start();
    proc2.start();
    // proc1 스레드가 counter++ 연산하는 도중에 proc2 스레드가 counter++ 연산을 실행한다면?
    // proc1 스레드가 메인메모리에 증가된 값을 쓰기전에 proc2 스레드가 값을 읽어 증가시켜 
    // 결국 같은 값을 메인메모리에 쓰게 된다. 이런 경우 synchronized 키워드가 필요하다. 
  }
}

Synchronized 블럭과 Intrinsic Lock(고유락)

  • 자바의 모든 객체는 intrinsic lock (고유락) 또는 monitor lock을 가지고 있다. (뮤텍스라 불리기도 함)
  • Synchronized 블럭을 진입하기 전에 고유 락을 획득해야 한다 .
  • 한 시점에 오직 하나의 스레드만 고유 락을 획득할 수 있으며, 다른 스레드는 고유 락을 획득한 스레드가 락을 해제하기를 기다려야만 한다.

Multiple Lock; Using Synchronized Code Blocks

  • Synchronized Instance Methods
    • worker.main(); 를 호출하면 두 개의 스레드가 동일한 Worker 인스턴스의 stageOne(), stageTwo()를 호출한다.
    • stageOne(), stageTwo()는 synchronized 메서드이므로 하나의 스레드가 메서드를 호출하여 고유락을 획득하면 다른 스레드는 Worker 인스턴스 자체에 접근할 수 없다. stageOne(), stageTwo()는 접근하는 객체가 다른데 thread-1이 stageOne()를 처리하는 동안 thread-2가 놀고 있는 것보다 stageTwo()라도 처리하고 있으면 효율적일 것이다.
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class Worker {

	private Random random = new Random();

	private List<Integer> list1 = new ArrayList<Integer>();
	private List<Integer> list2 = new ArrayList<Integer>();

  	// list1만 접근한다.
	public synchronized void stageOne() {
			try {
				Thread.sleep(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			list1.add(random.nextInt(100));
	}

  	// list2만 접근한다.
	public synchronized void stageTwo() {
			try {
				Thread.sleep(1);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			list2.add(random.nextInt(100));
	}

  // 두 스레드가 같은 인스턴스를 공유하므로 한 스레드가 고유 락을 획득하면
  // 다른 스레드는 락을 획득한 스레드가 락을 반납할 때까지 기다려야 한다.
  // thread-1.stageOne(), thread-2-stageTwo()가 불가능하다.
  // stageOne(), stageTwo()는 접근하는 객체가 다른데
  // thread-1이 stageOne()을 처리하는 동안 thread-2가 stageTwo()를 처리하면 
  // 처리속도가 빨라질 수 있다.
	public void process() {
		for (int i = 0; i < 1000; i++) {
			stageOne();
			stageTwo();
		}
	}

	public void main() {
		System.out.println("Starting ...");

		long start = System.currentTimeMillis();

		Thread t1 = new Thread(new Runnable() {
			public void run() {
				process();
			}
		});

		Thread t2 = new Thread(new Runnable() {
			public void run() {
				process();
			}
		});

		t1.start();
		t2.start();

		try {
			t1.join();
			t2.join();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		long end = System.currentTimeMillis();

    // Time taken: 5203 (약 5초 소요됨)
		System.out.println("Time taken: " + (end - start)); 
		System.out.println("List1: " + list1.size() + "; List2: "
				+ list2.size());
	}
}

public class App {

	public static void main(String[] args) {

		Worker worker = new Worker();
		worker.main();
	}

}
  • Synchronized Blocks in Instance Methods

    • 별도의 락을 위한 객체를 생성하여 synchronized 블럭을 사용한다
    • thread-1이 stageOne()을 처리하는 동안 thread-2는 stageTwo()를 처리할 수 있으므로 전체 처리시간이 빨라진다.
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
      
    public class Worker {
      
    	private Random random = new Random();
      
    	private Object lock1 = new Object(); // 락을 위한 객체
    	private Object lock2 = new Object(); // 락을 위한 객체
      
    	private List<Integer> list1 = new ArrayList<Integer>();
    	private List<Integer> list2 = new ArrayList<Integer>();
      
    	public void stageOne() {
    		synchronized (lock1) {
    			try {
    				Thread.sleep(1);
    			} catch (InterruptedException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
      
    			list1.add(random.nextInt(100));
    		}
      
    	}
      
    	public void stageTwo() {
      
    		synchronized (lock2) {
    			try {
    				Thread.sleep(1);
    			} catch (InterruptedException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
      
    			list2.add(random.nextInt(100));
    		}
      
    	}
      
    	public void process() {
    		for (int i = 0; i < 1000; i++) {
    			stageOne();
    			stageTwo();
    		}
    	}
      
    	public void main() {
    		System.out.println("Starting ...");
      
    		long start = System.currentTimeMillis();
      
    		Thread t1 = new Thread(new Runnable() {
    			public void run() {
    				process();
    			}
    		});
      
    		Thread t2 = new Thread(new Runnable() {
    			public void run() {
    				process();
    			}
    		});
      
    		t1.start();
    		t2.start();
      
    		try {
    			t1.join();
    			t2.join();
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
      
    		long end = System.currentTimeMillis();
      
        // Time taken: 2567
    		System.out.println("Time taken: " + (end - start)); 
    		System.out.println("List1: " + list1.size() + "; List2: "
    				+ list2.size());
    	}
    }
      
    

스레드 풀

img

  • 다수의 스레드를 동시에 관리하는 효율방법
  • 매번 스레드를 생성하는 것에 비해 오버헤드가 적게 발생한다
  • 작업이 끝난 스레드는 TaskQueue에 있는 다음 작업을 처리한다.

  • Executor와 ExecutorService, Callable과 Future

  • execute()와 submit() 차이점

    • execute()
      • 리턴 타입 void
      • exception 발생 시 프레임워크 단에서 에러 핸들링한다
      • exception 발생 시 해당 스레드를 종료하고 새로운 스레드를 생성한다
    • submit()
      • 리턴 타입 Future
        • 리턴된 Future로 추가작업 할 수 있음 ( cancel(), get()… )
      • exception 발생 하더라도 try/catch 또는 afterExecute() 오버라이딩 하지 않으면 에러 발생여부를 알 수 없다
      • exception 발생 하더라도 기존 스레드를 유지한다
    • 참고1 참고2 참고 3
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class Processor implements Runnable {

    private int id;

    public Processor(int id){
        this.id = id;
    }
    @Override
    public void run(){
        System.out.println(Thread.currentThread().getName() + " : Started : " + id);

        // do Something
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " : Completed : " + id);
    }

}

public class App{

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(2);

        for(int i=1; i<=10; i++){
            executor.execute(new Processor(i));
//            executor.submit(new Processor(i));
        }

        // 더 이상 새로운 task를 받지 않음
        executor.shutdown();

        try {
            // shutdown() 호출한 후 남은 task들이 timeout 시간내로 끝나길 기다린다.
            executor.awaitTermination(1, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("All tasks completed");

    }
}



CountDownLatch

  • 스레드 N개를 실행하고, 일정 개수의 스레드가 모두 끝날 때까지 기다려야 하는 경우에 사용한다
    • latch.await()
      • 모든 스레드가 끝날 때 까지 기다린다
    • latch.await(timeout, timeUnit)
      • 설정한 시간만큼 기다린 후 다음 로직을 실행한다
    • latch.getCount()
      • 현재 카운트를 확인하여 모든 스레드가 작업을 완료하기 전에 다음 로직을 실행할 수 있다
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {

        CountDownLatch latch = new CountDownLatch(5); // count 초기값 설정

        for(int i=0; i<5; i++){
            new Thread(new Processors(latch)).start();
        }
        
        latch.await(); // count == 0 까지 기다린다
        latch.await(1, TimeUnit.SECONDS); // timeout 설정가능하다
  			while(true){
           if(latch.getCount() == 3){ // 현재 카운트를 확인하여 모든 작업이 끝나기전에 동작을 실행할 수 있다
                // do something...
                break;
           }
        }
        System.out.println("All Completed");
    }
}

class Processors implements Runnable{

    CountDownLatch latch;

    public Processors(CountDownLatch latch){
        this.latch = latch;
    }

    @Override
    public void run() {
        System.out.println("Starting Someting");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Finished Something");

        latch.countDown(); // 스레드가 작업이 끝났음을 알린다
    }
}

Wait(), Notify()

  • Wait()
    • synchronized 블럭안에서 호출한다 = 호출하는 스레드가 고유 락을 갖고 있는 상태
    • 갖고 있던 고유락을 다른 스레드에게 넘기고 잠든다
    • wait()가 호출된 후 2가지 조건이 충족되야 다시 락을 얻을 수 있다
      • 락 통제권을 다시 얻을 수 있는 조건이어야 한다
      • 같은 객체의 락을 가진 다른 스레드가 notify()를 호출해야 한다
  • notify()
    • synchronized 블럭 안에서 호출한다 = 호출하는 스레드가 고유 락을 갖고 있는 상태
    • 잠들어 있던 스레드 중 하나를 깨운다
    • notify()는 호출 즉시 락 통제권을 다른 스레드에게 넘기는 wait()와는 달리 synchronized 블럭이 끝날때 까지 락 통제권을 넘기지 않는다
    • notifyAll()은 모든 스레드를 깨운다



ReentrantLock