Flow 란?
코틀린 플로우는 suspend function을 보완하기 위해 나온 친구이다.
비동기 동작의 결과로 suspend function 이 하나의 결과물 던진다면,
플로우를 이용하여 여러 개의 결과를 원하는 형식으로 던질 수 있다.
특징
비동기이며 코루틴에서만 동작 가능한 것은 suspend function
과 동일하다.
다른 점은 함수 앞에 suspend
를 붙이지 않아도 된다.
cold stream ( kotlin의 sequence )으로 동작하며 hot stream 은 지원하지 않는다.
그렇기에 데이터는 요청할 때마다 처음부터 새로 발행되며,
요청 전에는 선언만 있을 뿐 아무런 동작도 하지 않는다.
직접 취소할 수 있는 함수는 따로 제공하지 않는다.
Flow 생성
우선 Flow를 생성하는 방법부터 알아보겠다.
private fun getNumbers(): Flow<Int> = flow {
for (i in 1..100) {
emit(i)
println("Emit $i") // 1 ~ 100 까지 방출
}
emitAll((101 .. 200).asFlow()) // 101 ~ 200 까지 방출
}
가장 간단한 방법으로 flow { ... }
를 통해서 생성이 가능하다.
데이터 전달을 위해서 emit
함수를 사용할 수 있으며,
emitAll
을 통해서 다른 flow의 데이터를 방출하는 것도 가능하다.
위의 예제처럼 작성하면 1~200까지 데이터를 전달하는 Flow를 전달하는 함수를 만들 수 있다.
(1..3).asFlow()
mapOf(Pair(3,4)).asIterable().asFlow()
asFlow
를 이용하면 array, iterator, range 등을 flow로 변환할 수 도 있다.
fun getStr(): String = "String"
fun getFlowStr(): Flow<String> = ::getStr.asFlow()
suspend fun getSuspendStr(): String = "String"
fun getFlowSuspendStr(): Flow<String> = ::getSuspendStr.asFlow()
일반 함수와, suspend 함수도 asFlow 함수를 위와 같이 사용하면 Flow로 변환할 수 있다.
( 대신, 단일 데이터 ( String, List <Int> 등 )를 방출하는 Flow 만 만들 수 있다. )
리스트를 반환하는 함수라고 리스트 아이템들을 순차적으로 방출해주지는 않는다.
flowOf(1, "213", Sample(5), 3, 3.0f)
flowOf
를 통해서 Flow를 생성하는 방법도 존재한다.
Flow에게 데이터 요청하기
지금부터 나오는 모든 코드는 코루틴 내부에서 실행되어야 한다.
가독성을 위해 다 생략했다.
데이터 요청을 위한 함수로는
collect
, collectIndexed
, collectLatest
,launchIn
, 터미널 연산자 등이 있다.
(1..3).asFlow().collect {
delay(100)
println("$it")
}
result:
1
2
3
collect
는 Flow의 데이터를 순차적으로 가져와 하나씩 처리한다.
(1..3).asFlow().collectIndexed {
delay(100)
println("$index $value")
}
result:
collect 0 1
collect 1 2
collect 2 3
collectIndexed
는 collect
와 동일하게 동작하며 index 값을 추가로 받아서 처리할 수 있다.
(1..3).asFlow().collectLatest {
delay(100)
println("$it")
}
result:
3
collectLatest
는 가장 최근의 데이터만 가져와서 처리한다.
기본적으로는 collect
와 동일하게 데이터들은 순차적으로 들어온다.
다른 점은 새로운 데이터가 들어왔을 경우에 현재 작업을 취소시키고
새로운 데이터와 함께 처음부터 다시 작업을 진행하는 점이다.
중간에 언제든지 작업이 취소될 수 있으므로 문제가 될 작업에는 사용하면 안 된다.
CoroutineScope(Dispatchers.Default).launch {
val io = CoroutineScope(Dispatchers.IO)
(1..3).asFlow().onEach {
println(it)
}.launchIn(io)
}
launchIn
은 특정 scope에서 동작을 시작하도록 할 수 있다.
해당 함수 내부에선 scope.launch { collect() }
코드를 호출한다.
데이터를 따로 처리할 수는 없으니
다음에 나올 연산자( ex) 예제의 onEach )들을 지정해 놓은 뒤 실행만을 목적으로 한다.
코드 내부에서 scope.launch를 사용하기에 job을 반환하며,
이를 통해 cancel이나 join을 하는 것이 가능하다.
(1..3).asFlow().reduce { ... }
(1..3).asFlow().fold { ... }
(1..3).asFlow().single { ... }
(1..3).asFlow().singleOrNull { ... }
(1..3).asFlow().first { ... }
(1..3).asFlow().firstOrNull { ... }
터미널 연산자들은 위의 정도만 구현되어 있으며,
다른 곳에서도 많이 볼 수 있는 터미널 연산자들이기에 따로 설명은 안한다.
Flow 취소하기
위에 특징에서 말했다시피 Flow는 취소와 관련된 함수는 따로 제공하지 않는다.
val job = launch {
(1..3).asFlow().collect {
println("$it")
}
}
...
job.cancel()
취소를 원한다면 예제와 같이 외부에 취소가 가능한 무언가로 감싸줘야만 한다.
launch , async 등이 적절한 예시이다.
위의 launchIn
함수가 job을 반환하니 이것을 활용해도 된다.
CoroutineContext 변경하기
(1..3).asFlow().flowOn(Dispatchers.IO)
.collect {
println("$it")
}
기존 코루틴이 withContext(Dispatchers.IO)
와 같은 코드로 Context를 변경했다면
Flow는 flowOn()
를 통해서 변경할 것을 권장하고 있다.
만약, 여러번 중복해서 사용한다면 먼저 사용한 것으로 적용이 됩니다.
물론 기존의 방식대로 Context를 변경해도 동작은 하지만 emit 을 하면 에러가 발생한다.
그래도 만든 놈이 잘못된 방법이라고 하니 주의하자.
마무리
기본적인 사용법을 살펴봤고,
Flow에서 사용할 수 있는 연산자들은 #2 에서는 정리하려 한다.
'코틀린' 카테고리의 다른 글
이펙티브 코틀린 - 음..? (1) | 2022.03.29 |
---|---|
Kotlin Channel - 코루틴간 데이터 통신 (0) | 2020.07.30 |
Kotlin Flow #4 - onXXX 함수와 예외처리 (0) | 2020.07.11 |
Kotlin Flow #3 - Zip And Combine (0) | 2020.07.10 |
Kotlin Flow #2 - 연산자들 (0) | 2020.07.08 |