여러 작업의 동시 실행은 모든 프로그래밍 언어에서 매우 중요한 측면 중 하나다. 여러 실행 경로를 조율한다는 것은 본질적으로 복잡한 일이며 이 복잡성을 관리하기 위한 다양한 접근 방법이 존재한다. 코틀린은 함수형 패러다임과 객체 지향 패러다임의 깔끔한 통합에 힘입어 인기를 얻고 있는 JVM 언어다. 이번 기사에서는 코루틴을 사용해 동시성 프로그램을 작성하는 과정을 더 자세히 살펴본다.
ⓒ Trismegist san / Shutterstock |
<이미지를 클릭하시면 크게 보실 수 있습니다> |
코틀린 코루틴 이해
일반적인 동시성과 마찬가지로 코틀린 코루틴도 기본적인 내용은 이해하기 쉽지만 금방 복잡해질 수 있으므로 신중을 기해 점진적으로 움직여야 한다. 코틀린의 코루틴은 간단한 블로킹 서브루틴부터 복잡한 리액티브 파이프라인에 이르기까지 폭넓은 영역을 다루는 kotlinx.coroutines 패키지에서 찾을 수 있다.코루틴은 쓰레드에 대한 추상화다. 자바의 가상 쓰레드와 비슷하다. 코루틴에서는 쓰레드를 제어하는 대신 플랫폼 수준 객체를 관리하는데, 이 객체는 플랫폼이 알아서 관리해준다. 결과적으로 기존 쓰레드를 사용할 때에 비해 더 나은(경우에 따라 훨씬 더 높은) 성능을 얻을 수 있다. 본질적으로 프로그래머가 “중단 가능한” 코드의 경로를 선언하고 엔진이 내부적으로 최선의 조율 방법을 판단하는 방식이다.
구문 측면에서 코루틴의 개념은 동기적으로 보이는 코드 블록을 통해 동시성을 관리하는 방법을 제공하는 것이다. 이 작업은 코루틴 범위(scope)를 사용해 수행된다. 코루틴 생성은 범위 내에서만 허용된다. 범위는 해당 범위 내에서 루틴의 동작을 정의하며, 범위 내에서 실행된 모든 코루틴은 결과가 어떻게 되든(예를 들어 오류가 발생해도) 이 범위로 돌아간다.
블로킹 범위 선언
가장 기본적인 종류의 범위는 runBlocking 함수를 사용해 얻는 블로킹 범위다. 블로킹 범위는 범위 내의 모든 코루틴이 종료될 때까지 플랫폼에 현재 쓰레드를 차단하도록 지시하는 범위다. 서브루틴이 작업을 마치기 전에 프로그램이 완료되지 않도록 보장하기 위해 일반적으로 애플리케이션의 최상위 수준, main 함수에서 사용된다. (참고로 안드로이드의 경우 이벤트 루프를 사용하므로 runBlocking을 사용할 필요가 없다.)코드에서 블로킹 범위를 선언하는 일반적인 방법은 다음과 같다.
import kotlinx.coroutines.*
fun main() = runBlocking {
// do concurrent work
}
이 구문은 코틀린에서 흥미로운 부분으로, main을 즉시 실행할 것을 지시하며 runBlocking 함수(kotlinx.coroutines 라이브러리에 있음)를 구현으로 제공하고 인수로 정의된 코드 블록을 전달한다(이를 후행 람다라고 함). 중괄호 본문에 정의된 것이 무엇이든 runBlocking에 의해 생성된 블로킹 범위에서 실행된다.
launch 함수를 사용한 작업 실행
블로킹 main 범위 내에서 작업을 실행하고자 한다면 일반적인 방법은 다음과 같이 kotlinx.coroutines에 있는 launch 함수를 사용하는 것이다.import kotlinx.coroutines.*
println("start main")
launch {
println("start launch 1")
delay(1000)
println("end launch 1")
}
//println("between launches (main)")
println("end main")
}
fun main() = runBlocking {
println("start main")
launch {
println("start launch 1")
delay(1000)
println("end launch 1")
}
//println("between launches (main)")
println("end main")
}
코드를 실행하면 다음과 같은 출력이 생성된다.
start main
end main
start launch 1
end launch 1
end main
start launch 1
end launch 1
이 출력을 통해 main 함수가 끝까지 실행된 다음 launch 블록이 동시에 실행되는 동안 기다린다는 것을 알 수 있다. 1초 동안 기다린 다음 완료된다. 이는 지정된 밀리초만큼 기다릴 수 있게 해주는 지연 함수의 예다.
이제 지연 대신 장기 실행 네트워크 요청, 디스크 작업 또는 계산을 실행한다고 가정해 보자. 코루틴 디스패처를 사용할 수 있다. 디스패처는 동시 작업이 처리되는 방식을 세밀하게 조정하기 위해 사용된다. 기본 디스패처부터 시작해 보자.
디스패처와 지연
위 예제를 확장해서 다음과 같이 두 가지 작업을 생성해 보자.fun main() = runBlocking {
println("start main")
launch {
println("start launch 1")
delay(1000)
println("end launch 1")
}
launch {
println("start launch 2")
delay(500)
println("end launch 2")
}
println("end main")
}
println("start main")
launch {
println("start launch 1")
delay(1000)
println("end launch 1")
}
launch {
println("start launch 2")
delay(500)
println("end launch 2")
}
println("end main")
}
이제 두 개의 작업이 시작됐으며, 위 코드를 실행하면 다음과 같은 출력이 생성된다.
start main
end main
start launch 1
start launch 2
end launch 2
end launch 1
예상할 수 있겠지만 더 빠른 작업(500밀리초 지연)이 더 느린 작업(1,000밀리초 지연)보다 먼저 완료된다. 두 작업 모두 main 함수가 끝난 후 완료된다.
이제 기능을 함수로 추출해 보자. 밀리초 단위의 대기 시간을 허용하는 함수가 있고(실제 환경이라면 검색할 API 엔드포인트일 수 있음), 두 가지 다른 매개변수(500, 1000)를 사용해 두 번 호출해서 이전 예제를 더 유연한 방식으로 재현한다. 코루틴을 사용한 가장 간단한 방법은 다음과 같다.
import kotlinx.coroutines.*
suspend fun launchTask(delayMillis: Long) {
println("START task $delayMillis")
delay(delayMillis)
println("END launchTask $delayMillis")
}
fun main() = runBlocking {
println("start main")
launch {
launchTask(1000)
}
launch {
launchTask(500)
}
println("end main")
}
위 코드를 실행할 경우 생성되는 출력은 마지막 예제와 동일하지만 이번에는 재사용 가능한 launchTask 함수를 얻게 된다. launchTask 앞에는 suspend 키워드가 있다. 이 키워드가 없으면 엔진은 함수가 “중단”을 지원한다는 점, 즉 다른 작업이 수행되는 사이 잠시 멈출 수 있음을 이해하지 못하고, 컴파일러는 지연을 거부한다.
또한 여기서는 launch 블록을 사용해야 했다. 두 개의 launchTask 호출이 있는 블록을 사용했다면 호출이 순차적으로 이뤄졌을 것이다.
컨텍스트와 취소
이번에는 동시 논리의 정교함을 점진적으로 높여보자. 두 작업을 취소하는 기능을 지원하려면 다음과 같이 둘을 하나의 컨텍스트로 묶고 cancel() 메서드를 사용하면 된다.import kotlinx.coroutines.*
suspend fun launchTask(delayMillis: Long) {
println("START task $delayMillis")
delay(delayMillis)
println("END launchTask $delayMillis")
}
fun main() = runBlocking {
println("start main")
val scope = CoroutineScope(Dispatchers.Default)
scope.launch {
launchTask(10000)
}
scope.launch {
launchTask(500)
}
// Cancel all coroutines in the scope after 2 seconds
delay(2000)
scope.cancel()
println("end main")
}
여기서 명시적으로 CoroutineScope를 생성하고 이를 사용해 중단된 두 개의 함수 호출을 시작한다. 여기서도 기본 디스패처를 사용한다. 범위를 통제하에 두고 작업을 시작한 다음 scope.cancel()로 취소할 수 있다. 두 개의 작업이 있고 그 중 하나는 지연이 10,000밀리초다. 2,000밀리초 이후에 작업을 취소하므로 다음과 같은 출력을 얻게 된다.
start main
START task 500
START task 10000
END launchTask 500
end main
따라서 10,000밀리초 작업은 시작은 되었지만 완료되지 않았으며 둘러싼 범위와 함께 취소됐다.
정교함을 한층 더하기 위해 withTimeout 블록을 추가할 수 있다.
fun main() = runBlocking {
println("start main")
withTimeout(5000) {
launch {
launchTask(10000)
}
launch {
launchTask(500)
}
}
println("end main")
}
이 블록은 10,000밀리초 작업을 끊는다는 점에서 이전 예제와 비슷하게 동작하지만 여기서는 예외가 발생한다. 예외를 매끄럽게 처리하는 방법은 다음과 같다.
try {
withTimeout(5000) {
launch {
launchTask(10000)
}
launch {
launchTask(500)
}
}
} catch (e: TimeoutCancellationException) {
println("Timeout occurred: ${e.message}")
}
다음과 같은 깔끔한 출력을 얻게 된다.
start main
START task 10000
START task 500
END launchTask 500
Timeout occurred: Timed out waiting for 5000 ms
이제 두 개의 작업이 있고 그 중에서 하나가 네트워크 호출을 수행한다고 생각해 보자. 네트워크 호출은 IO 바운드 작업으로 간주된다. 다음과 같이 사용할 특정 디스패처를 전달할 수 있다.
launch(Dispatchers.IO) {
launchTask(10000)
}
개별적으로 작업을 취소할 수도 있다.
val scope = CoroutineScope(Dispatchers.Default)
scope.launch {
launchTask(10000)
}
val job = scope.launch {
launchTask(500)
}
job.cancel() // cancel the specific job
delay(2000)
scope.cancel()
출력은 다음과 같다.
start main
START task 10000
end main
이 코드는 delay() 함수를 취소할 수 있기 때문에 동작한다. 더 복잡한 시나리오에서는 취소 지원을 직접 구현해야 한다. 코틀린 문서에도 나와 있듯이 취소는 협조적(cooperative) 작업이므로 코드가 취소 가능한지 확인해야 한다. 다음과 같이 isActive 속성을 사용하면 된다.
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (isActive) { // cancellable computation loop
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
cancel 호출이 수신되면 isActive 속성은 false가 되고 while 루프는 작업이 종료될 수 있도록 한다.
채널을 사용한 통신
코루틴은 실행 중인 코루틴 간에 데이터를 전달하기 위한 깔끔한 방법으로 채널을 지원한다. 예를 들면 다음과 같다.이렇게 하면 producer 작업은 채널 객체를 통해 메시지를 보내고 consumer는 이 메시지를 수신해 출력할 수 있다. 동시 컨텍스트 간에 데이터를 공유하기 위한 매우 간단한 메커니즘이다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
fun main() = runBlocking {
val channel = Channel() // A communication channel for Ints
val producer = launch {
repeat(5) { i ->
delay(1000)
channel.send(i)
}
}
val consumer = launch {
repeat(5) {
val message = channel.receive()
println("Received: $message")
}
}
}
흐름을 사용한 리액티브 프로그래밍
이제 흐름을 사용해서 정교함을 한 단계 더 높여보자. 흐름은 일종의 리액티브 프로그래밍 프레임워크를 제공하는 함수형 이벤트 스트림이다. 간단한 예를 들면 다음과 같다.import kotlinx.coroutines.*
import kotlin.random.*
import kotlinx.coroutines.flow.*
fun randomNumbers(count: Int): Flow<Int> = flow {
for (i in 1..count) {
emit(Random.nextInt()) // Emit a random integer
delay(500) // Simulate some work
}
}
fun main() = runBlocking {
randomNumbers(5)
.collect { value -> println("Received: $value") }
}
이 코드는 Int의 흐름을 반환하는 randomNumbers라는 함수를 생성한다. 호출하면 본문은 emit() 함수를 사용해서 값을 반환한다. 자바의 스트림과 비슷하며 .collect() 호출은 종료다. 이렇게 하면 흐름에서 구성 가능한 리액티브 파이프라인을 만들 수 있다. 이 프로그래밍 모델은 매우 유연하면서 강력하다.
예를 들어 수를 두 배로 늘려 스트림에 다른 단계를 추가하려면 다른 함수 연산자를 만들 수 있다.
fun doubledNumbers(numbers: Flow<Int>): Flow<Int> = numbers.map { it * 2 }
다음과 같이 파이프라인에 추가한다.
fun main() = runBlocking {
val randomFlow = randomNumbers(5)
val doubledFlow = doubledNumbers(randomFlow)
evenOddFlow.collect { value -> println(value)}
}
val randomFlow = randomNumbers(5)
val doubledFlow = doubledNumbers(randomFlow)
evenOddFlow.collect { value -> println(value)}
}
결론
지금까지 코루틴을 사용해서 코틀린 동시성 모델의 가장 흥미로운 부분을 간략히 살펴봤다. 기초는 이해하기 쉬우며, 그 위에 많은 복잡한 고수준 기능을 쌓아 올릴 수 있다. 코틀린은 언어 자체에는 간단한 동시 프리미티브만 포함하고 나머지 기능을 제공하는 데는 주로 kotlinx.coroutines 및 그 하위 패키지의 함수를 사용한다. 이를 통해 유연성이 높아지고 애플리케이션과 라이브러리 코드에서 자세히 설명하기가 쉬워진다.전체적으로 코틀린의 동시성 지원은 인상적이며 세심하게 설계됐다. 이 언어가 자바의 인기 있는 대안인 이유를 어렵지 않게 이해할 수 있다.
editor@itworld.co.kr
Matthew Tyson editor@itworld.co.kr
저작권자 한국IDG & ITWorld, 무단 전재 및 재배포 금지
이 기사의 카테고리는 언론사의 분류를 따릅니다.
기사가 속한 카테고리는 언론사가 분류합니다.
언론사는 한 기사를 두 개 이상의 카테고리로 분류할 수 있습니다.
언론사는 한 기사를 두 개 이상의 카테고리로 분류할 수 있습니다.