Sharing Subscription Overview
구독공유를 통해서 불필요한 작업을 방지하는 방법.
let source = Observable<String>.create { observer in
let url = URL(string: "https://kxcoding-study.azurewebsites.net/api/string")!
let task = URLSession.shared.dataTask(with: url) { (data, response, error) in
if let data = data, let html = String(data: data, encoding: .utf8) {
observer.onNext(html)
}
observer.onCompleted()
}
task.resume()
return Disposables.create {
task.cancel()
}
}
.debug()
source.subscribe().disposed(by: bag)
source.subscribe().disposed(by: bag)
source.subscribe().disposed(by: bag)
API서버에 접속한 다음 전달된 문자열을 방출하는 간단한 옵저버블과 3개의 구독자
옵저버블에 구독자가 추가되면 시퀀스가 시작된다. 시퀀스가 시작되면 클로저에서 구현한 코드가 실행되고 서버에서 전달된 문자열이 방출된다음 종료된다.
한개의 구독자까지만 실행을 해보면 결과는 아래와 같다.
let source = Observable<String>.create { observer in
let url = URL(string: "https://kxcoding-study.azurewebsites.net/api/string")!
let task = URLSession.shared.dataTask(with: url) { (data, response, error) in
if let data = data, let html = String(data: data, encoding: .utf8) {
observer.onNext(html)
}
observer.onCompleted()
}
task.resume()
return Disposables.create {
task.cancel()
}
}
.debug()
source.subscribe().disposed(by: bag)
//2023-02-20 17:41:46.227: overveiw.playground:48 (__lldb_expr_508) -> subscribed
//2023-02-20 17:41:49.392: overveiw.playground:48 (__lldb_expr_508) -> Event next(Hello)
//2023-02-20 17:41:49.393: overveiw.playground:48 (__lldb_expr_508) -> Event completed
//2023-02-20 17:41:49.393: overveiw.playground:48 (__lldb_expr_508) -> isDisposed
네트워크 작업이 실행되었다.
세 번쨰 구독자까지 실행시)
source.subscribe().disposed(by: bag)
source.subscribe().disposed(by: bag)
source.subscribe().disposed(by: bag)
/*
2023-02-20 17:41:46.227: overveiw.playground:48 (__lldb_expr_508) -> subscribed
2023-02-20 17:41:49.392: overveiw.playground:48 (__lldb_expr_508) -> Event next(Hello)
2023-02-20 17:41:49.393: overveiw.playground:48 (__lldb_expr_508) -> Event completed
2023-02-20 17:41:49.393: overveiw.playground:48 (__lldb_expr_508) -> isDisposed
2023-02-20 17:42:52.074: overveiw.playground:48 (__lldb_expr_508) -> subscribed
2023-02-20 17:42:52.088: overveiw.playground:48 (__lldb_expr_508) -> Event next(Hello)
2023-02-20 17:42:52.089: overveiw.playground:48 (__lldb_expr_508) -> Event completed
2023-02-20 17:42:52.089: overveiw.playground:48 (__lldb_expr_508) -> isDisposed
2023-02-20 17:42:52.074: overveiw.playground:48 (__lldb_expr_508) -> subscribed
2023-02-20 17:42:52.088: overveiw.playground:48 (__lldb_expr_508) -> Event next(Hello)
2023-02-20 17:42:52.089: overveiw.playground:48 (__lldb_expr_508) -> Event completed
2023-02-20 17:42:52.089: overveiw.playground:48 (__lldb_expr_508) -> isDisposed
*/
첫 번째 구독자가 전달받은 결과를 나머지 구독자가 전달받으면 좋겠지만 기본적으로 공유되지 않는다
구독자가 추가되면 항상 새로운 시퀀스가 추가된다.
3개의 구독자가 추가되면 옵저버블에 3개의 구독자가 추가되었고 3번의 네트워크 요청이 실행되었다.
이렇게 구현하면 클라이언트에서 불필요한 리소스를 낭비한다. 서버도 마찬가지다.
이런 문제를 해결하기 위해서는 모든 구독자가 하나의 구독을 공유하도록 구현해야한다.
RxSwift는 여기에 필요한 다양한 연산자를 제공한다.
share이용)
let source = Observable<String>.create { observer in
let url = URL(string: "https://kxcoding-study.azurewebsites.net/api/string")!
let task = URLSession.shared.dataTask(with: url) { (data, response, error) in
if let data = data, let html = String(data: data, encoding: .utf8) {
observer.onNext(html)
}
observer.onCompleted()
}
task.resume()
return Disposables.create {
task.cancel()
}
}
.debug()
.share()
source.subscribe().disposed(by: bag)
source.subscribe().disposed(by: bag)
source.subscribe().disposed(by: bag)
//2023-02-20 17:48:37.561: overveiw.playground:48 (__lldb_expr_512) -> subscribed
//2023-02-20 17:48:37.635: overveiw.playground:48 (__lldb_expr_512) -> Event next(Hello)
//2023-02-20 17:48:37.636: overveiw.playground:48 (__lldb_expr_512) -> Event completed
//2023-02-20 17:48:37.636: overveiw.playground:48 (__lldb_expr_512) -> isDisposed
이제 모든 구독자가 구독을 공유하기 때문에 옵저버블에서 구현한 코드는 한번만 실행된다.
첫 번째 구독자를 추가하면 공유한 구독이 없기 때문에 새로 시퀀스를 시작한다.
관련된 로그가 출력된다.
이후에 새로운 구독자를 추가하면 공유할 구독이 있기 때문에 새로운 시퀀스를 시작하지 않는다.
그래서 관련된 로그는 더이상 출력되지 않는다.
let subject = PublishSubject<Int>()
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).take(5)
source
.subscribe { print("🔵", $0) }
.disposed(by: bag)
source
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe { print("🔴", $0) }
.disposed(by: bag)
/*
🔵 next(0)
🔵 next(1)
🔵 next(2)
🔵 next(3)
🔴 next(0)
🔵 next(4)
🔵 completed
🔴 next(1)
🔴 next(2)
🔴 next(3)
🔴 next(4)
🔴 completed
*/
multicast
let subject = PublishSubject<Int>()
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).take(5)
source
.subscribe { print("🔵", $0) }
.disposed(by: bag)
source
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe { print("🔴", $0) }
.disposed(by: bag)
/*
🔵 next(0)
🔵 next(1)
🔵 next(2)
🔵 next(3)
🔴 next(0)
🔵 next(4)
🔵 completed
🔴 next(1)
🔴 next(2)
🔴 next(3)
🔴 next(4)
🔴 completed
*/
1초마다 정수를 5개 방출하는 옵저버블에 2개의 구독자가 있다. 각 구독자는 파란공과 빨간공으로 나뉘며 빨간공의 구독자는 구독 시점을 3초 연기시키고있다.
두개의 시퀀스가 개별적으로 시작되었고 서로 공유되지 않는다. RxSwift에서 가장 기초적인 규칙이다.
네트워크에서 사용하는 개념으로 설명하면 Unicast와 같다. 여러 구독자가 하나의 옵저버블을 공유하도록 구현하는 방법은 다양하다
public func multicast<Subject: SubjectType>(_ subject: Subject)
-> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
ConnectableObservableAdapter(source: self.asObservable(), makeSubject: { subject })
}
멀티캐스트 연산자는 서브젝트를 파라미터로 받는다.
원본옵저버블이 방출하는 이벤트는 구독자로 방출되는것이 아니라 이 서브젝트로 전달된다.
서브젝트는 전달받은 이벤트를 등록된 다수의 구독자에 전달한다.
기본적으로 유니캐스트 방식으로 작동하는 멀티캐스트 방식으로 바꿔준다.
이것을 위해 특별한 형식의 옵저버블을 리턴한다.
ConnectableObservable
ConnectableObservable은 일반 옵저버블과 구분되는 특징을 갖고있다. 일반 옵저버블은 구독자가 추가되면 새로운 시퀀스를 시작한다.
다시말해 이벤트 방출을 시작한다. 하지만 ConnectableObservable은 시퀀스가 시작되는 시점이 다르다. 구독자가 추가되어도 시퀀스는 시작되지 않는다. Connect메서드를 호출하는 시점에 시퀀스가 시작된다.
원본옵저버블이 전달하는 이벤트는 구독자에게 바로 전달되는것이 아니라 첫번째 파라미터로 전달한 서브젝트로 전달한다.
그리고 이 서브젝트가 등록된 모든 구독자에게 이벤트를 전달한다. 이렇게 동작하기 때문에 모든 구독자가 등록된 이후에 하나의 시퀀스를 시작하는 패턴을 구현할 수 있다.
ConnectableObservableAdapter는 원본옵저버블과 서브젝트를 연결해주는 특별한 클래스이다.
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(5)
.multicast(subject)
이렇게되면 source에는 일반옵저버블이 아닌 ConnectableObservable이 저장된다.
let subject = PublishSubject<Int>()
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(5)
.multicast(subject)
source
.subscribe { print("🔵", $0) }
.disposed(by: bag)
source
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe { print("🔴", $0) }
.disposed(by: bag)
이렇게되면 코드는 실행되었지만 아무것도 출력되지 않는다. 왜그럴까
ConnectableObservable은 구독자가 추가되는 시점에 시퀀스를 시작하지 않는다.
source.connect()
connect메서드를 명시적으로 호출해야 시퀀스가 시작된다.
원본 옵저버블에서 시퀀스가 시작되고 모든 이벤트는 파라미터로 전달한 서브젝트로 전달된다.
그리고 이 서브젝트는 등록된 모든 구독자에게 이벤트를 전달한다.
이 모든 과정은 connect메서드가 호출되는 시점에 시작된다.
실행결과
//원본옵저버블
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).take(5)
.multicast(subject)
source
.subscribe { print("🔵", $0) }
.disposed(by: bag)
source
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe { print("🔴", $0) }
.disposed(by: bag)
source.connect()
/*
🔵 next(0)
🔵 next(1)
🔵 next(2)
🔴 next(2)
🔵 next(3)
🔴 next(3)
🔵 next(4)
🔴 next(4)
🔵 completed
🔴 completed
*/
이전에는 구독시점이 지연되더라도 원본옵저버블이 방출하는 모든 이벤트를 전달받았다. 구독자마다 개별 시퀀스가 시작되었기 때문이다.
지금은 multicast연산자로 원본옵저버블을 ConnectableObservable로 바꿨다. 그러면 모든 구독자가 원본 옵저버블을 공유한다.
구독이 지연된 3초동안 원본옵저버블이 전달한 2개의 event는(0, 1) 2번째 구독자에게 전달되지 않는다.
두번째 구독자가 처음으로 받게되는 이벤트는 2가 저장되어있는 NE이다.
connect메서드의 리턴형은 Disposable이다. 그래서 원하는 시점에 dispose메서드를 호출해서 공유시퀀스를 중지할 수 있다.
그리고 다른 옵저버블처럼 disposebag에 넣어서 리소스를 정리할 수 있다.
Multicast연산자는 하나의 옵저버블을 공유할 때 사용하는 가장 기본적인 연산자이다.
원하는 기능을 자유롭게 구현할 수 있지만 서브젝트를 직접 만들고 connect메서드를 직접 호출해야한다는 점에서 조금 번거롭다.
그래서 multicast연산자를 직접 활용하기보다 이 연산자를 활용하는 다른 연산자를 주로 사용한다
publish
public func publish() -> ConnectableObservable<Element> {
self.multicast { PublishSubject() }
}
multicast 연산자를 호출하고 새로운 PublishSubject를 파라미터로 전달한다.
그다음 multicast가 리턴하는 새로운 ConnectableObservable을 그대로 리턴한다.
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(5)
.publish()
source
.subscribe { print("🔵", $0) }
.disposed(by: bag)
source
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe { print("🔴", $0) }
.disposed(by: bag)
source.connect()
/*
🔵 next(0)
🔵 next(1)
🔵 next(2)
🔴 next(2)
🔵 next(3)
🔴 next(3)
🔵 next(4)
🔴 next(4)
🔵 completed
🔴 completed
*/
multicast 연산자는 옵저버블을 공유하기 위해서 내부적으로 서브젝트를 사용한다.
파라미터로 publishsubject를 전달한다면 직접 생성해서 전달하는거보다 publish연산자를 활용해서 사용하는것이 단순하고 좋다.
publishsubject를 자동으로 생성해준다는 것을 제외하면 나머지는 multicast와 동일하다.
그러므로 connect메서드를 호출하는것을 생략할 순 없다
replay
ConnectableObservable에 버퍼를 추가하고 새로운 구독자에게 최근 이벤트를 전달하는 방법, 다음 replay연산자를 활용해서 코드를 단순하게 바꾸는 방법에 대해 공부
let subject = PublishSubject<Int>()
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).take(5).multicast(subject)
source
.subscribe { print("🔵", $0) }
.disposed(by: bag)
source
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe { print("🔴", $0) }
.disposed(by: bag)
source.connect()
/*
🔵 next(0)
🔵 next(1)
🔵 next(2)
🔴 next(2)
🔵 next(3)
🔴 next(3)
🔵 next(4)
🔴 next(4)
🔵 completed
🔴 completed
*/
multicast연산자로 전달하는 subject에 집중해본다.
connect메서드가 호출되면 원본옵저버블에서 시퀀스가 시작되고 구독자에게 이벤트가 전달되기 시작한다. 첫 번째 구독자는 지연없이 구독을 시작하기 때문에 모든 이벤트를 전달받는다. 하지만 두 번쨰 구독자는 3초뒤에 구독을 시작한다. 그래서 구독전에 서브젝트가 내보낸 이벤트는 받지 못한다.
두번쨰 구독자에게 이전에 전달했던 이벤트도 함께 전달하고 싶다면?
PublishSubject는 별도의 버퍼를 가지고있지 않아서 이런것이 불가능하다.
PublishSubject를 ReplaySubject로 바꾸면 문제가 쉽게 해결된다.
let subject = ReplaySubject<Int>.create(bufferSize: 5)
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).take(5).multicast(subject)
source
.subscribe { print("🔵", $0) }
.disposed(by: bag)
source
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe { print("🔴", $0) }
.disposed(by: bag)
source.connect()
/*
🔵 next(0)
🔵 next(1)
🔴 next(0)
🔴 next(1)
🔵 next(2)
🔴 next(2)
🔵 next(3)
🔴 next(3)
🔵 next(4)
🔴 next(4)
🔵 completed
🔴 completed
*/
이전에는 두번쨰 구독자가 처음 받는 이벤트가 2가 저장되어있는 NE였다.
이번에는 0과 1이 저장된 NE도 함께 받고있다. 여기에서 최대 5개의 이벤트를 버퍼에 저장하고 있기 때문이다.
이와같은 작업을 replay연산자를 활용해서 코드를 단순하게 바꿀 수 있다.
public func replay(_ bufferSize: Int)
-> ConnectableObservable<Element> {
self.multicast { ReplaySubject.create(bufferSize: bufferSize) }
}
publish 연산자와 맢찬가지로 multicast연산자를 호출하고 있다. 그리고 리플레이 서브젝트를 만들어서 파라미터로 전달하고있다.
multicast연산자로 PublishSubject로 전달한다면 publish연산자를 사용하고 Replay Subject를 전달하면 replay연산자를 사용한다.
두 연산자 모두 multicast연산자를 쉽게 사용하게 도와주는 유틸리티 연산자이다.
보통은 파라미터를 통해서 버퍼의 크기를 지정하지만 버퍼의 크기에 제한이 없는 replayAll연산자도 있다.
public func replayAll()
-> ConnectableObservable<Element> {
self.multicast { ReplaySubject.createUnbounded() }
}
하지만 구현에 따라서 메모리 사용량이 급격하게 증가하는 문제가 있기 때문에 특별한 이유가 없으면 사용하지 않아야 한다.
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(5)
.replay(5)
source
.subscribe { print("🔵", $0) }
.disposed(by: bag)
source
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe { print("🔴", $0) }
.disposed(by: bag)
source.connect()
/*
🔵 next(0)
🔵 next(1)
🔴 next(0)
🔴 next(1)
🔵 next(2)
🔴 next(2)
🔵 next(3)
🔴 next(3)
🔵 next(4)
🔴 next(4)
🔵 completed
🔴 completed
*/
결과는 ReplaySubject를 전달한것과 동일하지만 코드가 짧고 간결해졌다.
replay연산자를 사용할땐 신중하게 버퍼의 크기를 지정해야한다.
필요이상으로 크게 지정하면 필연적으로 메모리 문제가 발생하기 때문에 필요한 선에서 가장 작은 크기로 지정해야한다.
그리고 버퍼크기에 제한이 없는 replayAll 연산자는 가능하다면 사용하지 말아야 한다.