왜 coroutine 을 사용해야 할까?

Kotlin coroutine 은 효율적으로 thread 를 사용하고, 프로그래머가 편하고, 퍼포먼스가 좋다.

효율적으로 thread 를 사용한다

Thread 를 생성하는 데에는 큰 비용이 든다. 그런데 Coroutine 를 사용하기 위해 추가적인 thread 생성이 필요 없다.

또한 coroutine 은 non-blocking 이다. 즉, coroutine 이 완료될 때까지 thread 가 멈춰있지 않고 다른 작업을 처리할 수 있다. 아래 예시를 보자.

fun showOrderInfo(details: Details) = async {
    val orderId = orderProduct(details).await()
    val orderData = loadOrderData(orderId).await()
    showData(orderData)
}

위 예시에서 orderProduct()loadOrderData() 는 suspend fun 로서, non-blocking 이다. 덕분에 thread 는 showOrderInfo() 함수 말고도 다른 작업을 할 수 있다. 만약 orderProduct()loadOrderData() 가 blocking 이라면, thread 는 두 함수가 완료될 때까지 다른 작업을 할 수 없다.

프로그래머가 편하다

Coroutine 을 사용하는 함수를 정의할 때 suspend modifier 만 붙이면 된다. 그렇기 때문에 레거시 코드에 coroutine 을 도입할 때도 작업량이 적다.

callback 을 짤 필요가 없어서 코드의 가독성이 좋다. 후술하겠지만, 사실 Kotlin coroutine 은 callback 을 사용한다.

퍼포먼스가 좋다

Thread 를 생성하는 로직과 Thread 생성 없이 coroutine 을 실행하는 것의 성능 차이를 보자. 아래는 Marcin Moskała 의 Kotlin Coroutines Deep Dive 예시를 각색한 것이다.

fun repeatWithThread() {
    val threads = mutableListOf<Thread>()

    val timeSpent = measureTimeMillis {
        repeat(400_000) {
            val thread = thread {
                Thread.sleep(1500L)
            }
            threads.add(thread)
        }

        threads.forEach { it.join() }
    }

    println("Repeating with threads spent $timeSpent milliseconds.")
}
Repeating with threads spent 107475 milliseconds.
fun repeatWithCoroutine() = runBlocking {
    val jobs = mutableListOf<Job>()
    val timeSpent = measureTimeMillis {
        repeat(400_000) {
            val job = launch {
                delay(1500L)
            }
            jobs.add(job)
        }

        jobs.forEach { it.join() }
    }

    println("Repeating with coroutines spent $timeSpent milliseconds.")
}
Repeating with coroutines spent 1956 milliseconds.

보다시피 thread 를 만드는 방식은 coroutine 을 사용하는 것보다 시간이 훨씬 많이 걸린다. 또한 thread 를 만들면서 OOM 이 발생할 수도 있다. 그러면 이미 thread pool 이 만들어진 상황에서는 어떨까? 아래를 보자.

    fun repeatWithThreadPool() {
        val threadPool = List(400_000) {
            thread {
                Thread.sleep(1500L)
            }
        }

        val timeSpent = measureTimeMillis {
            threadPool.forEach { it.join() }
        }

        println("Repeating with threads spent $timeSpent milliseconds.")
    }
Repeating with threads spent 2557 milliseconds.

위에서 확인할 수 있듯이, thread pool 을 만드는 시간을 재지 않았음에도 작업마다 thread 를 할당하는 방식은 coroutine 을 사용하는 방식보다 느리다.

기본

개념

Marcin Moskała 의 Kotlin Coroutines Deep Dive 에서 아래와 같이 coroutines 를 정의한다.

By definition, coroutines are components that can be suspended and resumed.

Kotlin coroutine 은 suspend, resume 이 가능하며 이 때의 동작은 non-blocking 이다. 또한 프로그래머는 어떤 thread 에서 coroutine 을 구동할지 선택할 수 있다.

동작 예시: sequence

아래는 Marcin Moskała 의 Kotlin Coroutines Deep Dive 예시다.

class CoroutineInSequence {
    fun showHowCoroutineWorks() {
        val seq = sequence {
            println("Generating first")
            yield(1)
            println("Generating second")
            yield(2)
            println("Generating third")
            yield(3)
            println("Done")
        }

        for (num in seq) {
            println("the next number is $num")
        }
    }
}

함수 showHowCoroutineWorks() 를 실행하면 아래와 같은 동작을 확인할 수 있다.

Generating first
the next number is 1
Generating second
the next number is 2
Generating third
the next number is 3
Done

이를 통해 for-loop block 이 실행될 때마다 yield() 함수에서 코루틴이 멈추고 (suspend) 재개하는 (resume) 것을 확인할 수 있다.

참고로 suspend 대상은 함수가 아닌 coroutine 이라는 것을 잊으면 안 된다. 함수가 멈추는 것처럼 보여도, 이건 함수가 coroutine 을 멈춘 것이다.

동작 원리: Continuation interface

Callback 과 유사하다

Coroutine 은 어떻게 함수 중간에 실행을 멈추고, 나중에 멈춘 부분부터 실행할 수 있을까? Kotlin coroutines 의 동작 방식은 callback 과 유사하다. 하지만 Kotlin coroutine 은 thread 를 점유하지 않기 위해 call stack 을 비워버린다. 그렇기 때문에 call stack 의 대체제가 필요하다. 이는 continuations 에 의해 달성된다.

알다시피, suspend fun 은 suspension 과 resume 이 가능하다. 이를 위해서 기록돼야 하는 것들은 어느 함수 어느 위치에서 suspension 이 일어났는지, local variables, parameters 등이다. 당연한 얘기지만 이 정보를 갖고 resume 을 한다. 이 정보는 모두 Continuation object 에 저장된다.

Koltin compiler 는 suspend fun 에 Continuation interface 를 parameter 로 추가한다. Continuation 은 general callback interface 다. 이는 IntelliJ 에서 kotlin code 를 java 로 변환해서 쉽게 확인할 수 있다.
kotlin 코드

suspend fun showContinuationParameter() {
}

java 로 변환

public final class SuspendFunctionPracticeKt {
   @Nullable
   public static final Object showContinuationParameter(@NotNull Continuation $completion) {
      return Unit.INSTANCE;
   }
}

위 코드를 보면 suspend fun 에는 Continuation 이 있음을 알 수 있다.

continuation callback 절차

Continuation 의 callback 절차는 다음과 같다.

  1. continuation resume 시작
  2. Resumed continuation 이 관련 함수를 호출
  3. 위 continuation 이 다른 continuation 을 resume 시킴

위 절차가 연쇄적으로 일어난다. 아래 예시를 보자 (Marcin Moskała 의 Kotlin Coroutines Deep Dive 예시).

suspend fun a() {
    val user = readUser()
    b()
    b()
    b()
    println(user)
}

suspend fun b() {
    for (i in 1..10) {
        c(i)
    }
}

suspend fun c(i: Int) {
    delay(i * 100L)
    println("Tick")
}

위 코드에서 c 를 resume 하는 경우를 예로 들면 아래와 같은 절차가 연쇄적으로 일어난다.

  1. CContinuation.resume 호출
  2. CContinuation.resume 이 c() 호출
  3. c() 가 완료되면 CContinuation.resume 이 BContinuation.resume 호출

동일한 절차로 AContinuation 까지 진행된다.

Convention: suspend vs. extension

Svetlana Isakova 에 따르면, 언제 suspend 함수를 쓰고, 언제 CoroutineScope extension function 을 사용할지 convention 이 있다. 반환 값을 즉시 확인하고자 할 때는 suspend fun 을 정의하고, 반환 값에 관심이 없거나 즉시 확인할 필요가 없을 때는 CoroutineScope 의 extension function 을 정의한다. 아래와 같이 말이다.

suspend fun orderProduct(details: details): ProductId

fun CoroutineScope.loadProductInformationAsync(
    productName: String
): Deferred<Image>

Basic example

class CoroutineBasic {
    fun executeMultipleCoroutine() {
        GlobalScope.launch {
            launch {
                delay(300L)
                println("1st scope: 1st coroutine")
            }

            launch {
                delay(100L)
                println("1st scope: 2nd coroutine")
            }
        }

        GlobalScope.launch {
                println("2nd scope: 1st println")
                println("2nd scope: 2nd println")
        }
    }
}

executeMultipleCoroutine 를 실행하면 다음과 같은 결과를 얻는다.

2nd scope: 1st println
2nd scope: 2nd println
1st scope: 2nd coroutine
1st scope: 1st coroutine

즉, launch 함수 내에서는 synchronous, launch 함수끼리는 asynchronous 다.

launch vs. async

Coroutine builders 에는 launch, async 가 있다. launch 는 Job 을 반환하고, async 는 Deferred 를 반환한다. 어떤 경우에 launch, async 를 각각 사용해야 할까? 실행의 결과를 확인해야 하면 async 를, 실행만 하려면 launch 를 쓴다.

적용 사례

Philipp Lackner 가 youtube 에서 언급한 내용을 정리했다.

예외 전파: Exceptions in child coroutines propagate to the parent

코틀린 코루틴을 사용할 때 예외 처리가 헷갈릴 수 있다. 아래 두 함수를 보자.

class CoroutineError {
    fun throwInsideCurrentCoroutine() {
        GlobalScope.launch {
            try {
                throw Exception("exception.")
            } catch (e: Exception) {
                println("Exception has been caught: $e")
            }
        }
    }

    fun throwOutsideCurrentCoroutine() {
        GlobalScope.launch {
            try {
                launch {
                    throw Exception("child coroutine exception thrown.")
                }
            } catch (e: Exception) {
                println("Exception has been caught: $e")
            }
        }
    }
}

throwInsideCurrentCoroutinethrowOutsideCurrentCoroutine 의 차이는 예외가 어디서 발생했느냐다. 전자는 try-catch 문이 속한 context 에서 예외가 발생하지만, 후자는 try-catch 가 속하지 않은 context 에서 예외가 발생한다. 그렇기 때문에 각 함수를 실행해보면 각각 아래와 같은 메시지를 확인할 수 있다.

함수 throwInsideCurrentCoroutine 실행: catch 성공

Exception has been caught: java.lang.Exception: nested exception.

함수 throwOutsideCurrentCoroutine 실행: catch 실패

Exception in thread "DefaultDispatcher-worker-2" java.lang.Exception: child coroutine exception thrown.
	at CoroutineError$throwOutsideCurrentCoroutine$1$1.invokeSuspend(CoroutineError.kt:19)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)
	Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@792a55e5, Dispatchers.Default]

함수 throwOutsideCurrentCoroutine 에서 왜 catch 가 실패할까? 이유는 서로 coroutine 이 다르기 때문이다. 코드에서 try 문 안에 launch 함수를 사용해 독립적인 coroutine 을 만들었고, 거기서 예외가 발생했으니 parent coroutine 로 propagation 이 일어나고, 이로 인해 catch 실패가 일어난다.

launch 말고 async 는 어떨까? asyncawait 를 호출했을 때 async 내에서 발생한 exception 을 던진다. 그렇다면 아래 throwAsyncExceptionInLaunch 함수는 exception 을 던지면 안 될 거 같다.

class AsyncExceptionPractice {
    fun throwAsyncExceptionInLaunch() {
        GlobalScope.launch {
            val networkResponse = async {
                delay(500L)
                throw Exception()
                "result"
            }

            // No matter whether you comment this out, an exception occurs
//            networkResponse.await()
        }
    }

    fun throwAsyncExceptionInAsync() {
        GlobalScope.async {
            val networkResponse = async {
                delay(500L)
                throw Exception()
                "result"
            }

            // No matter whether you comment this out, an exception does not occur
//            networkResponse.await()
        }
    }
}

하지만 throwAsyncExceptionInLaunch 함수를 호출하면 아래와 같이 exception 이 발생한다.

Exception in thread "DefaultDispatcher-worker-2" java.lang.Exception
	at AsyncExceptionPractice$throwAsyncExceptionInLaunch$1$networkResponse$1.invokeSuspend(AsyncExceptionPractice.kt:11)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)
	Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@45e5831c, Dispatchers.Default]

왜 그럴까? 함수 throwAsyncExceptionInLaunch 에서 async coroutine 이 launch coroutine 의 child 이기 때문이다. 다시 말하면, launch coroutine 으로 async coroutine 의 예외가 전파되고, launchasync 와 달리 즉시 예외를 던지기 때문이다.

만약 parent coroutine 이 async 에 의해 시작되는 throwAsyncExceptionInAsync 를 호출하면 어떤 일이 일어날까? 예외는 발생하지 않는다. 이는 위에서 서술한 async 의 성격과 관련이 있다.

supervisorScope: No propagation

Coroutine scope 는 propagation 방식에 따라 두 가지로 나뉜다. 하나는 (normal) coroutine scope, 다른 하나는 supervisor coroutine scope 이다. 보통의 coroutine scope 에서 예외가 발생하면 child coroutines 를 모두 취소하고 해당 coroutine scope 역시 취소된다. supervisorScope 는 child coroutine 의 예외가 전파되지 않는다.

class ExceptionPropagationControl {
    fun doNotPropagate() {
        GlobalScope.launch {
            supervisorScope {
                launch {
                    delay(300L)
                    println("1st coroutine.")
                    throw Exception()
                }

                launch {
                    delay(500L)
                    println("2nd coroutine.")
                }
            }
        }
    }

    fun doPropagate() {
        GlobalScope.launch {
            coroutineScope {
                launch {
                    delay(300L)
                    println("1st coroutine.")
                    throw Exception()
                }

                launch {
                    delay(500L)
                    println("2nd coroutine.")
                }
            }
        }
    }
}

supervisorScope 를 사용하는 doNotPropagate() 함수를 호출하면 child coroutine 의 예외가 전파되지 않는 것을 아래에서 확인할 수 있다.

1st coroutine.
Exception in thread "DefaultDispatcher-worker-2" java.lang.Exception
	at ExceptionPropagationControl$doNotPropagate$1$1$1.invokeSuspend(ExceptionPropagationControl.kt:19)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)
	Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@386429a2, Dispatchers.Default]
2nd coroutine.

반대로 coroutineScope 를 사용하는 doPropagate() 함수를 호출하면 아래와 같이 예외가 전파돼 다른 child coroutine 이 완료되지 않는다.

1st coroutine.
Exception in thread "DefaultDispatcher-worker-2" java.lang.Exception
	at ExceptionPropagationControl$doPropagate$1$1$1.invokeSuspend(ExceptionPropagationControl.kt:36)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)
	Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@52d8bbd8, Dispatchers.Default]

cancellation bad practice

아래 코드를 보면 cancel() 을 통해 targetJob 이 취소됨을 알 수 있다.

class CoroutineCancellation {
    fun cancelJobDuringExecution() {
        GlobalScope.launch {
            val targetJob = launch {
                try {
                    delay(500L)
                } catch (e: Exception) {
                    e.printStackTrace()
                }
                println("job has been done.")
            }

            delay(300L)
            targetJob.cancel()
        }
    }
}
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@25f78d6e
job has been done.

위에서 cancel() 함수는 targetJob 을 300ms 후에 취소한다. targetJobprintln("job has been done.") 을 실행하기 위해서는 500ms 가 필요하다. 그런데 이상하게도 console 은 “job has been done.” 를 출력한다. 왜 그럴까? cancel() 로 인해 CancellationException 이 발생하고, 이게 catch block 에서 잡히기 때문이다. cancel() 은 suspend 함수를 취소하는데, 위에서는 delay(500L) 이 취소된다. delay(500L) 이 취소되면서 CancellationException 이 발생하고, 이건 다시 catch block 에 의해 처리된다. 예외가 catch block 에서 잡혔기 때문에 parent coroutine 으로 예외는 전파되지 않는다. 그렇기 때문에 println("job has been done.") 이 실행되 자원이 낭비된다.

References