ignoreElements

  • ignoreElements() 연산자는 모든 이벤트를 무시하고 completed 혹은 error 이벤트만을 전달한다.
  • 주로 이벤트의 성공, 실패 여부에만 관심이 있는 경우 사용한다.
let observable = Observable.from([1, 2, 3, 4, 5, 6, 7])
observable
    .ignoreElements()
    .subscribe { print($0) } // completed

elementAt

  • elementAt(_:) 연산자는 특정 인덱스에 위치한 값을 전달한다.
Observable
    .from([1, 2, 3, 4, 5])
    .elementAt(1)
    .subscribe { print($0) } // next(2) -> completed

filter

  • filter(_:) 연산자는 Swift의 고차함수와 완전히 똑같다.
Observable
    .from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    .filter { $0.isMultiple(of: 3) }
    .subscribe { print($0) } // next(3) -> next(6) -> next(9) -> completed

skip

  • skip(_:) 연산자는 take(_:) 연산자와 반대로 파라미터로 전달된 정수만큼 skip 한다.
Observable
    .from([1, 2, 3, 4, 5, 6])
    .skip(5) // 총 5회까지 무시
    .subscribe { print($0) } // next(6) -> completed

skipWhile

  • skipWhile(_:) 연산자는 클로져에서 true를 리턴하는 동안 스킵한다.
  • 한 번 false 리턴하게 되면 더 이상 조건을 판단하지 않는다.
Observable
    .from([1, 2, 3, 4, 5, 6])
    .skipWhile({ $0 <= 4 })
    .subscribe { print($0) } // next(5) -> next(6) -> completed

skipUntil

  • skipUntil(_:) 연산자는 Observable을 파라미터로 받는데, 이 Observable이 next 이벤트를 전달하기 전까지 원본 Observable이 전달하는 이벤트를 무시한다.
  • 그래서 파라미터로 전달하는 Observable는 trigger
let subject = PublishSubject<Int>()
let trigger = PublishSubject<Int>()

subject
    .skipUntil(trigger)
    .subscribe { print($0) }

subject.onNext(1) // 무시
trigger.onNext(5) // 무시 (trigger가 next 이벤트를 전달했지만 publishPusbject는 이미 이벤트가 업)
subject.onNext(2) // next(2)

take

  • take(_:) 연산자는 skip(_:) 연산자와 반대로 파라미터로 전달된 정수만큼만 전달한다.
Observable
    .from([1, 2, 3, 4, 5])
    .take(2)
    .subscribe { print($0) } // next(1) -> next(2) -> completed

takeWhile

  • skipWhile(_:) 연산자와 반대로 조건이 false가 되기 전까지 모든 이벤트를 전달한다.
  • 한 번 false를 리턴하게 되면 더 이상 값을 전달하지 않는다.
Observable
    .from([1, 2, 3, 4, 5])
    .takeWhile({ $0 <= 3 })
    .subscribe { print($0) } // next(1) -> next(2) -> next(3) -> completed

takeUntil

  • skipUntil(_:) 연산자와 반대로 trigger가 next 이벤트를 전달하기 전까지 모든 이벤트를 전달한다.
let subject = PublishSubject<Int>()
let trigger = PublishSubject<Int>()

subject
    .takeUntil(trigger)
    .subscribe { print($0) }

subject.onNext(1) // next(1)
subject.onNext(2) // next(2)
trigger.onNext(5) // completed
subject.onNext(3) // 무시

takeLast

  • takeLast(_:) 연산자는 파라미터로 전달된 정수만큼 가지고 있다가 Observable이 completed가 되는 순간 전달한다.
  • 반대로 error 이벤트는 error만 전달한다.
let subject = PublishSubject<Int>()
subject
    .takeLast(3)
    .subscribe { print($0) }

subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.onNext(4)
subject.onCompleted() // next(2) -> next(3) -> next(4) -> completed

enum CustomError: Error {
    case error
}

let subject2 = PublishSubject<Int>()
subject2
    .takeLast(3)
    .subscribe { print($0) }

subject2.onNext(1)
subject2.onNext(2)
subject2.onNext(3)
subject2.onNext(4)
subject2.onError(CustomError.error) // error(error)

single

  • 이름처럼 하나의 요소만 방출하고, 만약 여러 요소가 방출되는 경우 에러가 발생한다.
  • 결국 하나의 요소만 방출하는 것을 보장한다.
Observable
    .just("s")
    .single()
    .subscribe { print($0) } // next(s) -> completed

Observable
    .of("s", "ss")
    .single()
    .subscribe { print($0) } // next(s) -> error(Sequence contains more than one element.)
  • 또 파라미터로 조건을 추가할 수 있다.
Observable
    .of("s", "ss")
    .single({ $0 == "ss" })
    .subscribe { print($0) } // next(ss) -> completed
  • single(_:) 연산자는 하나의 요소를 방출하고 바로 completed 되지 않고 기다린다.
  • 만약 하나의 요소를 방출하고 next 이벤트가 전달된다면 error 이벤트를 전달하고, completed 된 경우에만 completed를 전달한다.
let subject = PublishSubject<Int>()

subject
    .single()
    .subscribe { print($0) }

subject.onNext(1) // next(1)
subject.onNext(2) // error(Sequence contains more than one element.)

distinctUntilChanged

  • 동일한 항목이 연속적으로 방출되지 않도록 방지하는 연산자이다.
  • 바로 전 방출된 요소가 동일한지 여부만 신경쓴다.
Observable
    .from([1, 1, 1, 1, 1, 2, 1, 2, 2, 1, 1, 2])
    .distinctUntilChanged()
    .subscribe { print($0) } // next(1) -> next(2) -> next(1) -> next(2) -> next(1) -> next(2) -> completed

debounce

  • 짧은 시간 반복적으로 방출되는 요소를 제어한다.
  • 첫 번째 이벤트가 방출된 시점부터 파라미터로 받은 시간동안 방출되는 요소가 없으면 마지막으로 방출된 요소가 전달되고, 만약 지정된 시간보다 짧은 시간에 방출된 요소가 있다면 타이머를 초기화하고 다시 지정된 시간이 지날때까지 마지막 이벤트를 가지고 기다린다.
  • 마지막 이벤트를 방출하면, 마찬가지로 타이머를 다시 초기화한다.
  • 실 사용 사례로 검색 api 호출시 사용하면 너무 많은 텍스트 수정으로 인한 문제를 방지할 수 있다.
let buttonTap = Observable<String>.create { observer in
    DispatchQueue.global().async {
        for i in 1...10 {
            observer.onNext("Tap \(i)")
            Thread.sleep(forTimeInterval: 0.3) // 0.3초 주기마다 Tap \(i) 이벤트 전달, debounce의 파라미터가 1초이기 때문에 모두 무시
        }
        
        Thread.sleep(forTimeInterval: 1) // 여기에서 1초가 지났으므로 마지막 이벤트인 next(Tap 10)이 전달
        
        for i in 11...20 {
            observer.onNext("Tap \(i)")
            Thread.sleep(forTimeInterval: 0.5) // 0.5초 주기마다 Tap \(i) 이벤트 전달, debounce의 파라미터가 1초이기 때문에 모두 무시
        }
        // 이후 1초 동안 새로운 이벤트가 없기 때문에 마지막 이벤트 next(Tap 20)이 전달되고 completed 된다.
        observer.onCompleted()
    }
    
    return Disposables.create {
        
    }
}

buttonTap
    .debounce(.seconds(1), scheduler: MainScheduler.instance) // 마지막 이벤트가 방출된지 1초가 지나야만 이벤트를 전달한다.
    .subscribe { print($0) } // next(Tap 10) -> next(Tap 20) -> completed

throttle

  • next 이벤트를 지정된 주기마다 구독자에게 전달한다.
  • throttle(_:latest:scheduler:) 연산자의 두번째 파라미터에 따라 큰 차이가 있다.
  • latest가 true인 경우 지정된 주기를 반드시 지킨다. 반면 false인 경우에는 주기가 지난 후 새로운 이벤트가 전달되기까지 기다리기 때문에 1초 이상 기다릴 수 있다.
Observable<Int>.interval(.milliseconds(300), scheduler: MainScheduler.instance)
    .take(5)
    .throttle(.seconds(2), latest: true, scheduler: MainScheduler.instance)
    .subscribe { print($0) } // next(0) -> next(4) -> completed

Observable<Int>.interval(.milliseconds(300), scheduler: MainScheduler.instance)
    .take(5)
    .throttle(.seconds(2), latest: false, scheduler: MainScheduler.instance)
    .subscribe { print($0) } // next(0) -> completed