1. 코루틴 개념
- 모든 코루틴은 CouroutineScope 안에서 돌아감
- 코루틴의 핵심은 suspend
- 함수의 동작을 일시중단이 가능
- 코루틴 내에서 thread는 놀지않게함. 처리되는 쓰레드가 일시중단 될경우 다른 것들을 처리함
1.1 Builder
- 코루틴 함수들을 시작하는 지점이라고 보는 것이 옳음.
- 즉, 코루틴 함수 스택이 시작되는 지점.
1.1.1 launch
//구현부분
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
- 독립적으로 실행되는 새로운 coroutineScope를 생성
- Thread를 생성하는 것과 비슷함.
- 즉, 코루틴을 실행한 thread를 멈추지 않음.
fun main()
{
GlobalScope.launch {
delay(1000L)
println("aa")
}
}
//위와 같이 할 경우 "aa"는 실행되지 않음. 왜냐면 프로그램이 끝나버리니까.
- CoroutineScope의 확장함수
CoroutineScope.launch()
1.1.2 runBlocking
- 코루틴을 실행한 thread를 중단(blocking) 시킴
- 새로운 코루틴을 시작후 완료될 때까지 현재 스레드를 중단 가능한 상태로 블로킹
- dispatcher를 쓰면 다른 스레드에서 실행시킬수 있지만, 어쨌든 해당 스레드는 블로킹됨
- 프로그램이 끝나지 않도록 하는 목적으로만 쓰임(현재는 suspend와 runTest를 사용함…)
- Main함수 → suspend
- Unit test 함수 → runTest
<aside> 💡 runBlocking은 다른 코루틴의 자식이 될 수 없음.
</aside>
1.1.3 async
- launch와 동일하지만 값을 반환함.
- CoroutineScope의 확장함수
CoroutineScope.async()
1.2 CouroutineScope
- runBlocking과 같이 중단시킴
- 아직은 잘 모르겠으나 runBlocking보다 좋다고 함
- runBLocking은 실행한 현재 thead를 block 시키지만 얘는 어쨌든 코루틴의 블럭을 써서 메인 스레드를 중지시키지 않는 느낌
suspend fun main(): Unit = coroutineScope {
launch {
delpy(1000L)
println("world")
}
println("hello")
}
1.3 CoroutineContext
- 코루틴의 데이터를 저장하고 전달하는 객체
- 컬렉션으로 처리됨
- data object
- 코루틴의 부모자식 관게를 Context를 상속받는것임
- Job, CorutineName, CoroutineDispatcher와 같은 객체들의 모음임
1.3.1 컨텍스트 합치기
-
- 연산자를 지원하며 같은 내용은 덮어쓰기, 다른 내용은 추가됨
data class CoroutineName(
val name: String
) : AbstractCoroutineContextElement(CoroutineName)
{
override fun toString(): String = "CoroutineName($name)"
companion object Key : CoroutineContext.Key<CoroutineName>
}
fun main() {
val ctx1: CoroutineContext = CoroutineName("Name1")
println(ctx1[CoroutineName]?.name)
val ctx2: CoroutineContext = CoroutineName("Name2")
println(ctx2[CoroutineName]?.name)
val ctx3 = ctx1 + ctx2
println(ctx3[CoroutineName]?.name)
}
1.4 Job
- CoroutineContext에 포함된 인터페이스로써 state를 가짐
- 수명을 가지고 있으며 취소가능
- 모든 코루틴은 자신만의 job을 가지고 있으며, CoroutineContext에서 상속되지 않는 유일한 값임
- 기본적으로 active한 상태에서 시작하지만 옵셔널하게 lazy 시작을 위해 new 상태로 시작할 수 있음
- active 상태에서 job이 실행되고 코루틴은 Job을 실행할 수 있음
구분 내용
new | 시작 가능한 상태 |
active | JOB을 실행할 수 있는 상태 |
completing | 잡을 완료하고 자식들을 기다리는 상태 |
completed | 자식들도 완료된 상태 |
cancelling | 에러 후처리 상태 |
canceled | 에러 후처리 완료 |
1.4.1 Job Factory 함수
- 아래와 같이 Job을 코루틴 없이 생성후 할당할 수 있음
- 팩토리함수로 만들경우 자식들이 완료되도 completed 상태로 가지않고 active상태로 존재
- 계속 다른 코루틴에서 사용될 수 있기 때문에
- 따라서 아래와 같이 해줘야 됨
suspend fun main(): Unit = coroutineScope {
val job = Job()
launch(job) {
delay(1000)
println("Text 1")
}
launch(job) {
delay(1000)
println("Text 2")
}
job.children.forEach { it.join() }
}
1.4.2 Job의 취소
- 취소될경우 cancelling 상태로 변경되고 exception이 떨어짐
- 그래서 try catch로 잡기보단 finally로 후처리 되어야 할것들을 처리해주는게 좋음
suspend fun main(): Unit = coroutineScope {
val job = Job()
launch(job) {
try {
delay(1000)
println("Text 1")
}
finally {
println("finally")
launch {
println("will not be print")
}
withContext(NonCancellable) {
println("will be print")
}
}
}
job.cancelAndJoin()
println("done")
}
- 취소된 context 내에서는 코루틴을 추가로 실행될 수 없음… 단 withContext를 통해 처리가능
1.4.3 invokeOnCompletion
- completed 또는 canceled 되기전에 실행되어야 할 내용을 정의
job.invokeOnCompletion { cause: Throwable? ->
println("job completed with $cause")
}
1.5 예외처리
- 기본적으로 모든 예외는 부모-자식간으로 다 전파됨.
- 중단점이 없을경우 모든 코루틴에 에러가 전파되어 모든 코루틴이 취소됨.
1.5.1 supervisorJob()
- 기본적으로 자식에서 발생하는 예외를 무시시킴
suspend fun main(): Unit = runBlocking {
launch(SupervisorJob()) {
try {
delay(1000)
println("Text 1")
throw Exception("Exception 1")
}
finally {
println("finally")
launch {
println("will not be print")
}
}
}
launch(SupervisorJob()) {
delay(1000)
println("Text 2")
}
println("done")
}
//done
//Text 2
- supervisorJob은 단 하나의 자식만 가짐
- 따라서 아래와 같이 쓸경우 exception이 전파됨
suspend fun main(): Unit = runBlocking {
launch(SupervisorJob()) {
launch {
delay(1000)
throw Exception("exception")
}
launch() {
delay(2000)
println("출력 안됨")
}
}
delay(3000)
println("done")
}
// Exception.....
//done
- 코루틴 내에서 try - catch는 Exception을 핸들링 할 수 없음
(2) Flow
- stream 처리와 비슷
- 생성자 → 중간 → 소비자(collect)
- 중간
- filter
- map
fun main(): Unit = runBlocking(Dispatchers.Default) {
flow { // producer
repeat(times = 3) { data ->
delay(300L)
emit(data)
println("produced $data")
}
}.collect { data -> // consumer
delay(700L)
println("consumed $data")
}
}
(3) channel
val channel = lifecycleScope.produce<Int>{
repeat(5){
channel.send(20 + it)
}
}
lifecycleScope.launch {
for(temperature in channel) {
Log.d("TEST", "$temperature")
}
}
vs Flow
val flow: Flow<Int> = flow {
repeat(5){
emit(20 + it)
}
}
lifecycleScope.launch {
flow.collect { temperature ->
Log.d("TEST", "$temperature")
}
}
→ Chanel은 수신하기도 전에 보내기 시작. hot
→ Flow는 수신쪽에서 collect를 호출해야 보내기 시작. cold
(4) dispatcher
- 실제 thread에 코루틴을 실행시키는 것
- thread pool 같은 느낌이 없지는 않음.
- 각 플랫폼에 따라 정해진 값이 있음
- android에 경우 main, io, default 등
- 커스텀한 이름을 지정할 수 있음
(5) 에러처리
- 전파
- 자동으로 에러를 전파함.
- launch, actor
- 노출
- await()를 호출하기 전까지는 전파안됨
- async, produce
→ 위 두개의 차이로 인해 잘 핸들링 해야함.
→ 코루틴의 에러는 try - catch로 처리되지 않음. → 이미 전파됨.
- supervisorJon
- 상위로의 전파를 막음
suspend fun main() { // Child Job #2 CoroutineScope(Dispatchers.Default + coroutineExceptionHandler).launch { // Child Job #5 launch(SupervisorJob()) { delay(300) throw Exception("first coroutine Failed") }.join() // Child Job #6 launch(SupervisorJob()) { delay(400) println("second coroutine succeed") }.join() }.join() } Caught: first coroutine Failed second coroutine succeed Process finished with exit code 0
- supervisorScope
- 범위를 지정해서 쓸수도 있음
suspend fun main() {
// Child Job #2
CoroutineScope(Dispatchers.Default + coroutineExceptionHandler).launch {
supervisorScope {
// Child Job #5
launch {
delay(300)
throw Exception("first coroutine Failed")
}.join()
// Child Job #6
launch {
delay(400)
println("second coroutine succeed")
}.join()
}
}.join()
}
2. Coroutine in Vertx
(1) CoroutineVerticle()
- Verticle의 start, stop을 suspend 함수로 시작하게 해줌
- 또한 내부적으로 coroutine context를 vertx eventloop로 처리될 수 있도록 기본 dispatcher를 처리
- verticle 이외의 코루틴에 영향이 없도록 supervisorJob으로 error handling 처리함
- 아래는 코드 일부
abstract class CoroutineVerticle : Verticle, CoroutineScope {
private lateinit var vertxInstance: Vertx
private lateinit var context: Context
override val coroutineContext: CoroutineContext by lazy
{ context.dispatcher() + SupervisorJob() }
override fun init(vertx: Vertx, context: Context) {
this.vertxInstance = vertx
this.context = context
}
override fun getVertx(): Vertx = vertxInstance
override fun start(startFuture: Promise<Void>?) {
launch {
try {
start()
startFuture?.complete()
} catch (t: Throwable) {
startFuture?.fail(t)
}
}
}
override fun stop(stopFuture: Promise<Void>?) {
val job = coroutineContext.job
launch {
try {
stop()
stopFuture?.complete()
} catch (t: Throwable) {
stopFuture?.fail(t)
} finally {
job.cancel()
}
}
}
- vertx에서는 runBlocking을 쓰면 안됨.
대안
- 기본적으로 coroutineVerticle로 시작하면 모든 function을 suspend로 만들수 있음
- 그 외에 새로운 블럭을 만들어야 한다면
val result: Future<List<CardInfo>> = vertxFuture {
service.getCardRecommList(pageNo, pageLimit, cate_grp)
}