let observable = Observable.from([1,2,3]) observable.do { value in print(value) // 값 수신과 동시에 실행 } afterNext: { value in print(value) // 값 수신 직후 실행 } onError: { error in print(error) // 에러가 나타날 때 실행 } afterError: { error in print(error) // 에러로 인해 스트림이 종료된 후 실행 } onCompleted: { print("complete") // complete 종료와 함께 실행 } afterCompleted: { print("afterComplete") // complete 직후 실행 } onSubscribe: { print("subscribe") // subscribe 이전 실행 } onSubscribed: { print("subscribed") // subscribe 직후 실행 } onDispose: { print("disposed") // 어떠한 이유로든 옵저버블이 dispose되고 나서 실행 }
예외 처리
RxJava와의 차이점
RxSwift에서 Observable 자체에 구현된 예외처리 메소드는 없다.
예외 처리는 실패하지 않는 옵저버블인 Driver, Single등에 구현되어 있으며 asDriver()등의 메소드를 통해 예외 처리와 함께 변환할 수 있다.
Driver, Single은 Main Thread에서만 실행된다.
onErrorReturn()
// 1/4 확률로 에러를 던진다func troubleMaker(val: Int) throws -> Int { let toss = Bool.random() && Bool.random() if toss { throw NSError() } return val}
에러 처리 시 무조건 지정한 값을 발행하고 종료된다.
Observable.from([1,2,3,4,5]) .map { value in return try self.troubleMaker(val: value) }.do(onError: { _ in print("!!") }) .asDriver(onErrorJustReturn: -1) .drive { result in print(result) }.disposed(by: self.disposeBag)// 1 2 3 !! -1
onErrorDriveWith()
에러가 난 경우 다른 Driver로 대체하여 구독한다. Driver.empty()등을 사용해 안전하게 예외 처리가 가능하다.
let recovery = Observable.from([6,7,8,9,10]).asDriver(onErrorJustReturn: 0)Observable.from([1,2,3,4,5]) .map { value in return try self.troubleMaker(val: value) }.do(onError: { _ in print("!!") }) .asDriver(onErrorDriveWith: self.recovery) .drive { result in print(result) }.disposed(by: self.disposeBag)// 1 2 3 !! 6 7 8 9 10
retry()
- 에러 시 반복 회수를 지정할 수 있으며, 인자 없이 사용하면 무제한 반복한다.Observable.from([1,2,3,4,5,6,7,8,9,10]) .map { elem in return try self.troubleMaker(val: elem) }.do(onError: { _ in print("!!") }) .retry(3) // 3번까지 재시도 .bind { print($0) } .disposed(by: self.disposeBag)// 1 2 3 !! 1 2 !! 1 2 3 4 !! 1 !!
retry(when:)
retryHandler를 주면 retryHandler가 이벤트를 발행할 때마다 구독을 재시도한다.
let retryHandler = Observable<Int>.interval(.seconds(3), scheduler: MainScheduler.instance)Observable.from([1,2,3,4,5,6,7,8,9,10]) .map { elem in return try self.troubleMaker(val: elem) }.do(onError: { _ in print("!!") }) .retry(when: { _ in return retryHandler}) .bind { print ($0) } .disposed(by: self.disposeBag)// 1 !! 1 2 !! ...
흐름 제어
sample()
RxSwift에서는 시간과 관련 없이 두 번째 옵저버블이 이벤트를 발행할 때 첫 번째 옵저버블의 최근 데이터를 발행한다.
let tap = PublishSubject<Void>()let typingText = PublishSubject<String>()typingText.sample(tap) .bind { value in print(value) }.disposed(by: self.disposeBag)typingText.onNext("ASD") // Nothing printedtap.onNext(()) // ASD
buffer()와 유사하지만, 배열이 아닌 Cold Observable의 형태로 내려보내준다.
map()과 flatMap()의 관계
let observable = Observable.from([1,2,3,4,5,6,7,8,9,10])observable.window(timeSpan: .seconds(1), count: 3, scheduler: MainScheduler.instance) .bind { ob in ob.bind { value in print(value) }.disposed(by: self.disposeBag) }.disposed(by: self.disposeBag)// 1,2,3,4,5,6,7,8,9,10
let safeButton = UIButton()// throttleFirstsafeButton.rx.tap .throttle(.milliseconds(600), latest: false, scheduler: MainScheduler.instance) .bind { _ in self.moveNext() } // 처음 이벤트 전달 .disposed(by: disposeBag)// throttleLastsafeButton.rx.tap .throttle(.milliseconds(600), latest: true, scheduler: MainScheduler.instance) .bind { _ in self.moveNext() } // 나중 이벤트 전달 .disposed(by: disposeBag)
debounce()
throttle과 비슷하나 새 이벤트가 내려올 때마다 시간이 연장된다.
let searchText = PublishSubject<String>()searchText.debounce(.milliseconds(600), scheduler: MainScheduler.instance) .flatMap { self.networking() } .bind { text in print(text) } // 나중 이벤트 전달 .disposed(by: disposeBag)