본문 바로가기
Kotlin/Coroutine

(Coroutine) 7. Flow Basics

by jaesungLeee 2022. 11. 2.

1. Data Stream

코루틴에서 Flow는 하나의 단일 값만 반환할 수 있는 suspend function과는 달리 여러 값을 순차적으로 내보낼 수 있는 즉, 데이터 스트림을 구현하기 위한 라이브러리이다.

예를 들어, 데이터베이스의 업데이트 내용들을 Flow를 통해 라이브하게 얻을 수 있다.

 

여기서 설명하는 데이터 스트림은 크게 3가지 항목으로 구성된다.

 

 

Producer (생산자)

Producer는 데이터 스트림에 추가되는 데이터를 생산하는 역할을 한다.

Flow를 이용해 비동기적으로 데이터를 생성할 수 있다.

 

Intermediary (중개자)

데이터 스트림을 통해 내보내는 각각의 값이나 스트림 자체를 수정하는 역할을 한다.

Optional하기 때문에 반드시 필요한 것은 아니다.

 

Consumer (소비자)

데이터 스트림을 사용하는 주체로 볼 수 있다.

 

2. Flow basic

앞서 설명한 것 처럼 Flow는 코루틴을 기반으로 비동기 데이터 스트림을 제어하는데 사용할 수 있다.

 

아래 예시를 살펴보자.

 

fun doSomething(): Flow<Int> = flow {
    repeat(5) {
        emit(Random.nextInt(0, 500))
        delay(10L)
    }
}

fun main() = runBlocking {
    doSomething().collect { value ->
        println(value)
    }
}

>> 104
>> 336
>> 210
>> 289
>> 79

 

해당 예시에서 doSomething은 0부터 500까지의 난수를 10ms 동안 지연하면서 10번동안 emit하고 있다.

 

위 예시에서 사용하는 runBlocking 처럼 flow를 통해 Flow Builder를 만들 수 있다.

또한, doSomething은 Int 타입의 Flow인 스트림을 반환하고 있다.

즉, Flow가 반환하는 타입은 Flow<T> 형태의 객체임을 알 수 있다.

이 Int 타입의 스트림은 emit을 통해 방출된다.

이 emit된 값들을 받기 위해서는 collect를 사용해야 한다.

 

중요한 점은 Flow는 Cold Stream 이기 때문에 요청이 있을 경우에만 값을 전달한다.

collect와 같은 함수를 통해 스트림을 소비한다는 요청을 해야지만 Producer에서 값을 하나씩 흘려보낸다.

 

3. withTimeOutOrNull을 이용한 Flow 취소

앞선 포스팅에서 코루틴 취소를 위해 withTimeOutOrNull을 사용한 예시를 보았었다.

Flow도 마찬가지로 withTimeOut 또는 withTimeOutOrNull을 통해 취소를 할 수 있다.

 

아래 예시를 살펴보자.

 

private fun doSomething(): Flow<Int> = flow {
    repeat(10) {
        emit(Random.nextInt(0, 500))
        delay(1000L)
    }
}

fun main() = runBlocking {
    val result = withTimeoutOrNull(5000L) { // this : CoroutineScope
        doSomething().collect { value ->
            println("$value with $this")
        }
        true
    } ?: false

    if (!result) {
        println("Cancelled!")
    }
}

>> 238 with TimeoutCoroutine(timeMillis=5000){Active}@41a4555e
>> 379 with TimeoutCoroutine(timeMillis=5000){Active}@41a4555e
>> 269 with TimeoutCoroutine(timeMillis=5000){Active}@41a4555e
>> 281 with TimeoutCoroutine(timeMillis=5000){Active}@41a4555e
>> 331 with TimeoutCoroutine(timeMillis=5000){Active}@41a4555e
>> Cancelled!

 

withTimeOutOrNull은 일정 시간이 지난 뒤 cancel하면서 null을 반환한다는 특징이 있다.

위 예시처럼 Flow에서도 동일하게 cancel이 이루어진 것을 확인할 수 있다.

 

private fun doSomething(): Flow<Int> = flow {
    repeat(10) {
        emit(Random.nextInt(0, 500))
        delay(1000L)
    }
}

fun main() = runBlocking {
    withTimeout(5000L) {
        try {
            doSomething().collect { value ->
                println("$value with $this")
            }
        } catch (e: TimeoutCancellationException) {
            println(e.message)
        } finally {
            println("Cancelled")
        }
    }
}

>> 388 with TimeoutCoroutine(timeMillis=5000){Active}@7637f22
>> 344 with TimeoutCoroutine(timeMillis=5000){Active}@7637f22
>> 220 with TimeoutCoroutine(timeMillis=5000){Active}@7637f22
>> 234 with TimeoutCoroutine(timeMillis=5000){Active}@7637f22
>> 62 with TimeoutCoroutine(timeMillis=5000){Active}@7637f22
>> Timed out waiting for 5000 ms
>> Cancelled
>> Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 5000 ms

 

withTimeOut은 withTimeOutOrNull과는 달리 따로 TimeoutCancellationException에 대해 try..catch를 통한 예외 처리를 해주어야 한다는 특징이 있다.

마찬가지로 Flow에 대한 취소도 가능한 것을 확인할 수 있다.

4. Flow Builder - flow, flowOf, asFlow

코루틴에서도 여러가지 빌더들이 있었다.

runBlocking, launch, async와 같은 코루틴 빌더로 코루틴을 사용할 수 있었는데 Flow에서도 유사하게 Flow 빌더가 있다.

 

앞서 살펴본 예시에서는 flow를 사용해 스트림을 만들었다.

flow 이외에도 flowOf를 사용할 수 있다.

 

flowOf는 flow와는 달리 여러 값을 인자로 전달해 스트림을 만든다.

즉, flow와 flowOf는 본질적으로는 동일한 형태의 스트림을 만드는 Flow Builder이지만 약간의 방식의 차이점은 있다.

 

// Flow Builder - flowOf
fun main() = runBlocking<Unit> {
    flowOf(1, 2, 3, 4, 5).collect() { value ->
        println(value)
    }
}

>> 1
>> 2
>> 3
>> 4
>> 5

// Flow Builder - flow
fun main() = runBlocking {
    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
        emit(5)
    }.collect { value -> println(value) }
}

>> 1
>> 2
>> 3
>> 4
>> 5

 

asFlow는 컬렉션이나 시퀀스를 전달하여, 해당 객체를 구성하는 값들을 스트림형태로 반환한다.

 

fun main() = runBlocking {
    // Collection
    listOf(1, 2, 3, 4, 5).asFlow().collect {
        println(it)
    }

    // Sequence
    (6..10).asFlow().collect() {
        println(it)
    }
}

>> 1
>> 2
>> 3
>> 4
>> 5
>> 6
>> 7
>> 8
>> 9
>> 10

 

Reference

https://kotlinlang.org/docs/flow.html

 

 

 

 

 

'Kotlin > Coroutine' 카테고리의 다른 글

(Coroutine) 6. CoroutineExceptionHandler, SupervisorJob  (0) 2022.11.02
(Coroutine) 5. Scope  (0) 2022.11.01
(Coroutine) 4. CoroutineContext, Dispatcher  (0) 2022.09.14
(Coroutine) 3. async  (0) 2022.09.14
(Coroutine) 2. Light weight Thread, Job  (0) 2022.09.12