startWith

  • 말 그대로 시작값을 설정하는 방식이다.
  • Observable 시퀀스 앞에 새로운 요소를 추가할 수 있으며, 가변 파라미터이기 때문에 여러 값을 넣을 수 있다.
let number = [1, 2, 3, 4, 5]
Observable
    .from(numbers)
    .startWith(0, -1, -2)
    .startWith(-3)
    .subscribe { print($0) } // next(-3) -> next(0) -> next(-1) -> next(-2) -> next(1) -> next(2) -> next(3) -> next(4) -> next(5) -> completed

concat

  • 2개의 Observable을 연결할때 사용한다.
let numbers = Observable.from(["1", "2", "3"])
let characters = Observable.from(["ㄱ", "ㄴ", "ㄷ"])

Observable.concat([fruits, animals])
    .subscribe { print($0) } // next(1) -> next(2) -> next(3) -> next(ㄱ) -> next(ㄴ) -> next(ㄷ) -> completed

numbers.concat(characters)
    .subscribe { print($0) } // next(1) -> next(2) -> next(3) -> next(ㄱ) -> next(ㄴ) -> next(ㄷ) -> completed
  • 위처럼 두 가지 방법으로 사용할 수 있다.

merge

  • 여러 Observable에서 방출하는 요소를 하나의 Observable에서 방출하도록 병합한다.
  • concat(_:) 연산자와 비슷하지만 concat(_:) 연산자의 경우 첫 번째 Observable이 completed가 되면 두번째 Observable의 요소를 방출해서 이어붙이는데 반해, merge(_:) 연산자는 두개의 Observable을 하나로 합쳐 순서대로 방출한다.
let oddNumbers = BehaviorSubject(value: 1)
let evenNumbers = BehaviorSubject(value: 2)

let source = Observable.of(oddNumbers, evenNumbers)
source
    .merge()
    .subscribe { print($0) } // next(1) -> next(2)

oddNumbers.onNext(3) // next(3)
evenNumbers.onNext(4) // next(4)
evenNumbers.onNext(6) // next(5)
oddNumbers.onNext(5) // next(6)
oddNumbers.onCompleted()
oddNumbers.onNext(7) // 무시
evenNumbers.onNext(8) // next(8)
evenNumbers.onCompleted() // completed
  • 에러가 발생하면
enum MyError: Error {
   case error
}

let oddNumbers = BehaviorSubject(value: 1)
let evenNumbers = BehaviorSubject(value: 2)

let source = Observable.of(oddNumbers, evenNumbers)
source
    .merge()
    .subscribe { print($0) } // next(1) -> next(2)

oddNumbers.onNext(3) // next(3)
evenNumbers.onNext(4) // next(4)
evenNumbers.onNext(6) // next(5)
oddNumbers.onNext(5) // next(6)
oddNumbers.onError(MyError.error) // error(error)
oddNumbers.onNext(7) // 무시
evenNumbers.onNext(8) // 무시
evenNumbers.onCompleted() // 무시
  • 그리고 merge(_:) 연산자에는 병합가능한 최대 숫자에 제한이 없다.
  • 만약 제한을 두고 싶다면 merge(maxConcurrent:)를 사용하면 된다.
let oddNumbers = BehaviorSubject(value: 1)
let evenNumbers = BehaviorSubject(value: 2)
let negativeNumbers = BehaviorSubject(value: -1)

let source = Observable.of(oddNumbers, evenNumbers)
source
    .merge(maxConcurrent: 1)
    .subscribe { print($0) } // next(1)

oddNumbers.onNext(3) // next(3)
evenNumbers.onNext(4) // 무시
negativeNumbers.onNext(-2) // 무시

combineLatest

  • 소스 Observable에서 모두 이벤트가 발생한 시점부터, 그 이후의 각각의 소스 Observable이 방출하는 각각의 최신 이벤트를 결합한다.
let greetings = PublishSubject<String>()
let languages = PublishSubject<String>()

Observable
    .combineLatest(greetings, languages) { lhs, rhs -> String in
        return "\(lhs), \(rhs)"
    }
    .subscribe { print($0) }

greetings.onNext("Hi") // 무시
greetings.onNext("Hello") // 무시
languages.onNext("Swift") // next(Hello, Swift)
greetings.onNext("Hi~") // next(Hi~, Swift)
languages.onNext("RxSwift") // next(Hi~, RxSwift)

greetings.onCompleted() // 무시
greetings.onNext("Bye") // 무시
languages.onNext("Java") // next(Hi~, Java)
languages.onCompleted() // completed
  • 에러가 발생하면
enum MyError: Error {
   case error
}

greetings.onNext("Hi") // 무시
greetings.onNext("Hello") // 무시
languages.onNext("Swift") // next(Hello, Swift)
greetings.onNext("Hi~") // next(Hi~, Swift)
languages.onNext("RxSwift") // next(Hi~, RxSwift)

//greetings.onCompleted() // 무시
greetings.onError(MyError.error) // error(error)
greetings.onNext("Bye") // 무시
languages.onNext("Java") // 무시
languages.onCompleted() // 무시

zip

  • zip(_:) 연산자는 가장 최신 이벤트를 반드시 짝을 맞춰 방출합니다.
let numbers = PublishSubject<Int>()
let strings = PublishSubject<String>()

Observable
    .zip(numbers, strings, resultSelector: { "\($0) & \($1)" })
    .subscribe({ print($0) })
    .disposed(by: bag)

numbers.onNext(1) // 무시
numbers.onNext(2) // 무시
numbers.onNext(3) // 무시

strings.onNext("A") // next(1 & A)
numbers.onNext(4) // 무시
strings.onNext("B") // next(2 & B)

numbers.onCompleted()
strings.onNext("C") // next(3 & C)
strings.onNext("D") // next(4 & D)
strings.onNext("E") // 무시
strings.onCompleted() // completed
  • 에러가 발생하면
enum MyError: Error {
   case error
}

numbers.onNext(1) // 무시
numbers.onNext(2) // 무시
numbers.onNext(3) // 무시

strings.onNext("A") // next(1 & A)
numbers.onNext(4) // 무시
strings.onNext("B") // next(2 & B)

//numbers.onCompleted()
numbers.onError(MyError.error) // error(error)
strings.onNext("C") // 무시
strings.onNext("D") // 무시
strings.onNext("E") // 무시
strings.onCompleted() // 무시

withLatestFrom

  • triggerObservable.withLatestFrom(dataObservable) 형태로 사용된다.
  • trigger가 되는 Observable이 이벤트를 방출하면 가장 data Observable의 가장 최신 이벤트를 전달한다.
let trigger = PublishSubject<Void>()
let data = PublishSubject<String>()

trigger.withLatestFrom(data)
    .subscribe { print($0) }

data.onNext("Hello") // 무시
trigger.onNext(()) // next(Hello)
trigger.onNext(()) // next(Hello)
data.onNext("Hi") // 무시
trigger.onNext(()) // next(Hi)

data.onCompleted()
trigger.onNext(()) // next(Hi)
  • 만약 trigger가 completed 되면
data.onNext("Hello") // 무시
trigger.onNext(()) // next(Hello)
trigger.onNext(()) // next(Hello)
data.onNext("Hi") // 무시
trigger.onNext(()) // next(Hi)

//data.onCompleted()
trigger.onCompleted() // completed
trigger.onNext(()) // 무시
  • 에러가 발생하면
data.onNext("Hello") // 무시
trigger.onNext(()) // next(Hello)
trigger.onNext(()) // next(Hello)
data.onNext("Hi") // 무시
trigger.onNext(()) // next(Hi)

data.onError(MyError.error) // error(error)
trigger.onNext(()) // 무시

sample

  • withLatestFrom(_:) 연산자와 반대 형태인 dataObservable.sample(triggerObservable) 형태로 사용한다.
  • 하지만 withLatestFrom(_:) 연산자와 같이 trigger Observable에서 next 이벤트가 전달되면 data Observable의 최신 이벤트가 전달된다.
  • 그리고 sample(_:) 연산자의 경우 동일한 이벤트를 반복적으로 전달하지 않는 차이가 있다.
let trigger = PublishSubject<Void>()
let data = PublishSubject<String>()

data.sample(trigger)
    .subscribe { print($0) }

data.onNext("A") // 무시
data.onNext("B") // 무시
trigger.onNext(()) // next(B)
trigger.onNext(()) // 무시
data.onNext("C") // 무시
trigger.onNext(()) // next(C)
trigger.onNext(()) // 무시

data.onCompleted() // 무시
trigger.onNext(()) // completed
  • 에러인 경우
data.onNext("A") // 무시
data.onNext("B") // 무시
trigger.onNext(()) // next(B)
trigger.onNext(()) // 무시
data.onNext("C") // 무시
trigger.onNext(()) // next(C)
trigger.onNext(()) // 무시

//data.onCompleted()
data.onError(MyError.error) // error(error)
trigger.onNext(()) // 무시

switchLatest

  • 최신 Observable이 전달하는 이벤트를 전달한다.
let a = PublishSubject<String>()
let b = PublishSubject<String>()

let source = PublishSubject<Observable<String>>()

source.switchLatest()
    .subscribe { print($0) }
    .disposed(by: bag)

a.onNext("1") // 무시
b.onNext("b") // 무시

source.onNext(a)

a.onNext("2") // next(2)
b.onNext("b") // 무시

source.onNext(b)
a.onNext("3") // 무시
b.onNext("c") // next(c)

a.onCompleted() // 무시
b.onCompleted() // 무시
source.onCompleted() // completed
  • 에러인 경우
a.onNext("1") // 무시
b.onNext("b") // 무시

source.onNext(a)

a.onNext("2") // next(2)
b.onNext("b") // 무시

source.onNext(b)
a.onNext("3") // 무시
b.onNext("c") // next(c)

//a.onCompleted()
//b.onCompleted()
a.onError(MyError.error) // 무시
b.onError(MyError.error) // error(error)
source.onCompleted() // 무시

reduce

  • Swift의 고차함수를 떠올리면 이해하기 쉽다.
  • scan(_:, accumulator:)와 달리 중간 결과를 생략하고 최종 결과만 전달한다.
let o = Observable.range(start: 1, count: 5)

print("== scan")

o.scan(0, accumulator: +)
   .subscribe { print($0) } // next(1) -> next(3) -> next(6) -> next(10) -> next(15) -> completed

print("== reduce")

o.reduce(0, accumulator: +)
    .subscribe { print($0) } // next(15) -> completed