concatMap
concatMap은 flatMap과 비교하면 쉽게 이해된다.
let redCircle = "🔴"
let greenCircle = "🟢"
let blueCircle = "🔵"
let redHeart = "❤️"
let greenHeart = "💚"
let blueHeart = "💙"
Observable.from([redCircle, greenCircle, blueCircle])
.flatMap { circle -> Observable<String> in
switch circle {
case redCircle:
return Observable.repeatElement(redHeart)
.take(5)
case greenCircle:
return Observable.repeatElement(greenHeart)
.take(5)
case blueCircle:
return Observable.repeatElement(blueHeart)
.take(5)
default:
return Observable.just("")
}
}
.subscribe { print($0) }
.disposed(by: disposeBag)
//next(❤️)
//next(❤️)
//next(💚)
//next(❤️)
//next(💚)
//next(💙)
//next(❤️)
//next(💚)
//next(💙)
//next(❤️)
//next(💚)
//next(💙)
//next(💚)
//next(💙)
//next(💙)
flatMap과 다르게 Interleaving 없이 항상 방출 순서를 보장한다.
위의 코드를 concatMap으로 바꿔본다면 출력은 아래처럼 된다.
next(❤️)
next(❤️)
next(❤️)
next(❤️)
next(❤️)
next(💚)
next(💚)
next(💚)
next(💚)
next(💚)
next(💙)
next(💙)
next(💙)
next(💙)
next(💙)
completed
Interleaving이 허용된 flatMap에선 파란, 초록하트가 이전에 나온 하트에 끼어들기가 가능했는데 concatMap에선 이게 불가능해진다.
concatMap은 InnerObservable을 생성된 순서대로 연결한다. 그리고 앞에있던 InnerObservable이 이벤트 방출을 끝내면 이어지는 InnerObservable에서 이벤트 방출을 시작한다.
scan
이 연산자는 기본값으로 연산을 시작한다.
원본옵저버블이 방출하는 항목을 대상으로 변환을 실행한 다음 결과를 방출하는 하나의 옵저버블을 리턴한다.
그래서 원본이 방출하는 항목의 수와 구독자로 전달되는 항목의 수가 동일한다.
첫 번째 파라미터로 기본값을 전달하는데 여기에서는 0을 전달한다.
두 번쨰 파라미터에는 클로저를 전달한다.
두 번째 파라미터로 전달할 수 있는 형식은 두가지다. 첫 번쨰는 클로저, 두번째 파라미터는 옵저버블이 방출하는 항목의 형식과 같다. 그리고 클로저에 리턴형은 첫번째 파라미터와 같다. scan연산자로 전달하는 클로저는 Accumulator Function 또는 Accumulator Closure라고 한다. 기본값이나 옵저버블이 방출하는 항목을 대상으로 Accumulator Closure를 실행한 다음 결과를 옵저버블로 리턴한다.
클로저가 리턴한 값은 이어서 실행되는 클로저의 첫 번째 파라미터로 전달된다.
Observable.range(start: 1, count: 10)
.scan(0, accumulator: +)
를 실행하면 클로저로 기본값 0과 1이 전달되고 acc로 전달된 + 연산을 수행한다. 다음 0과 1의 +인 1이 구독자에게 전달된다.
다음으로 옵저버블이 2를 방출하게 되면 이전결과인 1과 새로 방출된 2가 클로저로 전달된다. 이번에는 구독자에게 3이 전달된다.
이는 계산 누적합의 중간값과 결과값 모두 필요할 때 사용하는데, 만약 결과값만 필요하다면 reduce연산자를 사용한다.
buffer
이 연산자는 특정 주기동안 옵저버블이 방출하는 항목을 수집하고 하나의 배열로 리턴한다.
RxSwift에선 이런 동작을 Controlled Buffering이라고 한다.
1초마다 정수를 방출하는 옵저버블을 만든다.
buffer(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
이 연산자는 3개의 파라미터를 받는다.
첫 번째 파라미터는 항목을 수집할 시간이다. 여기에서 지정한 시간마다 수집되어 있는 항목을 배출하는데 시간이 경과되지 않아도 항목을 배출할 수 있다.
두번째 파라미터는 수집할 항목의 숫자이다. 정확한 숫자가 아니라 최대숫자이다. 최대숫자가 수집되지 않아도 시간이 경과되면 결과를 배출한다.
세 번째 파라미터는 스케쥴러이다.
연산자의 리턴형을보면 타입파라미터가 배열형으로 선언되어있다. 지정된 시간동안 수집된 항목들을 배열에 담아서 리턴한다.
RxTimeInterval이라고 적혀있지만 더이상 쓰이지 않는 형식이다. 그래서 보통 Dispatch time interval 형식으로 타임을 지정한다.
Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.buffer(timeSpan: .seconds(2), count: 3, scheduler: MainScheduler.instance)
.take(5)
.subscribe { print($0) }
//next([0])
//next([1, 2, 3])
//next([4, 5])
//next([6, 7])
//next([8, 9])
//completed
버퍼연산자는 첫 번쨰 파라미터로 전달한 timeSpan이 경과하면 수집된 항목들을 즉시 방출한다. 두번째 파라미터로 지정한 숫자가 수집되지 않았더라도 즉시 방출한다.
지금은 2초마다 수집하고 있으니까 방출되는 배열에는 보통 2개의 요소가 포함되어 있다. 하지만 시간상의 오차로 인해 첫 번째 배열처럼 하나가 포함되어있는 경우도 있고 두번째 배열처럼 3개가 포함되어있는경우도 있다.
Window
Window연산자는 버퍼 연산자처럼 timeSpan과 maxcount를 지정해서 원본 옵저버블이 방출하는 항목들을 작은단위의 옵저버블로 분해한다. 버퍼연산자는 수집된 항목들을 배열형태로 리턴하지만 윈도우 연산자는 수집된 항목들을 방출하는 옵저버블을 리턴한다. 그래서 리턴된 옵저버블이 무엇을 방출하고 언제 완료되는지 이해하는 것이 중요하다.
public func window(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
-> Observable<Observable<Element>> {
return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
}
파라미터로는 버퍼연산자와 동일하다.
첫 번째 파라미턴느 시간을 분해할 시간단위를 전달한다.
두 번째 파라미터는 분해할 최대 항목수를 전달하고
마지막 파라미터는 연산자를 실행할 스케쥴러를 전달한다.
버퍼연산자와의 차이는 리턴타입에있다.
버퍼연산자는 수집된 배열을 방출하는 옵저버블을 리턴하는데
윈도우연산자는 옵저버블을 방출하는 옵저버블을 리턴한다.
이런 옵저버블을 방출하는 옵저버블을 InnerObservable이라고 부른다. InnerObservable은 지정된 최대 항목 수 만큼 방출하거나 지정된 시간이 경과하면 CompletedEvent를 방출하고 종료된다.
2초마다 최대 3개의 항목을 수집하도록 파라미터를 전달한다. 또한 take연산자로 전달되는 항목의 개수를 5개로 제한한다.
Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.window(timeSpan: .seconds(2), count: 3, scheduler: MainScheduler.instance)
.take(5)
.subscribe{ print($0) }
.disposed(by: disposeBag)
//next(RxSwift.AddRef<Swift.Int>)
//next(RxSwift.AddRef<Swift.Int>)
//next(RxSwift.AddRef<Swift.Int>)
//next(RxSwift.AddRef<Swift.Int>)
//next(RxSwift.AddRef<Swift.Int>)
//completed
결과를 보면 2초마다 항목이 방출되고 있다.
여기에 있는 addRef는 특별한 형태의 옵저버블이다. 앞에서 설명한 InnerObservable다.
addRef는 옵저버블이고 구독할 수 있다.
Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.window(timeSpan: .seconds(2), count: 3, scheduler: MainScheduler.instance)
.take(5)
.subscribe{ print($0)
//이처럼 inner onservable에 접근할 수 있다
if let observable = $0.element{
observable.subscribe{ print(" inner: ", $0)}
}
}
.disposed(by: disposeBag)
/*
next(RxSwift.AddRef<Swift.Int>)
inner: next(0)
inner: completed
next(RxSwift.AddRef<Swift.Int>)
inner: next(1)
inner: next(2)
inner: next(3)
inner: completed
next(RxSwift.AddRef<Swift.Int>)
inner: next(4)
inner: next(5)
inner: completed
next(RxSwift.AddRef<Swift.Int>)
inner: next(6)
inner: next(7)
inner: completed
next(RxSwift.AddRef<Swift.Int>)
completed
inner: next(8)
inner: next(9)
inner: completed
*/
원본 옵저버블은1초마다 하나씩 방출하고있고, 윈도우 연산자는 2초마다 3개씩 수집하고 있다.
결과적으로 count는 채우지 못한다.
결과를 보면 count가 채워질 때 까지 기다리지 않고 시간만큼 기다린다음 방출한다.
버퍼연산자와 마찬가지로 시간의 오차로인해 3개, 1개가 방출되는 경우도 있다.
GroupBy
이름에서 유추할 수 있는 것처럼 옵저버블에서 방출하는 요소를 원하는 기준으로 그룹화한다.
public func groupBy<Key: Hashable>(keySelector: @escaping (Element) throws -> Key)
-> Observable<GroupedObservable<Key, Element>> {
GroupBy(source: self.asObservable(), selector: keySelector)
}
파라미터로 클로저를 받고 클로저는 요소를 파라미터로 받아서 키를 리턴한다. 키의 형식은 Hashable을 채용한 형식으로 한정되어있다.
연산자를 실행하면 요소에서 클로저에서 동일한 값을 리턴하는 요소끼리 그룹으로 묶이고 그룹에 속한 요소들은 개별 옵저버블을 통해 방출된다.
연산자가 리턴하는 옵저버블을 보면 타입파라미터가 GroupedObservable로 선언되어 있다.
여기에는 방출하는 요소와 함께 키가 저장되어있다.
문자열 길이를 기준으로 그룹화 해본다.
let words = ["Apple", "Banana", "Orange", "Book", "City", "Axe"]
Observable.from(words)
.groupBy{ $0.count }
.subscribe{ print( $0 )
.disposed(by: disposeBag)
//이렇게 클로저에서 문자열의 길이를 리턴하면 키 형식이 Int가 된다.
//그리고 문자열 길이에 따라서 그룹화된다.
/*
next(GroupedObservable<Int, String>(key: 5, source: RxSwift.(unknown context at $106b159f8).GroupedObservableImpl<Swift.String>))
next(GroupedObservable<Int, String>(key: 6, source: RxSwift.(unknown context at $106b159f8).GroupedObservableImpl<Swift.String>))
next(GroupedObservable<Int, String>(key: 4, source: RxSwift.(unknown context at $106b159f8).GroupedObservableImpl<Swift.String>))
next(GroupedObservable<Int, String>(key: 3, source: RxSwift.(unknown context at $106b159f8).GroupedObservableImpl<Swift.String>))
completed
*/
문자열이 방출되는것이 아니다.
그룹으로 묶인 문자열을 방출하는 옵저버블이 방출된다. 그리고 옵저버블은 GroupedObservable이고 키가 함께 저장되어있다.
키와 그룹을 함께 출력해본다
let disposeBag = DisposeBag()
let words = ["Apple", "Banana", "Orange", "Book", "City", "Axe"]
Observable.from(words)
.groupBy{ $0.count }
.subscribe(onNext: { groupedObservable in
print("==\(groupedObservable.key)")
groupedObservable.subscribe{ print( $0 )}
})
.disposed(by: disposeBag)
/*
==5
next(Apple)
==6
next(Banana)
next(Orange)
==4
next(Book)
next(City)
==3
next(Axe)
completed
completed
completed
completed
*/
groupBy연산자를 활용할 때에는 보통 flatMap와 toArray연산자를 활용해서 그룹핑된 최종 결과를 하나의 배열로 방출하도록 구현한다.
let disposeBag = DisposeBag()
let words = ["Apple", "Banana", "Orange", "Book", "City", "Axe"]
Observable.from(words)
.groupBy{ $0.count }
.flatMap{ $0.toArray()}
.subscribe{ print($0) }
.disposed(by: disposeBag)
/*
next(["Book", "City"])
next(["Banana", "Orange"])
next(["Apple"])
next(["Axe"])
completed
*/
이렇게 되면 문자열 길이로 그룹핑된 4개의 배열이 방출된다.
이번엔 첫 번째 문자를 기준르로 그룹핑한다.
let words = ["Apple", "Banana", "Orange", "Book", "City", "Axe"]
Observable.from(words)
// .groupBy{ $0.count }
.groupBy{ $0.first ?? Character(" ")}
.flatMap{ $0.toArray()}
.subscribe{ print($0) }
.disposed(by: disposeBag)
/*
next(["Banana", "Book"])
next(["City"])
next(["Apple", "Axe"])
next(["Orange"])
completed
*/
도전과제)
groupBy를 활용해서 홀수와 짝수로 나눠보기.
Observable.range(start: 1, count: 10)
.groupBy{ $0.isMultiple(of: 2) }
.flatMap{ $0.toArray() }
.subscribe{ print($0) }
.disposed(by: disposeBag)
/*
next([1, 3, 5, 7, 9])
next([2, 4, 6, 8, 10])
completed
*/