Flow에서 사용할 수 있는 다양한 연산자들을 정리한다.
아래 나오는 연산자들은 Flow 나 데이터의 조작을 위한 것으로
따로 실행을 시키지 않는다면 아무런 동작도 시작하지 않는다.
( collect, launchIn 등등 )
개수 제한 연산자
(1..3).asFlow().take(2).collect{
println("$it")
}
take
연산자는 특정 개수까지만 데이터를 받고 싶을 때 사용하는 연산자이다.
지정한 개수까지만 방출한 후에 남은 데이터에 상관없이 Flow는 자동으로 취소된다.
숫자를 마이너스로 넣을 경우 에러가 발생한다.
버퍼 연산자
private fun getNumbers(): Flow<Int> = flow {
for (i in 1..5) {
delay(200)
emit(i)
}
}.flowOn(Dispatchers.IO)
getNumbers().collect {
delay(300)
}
result: 2580 ms
getNumbers().buffer().collect {
delay(300)
}
result: 1896 ms
Flow
가 같은 코루틴안에서 실행되면 방출과 사용이 순차적으로 일어나고
이러한 동작은 전체 실행시간을 느리게 만든다.
buffer
를 이용하면 방출과 사용이 별개의 코루틴에서 동작하도록 만들 수 있다.
데이터를 미리 생산해서 버퍼에 담아두고 바로 사용하기에 실행시간에 불필요한 딜레이가 사라진다.
굳이, buffer
를 사용하지 않아도 launchIn
으로 다른 Scope를 지정하여 실행시키거나,
flowOn
을 통해서 방출과 사용하는 부분의 Context를 다르게 한다면 동일한 효과가 나타난다.
첫 번째 collect
만 사용하였을 때 작업이 완료되는 시점은 2580ms 가 지난 시점으로
( 200 ( 방출 ) + 300 ( 사용 ) ) * 5개의 데이터만큼의 시간이 필요한 것을 확인했다.
두 번째 코드는 buffer
를 통해서 생산을 다른 코루틴에서 동작하도록 하였다.
완료 시점은 1896ms로 200 ( 첫 번째 아이템 방출 ) + 300 ( 사용 ) * 5 만큼의 시간만 필요한 것을 확인할 수 있다.
private fun getNumbers(): Flow<Int> = flow {
for (i in 1..5) {
delay(200)
emit(i)
println("Emit $i")
}
}
getNumbers().conflate().collect {
println("collect $it")
delay(500)
}
result:
Emit 1
collect 1
Emit 2
Emit 3
collect 3
Emit 4
Emit 5
collect 5
conflate
는 buffer
마찬가지로 방출과 사용을 다른 코루틴에서 동작하게 한다.
( 내부 코드에서 conflate
는 buffer(CONFLATED)
호출하므로 둘은 동일하다. )
buffer
와 다른 점은 모든 데이터를 버퍼에 넣어두고 하나씩 전달하지 않고,
최신 데이터만 버퍼에 넣어두고 전달한다.
위의 예제의 결과를 보면 소비 시점에서 가장 최신 데이터를 가져와 출력하는 것을 볼 수 있다.
변환 연산자
(1..3).asFlow().transform {
emit("Hello")
if( it != 2 ) emit(it)
}.collect {
println(it.toString())
}
result :
Hello
1
Hello
Hello
3
값을 원하는 형태로 변환하기 위해선 transform
를 사용하면 된다.
위의 예제처럼 여러 값을 생산하거나 값을 수정하거나 필터링할 수 도 있다.
(1..3).asFlow().scan(listOf<Int>()) { acc, value ->
acc + value
}.collect {
println(it.toString())
}
result :
[]
[1]
[1, 2]
[1, 2, 3]
(1..3).asFlow().scanReduce{ acc, value ->
acc + value
}.collect {
println(it.toString())
}
result :
1
3
6
실험 기능이지만 scan
과 scanReduce
를 사용할 수 도 있다.
각각 fold
와 reduce
와 동일하게 동작한다.
대신, 결과 데이터를 방출해주는 것이 다르다.
(1..3).asFlow().mapLatest {
delay(100)
it * 5
}.collect {
delay(300)
println(it)
}
result :
15
mapLatest
는 map
과 동일한 기능을 하지만 collectLatest
처럼 가장 최근의 값만을 방출한다.
이전에 방출된 값들은 다 무시하고, 가장 최근의 값만 방출한다.
위의 예제에선 1, 2의 값을 무시가 되고 가장 마지막인 3 만이 "it * 3" 로직을 통해 "15"로 방출되었다.
이 외에도
변환을 위한 map
, mapNotNull
거르기 위한 fliter
, filterNot
, filterIsInstance
, filterNotNull
인덱스를 얻기위한 withIndex
등의 기본적인 연산자도 지원하고 있다.
Flatten 연산자
Flow 내에서 어떤 처리의 결과가 Flow로 올 수가 있다.
이럴 때 응답 값으로 온 Flow 를 처리할 수 있는 함수를 제공한다.
fun getResult(index: Int): Flow<String> = flow {
emit("start $index")
delay(100)
emit("end $index")
}
(1..3).asFlow()
.flatMapConcat { getResult(it) }
.collect {
delay(300)
println(it)
}
result :
start 1
end 1
start 2
end 2
start 3
end 3
flatMapConcat
은 응답인 Flow 가 완료되어야 다음 단계를 순차적으로 진행한다.
(1..3).asFlow()
.flatMapMerge { getResult(it) }
.collect {
delay(300)
println(it)
}
result :
start 1
start 2
start 3
end 1
end 2
end 3
(1..3).asFlow()
.flatMapMerge(2) { getResult(it) }
....
result :
start 1
start 2
end 1
end 2
start 3
end 3
flatMapMerge
는 응답인 Flow의 완료 여부와 상관없이 다음 단계를 동시에 진행한다.
생성자에서 몇개까지 동시에 진행할지에 대한 개수를 지정할 수 있다.
첫 번째 예제를 보면 start 가 다 들어온 후 end 가 들어오는 것을 확인할 수 있다.
두 번째 예제는 2개 까지만 동시에 진행하도록 지정하였고
결과를 보면 1번 2번 만 동시에 진행되고 3번은 나중에 진행된 것을 확인할 수 있다.
(1..3).asFlow()
.flatMapLatest { getResult(it) }
.collect {
delay(300)
println(it)
}
result :
start 1
start 2
start 3
end 3
flatMapLatest
은 collectLatest
와 비슷한 성격을 가지고 있다.
새로운 응답이 들어오면 이전의 응답 데이터로 돌리던 작업을 취소하고 새로운 응답으로 작업을 진행한다.
결과를 보면 1,2,3의 start 가 들어오고,
1,2 end 는 딜레이 동안에 출력이 취소되고, 남은 3 end 만 출력된 것을 확인할 수 있다.
(1..3).asFlow()
.collect {
getResult(it).collect{
println(it)
}
}
굳이 위의 함수들을 사용하지 않아도 결과 Flow 를 사용하는데 문제는 없다.
대신, collect
내부에서 collect
를 호출해야 하는 지저분한 상황이 만들어진다...
'코틀린' 카테고리의 다른 글
이펙티브 코틀린 - 음..? (1) | 2022.03.29 |
---|---|
Kotlin Channel - 코루틴간 데이터 통신 (0) | 2020.07.30 |
Kotlin Flow #4 - onXXX 함수와 예외처리 (0) | 2020.07.11 |
Kotlin Flow #3 - Zip And Combine (0) | 2020.07.10 |
Kotlin Flow #1 - 기본 사용법 (1) | 2020.07.08 |