본문 바로가기

코틀린

Kotlin Channel - 코루틴간 데이터 통신

반응형

Channel 이란?

앞에서 본 Flow가 단일방향으로 데이터를 던지고 받는 형식이라면

Channel 은 여러 방향에서 데이터를 던지고 받는 형식으로 코루틴 끼리의 데이터를 전달하기 위한 위한 친구이다.

 

구조는 BlockingQueue 와 비슷하며, 동일하게 ThreadSafe 한 형태의 구조를 가지고 있다.

기본적인 사용법

기본적인 사용법은 간단하다.

val channel = Channel<Int>()

CoroutineScope(Dispatchers.Default).launch {
    channel.send(it)
    channel.receive()
}

channel.close()

Channel<...>() 함수를 통해 생성할 수 있으며,

데이터를 스트림에 밀어 넣을 땐 send, 스트림에서 받을 땐 receive 를 사용하면 된다.

 

생성 위치는 상관없지만 sendreceive 는 suspend 함수이기 때문에 코루틴 내부에서 호출되어야 한다.

 

채널에 더 이상 아무 데이터도 보내거나 받지 않는다면 채널을 종료시켜야 한다.

 

close 함수를 통해서 종료시킬 수 있으며,

종료 이후에 send, receive 함수는 ClosedReceiveChannelException 을 발생시킨다.

파이프라인

파이프라인은 채널을 생성하는 패턴으로

하나의 코루틴이 초기 데이터를 생성하고, 소비하는 곳에서 받은 후 새로운 데이터를 생성하는 흐름을 말한다.

 

produce<...>{ ... } 함수를 통하여 만들 수 있다.

val numbers = produce<Int> { for (x in 0 .. Int.MAX_VALUE) send(x) }
val numXnum = produce { for (x in numbers) send(x * x) }

사용법은 위와 같으며 결과물로는 0 ~ MAX_VALUE 까지 데이터를 방출하고 그 곱을 방출하는 채널이 만들어진다.

 

응답값으로 ReceiveChannel 가 반환되며, 해당 채널은 데이터를 추가적으로 보내는 것 ( send ) 은 불가능하고
produce 내부에서 생성된 데이터를 받는 거 ( receive ) 만 가능하다.

데이터 가져오기

채널의 데이터를 가져오는 방법은 receive 말고도 여러 가지 방법을 지원한다.

val channel = Channel<Int>()

CoroutineScope(Dispatchers.Default).launch {
    repeat(100) {
        channel.send(it)
    }
}

CoroutineScope(Dispatchers.Default).launch {

    val num = channel.receive() // 1

    for(x in channel) { // 2
        println(x)
    }

    channel.consumeEach { // 3
        println(it)
    }
}

1번의 receive 를 통해서 하나의 값을 받거나

2번, 3번의 for-loop, consumeEach 를 통해서 값 전체 하나씩 받아 올 수 있다.

( 채널이 종료되면 for문과 each문을 빠져나간다. )

channel.consumeAsFlow().collect{
    println(it)
}

consumeAsFlow 을 사용하면 데이터를 Flow 형식으로 받는 것도 가능하다.

 

대신, 이름 그대로 소비를 cold stream ( Flow )으로 할 뿐,

데이터 배출 자체는 hot stream ( Channel ) 이기에 사용을 기다리지 않고 한 번에 배출된다.

 

쉽게 말해서 한 번밖에 배출이 안되니, 진짜 Flow 처럼 여러 곳에서 동일한 데이터를 받는 것은 불가능하다.

val channel = produce<Int> { for (x in 0..5) send(x) }
val flowFromChannel = channel.consumeAsFlow()

flowFromChannel.collect{
    println("1 $it")
}
flowFromChannel.collect{
    println("2 $it")
}

result :
1 0
1 1
~~
1 4

val flow = (0..5).asFlow()

flow.collect{
    println("1 $it")
}
flow.collect{
    println("2 $it")
}

result :
1 0
1 1
~~
1 4
2 0
2 1
~~
2 4

위의 예제를 보면 consumeAsFlow 을 통해 만들어진 Flow 는 첫 번째 collect 만 동작되었고,

Flow 는 모든 collect 가 동작하는 것을 확인할 수 있다.

 

그럼 굳이 consumeAsFlow 을 쓸 필요가 있나? 그냥 consumeEach 랑 다를게 뭐지?

consumeAsFlow 가 필요한 경우

그냥 주는 데이터를 그대로 쓸 거면 consumeAsFlow 는 코드만 길어질 뿐 차이가 없다.

 

대신 데이터 조작을 위해 find, map, filter, first 같은 함수들을 이용할 거라면
consumeAsFlow 을 통해서 Flow 로 전환해서 사용해야 한다.

 

현재는 channel, flow 둘 다 map, filter 같은 함수를 제공하고 있기 때문에 전환을 안 해도 사용 가능 하지만

1.4.0 버전부터 channel 의 filter, map 같은 함수들이 제거된다. ( 현재도 Deprecated 상태이다. )

 

제거되는 이유는 코루틴 깃 이슈에서 확인할 수 있는데,

"사용의 입장에서 데이터 생성 ( filter, map ) 이 필요한 경우 hot stream ( Channel ) 은 적합하지 않으니 cold stream 한 객체를 따로 만들 필요가 있다."

라는 의견의 내용이다. ( 이때는 Flow 가 나오기 전이다. )

그렇다면 뭐가 적합하지 않다는 거지?

적합하지 않은 이유

적합하지 않다고 말한 이유를 filter 예제를 통해서 보도록 하자.

val source = produce<Int> { for (x in 0..1000) send(x) }

val filtered = channel.filter {
    System.currentTimeMillis() > 특정 시간
}
delay( 10000 )
filtered.consumeEach { .. }

hot stream 인 channel 에서는 동작은
1.produce 에서 데이터 생성 

2.filter 함수를 통해 걸러진 데이터 새로 생성 
3. 딜레이 ( 필터조건이 바뀔만한 로직 예제에서는 시간 )
4. 데이터 사용
순으로 진행된다.

 

당연히 필터 된 결과 값은 "실행 시점" 이 아닌 "생성 시점" 을 기준으로 필터링된 결과를 가지게 된다.

 

그럼 만약 사용자가 필터링되는 타이밍이 "사용 시점" 이길 바란다면 어떻게 해야 할까?

이것이 적합하지 않다는 이유이다.  hot stream 만으로는 해결할 수 없다. 

 

그 결과로 나온 것이 바로 사용 시점에서 데이터를 방출하는 lazy cold stream Flow 이다.

 

그건 그렇고, filter, map 등을 그냥 둬도 되지 않을까?

 

새로운 Flow ( cold ) 는 Sequential 하게 동일한 기능을 수행해주는데 Sequential 하지도 않은
Channel 의 코드( filter, map ... ) 들을 살려둘 필요가 없었다.

 

그래서 consumeAsFlow 를 통해 Flow 로 전환 가능하게 만든 후 Deprecated 붙여버린 것이라 추측한다.

Fan-In, Fan-Out

Channel을 소개할 때 여러 방향에서 데이터를 던지고 받는다고 했는데,

Fan-In, Fan-Out 은 그것을 지칭하는 용어이다.

 

그냥 여러 곳에서 for-loop , consumeEach , receive 등을 하면 Fan-Out, send 를 하면 Fan-In이다.

 

중요한 것은 Fan-Out 상태일 때 for-loop , consumeEach 의 동작의 차이이다.

launch {
    source.consumeEach {
        println("1 $it")
    }
}
launch {
    try {
        source.consumeEach {
            throw Exception()
        }
    }catch (e : Exception){
        ...
    }
}

----------------------------

launch {
    try {
        for (x in source) {
            throw Exception()
            println("1 $x")
        }
    }catch (e : Exception){
        ...
    }
}
launch {
    for (x in source) {
        println("2 $x")
    }
}

둘 다 에러가 발생하지 않는 경우는 동일한 동작을 하지만, 에러가 발생했을 때 결과가 달라진다.

 

위의 예제는 받는 곳이 두 곳일 때 한쪽이 에러가 발생한 상황이다.

 

consumeEach 의 경우 에러가 발생하면 채널 자체가 close된다. 

따라서, 사용하는 모든 곳이 다 같이 종료된다.

 

for-loop 의 경우 에러가 발생한 for-loop 만 중단되고 channel는 종료시키지 않는다.

따라서, 사용하고 있는 다른 곳들은 정상적인 동작을 이어간다.

perfectly safe to use from multiple coroutines

가이드 문서에서는 for-loop 를 완벽하게 안전한 것이라고 표현하고 있으니,

Fan-Out에서는 for-loop 를 사용하는 것이 좋겠다.

버퍼

Channel에서도 버퍼 사이즈나 형태를 지정할 수 있다.

Channel<Int>(4)
Channel<Int>(UNLIMITED)
Channel<Int>(RENDEZVOUS)
Channel<Int>(CONFLATED)
Channel<Int>(BUFFERED)

사이즈의 값을 직접 지정하거나 미리 정의된 UNLIMITED, RENDEZVOUS, CONFLATED, BUFFERED 등을 사용하면 된다.

 

UNLIMITED 는 버퍼에 제한이 Int.MAX_VALUE 인 상태
RENDEZVOUS 버퍼가 없는 상태
CONFLATED 는 항상 최신의 값 하나만 가지고 있는 상태
( 데이터가 있는 상태에서 새로운 데이터가 들어오면 이전 데이터는 소실된다. )
BUFFERED 는 시스템이 정한 버퍼값 ( 64개 )을 가지고 있는 상태

 

기본값은 RENDEZVOUS 이다.

공평한 채널

Channel 은 FIFO ( first-in first-out )으로 동작하여 하나의 채널이 독점하지 않고 순차적으로 데이터를 가져간다.

val channel = Channel<Int>()

launch {
    delay(100)
    channel.send(0)
}

launch {
    repeat(4){
        val x = channel.receive()
        println("1 $x")
        channel.send(x+1)
    }
}
launch {
    repeat(4){
        val x = channel.receive()
        println("2 $x")
        channel.send(x+1)
    }
}

result :
1 0
2 1
1 2
2 3
1 4
2 5
1 6
2 7

간단한 예제를 통해 보면 자기가 던진 데이터를 자기가 안 가져가고
먼저 receive 한 곳에서 가져가는 아주 아주 공평한 모습을 확인할 수 있다.

마무리

Channel의 경우 대부분이 실험 API, 삭제될 API 또는 쓸모없는(obsolete) API라서 실제로 사용 가능한 기능은 많지 않다.

 

개발자가 로드맵을 통해 Channel 은 코 루틴 간의 데이터 통신을 위해서 남겨 둔다고 말했으니
그렇게 쓰면 된다.

 
반응형

'코틀린' 카테고리의 다른 글

KotlinDl - 코틀린 버전의 딥러닝  (1) 2022.12.16
이펙티브 코틀린 - 음..?  (1) 2022.03.29
Kotlin Flow #4 - onXXX 함수와 예외처리  (0) 2020.07.11
Kotlin Flow #3 - Zip And Combine  (0) 2020.07.10
Kotlin Flow #2 - 연산자들  (0) 2020.07.08