멀티스레딩
02 Jan 2020 | JavaJava Multi-Threading
아래 내용은 자바 멀티스레딩 강의를 보며 정리한 내용에 설명을 추가한 것입니다.
run(), start() 차이점
-
run()
- 메인 스레드에서 오버라이딩한 메서드 호출
-
start()
- 별도 스레드를 생성하여 run() 메서드 호출
멀티스레딩 환경에서 하나의 데이터를 공유하면 문제가 발생한다
- 자바 코드 최적화 과정에서 스레드를 상속받은 클래스 내부 변수(running)를 해당 스레드 내부에서 변경하지 않으면 변하지 않는다고 가정하고 cpu 캐싱 처리하여 최적화한다.
- cpu 캐싱이란 ?
- 변수 값을 메인메모리가 아닌 cpu 캐시에 저장하여 읽는 속도를 향상시킨다
- 외부 스레드에서 스레드 클래스 내부 변수를 수정하고 싶다면 volatile 키워드를 사용해라
- 스레드의 가시성 문제, volatile 키워드의 Happens-Before 보장
- volatile 변수의 새 값이 기존값을 근거로 할 경우(read-modify-write 패턴) volatile 선언만으로는 스레드를 블록할 수 없다. 이런 경우는 synchronized 키워드가 필요하다.
class Processor extends Thread{
private boolean running = false;
private volatile int counter = 0;
@override
public void run(){
while(running){
System.out.println("hello");
counter++;
}
}
}
public class App{
public static void main(String[] args){
Processor proc1 = new Processor();
Processor proc2 = new Processor();
proc1.start();
proc2.start();
// proc1 스레드가 counter++ 연산하는 도중에 proc2 스레드가 counter++ 연산을 실행한다면?
// proc1 스레드가 메인메모리에 증가된 값을 쓰기전에 proc2 스레드가 값을 읽어 증가시켜
// 결국 같은 값을 메인메모리에 쓰게 된다. 이런 경우 synchronized 키워드가 필요하다.
}
}
Synchronized 블럭과 Intrinsic Lock(고유락)
- 자바의 모든 객체는 intrinsic lock (고유락) 또는 monitor lock을 가지고 있다. (뮤텍스라 불리기도 함)
- Synchronized 블럭을 진입하기 전에 고유 락을 획득해야 한다 .
- 한 시점에 오직 하나의 스레드만 고유 락을 획득할 수 있으며, 다른 스레드는 고유 락을 획득한 스레드가 락을 해제하기를 기다려야만 한다.
Multiple Lock; Using Synchronized Code Blocks
- Synchronized Instance Methods
- worker.main(); 를 호출하면 두 개의 스레드가 동일한 Worker 인스턴스의 stageOne(), stageTwo()를 호출한다.
- stageOne(), stageTwo()는 synchronized 메서드이므로 하나의 스레드가 메서드를 호출하여 고유락을 획득하면 다른 스레드는 Worker 인스턴스 자체에 접근할 수 없다. stageOne(), stageTwo()는 접근하는 객체가 다른데 thread-1이 stageOne()를 처리하는 동안 thread-2가 놀고 있는 것보다 stageTwo()라도 처리하고 있으면 효율적일 것이다.
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class Worker {
private Random random = new Random();
private List<Integer> list1 = new ArrayList<Integer>();
private List<Integer> list2 = new ArrayList<Integer>();
// list1만 접근한다.
public synchronized void stageOne() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
list1.add(random.nextInt(100));
}
// list2만 접근한다.
public synchronized void stageTwo() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
list2.add(random.nextInt(100));
}
// 두 스레드가 같은 인스턴스를 공유하므로 한 스레드가 고유 락을 획득하면
// 다른 스레드는 락을 획득한 스레드가 락을 반납할 때까지 기다려야 한다.
// thread-1.stageOne(), thread-2-stageTwo()가 불가능하다.
// stageOne(), stageTwo()는 접근하는 객체가 다른데
// thread-1이 stageOne()을 처리하는 동안 thread-2가 stageTwo()를 처리하면
// 처리속도가 빨라질 수 있다.
public void process() {
for (int i = 0; i < 1000; i++) {
stageOne();
stageTwo();
}
}
public void main() {
System.out.println("Starting ...");
long start = System.currentTimeMillis();
Thread t1 = new Thread(new Runnable() {
public void run() {
process();
}
});
Thread t2 = new Thread(new Runnable() {
public void run() {
process();
}
});
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
long end = System.currentTimeMillis();
// Time taken: 5203 (약 5초 소요됨)
System.out.println("Time taken: " + (end - start));
System.out.println("List1: " + list1.size() + "; List2: "
+ list2.size());
}
}
public class App {
public static void main(String[] args) {
Worker worker = new Worker();
worker.main();
}
}
-
Synchronized Blocks in Instance Methods
- 별도의 락을 위한 객체를 생성하여 synchronized 블럭을 사용한다
- thread-1이 stageOne()을 처리하는 동안 thread-2는 stageTwo()를 처리할 수 있으므로 전체 처리시간이 빨라진다.
import java.util.ArrayList; import java.util.List; import java.util.Random; public class Worker { private Random random = new Random(); private Object lock1 = new Object(); // 락을 위한 객체 private Object lock2 = new Object(); // 락을 위한 객체 private List<Integer> list1 = new ArrayList<Integer>(); private List<Integer> list2 = new ArrayList<Integer>(); public void stageOne() { synchronized (lock1) { try { Thread.sleep(1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } list1.add(random.nextInt(100)); } } public void stageTwo() { synchronized (lock2) { try { Thread.sleep(1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } list2.add(random.nextInt(100)); } } public void process() { for (int i = 0; i < 1000; i++) { stageOne(); stageTwo(); } } public void main() { System.out.println("Starting ..."); long start = System.currentTimeMillis(); Thread t1 = new Thread(new Runnable() { public void run() { process(); } }); Thread t2 = new Thread(new Runnable() { public void run() { process(); } }); t1.start(); t2.start(); try { t1.join(); t2.join(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } long end = System.currentTimeMillis(); // Time taken: 2567 System.out.println("Time taken: " + (end - start)); System.out.println("List1: " + list1.size() + "; List2: " + list2.size()); } }
스레드 풀
- 다수의 스레드를 동시에 관리하는 효율방법
- 매번 스레드를 생성하는 것에 비해 오버헤드가 적게 발생한다
-
작업이 끝난 스레드는 TaskQueue에 있는 다음 작업을 처리한다.
-
execute()와 submit() 차이점
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class Processor implements Runnable {
private int id;
public Processor(int id){
this.id = id;
}
@Override
public void run(){
System.out.println(Thread.currentThread().getName() + " : Started : " + id);
// do Something
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " : Completed : " + id);
}
}
public class App{
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
for(int i=1; i<=10; i++){
executor.execute(new Processor(i));
// executor.submit(new Processor(i));
}
// 더 이상 새로운 task를 받지 않음
executor.shutdown();
try {
// shutdown() 호출한 후 남은 task들이 timeout 시간내로 끝나길 기다린다.
executor.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("All tasks completed");
}
}
CountDownLatch
- 스레드 N개를 실행하고, 일정 개수의 스레드가 모두 끝날 때까지 기다려야 하는 경우에 사용한다
- latch.await()
- 모든 스레드가 끝날 때 까지 기다린다
- latch.await(timeout, timeUnit)
- 설정한 시간만큼 기다린 후 다음 로직을 실행한다
- latch.getCount()
- 현재 카운트를 확인하여 모든 스레드가 작업을 완료하기 전에 다음 로직을 실행할 수 있다
- latch.await()
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5); // count 초기값 설정
for(int i=0; i<5; i++){
new Thread(new Processors(latch)).start();
}
latch.await(); // count == 0 까지 기다린다
latch.await(1, TimeUnit.SECONDS); // timeout 설정가능하다
while(true){
if(latch.getCount() == 3){ // 현재 카운트를 확인하여 모든 작업이 끝나기전에 동작을 실행할 수 있다
// do something...
break;
}
}
System.out.println("All Completed");
}
}
class Processors implements Runnable{
CountDownLatch latch;
public Processors(CountDownLatch latch){
this.latch = latch;
}
@Override
public void run() {
System.out.println("Starting Someting");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Finished Something");
latch.countDown(); // 스레드가 작업이 끝났음을 알린다
}
}
Wait(), Notify()
- Wait()
- synchronized 블럭안에서 호출한다 = 호출하는 스레드가 고유 락을 갖고 있는 상태
- 갖고 있던 고유락을 다른 스레드에게 넘기고 잠든다
- wait()가 호출된 후 2가지 조건이 충족되야 다시 락을 얻을 수 있다
- 락 통제권을 다시 얻을 수 있는 조건이어야 한다
- 같은 객체의 락을 가진 다른 스레드가 notify()를 호출해야 한다
- notify()
- synchronized 블럭 안에서 호출한다 = 호출하는 스레드가 고유 락을 갖고 있는 상태
- 잠들어 있던 스레드 중 하나를 깨운다
- notify()는 호출 즉시 락 통제권을 다른 스레드에게 넘기는 wait()와는 달리 synchronized 블럭이 끝날때 까지 락 통제권을 넘기지 않는다
- notifyAll()은 모든 스레드를 깨운다
ReentrantLock