본문 바로가기

코틀린

Kotlin Flow #1 - 기본 사용법

반응형

Flow 란?

코틀린 플로우는 suspend function을 보완하기 위해 나온 친구이다.


비동기 동작의 결과로 suspend function 이 하나의 결과물 던진다면,
플로우를 이용하여 여러 개의 결과를 원하는 형식으로 던질 수 있다.

특징

비동기이며 코루틴에서만 동작 가능한 것은 suspend function 과 동일하다.

다른 점은 함수 앞에 suspend 를 붙이지 않아도 된다.

 

cold stream ( kotlin의 sequence )으로 동작하며 hot stream 은 지원하지 않는다.

그렇기에 데이터는 요청할 때마다 처음부터 새로 발행되며,

요청 전에는 선언만 있을 뿐 아무런 동작도 하지 않는다.

 

직접 취소할 수 있는 함수는 따로 제공하지 않는다.

Flow 생성

우선 Flow를 생성하는 방법부터 알아보겠다.

private fun getNumbers(): Flow<Int> = flow {
    for (i in 1..100) {
        emit(i)
        println("Emit $i") // 1 ~ 100 까지 방출
    }
     emitAll((101 .. 200).asFlow()) // 101 ~ 200 까지 방출
}

가장 간단한 방법으로 flow { ... } 를 통해서 생성이 가능하다.

 

데이터 전달을 위해서 emit 함수를 사용할 수 있으며,

emitAll 을 통해서 다른 flow의 데이터를 방출하는 것도 가능하다.

 

위의 예제처럼 작성하면 1~200까지 데이터를 전달하는 Flow를 전달하는 함수를 만들 수 있다.

(1..3).asFlow()
mapOf(Pair(3,4)).asIterable().asFlow()

asFlow 를 이용하면 array, iterator, range 등을 flow로 변환할 수 도 있다.

fun getStr(): String = "String"
fun getFlowStr(): Flow<String> = ::getStr.asFlow()

suspend fun getSuspendStr(): String = "String"
fun getFlowSuspendStr(): Flow<String> = ::getSuspendStr.asFlow()

일반 함수와, suspend 함수도 asFlow 함수를 위와 같이 사용하면 Flow로 변환할 수 있다.

( 대신, 단일 데이터 ( String, List <Int> 등 )를 방출하는 Flow 만 만들 수 있다. )

리스트를 반환하는 함수라고 리스트 아이템들을 순차적으로 방출해주지는 않는다.

flowOf(1, "213", Sample(5), 3, 3.0f)

flowOf 를 통해서 Flow를 생성하는 방법도 존재한다.

Flow에게 데이터 요청하기

지금부터 나오는 모든 코드는 코루틴 내부에서 실행되어야 한다.
가독성을 위해 다 생략했다.

데이터 요청을 위한 함수로는

collect, collectIndexed, collectLatest ,launchIn , 터미널 연산자 등이 있다.

(1..3).asFlow().collect {
    delay(100)
    println("$it")
}

result:
1
2
3

collect 는 Flow의 데이터를 순차적으로 가져와 하나씩 처리한다.

(1..3).asFlow().collectIndexed {
    delay(100)
      println("$index $value")
}

result:
collect 0 1
collect 1 2
collect 2 3

collectIndexedcollect 와 동일하게 동작하며 index 값을 추가로 받아서 처리할 수 있다.

(1..3).asFlow().collectLatest {
    delay(100)
    println("$it")
}

result:
3

collectLatest 는 가장 최근의 데이터만 가져와서 처리한다.

 

기본적으로는 collect 와 동일하게 데이터들은 순차적으로 들어온다.

다른 점은 새로운 데이터가 들어왔을 경우에 현재 작업을 취소시키고
새로운 데이터와 함께 처음부터 다시 작업을 진행하는 점이다.

 

중간에 언제든지 작업이 취소될 수 있으므로 문제가 될 작업에는 사용하면 안 된다.

 CoroutineScope(Dispatchers.Default).launch {
    val io = CoroutineScope(Dispatchers.IO)
    (1..3).asFlow().onEach {
        println(it)
    }.launchIn(io)
}

launchIn 은 특정 scope에서 동작을 시작하도록 할 수 있다.

해당 함수 내부에선 scope.launch { collect() } 코드를 호출한다.

 

데이터를 따로 처리할 수는 없으니

다음에 나올 연산자( ex) 예제의 onEach )들을 지정해 놓은 뒤 실행만을 목적으로 한다.

 

코드 내부에서 scope.launch를 사용하기에 job을 반환하며,
이를 통해 cancel이나 join을 하는 것이 가능하다.

(1..3).asFlow().reduce { ... }
(1..3).asFlow().fold { ... }
(1..3).asFlow().single { ... }
(1..3).asFlow().singleOrNull { ... }
(1..3).asFlow().first { ... }
(1..3).asFlow().firstOrNull { ... }

터미널 연산자들은 위의 정도만 구현되어 있으며,

다른 곳에서도 많이 볼 수 있는 터미널 연산자들이기에 따로 설명은 안한다.

 

 

Flow 취소하기

위에 특징에서 말했다시피 Flow는 취소와 관련된 함수는 따로 제공하지 않는다.

val job = launch {
    (1..3).asFlow().collect {
        println("$it")
    }
}
...
job.cancel()

취소를 원한다면 예제와 같이 외부에 취소가 가능한 무언가로 감싸줘야만 한다.

launch , async 등이 적절한 예시이다.

 

위의 launchIn 함수가 job을 반환하니 이것을 활용해도 된다.

CoroutineContext 변경하기

(1..3).asFlow().flowOn(Dispatchers.IO)
    .collect {
        println("$it")
    }

기존 코루틴이 withContext(Dispatchers.IO) 와 같은 코드로 Context를 변경했다면

Flow는 flowOn() 를 통해서 변경할 것을 권장하고 있다.

만약, 여러번 중복해서 사용한다면 먼저 사용한 것으로 적용이 됩니다.

 

물론 기존의 방식대로 Context를 변경해도 동작은 하지만 emit 을 하면 에러가 발생한다.

그래도 만든 놈이 잘못된 방법이라고 하니 주의하자.

마무리

기본적인 사용법을 살펴봤고,

Flow에서 사용할 수 있는 연산자들은 #2 에서는 정리하려 한다.

반응형