1. 코루틴 개념

  • 모든 코루틴은 CouroutineScope 안에서 돌아감
  • 코루틴의 핵심은 suspend
    • 함수의 동작을 일시중단이 가능
  • 코루틴 내에서 thread는 놀지않게함. 처리되는 쓰레드가 일시중단 될경우 다른 것들을 처리함

1.1 Builder

  • 코루틴 함수들을 시작하는 지점이라고 보는 것이 옳음.
  • 즉, 코루틴 함수 스택이 시작되는 지점.

1.1.1 launch

//구현부분
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
  • 독립적으로 실행되는 새로운 coroutineScope를 생성
  • Thread를 생성하는 것과 비슷함.
    • 즉, 코루틴을 실행한 thread를 멈추지 않음.
fun main()
{
  GlobalScope.launch {
      delay(1000L)
      println("aa")
  }
}

//위와 같이 할 경우 "aa"는 실행되지 않음. 왜냐면 프로그램이 끝나버리니까.
  • CoroutineScope의 확장함수
CoroutineScope.launch()

1.1.2 runBlocking

  • 코루틴을 실행한 thread를 중단(blocking) 시킴
    • 새로운 코루틴을 시작후 완료될 때까지 현재 스레드를 중단 가능한 상태로 블로킹
    • dispatcher를 쓰면 다른 스레드에서 실행시킬수 있지만, 어쨌든 해당 스레드는 블로킹됨
  • 프로그램이 끝나지 않도록 하는 목적으로만 쓰임(현재는 suspend와 runTest를 사용함…)
    • Main함수 → suspend
    • Unit test 함수 → runTest

<aside> 💡 runBlocking은 다른 코루틴의 자식이 될 수 없음.

</aside>

1.1.3 async

  • launch와 동일하지만 값을 반환함.
  • CoroutineScope의 확장함수
CoroutineScope.async()

1.2 CouroutineScope

  • runBlocking과 같이 중단시킴
    • 아직은 잘 모르겠으나 runBlocking보다 좋다고 함
    • runBLocking은 실행한 현재 thead를 block 시키지만 얘는 어쨌든 코루틴의 블럭을 써서 메인 스레드를 중지시키지 않는 느낌
suspend fun main(): Unit = coroutineScope {
  launch {
    delpy(1000L)
    println("world")
  }
  println("hello")
}

1.3 CoroutineContext

  • 코루틴의 데이터를 저장하고 전달하는 객체
    • 컬렉션으로 처리됨
    • data object
  • 코루틴의 부모자식 관게를 Context를 상속받는것임
  • Job, CorutineName, CoroutineDispatcher와 같은 객체들의 모음임

1.3.1 컨텍스트 합치기

    • 연산자를 지원하며 같은 내용은 덮어쓰기, 다른 내용은 추가됨
data class CoroutineName(
    val name: String
) : AbstractCoroutineContextElement(CoroutineName)
{
    override fun toString(): String = "CoroutineName($name)"

    companion object Key : CoroutineContext.Key<CoroutineName>
}

fun main() {
    val ctx1: CoroutineContext = CoroutineName("Name1")
    println(ctx1[CoroutineName]?.name)

    val ctx2: CoroutineContext = CoroutineName("Name2")
    println(ctx2[CoroutineName]?.name)

    val ctx3 = ctx1 + ctx2
    println(ctx3[CoroutineName]?.name)
}

1.4 Job

  • CoroutineContext에 포함된 인터페이스로써 state를 가짐
  • 수명을 가지고 있으며 취소가능
  • 모든 코루틴은 자신만의 job을 가지고 있으며, CoroutineContext에서 상속되지 않는 유일한 값임
  • 기본적으로 active한 상태에서 시작하지만 옵셔널하게 lazy 시작을 위해 new 상태로 시작할 수 있음
  • active 상태에서 job이 실행되고 코루틴은 Job을 실행할 수 있음

구분 내용

new 시작 가능한 상태
active JOB을 실행할 수 있는 상태
completing 잡을 완료하고 자식들을 기다리는 상태
completed 자식들도 완료된 상태
cancelling 에러 후처리 상태
canceled 에러 후처리 완료

1.4.1 Job Factory 함수

  • 아래와 같이 Job을 코루틴 없이 생성후 할당할 수 있음
  • 팩토리함수로 만들경우 자식들이 완료되도 completed 상태로 가지않고 active상태로 존재
    • 계속 다른 코루틴에서 사용될 수 있기 때문에
  • 따라서 아래와 같이 해줘야 됨
suspend fun main(): Unit = coroutineScope {
    val job = Job()
    launch(job) {
        delay(1000)
        println("Text 1")
    }
    launch(job) {
        delay(1000)
        println("Text 2")
    }

    job.children.forEach { it.join() }
}

1.4.2 Job의 취소

  • 취소될경우 cancelling 상태로 변경되고 exception이 떨어짐
  • 그래서 try catch로 잡기보단 finally로 후처리 되어야 할것들을 처리해주는게 좋음
suspend fun main(): Unit = coroutineScope {
    val job = Job()
    launch(job) {
        try {
            delay(1000)
            println("Text 1")
        }
        finally {
            println("finally")

            launch {
                println("will not be print")
            }

            withContext(NonCancellable) {
                println("will be print")
            }
        }
    }

    job.cancelAndJoin()
    println("done")
}
  • 취소된 context 내에서는 코루틴을 추가로 실행될 수 없음… 단 withContext를 통해 처리가능

1.4.3 invokeOnCompletion

  • completed 또는 canceled 되기전에 실행되어야 할 내용을 정의
job.invokeOnCompletion { cause: Throwable? ->
        println("job completed with $cause")
    }

1.5 예외처리

  • 기본적으로 모든 예외는 부모-자식간으로 다 전파됨.
  • 중단점이 없을경우 모든 코루틴에 에러가 전파되어 모든 코루틴이 취소됨.

1.5.1 supervisorJob()

  • 기본적으로 자식에서 발생하는 예외를 무시시킴
suspend fun main(): Unit = runBlocking {

    launch(SupervisorJob()) {
        try {
            delay(1000)
            println("Text 1")

            throw Exception("Exception 1")
        }
        finally {
            println("finally")

            launch {
                println("will not be print")
            }
        }
    }

    launch(SupervisorJob()) {
        delay(1000)
        println("Text 2")
    }
    println("done")
}

//done
//Text 2
  • supervisorJob은 단 하나의 자식만 가짐
    • 따라서 아래와 같이 쓸경우 exception이 전파됨
suspend fun main(): Unit = runBlocking {

    launch(SupervisorJob()) {
        launch {
            delay(1000)
            throw Exception("exception")
        }

        launch() {
            delay(2000)
            println("출력 안됨")
        }
    }
    delay(3000)
    println("done")
}
// Exception.....
//done
  • 코루틴 내에서 try - catch는 Exception을 핸들링 할 수 없음

(2) Flow

  • stream 처리와 비슷
  • 생성자 → 중간 → 소비자(collect)
  • 중간
    • filter
    • map
fun main(): Unit = runBlocking(Dispatchers.Default) {
    flow { // producer
        repeat(times = 3) { data ->
            delay(300L)
            emit(data)
            println("produced $data")
        }
    }.collect { data -> // consumer
        delay(700L)
        println("consumed $data")
    }
}

(3) channel

val channel = lifecycleScope.produce<Int>{
	repeat(5){
		channel.send(20 + it)
	}
}

lifecycleScope.launch {
	for(temperature in channel) {
		Log.d("TEST", "$temperature")
	}
}

vs Flow

val flow: Flow<Int> = flow {
	repeat(5){
		emit(20 + it)
	}
}

lifecycleScope.launch {
	flow.collect { temperature ->
		Log.d("TEST", "$temperature")
	}
}

→ Chanel은 수신하기도 전에 보내기 시작. hot

→ Flow는 수신쪽에서 collect를 호출해야 보내기 시작. cold

(4) dispatcher

  • 실제 thread에 코루틴을 실행시키는 것
    • thread pool 같은 느낌이 없지는 않음.
  • 각 플랫폼에 따라 정해진 값이 있음
    • android에 경우 main, io, default 등
    • 커스텀한 이름을 지정할 수 있음

(5) 에러처리

  • 전파
    • 자동으로 에러를 전파함.
    • launch, actor
  • 노출
    • await()를 호출하기 전까지는 전파안됨
    • async, produce

→ 위 두개의 차이로 인해 잘 핸들링 해야함.

→ 코루틴의 에러는 try - catch로 처리되지 않음. → 이미 전파됨.

  • supervisorJon
    • 상위로의 전파를 막음
    suspend fun main() {
        // Child Job #2
        CoroutineScope(Dispatchers.Default + coroutineExceptionHandler).launch {
            // Child Job #5
            launch(SupervisorJob()) {
                delay(300)
                throw Exception("first coroutine Failed")
            }.join()
            // Child Job #6
            launch(SupervisorJob()) {
                delay(400)
                println("second coroutine succeed")
            }.join()
        }.join()
    }
    
    Caught: first coroutine Failed
    second coroutine succeed
    
    Process finished with exit code 0
    
  • supervisorScope
    • 범위를 지정해서 쓸수도 있음
suspend fun main() {
    // Child Job #2
    CoroutineScope(Dispatchers.Default + coroutineExceptionHandler).launch {
        supervisorScope {
            // Child Job #5
            launch {
                delay(300)
                throw Exception("first coroutine Failed")
            }.join()
            // Child Job #6
            launch {
                delay(400)
                println("second coroutine succeed")
            }.join()
        }
    }.join()
}

2. Coroutine in Vertx

(1) CoroutineVerticle()

  • Verticle의 start, stop을 suspend 함수로 시작하게 해줌
    • 또한 내부적으로 coroutine context를 vertx eventloop로 처리될 수 있도록 기본 dispatcher를 처리
    • verticle 이외의 코루틴에 영향이 없도록 supervisorJob으로 error handling 처리함
  • 아래는 코드 일부
abstract class CoroutineVerticle : Verticle, CoroutineScope {

  private lateinit var vertxInstance: Vertx
  private lateinit var context: Context

  override val coroutineContext: CoroutineContext by lazy 
      { context.dispatcher() + SupervisorJob() }

  override fun init(vertx: Vertx, context: Context) {
    this.vertxInstance = vertx
    this.context = context
  }

  override fun getVertx(): Vertx = vertxInstance

  override fun start(startFuture: Promise<Void>?) {
    launch {
      try {
        start()
        startFuture?.complete()
      } catch (t: Throwable) {
        startFuture?.fail(t)
      }
    }
  }

  override fun stop(stopFuture: Promise<Void>?) {
    val job = coroutineContext.job
    launch {
      try {
        stop()
        stopFuture?.complete()
      } catch (t: Throwable) {
        stopFuture?.fail(t)
      } finally {
        job.cancel()
      }
    }
  }
  • vertx에서는 runBlocking을 쓰면 안됨.

대안

  • 기본적으로 coroutineVerticle로 시작하면 모든 function을 suspend로 만들수 있음
  • 그 외에 새로운 블럭을 만들어야 한다면
val result: Future<List<CardInfo>> = vertxFuture {
      service.getCardRecommList(pageNo, pageLimit, cate_grp)
    }

+ Recent posts