Right now do !

[JAVA] concurrent programming - Thread Control

by 지금당장해

프롤로그

 가시성과 원자성을 이해 해야 문제없는 Thead 프로그램을 작성할 가능성이 높다. Thread 프로그램을 한다는 것은 나 혼자 목공소를 운영하다가 조수라도 한명 더 들어와 그의 작업까지 내가 정의 해야하고 서로 업무에 방해가 되지 않도록 동선을 고려하여 업무를 분장해야 하는 상황과 비유 할 수 있다. 그러기 위해서 적절한 시기와 방법으로 Thread는 생성되어야 하고 쉬어야(WAIT) 하며 때로는 멈춰야(BLOCK) 한다. 그리고 (제일 중요)완벽하게 Task를 완수 해야 한다. 필자는 이런 일련의 Thread 운영을 Thread Control이라 부르기로 했다. 

 

 그리고 이번 편에서는 소위 Thread Pool을 이용한 Thread Control 이 아닌 전통적인 방법으로 Thread를 생성하고 사용하는 과정을 먼저 다루려 한다. (Thread Pool을 이용한 방법은 다음 편에서 다루려 한다.) 

 

Thread의 상태(state)

 개발자 입장에서는 Thread로 처리할 Task를 정의하고 Thread를 생성하고 출발(start) 시키는 책임을 가지고 있다. 이후 JVM을 포함한 시스템의 Thrad Scheduler는 출발 요청이된 RUNNABLE Thread를 진짜 출발 시킨다. RUNNING 상태가 되는 것이다. 이 이후 별 문제없이 그리고 개발자 코드의 추가적인 제어 없이 Task를 완료하게 되면 Threads는 종료 상태(TERMINATED)가 된다. 이 흐름이 가장 엘레강스 한 Thread의 흐름이지만 현실은 그렇지 못하다. 좁디 좁은 목공소에서 나와 신입 조수의 동선이 충돌하지 않으리라 생각 하는것은 낙관적이다 못해 무책임 하다. 프로그램 세계에서도 좋던 싫던 Thread는 쉬거나 다른 Thread의 처리를 대기 해야하고 이로 인해 다양한 상태 전이가 일어난다. 아래 그림은 JAVA Thread Class내에 정의된 State열거형의 상태전이도 이다. 각 흐름 위에는 각 상태로의 전이에 필요한 함수가 기술되어 있으니 참고 바란다.

 

JAVA thread의 상태 전이

 

상기 상태도에서 RUNNING 상태는 JAVA의 Thread Class에서는 실존하지 않는 상태이다. 이 상태는 RUNNABLE와 통합되어 있다. 헌데 이를 굳이 구분해서 도식화 한 이유는 RUNNABLE로 통합되어 있지만 내부에서는 엄연히 실제 동작중인 상태와 리소스(CPU) 대기를 하는 상태가 나누어져 있기 때문에 이를 구분하여 설명하려는 목적이니 혼란 없기 바란다. 아래는 JAVA 소스상에 기술되어 있는 RUNNABLE에 대한 주석이다. (아래 주석에서 사용한 waiting이라는 용어는 Thread 상태인 WAIT와는 다른 OS자원 할당의 기다림이다 오해 없기 바란다.)

/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/

Thread task의 정의와 Thread 생성

Runnable interaface의 구현

 가장 순순하게 Thread Task를 정의하는 방법이다. Thread class의 인스턴싱을 위한 생성자의 인자가 Runnable이기 때문이다. 또한 Thread Class도 Runnable을 구현한 것이다. Runable interface는 무척이나 심플하다. 왜냐하면 시스템 입장에서 보면 Thread는 하나의 Task를 처리하는 단위이고 결국 함수 하나면 된다. 이 함수가 run() 이다.

// 중략 
@FunctionalInterface
public interface Runnable {
    public abstract void run();
}


// Runnable을 구현한 Class
class SimpleRunnalbe implements Runnable {

    @Override
    public void run() {
		// 여기에 적절한 Task를 정의 한다.
    }
}

 위 코드는 실제 Runnable이 정의된 코드의 주요 부분을 발췌했고 아래 Class를 이를 구현하는 경우를 설명하고 있다.  Runnable Class에는 @FunctionalInterface가 annotation되어 있다. 이는 JAVA 8부터 지원하는 람다식을 사용할 수 있는 Interface라는 것을 의미 한다. 이 용법에 대해서는 이후 설명 하도록 하겠다. 

Thread Class의 확장 또는 Runnable wrapping

class ExtendThread extends Thread {

    @Override
    public void run() {
        super.run();
        // Thread task 구현
    }
}

 Runable을 대신해서 이와 같이 구현해도 된다. 위에서 설명 했듯이 Thread Class 또한 Runnable을 구현 했기 때문에 run을 재정의 할 수 있다. 그런데 뭔가 어색하다. 그렇다고 잘못 된것은 없다. 필자가 이렇게 느끼는 이유는 책이나 자료에서 이렇게 사용하는 경우가 드물어서 그럴것이다. 다음 예제는 책이다 여타 자료에서 관용구 처럼 사용되는 Thread Class의 용법이다. 아마도 이런 용법이 많이 쓰이는 이유가 Runnable의 구현을 명시적으로 하기 보다는 익명 객체로 구현하기 때문이 아닐까 싶다.

// (1) 명시적으로 Runnable 을 구현한 경우
Thread t1 = new Thread(new SimpleRunnalbe());

// (2) 익명 객체로 Runnable 을 구현한 경우
Thread t2 = new Thread(new Runnable() {
    @Override
    public void run() {
        // thread task 구현
    }
});

// (2) 람다식으로 Runnable 을 구현한 경우
Thread t3 = new Thread(()-> {
    // thread task 구현
} );

  상속하여 확장 구현하는 방법이 아닌 Runnable 객체를 Thread Class로 wrapping하여 사용하는 방법 3가지다. 위에서도 설명 했지만 마지막 방법은 Runnable interface가 void 함수가 하나 뿐인 @FunctionalInterface이기 때문에 가능한 일이다. 더 자세한 사항은 람다식을 기술한 문서나 책을 참고하기 바란다. 위에서 기술한 4가지 방법중 어떤 방법을 사용하던 new Thread(....) 혹은 Thread에 확장된 Class의 생성이 이루어지면 Thread의 상태는 NEW가 된다. (이는 엄연히 JAVA Thread Class상에서 정의한 용어이며 시스템상에서는 숫자값으로 가지고 있다.) 

능동적인 thread control

Thread의 시작

 NEW까지 왔으니 이제 출발을 시켜야 한다. 엄밀히 말하면 시스템의 Thread Scheduler에게 Thread를 출발시켜달라고 요청을 해야한다. 이는 Thread 인스턴스 함수로 start()를 호출하여 이루어진다. start는 먼저 이 Thread를 RUNNABLE로 변경한다. 그야말로 실행할 수 있음이 된것이지 단 한줄도 시작하지 못했다. Thread Scheduler는 여유가 있는 CPU가 보이면 Thread를 출발 시킬것이다. 드디어 RUNNING이 된다. 이는 run함수의 수행을 의미한다. 이후 별일이 없다면 ( wait를 만나거나 lock을 만나지 않는다면 아니면 잘못된 프로그램 코드로 인해 무한 Loop에 빠지지 않는다면) Task가 완료되어 Thread의 상태는 TERMINATED가 될 것이다.  이제 부터는 그렇지 않은 경우를 논해보자.

 

Wait Thread

 "현재 확보한 Lock을 반환하고 이(현재) Thread는 잠시 휴게실에 들어가 쉬어라." 필자의 머릿속에 들어있는 wait() 함수의 정의이다. 그래서 wait() 함수를 사용할 때는 함수가 synchronized 블럭 내에 존재해야 한다. 그리고 쉴 시간을 줄수도 안줄수도 있다. 무한정 쉬게 한다면 누군가는 이를 깨워야 한다. 깨우는 방법은 다른 Thread가 notify()나 notifyAll()을 호출하는 것이다. 헌데 필자가 이 함수들를 처음 봤을때 어색했던 지점은 이런 함수들이 Thread Class가 아닌 JAVA의 super Class인 Object에 정의되어 있다는 점이였다. 지금은 이렇게 이해 하고 있다. 이 Object를 Object로 보지 않고 Lock객체로 보면 이해가 쉽다. 위에서 비유한 휴게실이라고 표현한 개념이 Thread 별로 있는 것이 아니고 Lock객체 별로 있는 것이다. 그래서 notify나 notifyAll이 성립하는 것이다. 즉 notify를 호출하면 Lock객체 내의 휴식을 하고 있는 Thread하나 notifyAll을 호출하면 thread 전부가 휴식 모드에서 일어나는 것이다. 만약 이 휴게실에 여러 thread가 쉬고 있는데 notify가 호출되면 누구를 깨울지는 명시 할 수 없다. ( 다만 FIFO라는 설이 있는데 실험 해보지 못해서 장담은 못하겠다.)

@Test
public void waitNotifyAllControlTest() {

    Object threadLockKey = new Object();

    final Thread[] testThreads = new Thread[2];
    testThreads[0] = new Thread(() -> {
        int i = 0;
        while (!testThreads[0].isInterrupted()) {
            Clock clock = Clock.systemUTC();
            System.out
                .println(String.format("Test Thread running >>>> %s%n", clock.instant()));
            ThreadUtil.sleep(500);
            i++;
            if (i > 10)
                break;

        }

        synchronized (threadLockKey) {
            threadLockKey.notifyAll();
        }

        System.out.println("CHECK POINT >>>> after working loop in new thread");
    });

    testThreads[1] = new Thread(() -> {
        synchronized (threadLockKey) {
            try {
                threadLockKey.wait();
            } catch (InterruptedException e) {
                // 적절한 예외 처리가 필요하다. 뒤에 설명한다.
            }
        }
        System.out.println("CHECK POINT >>>> after wait() in new thread 2");
    });

    testThreads[0].start();
    testThreads[1].start();
    System.out.println("CHECK POINT >>>> after start() in main thread.");

    synchronized(threadLockKey) {
        try {
            threadLockKey.wait();
        } catch (InterruptedException e) {
            // 적절한 예외 처리가 필요하다. 뒤에 설명한다.
        }
    }
    System.out.println("CHECK POINT >>>> after wait() in main thread.");
}

 위 예제는 2개의 사용자 정의 Thread와 Main Thread 총 3개의 Thread를 가지고 wait - notifyAll을 테스트 하는 예제 이다. Main thread와 thread[1] thread는 새로운 thread들이 출발하자 마자 wait에 들어간다. 모두 같은 휴게실(Lock객체)이다. 10회 현재 시간을 포함하는 메시지를 출력하고 Lock객체.notifyAll() 을 호출한다. 그러면 Main thread와 thread[1]이 휴게실에서 나온다. 그리고 다시 RUNNABLE 상태가 되어 Scheduler의 CPU 배당을 기다린다. (쉬기 직전 까지 일을 기억하고 있다. 고로 CPU를 배당 받으면 그 다음부터 한다.)

Sleep Thread

 이번에는 Thread를 재운다. 필자가 Java를 만들었다면 sleep과 wait를 용어를 바꾸었을것 같다. 개념상 wait가 오히려 더 푹 쉬기 때문이다. Lock까지 반납 하면서 쉰다. 그렇다 sleep과 가장 크게 다른 점이다. 해서 wait는 임계 영역안에서 호출되지 못하면 IllegalMonitorStateException 예외가 발생한다. sleep은 설령 Lock구간 내에 있어도 Lock을 반납하지 않기 때문에 이런 제약이 없다. sleep을 멈추는 방법은 시간이 다 되어 멈추는 것과 interrupt에 의해 이 기다림을 임의에 시점에 중단 시키는 방법 밖에 없다. 

Interrrupt Thread 그리고 InterrupedException

 애초 닷넷을 십 수년 해오던 필자가 Java의 sleep을 처음 봤을때 도무지가 이해가 않가는 대목이 바로 아래와 같이 sleep을 쓰려고 하면 try...catch 블럭으로 감싸야 한다는 점이였다. 그게 interrupt의 존재를 못랐기 때문이다. interrupt() 함수의 기능을 정의 하자면 wait()혹은 sleep()함수로 자고 있거나 쉬고 있는 thread를 깨움과 동시에 InterrupedException을 유발 시켜 개발자로 하여금 누군가에게 그만 하자는 신호가 왔음을 알리는 것이다. 여기 까지다. 오버 하면 안된다. thread의 isInterrupted()를 true로 만들어주지 않는다는 사실을 명심해야 한다. 그럼 누가 하나? 개발자가 판단해서 해야한다. 그리고 나중에 다른 글에서 다시 설명하겠지만 이 interrupt 함수는 명시적인 Lock의 lockInterruptibly() 혹은 tryLock(시간)에 의해 BLOCKED된 lock에 InterrupedException을 유발 시키는 방법이기도 하다. 이는 Dead Lock문제를 방지 할 수 있는 필자가 알고 있는 유일한 방법이기도 하다.

testThreads[1] = new Thread(() -> {
    int i = 0;
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        System.out.println(String.format("Current thread is Interrupted? %b%n"
            , Thread.currentThread().isInterrupted()));
        // 개발자가 현재 thread의 isInterrupted()를 true로 만드는 방법
        // 예외가 발생 했다고 자동으로 되리라는 착각은 위험한 결과를 초래한다.
        Thread.currentThread().interrupt();
        System.out.println(String.format("Current thread is Interrupted? %b%n"
            , Thread.currentThread().isInterrupted()));
    }    
    // 중략
});

testThreads[1].start();        
testThreads[1].interrupt();

위 테스트 코드의 콘솔 출력 결과는 아래와 같다. testThreads[1].interrupt()의 호출이 isInterrupted() == true를 만들어 주지 않음을 증명하고 있다. isInterrupted() == true가 되는 시점은 catch 블럭 내에서 현재 thread의 interrupt()를 호출한 시점이다. 결국 두 번에 걸친 interrupt() 호출의 thread의 interrupt 상태를 변경시킨다. 그러나 wait(), sleep()등으로 thread가 쉬지 않고 계속 동작중에는 isInterrupted()가 바로 true가 된다.

 

Current thread is Interrupted? false 

Current thread is Interrupted? true

 Join Thread

 Thread를 출발시키면 출발 시킨 thread(그것이 Main thread일수도 아닐 수도 있다.) 그리고 새로이 출발한 thread(들)은 서로 갈길 찾아서 안드로메다로 떠나 버린다. 할일을 다하면 위에서 설명 했듯이 thread는 그 생명을 다한다. 그런데 다 각자 할일 다 하러 떠났는데 주책 바가지 Main thread가 자기 할일 없다고 메인 함수의 마감 블럭(})을 빠져나가는 순간작업 thread(들)은 처리의 결과를 보고할 Main thread를 잃어버리게 된다. 이런 상황에 사용하는 함수가 join이다. thread객체.join()을 하면 해당 thread객체의 처리가 완료 될때 까지 현재 thread는 그자리에서 대기를 하게된다.

@Test
public void threadStartAndJoinTest() {

    Thread t1 = new Thread(new Runnable() {
        @Override
        public void run() {
            Clock clock = Clock.systemUTC();
            System.out.println(String.format("Before sleep >>> %s", clock.instant()));
            ThreadUtil.sleep(3000);
            System.out.println(String.format("After sleep >>> %s", clock.instant()));
        }
    });
    
    t1.start();

    try {
        t1.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

 위 코드는 3초를 대기를 전후해서 메시지와 현재 시각을 찍는 Task를 정의했고 호출 thread에서는 이렇게 만들어진 Task로 새로운 thread을 출발(start)시킨다. 그리고 현재 thread인 이 테스트 thread는 t1에 join한다. 이를 여러개 할 수 도 있다. 그렇게 하면 모든 join된 thread를 모두 기다리는 것이다. 단지 작업 Thread의 결과를 기다려야 하는 이런 상황이라면 지저분하게 wait를 할 필요가 없다. 참 그리고 앞에서 놓쳤는데 join도 sleep과 같이 thread객체.interrupt() 메서드 호출에 의해 해당 예외를 유발 시킬수 있다. 

Yeild Thread

 방금전에 Yeild 라는 단어의 뜻을 사전에서 찾아보니 "생산하다/항복하다" 의미도 있던데 지금 여기서는 "양도하다./넘겨 주다." 로 쓰였다. Thread라는 것이 결국 제한된 CPU자원을 서로 잘 나눠쓰는 개념으로 구현되어 있는데 이런 측면에서 보면 시작된 Thread가 뭔가를 처리할 조건이 못 되어 대기 해야 하는 경우에 타 Thread에 기회를 양보하는 것도 효율을 위해 고려해봐야 한다. 개발자가 이런 조건에서 임의로 wait를 하고 타 Thread의 notify를 기다리는 것도 좋은 방법일수 있으나 잘못된 프로그램 코드로 인해 기아(starvation)상태가 되어 영원히 휴게실에 머무는 산송장이 될 수도 있다. yeild는 이럴때 쓰면 된다. "아 지금은 당장 할거 없고 RUNNABLE상태로 갈꺼니까 Scheduler 니가 알아서 다시 실행 시켜줘!" 해서 아래와 같이 코드를 해놓으면 해당 thread의 상태는 RUNNABLE와 RUNNING사이를 계속 왔다갔다 하면서 실행 할 수 있는 상태를 기다린다. 실제 실험을 해보면 그냥 Loop를 회전하는 듯 한데 정말 부하가 걸리는 시스템에서는 효과가 있을듯 하다. (실험을 통해 검증하지는 못했다.)

 

@Test
public void yieldTest () {

    Object threadLockKey = new Object();
    final Thread[] testThreads = new Thread[2];
    AtomicBoolean isDoneSecondThread = new AtomicBoolean(false);
    testThreads[0] = new Thread (() -> {
        while (!isDoneSecondThread.get()) {
            System.out.println("First thread is will yield ... ");
            Thread.yield();
        }
    } );

    testThreads[1] = new Thread (() -> {
        int i = 0;
        while (!testThreads[1].isInterrupted()) {
            Clock clock = Clock.systemUTC();
            System.out
                .println(String.format("Test Thread running >>>> %s%n", clock.instant()));
            i++;
            if (i > 10)
                break;

        }

        boolean success = isDoneSecondThread.compareAndSet(false, true);
        if ( success ) {
            System.out.println("Set success AtomicBoolean flags.");
        }

        synchronized(threadLockKey) {
            threadLockKey.notifyAll();
        }

    });
    testThreads[0].start();
    testThreads[1].start();

    synchronized(threadLockKey) {
        try {
            threadLockKey.wait(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    System.out.println("CHECK POINT >>>> after wait() in main thread.");
}

 

타의에 의한 Thread control

 지금까지 설명한 Thread control은 특정 함수를 통해 thread의 상태를 변경시켜 기다리게 하거나 양보하거나 시간 또는 타 Thread의 알림(notify) 또는 간섭(interrupt)를 통해 다시 실행가능 한 상태로 돌리는 흐름이였다. Lock에 의한 thread상태의 변화는 이에 비해 수동적이라 할 수 있다. 예를 들어 아래 코드와 같이 synchronized 블럭이 있고 t1 thread가 먼저 이 블럭이 진입하여 lock을 획득했다고 가정해보자. 이 상황에서 t2가 이 블럭에 가면 t2는 BLOCKED가 되는 것이다. 해서 필자는 BLOCKED thread state를 수동적이라 해석한다.

 

package com.platformfarm.cluster;

import com.platformfarm.cluster.util.ThreadUtil;
import org.junit.Test;

public class BlockedStateTest {

    @Test
    public void makeBlockedThreadTest () {
        Object lockObject = new Object();
        ForBlockingTest t1 = new ForBlockingTest(lockObject, "T1");
        ForBlockingTest t2 = new ForBlockingTest(lockObject, "T2");
        t1.setOtherThread(t2);
        t2.setOtherThread(t1);

        t1.start();
        ThreadUtil.sleep(300);
        t2.start();

        try {
            t1.join();
            t2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class ForBlockingTest extends Thread {
        Thread otherThread;
        Object lockObject = null;

        ForBlockingTest(Object lockObject, String n) {
            this.lockObject = lockObject;
            this.setName(n);
        }

        void setOtherThread (Thread t){
            otherThread = t;
        }

        @Override
        public void run() {
            synchronized (this.lockObject) {
                int cnt = 0;
                while (!Thread.currentThread().isInterrupted()) {
                    cnt++;
                    if (cnt > 3) {
                        break;
                    }
                    ThreadUtil.sleep(500);
                    System.out.println(String.format("Current thread name is %s, state is %s"
                        , Thread.currentThread().getName(), Thread.currentThread().getState()));
                    System.out.println(String.format("Other thread name is %s, state is %s"
                        , otherThread.getName(), otherThread.getState()));
                }
            }

        }
    }
    
}

위 코드는 먼저 lock을 획득한 thread는 자신의 thread 이름과 상태를 출력하고 다른 thread의 이름과 상태를 다음줄에 출력한다. 아래는 실행 결과이다. 

Current thread name is T1, state is RUNNABLE 
Other thread name is T2, state is BLOCKED 
Current thread name is T1, state is RUNNABLE 
Other thread name is T2, state is BLOCKED 
Current thread name is T1, state is RUNNABLE 
Other thread name is T2, state is BLOCKED 
Current thread name is T2, state is RUNNABLE 
Other thread name is T1, state is TERMINATED 
Current thread name is T2, state is RUNNABLE 
Other thread name is T1, state is TERMINATED 
Current thread name is T2, state is RUNNABLE 
Other thread name is T1, state is TERMINATED

 

위에서 설명 했듯이 JAVA의 thread상태는 RUNNING이 존재 하지 않는다. 대신 실행 대기와 실행중을 통합하는 RUNNABLE로 출력 될 것이다. 오해 없기 바란다. thread에 진입을 못한 thread는 BLOCKED이다. T1의 수행이 끈나고 T2가 실행될때 T1은 할일이 다 끝났기 때문에 TERMINATED이다. 

에필로그

 이번 편에서는 JAVA전통적으로 thread생성하여 운용하는 과정에서 있을 수 있는 thread를 다루는 방법을 여러 관점에서 살펴봤다. 각 주제 단위로는 쉽게 이해가 가더라도 여러가지 상황이 겹쳐진 경우 상당히 혼란스러울 때도 있을것이다. 어떤 주제던 한번쯤은 깔딱 고개를 넘어야 하는데 쉽지 않은 산이긴 하다. 

블로그의 정보

지금 당장 해!!!

지금당장해

활동하기