Channel 이란?
앞에서 본 Flow가 단일방향으로 데이터를 던지고 받는 형식이라면
Channel 은 여러 방향에서 데이터를 던지고 받는 형식으로 코루틴 끼리의 데이터를 전달하기 위한 위한 친구이다.
구조는 BlockingQueue 와 비슷하며, 동일하게 ThreadSafe 한 형태의 구조를 가지고 있다.
기본적인 사용법
기본적인 사용법은 간단하다.
val channel = Channel<Int>()
CoroutineScope(Dispatchers.Default).launch {
channel.send(it)
channel.receive()
}
channel.close()
Channel<...>()
함수를 통해 생성할 수 있으며,
데이터를 스트림에 밀어 넣을 땐 send
, 스트림에서 받을 땐 receive
를 사용하면 된다.
생성 위치는 상관없지만 send
와 receive
는 suspend 함수이기 때문에 코루틴 내부에서 호출되어야 한다.
채널에 더 이상 아무 데이터도 보내거나 받지 않는다면 채널을 종료시켜야 한다.
close
함수를 통해서 종료시킬 수 있으며,
종료 이후에 send
, receive
함수는 ClosedReceiveChannelException
을 발생시킨다.
파이프라인
파이프라인은 채널을 생성하는 패턴으로
하나의 코루틴이 초기 데이터를 생성하고, 소비하는 곳에서 받은 후 새로운 데이터를 생성하는 흐름을 말한다.
produce<...>{ ... }
함수를 통하여 만들 수 있다.
val numbers = produce<Int> { for (x in 0 .. Int.MAX_VALUE) send(x) }
val numXnum = produce { for (x in numbers) send(x * x) }
사용법은 위와 같으며 결과물로는 0 ~ MAX_VALUE 까지 데이터를 방출하고 그 곱을 방출하는 채널이 만들어진다.
응답값으로 ReceiveChannel
가 반환되며, 해당 채널은 데이터를 추가적으로 보내는 것 ( send ) 은 불가능하고
produce 내부에서 생성된 데이터를 받는 거 ( receive ) 만 가능하다.
데이터 가져오기
채널의 데이터를 가져오는 방법은 receive
말고도 여러 가지 방법을 지원한다.
val channel = Channel<Int>()
CoroutineScope(Dispatchers.Default).launch {
repeat(100) {
channel.send(it)
}
}
CoroutineScope(Dispatchers.Default).launch {
val num = channel.receive() // 1
for(x in channel) { // 2
println(x)
}
channel.consumeEach { // 3
println(it)
}
}
1번의 receive
를 통해서 하나의 값을 받거나
2번, 3번의 for-loop
, consumeEach
를 통해서 값 전체 하나씩 받아 올 수 있다.
( 채널이 종료되면 for문과 each문을 빠져나간다. )
channel.consumeAsFlow().collect{
println(it)
}
consumeAsFlow
을 사용하면 데이터를 Flow
형식으로 받는 것도 가능하다.
대신, 이름 그대로 소비를 cold stream ( Flow )으로 할 뿐,
데이터 배출 자체는 hot stream ( Channel ) 이기에 사용을 기다리지 않고 한 번에 배출된다.
쉽게 말해서 한 번밖에 배출이 안되니, 진짜 Flow
처럼 여러 곳에서 동일한 데이터를 받는 것은 불가능하다.
val channel = produce<Int> { for (x in 0..5) send(x) }
val flowFromChannel = channel.consumeAsFlow()
flowFromChannel.collect{
println("1 $it")
}
flowFromChannel.collect{
println("2 $it")
}
result :
1 0
1 1
~~
1 4
val flow = (0..5).asFlow()
flow.collect{
println("1 $it")
}
flow.collect{
println("2 $it")
}
result :
1 0
1 1
~~
1 4
2 0
2 1
~~
2 4
위의 예제를 보면 consumeAsFlow
을 통해 만들어진 Flow
는 첫 번째 collect
만 동작되었고,
Flow
는 모든 collect
가 동작하는 것을 확인할 수 있다.
그럼 굳이 consumeAsFlow
을 쓸 필요가 있나? 그냥 consumeEach
랑 다를게 뭐지?
consumeAsFlow 가 필요한 경우
그냥 주는 데이터를 그대로 쓸 거면 consumeAsFlow
는 코드만 길어질 뿐 차이가 없다.
대신 데이터 조작을 위해 find
, map
, filter
, first
같은 함수들을 이용할 거라면consumeAsFlow
을 통해서 Flow
로 전환해서 사용해야 한다.
현재는 channel
, flow
둘 다 map, filter 같은 함수를 제공하고 있기 때문에 전환을 안 해도 사용 가능 하지만
1.4.0 버전부터 channel
의 filter, map 같은 함수들이 제거된다. ( 현재도 Deprecated 상태이다. )
제거되는 이유는 코루틴 깃 이슈에서 확인할 수 있는데,
"사용의 입장에서 데이터 생성 ( filter, map ) 이 필요한 경우 hot stream ( Channel ) 은 적합하지 않으니 cold stream 한 객체를 따로 만들 필요가 있다."
라는 의견의 내용이다. ( 이때는 Flow 가 나오기 전이다. )
그렇다면 뭐가 적합하지 않다는 거지?
적합하지 않은 이유
적합하지 않다고 말한 이유를 filter
예제를 통해서 보도록 하자.
val source = produce<Int> { for (x in 0..1000) send(x) }
val filtered = channel.filter {
System.currentTimeMillis() > 특정 시간
}
delay( 10000 )
filtered.consumeEach { .. }
hot stream 인 channel
에서는 동작은
1.produce
에서 데이터 생성
2.filter
함수를 통해 걸러진 데이터 새로 생성
3. 딜레이 ( 필터조건이 바뀔만한 로직 예제에서는 시간 )
4. 데이터 사용
순으로 진행된다.
당연히 필터 된 결과 값은 "실행 시점" 이 아닌 "생성 시점" 을 기준으로 필터링된 결과를 가지게 된다.
그럼 만약 사용자가 필터링되는 타이밍이 "사용 시점" 이길 바란다면 어떻게 해야 할까?
이것이 적합하지 않다는 이유이다. hot stream 만으로는 해결할 수 없다.
그 결과로 나온 것이 바로 사용 시점에서 데이터를 방출하는 lazy cold stream Flow
이다.
그건 그렇고, filter
, map
등을 그냥 둬도 되지 않을까?
새로운 Flow
( cold ) 는 Sequential 하게 동일한 기능을 수행해주는데 Sequential 하지도 않은Channel
의 코드( filter, map ... ) 들을 살려둘 필요가 없었다.
그래서 consumeAsFlow
를 통해 Flow
로 전환 가능하게 만든 후 Deprecated 붙여버린 것이라 추측한다.
Fan-In, Fan-Out
Channel을 소개할 때 여러 방향에서 데이터를 던지고 받는다고 했는데,
Fan-In, Fan-Out 은 그것을 지칭하는 용어이다.
그냥 여러 곳에서 for-loop
, consumeEach
, receive
등을 하면 Fan-Out, send
를 하면 Fan-In이다.
중요한 것은 Fan-Out 상태일 때 for-loop
, consumeEach
의 동작의 차이이다.
launch {
source.consumeEach {
println("1 $it")
}
}
launch {
try {
source.consumeEach {
throw Exception()
}
}catch (e : Exception){
...
}
}
----------------------------
launch {
try {
for (x in source) {
throw Exception()
println("1 $x")
}
}catch (e : Exception){
...
}
}
launch {
for (x in source) {
println("2 $x")
}
}
둘 다 에러가 발생하지 않는 경우는 동일한 동작을 하지만, 에러가 발생했을 때 결과가 달라진다.
위의 예제는 받는 곳이 두 곳일 때 한쪽이 에러가 발생한 상황이다.
consumeEach
의 경우 에러가 발생하면 채널 자체가 close
된다.
따라서, 사용하는 모든 곳이 다 같이 종료된다.
for-loop
의 경우 에러가 발생한 for-loop
만 중단되고 channel
는 종료시키지 않는다.
따라서, 사용하고 있는 다른 곳들은 정상적인 동작을 이어간다.
perfectly safe to use from multiple coroutines
가이드 문서에서는 for-loop 를 완벽하게 안전한 것이라고 표현하고 있으니,
Fan-Out에서는 for-loop
를 사용하는 것이 좋겠다.
버퍼
Channel에서도 버퍼 사이즈나 형태를 지정할 수 있다.
Channel<Int>(4)
Channel<Int>(UNLIMITED)
Channel<Int>(RENDEZVOUS)
Channel<Int>(CONFLATED)
Channel<Int>(BUFFERED)
사이즈의 값을 직접 지정하거나 미리 정의된 UNLIMITED, RENDEZVOUS, CONFLATED, BUFFERED 등을 사용하면 된다.
UNLIMITED
는 버퍼에 제한이 Int.MAX_VALUE 인 상태RENDEZVOUS
버퍼가 없는 상태CONFLATED
는 항상 최신의 값 하나만 가지고 있는 상태
( 데이터가 있는 상태에서 새로운 데이터가 들어오면 이전 데이터는 소실된다. )BUFFERED
는 시스템이 정한 버퍼값 ( 64개 )을 가지고 있는 상태
기본값은 RENDEZVOUS
이다.
공평한 채널
Channel 은 FIFO ( first-in first-out )으로 동작하여 하나의 채널이 독점하지 않고 순차적으로 데이터를 가져간다.
val channel = Channel<Int>()
launch {
delay(100)
channel.send(0)
}
launch {
repeat(4){
val x = channel.receive()
println("1 $x")
channel.send(x+1)
}
}
launch {
repeat(4){
val x = channel.receive()
println("2 $x")
channel.send(x+1)
}
}
result :
1 0
2 1
1 2
2 3
1 4
2 5
1 6
2 7
간단한 예제를 통해 보면 자기가 던진 데이터를 자기가 안 가져가고
먼저 receive 한 곳에서 가져가는 아주 아주 공평한 모습을 확인할 수 있다.
마무리
Channel의 경우 대부분이 실험 API, 삭제될 API 또는 쓸모없는(obsolete) API라서 실제로 사용 가능한 기능은 많지 않다.
개발자가 로드맵을 통해 Channel 은 코 루틴 간의 데이터 통신을 위해서 남겨 둔다고 말했으니
그렇게 쓰면 된다.
'코틀린' 카테고리의 다른 글
KotlinDl - 코틀린 버전의 딥러닝 (1) | 2022.12.16 |
---|---|
이펙티브 코틀린 - 음..? (1) | 2022.03.29 |
Kotlin Flow #4 - onXXX 함수와 예외처리 (0) | 2020.07.11 |
Kotlin Flow #3 - Zip And Combine (0) | 2020.07.10 |
Kotlin Flow #2 - 연산자들 (0) | 2020.07.08 |