본문 바로가기

코틀린

Kotlin Flow #2 - 연산자들

반응형

Flow에서 사용할 수 있는 다양한 연산자들을 정리한다.

 

아래 나오는 연산자들은 Flow 나 데이터의 조작을 위한 것으로

따로 실행을 시키지 않는다면 아무런 동작도 시작하지 않는다.

( collect, launchIn 등등 )

개수 제한 연산자

(1..3).asFlow().take(2).collect{
    println("$it")
}

take 연산자는 특정 개수까지만 데이터를 받고 싶을 때 사용하는 연산자이다.

지정한 개수까지만 방출한 후에 남은 데이터에 상관없이 Flow는 자동으로 취소된다.

 

숫자를 마이너스로 넣을 경우 에러가 발생한다.

버퍼 연산자

private fun getNumbers(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(200)
        emit(i)
    }
}.flowOn(Dispatchers.IO)

getNumbers().collect {
    delay(300)
}

result: 2580 ms

getNumbers().buffer().collect {
    delay(300)
}

result: 1896 ms

Flow 가 같은 코루틴안에서 실행되면 방출과 사용이 순차적으로 일어나고

이러한 동작은 전체 실행시간을 느리게 만든다.

 

buffer 를 이용하면 방출과 사용이 별개의 코루틴에서 동작하도록 만들 수 있다.

데이터를 미리 생산해서 버퍼에 담아두고 바로 사용하기에 실행시간에 불필요한 딜레이가 사라진다.

 

굳이, buffer 를 사용하지 않아도 launchIn 으로 다른 Scope를 지정하여 실행시키거나,

flowOn 을 통해서 방출과 사용하는 부분의 Context를 다르게 한다면 동일한 효과가 나타난다.

 

첫 번째 collect 만 사용하였을 때 작업이 완료되는 시점은 2580ms 가 지난 시점으로

( 200 ( 방출 ) + 300 ( 사용 ) ) * 5개의 데이터만큼의 시간이 필요한 것을 확인했다.

 

두 번째 코드는 buffer 를 통해서 생산을 다른 코루틴에서 동작하도록 하였다.

완료 시점은 1896ms로 200 ( 첫 번째 아이템 방출 ) + 300 ( 사용 ) * 5 만큼의 시간만 필요한 것을 확인할 수 있다.

private fun getNumbers(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(200)
        emit(i)
        println("Emit $i")
    }
}

getNumbers().conflate().collect {
    println("collect $it")
    delay(500)
}

result:
Emit 1
collect 1
Emit 2
Emit 3
collect 3
Emit 4
Emit 5
collect 5

conflatebuffer 마찬가지로 방출과 사용을 다른 코루틴에서 동작하게 한다.

( 내부 코드에서 conflatebuffer(CONFLATED) 호출하므로 둘은 동일하다. )

 

buffer 와 다른 점은 모든 데이터를 버퍼에 넣어두고 하나씩 전달하지 않고,

최신 데이터만 버퍼에 넣어두고 전달한다.

 

위의 예제의 결과를 보면 소비 시점에서 가장 최신 데이터를 가져와 출력하는 것을 볼 수 있다.

변환 연산자

(1..3).asFlow().transform {
    emit("Hello")
    if( it != 2 ) emit(it)
}.collect {
    println(it.toString())
}

result :
Hello
1
Hello
Hello
3

값을 원하는 형태로 변환하기 위해선 transform 를 사용하면 된다.

위의 예제처럼 여러 값을 생산하거나 값을 수정하거나 필터링할 수 도 있다.

(1..3).asFlow().scan(listOf<Int>()) { acc, value ->
    acc + value
}.collect {
    println(it.toString())
}

result :
[]
[1]
[1, 2]
[1, 2, 3]

(1..3).asFlow().scanReduce{ acc, value ->
    acc + value
}.collect {
    println(it.toString())
}

result :
1
3
6

실험 기능이지만 scanscanReduce 를 사용할 수 도 있다.

 

각각 foldreduce 와 동일하게 동작한다.

대신, 결과 데이터를 방출해주는 것이 다르다.

(1..3).asFlow().mapLatest {
        delay(100)
        it * 5
    }.collect {
        delay(300)
        println(it)
    }

result : 
15

mapLatestmap 과 동일한 기능을 하지만 collectLatest 처럼 가장 최근의 값만을 방출한다.

이전에 방출된 값들은 다 무시하고, 가장 최근의 값만 방출한다.

 

위의 예제에선 1, 2의 값을 무시가 되고 가장 마지막인 3 만이 "it * 3" 로직을 통해 "15"로 방출되었다.

 

이 외에도
변환을 위한 map, mapNotNull 

거르기 위한 fliter, filterNot , filterIsInstance, filterNotNull

인덱스를 얻기위한 withIndex
등의 기본적인 연산자도 지원하고 있다.

Flatten 연산자

Flow 내에서 어떤 처리의 결과가 Flow로 올 수가 있다.

이럴 때 응답 값으로 온 Flow 를 처리할 수 있는 함수를 제공한다.

fun getResult(index: Int): Flow<String> = flow {
    emit("start $index")
    delay(100)
    emit("end $index")
}

(1..3).asFlow()
    .flatMapConcat { getResult(it) }
    .collect {
        delay(300)
        println(it)
    }

result :
start 1
end 1
start 2
end 2
start 3
end 3

flatMapConcat 은 응답인 Flow 가 완료되어야 다음 단계를 순차적으로 진행한다.

(1..3).asFlow()
    .flatMapMerge { getResult(it) }
    .collect {
        delay(300)
        println(it)
    }

result :
start 1
start 2
start 3
end 1
end 2
end 3

(1..3).asFlow()
    .flatMapMerge(2) { getResult(it) } 
    ....

result :
start 1
start 2
end 1
end 2
start 3
end 3 

flatMapMerge 는 응답인 Flow의 완료 여부와 상관없이 다음 단계를 동시에 진행한다.

생성자에서 몇개까지 동시에 진행할지에 대한 개수를 지정할 수 있다.

 

첫 번째 예제를 보면 start 가 다 들어온 후 end 가 들어오는 것을 확인할 수 있다.

 

두 번째 예제는 2개 까지만 동시에 진행하도록 지정하였고

결과를 보면 1번 2번 만 동시에 진행되고 3번은 나중에 진행된 것을 확인할 수 있다.

(1..3).asFlow()
    .flatMapLatest { getResult(it) }
    .collect {
        delay(300)
        println(it)
    }

result : 
start 1
start 2
start 3
end 3

flatMapLatestcollectLatest 와 비슷한 성격을 가지고 있다.

새로운 응답이 들어오면 이전의 응답 데이터로 돌리던 작업을 취소하고 새로운 응답으로 작업을 진행한다.

 

결과를 보면 1,2,3의 start 가 들어오고,

1,2 end 는 딜레이 동안에 출력이 취소되고, 남은 3 end 만 출력된 것을 확인할 수 있다.

(1..3).asFlow()
    .collect {
        getResult(it).collect{
            println(it)
        }
    }

굳이 위의 함수들을 사용하지 않아도 결과 Flow 를 사용하는데 문제는 없다.

대신, collect 내부에서 collect 를 호출해야 하는 지저분한 상황이 만들어진다...

반응형