사용 목적

  • 서로 공유되지 않는 개별적인 Sequence를 공유하여 한번만 실행하도록 한다.
  • 네트워크 요청이나 DB에 접근하는 작업에 유용하다.

multicast

  • multicasst(_:) 연산자는 Subject를 파라미터로 받는데, 원본 Observerble에서 방출되는 이벤트가 구독자에게 바로 전달되는 것이 아니라 파라미터로 전달된 Subject에게 전달된다.
  • 그리고 이 Subject가 등록된 다수의 구독자에게 이벤트를 전달한다.
  • 여기에서 ConnectableObservable이 등장하는데, 구독이 시작되면 이벤트를 바로 전달하는 일반 Observable과 달리 connect() 메소드가 실행되어야 이벤트를 방출한다.
let subject = PublishSubject<Int>()

let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).take(5)

source
	.subscribe { print("🔵", $0) }
	.disposed(by: bag)

source
	.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
	.subscribe { print("🔴", $0) }
	.disposed(by: bag)
🔵 next(0)
🔵 next(1)
🔵 next(2)
🔵 next(3)
🔴 next(0)
🔵 next(4)
🔵 completed
🔴 next(1)
🔴 next(2)
🔴 next(3)
🔴 next(4)
🔴 completed
  • 여기에서 multicast(_:) 연산자를 추가하면, 아무일도 발생하지 않는다.
let subject = PublishSubject<Int>()

let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).take(5).multicast(subject) // 추가

source
	.subscribe { print("🔵", $0) }
	.disposed(by: bag)

source
	.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
	.subscribe { print("🔴", $0) }
   .disposed(by: bag)
  • 왜냐하면 connect()가 호출되지 않았기 때문이다.
  • 아래처럼 connect()를 호출하게 되면 결과는 다음과 같다.
let subject = PublishSubject<Int>()

let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).take(5).multicast(subject)

source
	.subscribe { print("🔵", $0) }
	.disposed(by: bag)

source
	.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
	.subscribe { print("🔴", $0) }
	.disposed(by: bag)

source.connect()
🔵 next(0)
🔵 next(1)
🔵 next(2)
🔴 next(2)
🔵 next(3)
🔴 next(3)
🔵 next(4)
🔴 next(4)
🔵 completed
🔴 completed
  • 결과를 보면 두번째 구독자는 0부터 받지 않고 2부터 받는다.
  • 이전에는 각 구독자마다 개별 Sequence이기 때문에 처음부터 방출되는 값을 받지만, 이제는 같은 Subject를 공유하기 때문에 두번째 구독자는 3초가 지난 시점부터 방출된 이벤트를 처리할 수 있는 것이다.

publish

  • multicast(_:)와 똑같은 기능을 하지만 연산자 내부에서 Subject를 생성하기 때문에 별도의 파라미터 없이 간단히 구현이 가능하다.
let bag = DisposeBag()
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).take(5).publish()

source
	.subscribe { print("🔵", $0) }
	.disposed(by: bag)

source
	.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
	.subscribe { print("🔴", $0) }
	.disposed(by: bag)

source.connect()
🔵 next(0)
🔵 next(1)
🔵 next(2)
🔴 next(2)
🔵 next(3)
🔴 next(3)
🔵 next(4)
🔴 next(4)
🔵 completed
🔴 completed

replay

  • 지금까지처럼 구독을 지연시킨 경우 Subject를 공유하기 때문에 두번째 구독은 일부 이벤트를 놓치게 된다.
  • 이런 경우에 만약 두번째 구독자도 모든 이벤트를 받고자 한다면 이 연산자를 사용한다.
  • 이 연산자는 내부적으로 Publish() 연산자와 동일하지만, 내부에서 생성되는 Subject가 PublishSubject가 아닌 ReplaySubject이기 때문에 buffer를 활용할 수 있다.
let bag = DisposeBag()
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).take(5).replay(5)

source
	.subscribe { print("🔵", $0) }
	.disposed(by: bag)

source
	.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
	.subscribe { print("🔴", $0) }
	.disposed(by: bag)

source.connect()
🔵 next(0)
🔵 next(1)
🔴 next(0)
🔴 next(1)
🔵 next(2)
🔴 next(2)
🔵 next(3)
🔴 next(3)
🔵 next(4)
🔴 next(4)
🔵 completed
🔴 completed
  • 이는 multicast(_:) 연산자에 직접 ReplaySubject를 전달한 것과 똑같다.

refCount

  • 지금까지의 Sharing Operator는 모두 ObservableType에 구현되어 있지만, refCount의 경우는 ConnectableObservableType에 구현되어 있기 때문에 ConnectableObservable에서만 사용할 수 있다.
  • 아래의 코드에서는 connect() 메서드가 실행되면서 끊임없이 이벤트를 방출하게 된다.
let bag = DisposeBag()
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).debug().publish()

let observer1 = source
    .subscribe { print("🔵", $0) }

source.connect()

DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    observer1.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
    let observer2 = source.subscribe { print("🔴", $0) }
    
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        observer2.dispose()
    }
}
2019-12-17 14:28:57.836: refCount.playground:31 (__lldb_expr_31) -> subscribed
2019-12-17 14:28:58.851: refCount.playground:31 (__lldb_expr_31) -> Event next(0)
🔵 next(0)
2019-12-17 14:28:59.852: refCount.playground:31 (__lldb_expr_31) -> Event next(1)
🔵 next(1)
2019-12-17 14:29:00.851: refCount.playground:31 (__lldb_expr_31) -> Event next(2)
🔵 next(2)
2019-12-17 14:29:01.852: refCount.playground:31 (__lldb_expr_31) -> Event next(3)
2019-12-17 14:29:02.852: refCount.playground:31 (__lldb_expr_31) -> Event next(4)
2019-12-17 14:29:03.852: refCount.playground:31 (__lldb_expr_31) -> Event next(5)
2019-12-17 14:29:04.852: refCount.playground:31 (__lldb_expr_31) -> Event next(6)
2019-12-17 14:29:05.852: refCount.playground:31 (__lldb_expr_31) -> Event next(7)
🔴 next(7)
2019-12-17 14:29:06.852: refCount.playground:31 (__lldb_expr_31) -> Event next(8)
🔴 next(8)
2019-12-17 14:29:07.852: refCount.playground:31 (__lldb_expr_31) -> Event next(9)
🔴 next(9)
2019-12-17 14:29:08.852: refCount.playground:31 (__lldb_expr_31) -> Event next(10)
2019-12-17 14:29:09.851: refCount.playground:31 (__lldb_expr_31) -> Event next(11)
2019-12-17 14:29:10.852: refCount.playground:31 (__lldb_expr_31) -> Event next(12)
2019-12-17 14:29:11.851: refCount.playground:31 (__lldb_expr_31) -> Event next(13)
2019-12-17 14:29:12.851: refCount.playground:31 (__lldb_expr_31) -> Event next(14)
2019-12-17 14:29:13.851: refCount.playground:31 (__lldb_expr_31) -> Event next(15)
.
.
.
  • 반면 refCount() 메서드는 구독이 시작되면 내부적으로 connect() 메서드를 호출하기 때문에 명시적으로 connect() 메서드를 호출할 수 없다.
  • 또한 구독자가 없으면 자동으로 이벤트는 중지된다.
  • 새로운 구독자가 생기면 Sequence를 다시 시작한다.
  • 직접 dispose()를 호출하거나, take(_:) 연산자 등을 통해 직접 관리해야할 부분을 알아서 해준다.
let bag = DisposeBag()
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).debug().publish().refCount()

let observer1 = source
    .subscribe { print("🔵", $0) }

DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    observer1.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
    let observer2 = source.subscribe { print("🔴", $0) }
    
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        observer2.dispose()
    }
}
2019-12-17 14:32:28.986: refCount.playground:31 (__lldb_expr_33) -> subscribed
2019-12-17 14:32:29.989: refCount.playground:31 (__lldb_expr_33) -> Event next(0)
🔵 next(0)
2019-12-17 14:32:30.989: refCount.playground:31 (__lldb_expr_33) -> Event next(1)
🔵 next(1)
2019-12-17 14:32:31.989: refCount.playground:31 (__lldb_expr_33) -> Event next(2)
🔵 next(2)
2019-12-17 14:32:32.282: refCount.playground:31 (__lldb_expr_33) -> isDisposed
2019-12-17 14:32:36.699: refCount.playground:31 (__lldb_expr_33) -> subscribed
2019-12-17 14:32:37.699: refCount.playground:31 (__lldb_expr_33) -> Event next(0)
🔴 next(0)
2019-12-17 14:32:38.700: refCount.playground:31 (__lldb_expr_33) -> Event next(1)
🔴 next(1)
2019-12-17 14:32:39.699: refCount.playground:31 (__lldb_expr_33) -> Event next(2)
🔴 next(2)
2019-12-17 14:32:39.993: refCount.playground:31 (__lldb_expr_33) -> isDisposed

share

  • 이 연산자는 multicast(_:) 연산자와 refCount()를 합친 종합 선물과도 같은 연산자이다.
  • 이 연산자는 2개의 파라미터를 받는데 첫째는 replay는 기본값이 0으로, 0보다 크다면 multicast(_:) 내부에서 ReplaySubject를 생성하므로 replay(_:) 연산자와 같고, 기본값이라면 내부에서 PublishSubject를 생성하므로 publish() 연산자와 같다.
  • 두번째 파라미터인 scope는 생명주기로 .whileConnected를 택하면 refCount() 처럼 구독이 시작되면 connect() 되었다가 구독이 끝나면 끝이나고, 반면 .forever를 택하면 multicast(_:) 연산자처럼 하나의 Subject를 공유한다. 기본값은 .whileConnected다.
  • 하나씩 살펴보자
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).debug().share()

let observer1 = source
    .subscribe { print("🔵", $0) }

let observer2 = source
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🔴", $0) }

DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    observer1.dispose()
    observer2.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
    let observer3 = source.subscribe { print("⚫️", $0) }
    
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        observer3.dispose()
    }
}
  • 위처럼 파라미터없이 호출하게되는 경우에는 publish()refCount()의 조합이다.
  • 따라서 observer2는 3초 지연된 시점부터 이벤트를 처리할 수 있고, observer3은 connect()된 시점인 7초 후 부터 새로운 Sequence를 받는다.
2019-12-17 14:43:05.094: share.playground:31 (__lldb_expr_37) -> subscribed
2019-12-17 14:43:06.098: share.playground:31 (__lldb_expr_37) -> Event next(0)
🔵 next(0)
2019-12-17 14:43:07.098: share.playground:31 (__lldb_expr_37) -> Event next(1)
🔵 next(1)
2019-12-17 14:43:08.097: share.playground:31 (__lldb_expr_37) -> Event next(2)
🔵 next(2)
🔴 next(0)
🔴 next(1)
🔴 next(2)
2019-12-17 14:43:09.098: share.playground:31 (__lldb_expr_37) -> Event next(3)
🔵 next(3)
🔴 next(3)
2019-12-17 14:43:10.097: share.playground:31 (__lldb_expr_37) -> Event next(4)
🔵 next(4)
🔴 next(4)
2019-12-17 14:43:10.609: share.playground:31 (__lldb_expr_37) -> isDisposed
2019-12-17 14:43:12.809: share.playground:31 (__lldb_expr_37) -> subscribed
2019-12-17 14:43:13.809: share.playground:31 (__lldb_expr_37) -> Event next(0)
⚫️ next(0)
2019-12-17 14:43:14.809: share.playground:31 (__lldb_expr_37) -> Event next(1)
⚫️ next(1)
2019-12-17 14:43:15.809: share.playground:31 (__lldb_expr_37) -> Event next(2)
⚫️ next(2)
2019-12-17 14:43:16.103: share.playground:31 (__lldb_expr_37) -> isDisposed

  • 이제 replay 값을 추가해보자.
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).debug().share(replay: 5)

let observer1 = source
    .subscribe { print("🔵", $0) }

let observer2 = source
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🔴", $0) }

DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    observer1.dispose()
    observer2.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
    let observer3 = source.subscribe { print("⚫️", $0) }
    
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        observer3.dispose()
    }
}
2019-12-17 15:33:14.376: share.playground:30 (__lldb_expr_39) -> subscribed
2019-12-17 15:33:15.392: share.playground:30 (__lldb_expr_39) -> Event next(0)
🔵 next(0)
2019-12-17 15:33:16.386: share.playground:30 (__lldb_expr_39) -> Event next(1)
🔵 next(1)
2019-12-17 15:33:17.387: share.playground:30 (__lldb_expr_39) -> Event next(2)
🔵 next(2)
🔴 next(0)
🔴 next(1)
🔴 next(2)
2019-12-17 15:33:18.386: share.playground:30 (__lldb_expr_39) -> Event next(3)
🔵 next(3)
🔴 next(3)
2019-12-17 15:33:19.387: share.playground:30 (__lldb_expr_39) -> Event next(4)
🔵 next(4)
🔴 next(4)
2019-12-17 15:33:19.878: share.playground:30 (__lldb_expr_39) -> isDisposed
2019-12-17 15:33:22.079: share.playground:30 (__lldb_expr_39) -> subscribed
2019-12-17 15:33:23.080: share.playground:30 (__lldb_expr_39) -> Event next(0)
⚫️ next(0)
2019-12-17 15:33:24.079: share.playground:30 (__lldb_expr_39) -> Event next(1)
⚫️ next(1)
2019-12-17 15:33:25.080: share.playground:30 (__lldb_expr_39) -> Event next(2)
⚫️ next(2)
2019-12-17 15:33:25.360: share.playground:30 (__lldb_expr_39) -> isDisposed
  • 결과를 보면, 이벤트가 buffer에 저장되기 때문에 observer2도 0부터 이벤트를 받을 수 있다.
  • 하지만 observer3은 새로운 Sequence이기 때문에 구독시점에 하나의 이벤트만 받을 수 있다.
  • 마지막으로 생명주기를 바꿔보자.
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).debug().share(replay: 5, scope: .forever)

let observer1 = source
    .subscribe { print("🔵", $0) }

let observer2 = source
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🔴", $0) }

DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    observer1.dispose()
    observer2.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
    let observer3 = source.subscribe { print("⚫️", $0) }
    
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        observer3.dispose()
    }
}
2019-12-17 15:36:14.702: share.playground:30 (__lldb_expr_41) -> subscribed
2019-12-17 15:36:15.705: share.playground:30 (__lldb_expr_41) -> Event next(0)
🔵 next(0)
2019-12-17 15:36:16.704: share.playground:30 (__lldb_expr_41) -> Event next(1)
🔵 next(1)
2019-12-17 15:36:17.705: share.playground:30 (__lldb_expr_41) -> Event next(2)
🔵 next(2)
🔴 next(0)
🔴 next(1)
🔴 next(2)
2019-12-17 15:36:18.704: share.playground:30 (__lldb_expr_41) -> Event next(3)
🔵 next(3)
🔴 next(3)
2019-12-17 15:36:19.705: share.playground:30 (__lldb_expr_41) -> Event next(4)
🔵 next(4)
🔴 next(4)
2019-12-17 15:36:20.230: share.playground:30 (__lldb_expr_41) -> isDisposed
⚫️ next(0)
⚫️ next(1)
⚫️ next(2)
⚫️ next(3)
⚫️ next(4)
2019-12-17 15:36:22.433: share.playground:30 (__lldb_expr_41) -> subscribed
2019-12-17 15:36:23.434: share.playground:30 (__lldb_expr_41) -> Event next(0)
⚫️ next(0)
2019-12-17 15:36:24.434: share.playground:30 (__lldb_expr_41) -> Event next(1)
⚫️ next(1)
2019-12-17 15:36:25.433: share.playground:30 (__lldb_expr_41) -> Event next(2)
⚫️ next(2)
2019-12-17 15:36:25.733: share.playground:30 (__lldb_expr_41) -> isDisposed
  • observer3의 결과가 조금 이상하다.
  • buffer로 지정된 만큼 이벤트를 받지만, 이후부터는 다시 0부터 이벤트를 받는다.
  • 헷갈리면 안될 개념으로 Subject를 공유하는 것일뿐 Sequence를 공유하는 것이 아니다.
  • 다시 정리하자면 refCount()는 새로운 구독자가 생기면 connect()를 호출하여 Subject가 생성된다.
  • 그래서 connect() 마다 새로운 Subject가 생성된다.
  • 그런데 .forever의 경우 새로운 Subject가 생성되는 것은 아니고 기존 Subject를 공유하지만, Sequence가 유지되는 것은 아니기 때문에 이 같은 결과가 나온다.