리액티브 연산자의 활용
interval()
- 일정 시간 간격으로 값을 발행한다.
- interval로 주기를 설정할 수 있으며, 시간 스케줄러를 정해야 한다.
- RxJava와 달리 RxSwift에서 최초 시간 지연을 위한 메소드는 timer()에 있다.
예제 코드
let timeLabel = UILabel()
Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.subscribe(onNext: { time in
let remainedTime = 300 - time
if remainedTime > 0 {
timeLabel.text = "\(remainedTime) 초 남았습니다."
} else {
timeLabel.text = "시간 초과"
}
})
timer()
- 인자로 받은 시간 지연 이후 0을 발행한다.
- period에 값을 넘겨주면 interval() 과 같이 작동한다.
예제 코드
let reminder = PublishSubject<Bool>()
// 30분후 발행
Observable<Int>.timer(.seconds(1800), scheduler: MainScheduler.instance)
.subscribe(onNext: { time in
reminder.onNext(true)
})
// 5초 후부터 1초 간격으로 발행
Observable<Int>.timer(.seconds(5), period: .seconds(1), scheduler: MainScheduler.instance)
.subscribe(onNext: { time in
print(time) // 0, 1, 2 ...
})
range()
- 0부터 count까지의 정수 값을 발행한다.
- 반복문을 대체할 수 있다.
예제 코드
let integerList = [1, 2, 3, 4, 5]
Observable<Int>.range(start: 0, count: integerList.count)
.subscribe(onNext: { index in
print(integerList[index]) // 1, 2, 3, 4, 5
}
})
intervalRange()
- interval과 range의 결합으로 range만큼의 값을 interval마다 발행한다.
- Rxswift에는 해당 기능이 없는데, Int형 Observable에서 take()를 이용하여 유사하게 구현할 수 있다.
예제 코드
let integerList = [1, 2, 3, 4, 5]
Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(integerList.count)
.subscribe(onNext: { index in
print(integerList[index]) // 1...2...3...4...5
})
defer()
- 옵저버블을 반환하는 연산자로, 옵저버블의 생성을 구독 시점까지 유예할 수 있다. 이때 새로운 옵저버블을 만들어 반환해 준다.
- 네트워킹, 디스크 접근 등 당장 초기화 시에 부담이 되는 작업이나 항상 최신 데이터가 필요할때 사용할 수 있다.
예제 코드
// 데이터를 받기까지 시간이 걸리는 옵저버블
func heavyProcess(from: String) -> Observable<Data> {
print("\(from): !!무거운 작업")
Thread.sleep(forTimeInterval: 1)
return Observable<Data>.just(Data())
}
let deferred: Observable<Data> = Observable.deferred {
return self.heavyProcess(from: "deferred")
}
print("deferred: 초기화 완료")
let normal = heavyProcess(from: "normal")
print("normal: 초기화 완료")
print("deferred: 구독 시작")
deferred.subscribe(onNext: { _ in
print("deferred: 데이터 수신")
})
print("normal: 구독 시작")
normal.subscribe(onNext: { _ in
print("normal: 데이터 수신")
})
print("deferred: 두번째 구독 시작")
deferred.subscribe(onNext: { _ in
print("deferred:두번째 데이터 수신")
})
print("normal: 두번째 구독 시작")
normal.subscribe (onNext:{ _ in
print("normal: 두번째 데이터 수신")
})
// defer()
// deferred: 초기화 완료
// normal: !!무거운 작업
// normal: 초기화 완료
// deferred: 구독 시작
// deferred: !!무거운 작업
// deferred: 데이터 수신
// normal: 구독 시작
// normal: 데이터 수신
// deferred: 두번째 구독 시작
// deferred: !!무거운 작업
// deferred:두번째 데이터 수신
// normal: 두번째 구독 시작
// normal: 두번째 데이터 수신
repeat()
- 하나의 이벤트를 지속적으로 발행해 준다.
- RxSwift에서는 옵저버블에 대한 reapeat()은 없다.
- 대신 repeatElement()가 있는데 배열 순회가 아닌 하나의 이벤트에 대한 발행이기 때문에 직접 구현해야 한다.
예제 코드
Observable.repeatElement(["1","2","3"])
.flatMap { arr in // concatMap
return Observable.from(arr)
}.subscribe(onNext: { value in
print(value)
})
concatMap(), switchMap()
concatMap()
- concatMap()은 flatMap과 유사하지만(이벤트 -> 옵저버블) 원래의 옵저버블에서 이벤트가 발행되었을때 이전의 데이터 흐름이 끝나지 않았으면 종료를 기다린다.
- 따라서 발행되는 옵저버블의 순서를 보장해 준다.
switchMap()
- switchMap()도 flatMap과 유사하지만 원래의 옵저버블에서 이벤트가 발행되었을때 이전의 데이터 흐름이 끝나지 않았다면 종료시키고 새 옵저버블을 발행한다.
- RxSwift에서는 flatMapLatest()라는 이름으로 구현되었다.
비교 예시
private func bgSensor() -> Observable<Int> {
let randomBG = Int.random(in: 100...200)
let randomTime = Int.random(in: 1...10)
print("발행: \(randomBG) mg/dL")
return Observable.just(randomBG).delay(.seconds(randomTime), scheduler: MainScheduler.instance)
}
let load = PublishSubject<Bool>()
let flatMapContainer = load.flatMap { _ in self.bgSensor() }
let concatMapContainer = load.concatMap { _ in return self.bgSensor() }
let switchMapContainer = load.flatMapLatest { _ in return self.bgSensor() }
flatMapContainer.subscribe(onNext:{ bg in
print("수신: \(bg) mg/dL")
})
concatMapContainer.subscribe(onNext:{ bg in
print("수신: \(bg) mg/dL")
})
switchMapContainer.subscribe(onNext:{ bg in
print("수신: \(bg) mg/dL")
})
load.onNext(true)
load.onNext(true)
load.onNext(true)
load.onNext(true)
load.onNext(true)
// flatMap()
// 발행: 176 mg/dL
// 발행: 113 mg/dL
// 발행: 185 mg/dL
// 발행: 189 mg/dL
// 발행: 194 mg/dL
// 수신: 189 mg/dL
// 수신: 113 mg/dL
// 수신: 185 mg/dL
// 수신: 194 mg/dL
// 수신: 176 mg/dL
// concatMap()
// 발행: 102 mg/dL
// 발행: 103 mg/dL
// 발행: 162 mg/dL
// 발행: 191 mg/dL
// 발행: 183 mg/dL
// 수신: 102 mg/dL
// 수신: 103 mg/dL
// 수신: 162 mg/dL
// 수신: 191 mg/dL
// 수신: 183 mg/dL
// flatmapLatest() - switchMap
// 발행: 140 mg/dL
// 발행: 156 mg/dL
// 발행: 133 mg/dL
// 발행: 126 mg/dL
// 발행: 145 mg/dL
// 수신: 145 mg/dL
groupBy()
- groupBy()는 하나의 옵저버블을 조건에 따라 여러 개의 옵저버블로 바꿔준다.
- 생성 함수에서 key가 될 수 있는 타입을 반환하고, 구독할 때 .key 프로퍼티를 이용해 해당 타입을 구별한다.
- 각각의 필터 없이 한번에 여러 번 구독이 가능하다.
예제 코드
enum BGType {
case bg
case cg
}
struct BGItem {
let type: BGType
let amount: Int
}
let bgList = [
BGItem(type: .bg, amount: 81),
BGItem(type: .cg, amount: 121),
BGItem(type: .cg, amount: 132),
BGItem(type: .bg, amount: 95),
BGItem(type: .bg, amount: 73),
BGItem(type: .cg, amount: 115)
]
// Observable<GroupedObservable<BGType, BGItem>>
let groupedBGItem = Observable<BGItem>.from(bgList).groupBy { bgItem -> BGType in
return bgItem.type
}
groupedBGItem.subscribe(onNext: { bgItemOb in
switch bgItemOb.key {
case .bg:
bgItemOb.subscribe(onNext: { bg in
print("bg amount: \(bg.amount)")
})
case .cg:
bgItemOb.subscribe(onNext: { cg in
print("cg amount: \(cg.amount)")
})
}
})
scan()
- reduce()와 비슷하지만 계산 중간 과정을 지속적으로 발행해 준다.
- 초깃값과 적용할 계산을 인자로 받는다.
예제 코드
Observable<Int>.from([1,2,3,4,5,6,7,8,9,10]).scan(0) { sum, value in
return sum + value
}.subscribe(onNext: { value in
print(value) // 1, 3, 6, 10, 15 ... 55
})
zip
- 각각 옵저버블로부터의 값을 결합해 발행한다.
- 한 쪽에서 발행하지 않으면 발행되지 않는다.
예제 코드 - 유닛 테스트(zip)
// 유닛 테스트
func zipExample() {
let response = PublishSubject<Bool>()
let expect = [true, true, false, true, false]
Observable.zip(response, Observable.from(expect)).bind { res, exp in
assert(res == exp)
}.disposed(by: self.disposeBag)
response.onNext(true)
response.onNext(true)
response.onNext(false)
response.onNext(true)
response.onNext(false)
response.onCompleted()
}
combineLatest
- 결합된 옵저버블 중 하나의 옵저버블에서 값이 발행되면 그 값과 다른 옵저버블의 최신값을 묶어서 발행한다.
- 각각의 옵저버블에서 값을 발행할 때마다 combineLatest에서 발행된다.
예제 코드 - 버튼 숨기기
func combineLatestExample() {
let isSGLabelHidden = BehaviorSubject<Bool>(value: true)
let sgLabel = UILabel()
let isSearchingBtnHidden = BehaviorSubject(value: false)
let isCgmUnusedBtnHidden = BehaviorSubject(value: false)
let isTransmitterBtnHidden = BehaviorSubject(value: false)
let isAllButtonHidden =
Observable.combineLatest(isSearchingBtnHidden,
isCgmUnusedBtnHidden,
isTransmitterBtnHidden)
.map { sb,cb, tb in return sb && cb && tb }
let isLabelHidden = Observable.combineLatest(isAllButtonHidden, isSGLabelHidden)
.map { buttonHidden, labelHidden -> Bool in
if buttonHidden {
return labelHidden
} else {
return true
}
}
isLabelHidden
.observe(on: MainScheduler.instance)
.bind { isHidden in
sgLabel.isHidden = isHidden
}.disposed(by: self.disposeBag)
// withLatestFrom
let a = PublishSubject<Bool>()
let b = PublishSubject<Bool>()
a.withLatestFrom(b) { ra, rb in
return ra == rb
}
}
merge, concat
- 두 연산자 모두 같은 종류의 옵저버블을 하나의 옵저버블로 결합해 준다.
- merge()는 각각의 옵저버블에서 발행한 값을 그대로 내려보내준다.
- concat()은 결합한 순서대로 앞의 옵저버블이 completion이 나면 그 뒤의 옵저버블이 값을 발행할 수 있다. 이전 값은 무시된다.
예제 코드 - ReactorKit(merge, concat)
struct BG {
enum BGType {
case bg, sg
}
let type: BGType
let amount: Int
}
func mergeConcatExample() {
struct State {
var isLabelHidden: Bool
var bgAmount: Int
}
enum Mutate {
case setData(data: Int)
case setLabel(isHidden: Bool)
}
func reduce(mutate: Mutate) {
var newState = originalState
switch mutate {
case .setLabel(let isHidden):
newState.isLabelHidden = isHidden
case .setData(let data):
newState.bgAmount = data
}
originalState = newState
}
var originalState = State(isLabelHidden: true, bgAmount: 0)
Observable<Mutate>.concat(Observable.just(.setData(data: 80)),
Observable.just(.setLabel(isHidden: false)))
.bind { mut in
reduce(mutate: mut)
}.disposed(by: self.disposeBag)
}
amb
- 두 옵저버블 중 값을 먼저 발행하는 옵저버블을 선택한다.
- 늦게 발행되는 옵저버블은 모두 무시한다.
- 게임(먼저 들어오는 사람만 허용), 여러 서버 접속(빠른 응답이 오는 서버를 사용) 등에 사용한다.
takeUntil
- 뒤에 물린 옵저버블이 값을 발행할 때까지만 앞의 옵저버블이 값을 발행한다.
- 뒤의 옵저버블이 발행되면 앞의 옵저버블은 completion된다.
예제 코드 - 서버 네트워킹(takeUntil)
func takeUntilExample() {
let confirmButton = UIButton()
confirmButton.isHidden = true
let cancelAction = PublishSubject<Void>()
let startAction = PublishSubject<Void>()
let response = Observable.just(1).delay(.seconds(3), scheduler: MainScheduler.instance)
let completed = PublishSubject<Void>()
startAction
.flatMap { _ in response }
.take(until: cancelAction)
.bind { data in
completed.onNext(())
}.disposed(by: self.disposeBag)
completed.bind {
print("success")
}.disposed(by: self.disposeBag)
cancelAction.bind {
print("canceled")
}.disposed(by: self.disposeBag)
startAction.onNext(()) // success
startAction.onNext(())
cancelAction.onNext(()) // canceled
}
skipUntil
- 뒤의 옵저버블이 발행되는 이후로부터 앞의 옵저버블이 값을 발행한다.
- 각각의 옵저버블의 completion은 연관이 없다.
예제 코드 - 메모 저장(skipUntil)
func skipUntilExample() {
let button = UIButton()
let goNext = PublishSubject<Void>()
let textView = PublishSubject<String>()
button.rx.tap
.bind(to: goNext)
.disposed(by: self.disposeBag)
goNext
.bind {
print("go next")
}.disposed(by: self.disposeBag)
goNext.skip(until: textView)
.bind {
print("save new memo")
}.disposed(by: self.disposeBag)
goNext.onNext(()) // go Next
// textView 입력 후
goNext.onNext(()) // save new memo, go Next
// skip 없이
let isDirty = BehaviorSubject<Bool>(value: false)
textView
.map { _ in return true }
.bind(to: isDirty)
.disposed(by: self.disposeBag)
button.rx.tap
.bind(to: goNext)
.disposed(by: self.disposeBag)
goNext.bind {
if try! isDirty.value() == true {
print("save new memo")
}
print("go next")
}.disposed(by: self.disposeBag)
goNext.onNext(()) // go Next
// textView 입력 후
goNext.onNext(()) // save new memo, go Next
}
All, 수학 함수
예제 코드 - All() 구현
func allExample() {
let pub = PublishSubject<Int>()
pub.reduce(true) { acc, value in
acc && (value > 0) // 조건식
}.bind { result in
print(result)
}.disposed(by: self.disposeBag)
}
예제 코드 - 수학 함수 구현
func mathExample() {
let pub = PublishSubject<Int>()
pub.scan((count: 0, sum: 0)) { (acc, next) -> (count: Int, sum: Int) in
return (count: acc.count + 1, sum: acc.sum + next)
}.map { count, sum -> Int in
return sum / count
}.bind { avg in
print(avg)
}.disposed(by: self.disposeBag)
pub.scan(0) { sum, value in
let result = sum + value
return result
}.bind { sum in
print(sum)
}.disposed(by: self.disposeBag)
}
delay, timeInterval
- delay: 옵저버블의 값이 발행되기까지의 시간을 늦춘다.
- timeInterval: 옵저버블이 값을 발행할 때마다 이전에 발행한 시간 차이를 같이 발행한다. RxSwift에서는 지원하지 않는다.
예제 코드 - 시계(delay, timeInterval)
func delayExample() {
let time = 300
let timer = Observable<Void>.just(()).delay(.seconds(time), scheduler: MainScheduler.instance)
timer.bind { _ in
print("time over")
}.disposed(by: self.disposeBag)
}