Practical Operator

리액티브 연산자의 활용

interval()

  • 일정 시간 간격으로 값을 발행한다.
  • interval로 주기를 설정할 수 있으며, 시간 스케줄러를 정해야 한다.
  • RxJava와 달리 RxSwift에서 최초 시간 지연을 위한 메소드는 timer()에 있다.

예제 코드


let timeLabel = UILabel()
Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .subscribe(onNext: { time in
        let remainedTime = 300 - time
        if remainedTime > 0 {
            timeLabel.text = "\(remainedTime) 초 남았습니다."
        } else {
            timeLabel.text = "시간 초과"
        }
    })

timer()

  • 인자로 받은 시간 지연 이후 0을 발행한다.
  • period에 값을 넘겨주면 interval() 과 같이 작동한다.

예제 코드

let reminder = PublishSubject<Bool>()

// 30분후 발행
Observable<Int>.timer(.seconds(1800), scheduler: MainScheduler.instance)
    .subscribe(onNext: { time in
        reminder.onNext(true)
    })

// 5초 후부터 1초 간격으로 발행
Observable<Int>.timer(.seconds(5), period: .seconds(1), scheduler: MainScheduler.instance)
    .subscribe(onNext: { time in
        print(time) // 0, 1, 2 ...
    })

range()

  • 0부터 count까지의 정수 값을 발행한다.
  • 반복문을 대체할 수 있다.

예제 코드


let integerList = [1, 2, 3, 4, 5]

Observable<Int>.range(start: 0, count: integerList.count)
    .subscribe(onNext: { index in
            print(integerList[index]) // 1, 2, 3, 4, 5
        }
    })

intervalRange()

  • interval과 range의 결합으로 range만큼의 값을 interval마다 발행한다.
  • Rxswift에는 해당 기능이 없는데, Int형 Observable에서 take()를 이용하여 유사하게 구현할 수 있다.

예제 코드


let integerList = [1, 2, 3, 4, 5]
Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(integerList.count)
    .subscribe(onNext: { index in
        print(integerList[index]) // 1...2...3...4...5
    })

defer()

  • 옵저버블을 반환하는 연산자로, 옵저버블의 생성을 구독 시점까지 유예할 수 있다. 이때 새로운 옵저버블을 만들어 반환해 준다.
  • 네트워킹, 디스크 접근 등 당장 초기화 시에 부담이 되는 작업이나 항상 최신 데이터가 필요할때 사용할 수 있다.

예제 코드


// 데이터를 받기까지 시간이 걸리는 옵저버블
func heavyProcess(from: String) -> Observable<Data> {
    print("\(from): !!무거운 작업")
    Thread.sleep(forTimeInterval: 1)
    return Observable<Data>.just(Data())
}
    
let deferred: Observable<Data> = Observable.deferred {
    return self.heavyProcess(from: "deferred")
}
print("deferred: 초기화 완료")

let normal = heavyProcess(from: "normal")
print("normal: 초기화 완료")

print("deferred: 구독 시작")
deferred.subscribe(onNext: { _ in
    print("deferred: 데이터 수신")
})

print("normal: 구독 시작")
normal.subscribe(onNext: { _ in
    print("normal: 데이터 수신")
})

print("deferred: 두번째 구독 시작")
deferred.subscribe(onNext: { _ in
    print("deferred:두번째 데이터 수신")
})

print("normal: 두번째 구독 시작")
normal.subscribe (onNext:{ _ in
    print("normal: 두번째 데이터 수신")
})

//    defer()
//    deferred: 초기화 완료
//    normal: !!무거운 작업
//    normal: 초기화 완료
//    deferred: 구독 시작
//    deferred: !!무거운 작업
//    deferred: 데이터 수신
//    normal: 구독 시작
//    normal: 데이터 수신
//    deferred: 두번째 구독 시작
//    deferred: !!무거운 작업
//    deferred:두번째 데이터 수신
//    normal: 두번째 구독 시작
//    normal: 두번째 데이터 수신

repeat()

  • 하나의 이벤트를 지속적으로 발행해 준다.
  • RxSwift에서는 옵저버블에 대한 reapeat()은 없다.
  • 대신 repeatElement()가 있는데 배열 순회가 아닌 하나의 이벤트에 대한 발행이기 때문에 직접 구현해야 한다.

예제 코드


Observable.repeatElement(["1","2","3"])
    .flatMap { arr in // concatMap
        return Observable.from(arr)
    }.subscribe(onNext: { value in
        print(value)
    })

concatMap(), switchMap()

concatMap()

  • concatMap()은 flatMap과 유사하지만(이벤트 -> 옵저버블) 원래의 옵저버블에서 이벤트가 발행되었을때 이전의 데이터 흐름이 끝나지 않았으면 종료를 기다린다.
  • 따라서 발행되는 옵저버블의 순서를 보장해 준다.

switchMap()

  • switchMap()도 flatMap과 유사하지만 원래의 옵저버블에서 이벤트가 발행되었을때 이전의 데이터 흐름이 끝나지 않았다면 종료시키고 새 옵저버블을 발행한다.
  • RxSwift에서는 flatMapLatest()라는 이름으로 구현되었다.

비교 예시


private func bgSensor() -> Observable<Int> {
    let randomBG = Int.random(in: 100...200)
    let randomTime = Int.random(in: 1...10)
    print("발행: \(randomBG) mg/dL")
    return Observable.just(randomBG).delay(.seconds(randomTime), scheduler: MainScheduler.instance)
}

let load = PublishSubject<Bool>()
        let flatMapContainer = load.flatMap { _ in self.bgSensor() }
        let concatMapContainer = load.concatMap { _ in return self.bgSensor() }
        let switchMapContainer = load.flatMapLatest { _ in return self.bgSensor() }

        flatMapContainer.subscribe(onNext:{ bg in
            print("수신: \(bg) mg/dL")
        })

        concatMapContainer.subscribe(onNext:{ bg in
            print("수신: \(bg) mg/dL")
        })
        
        switchMapContainer.subscribe(onNext:{ bg in
            print("수신: \(bg) mg/dL")
        })
        
        load.onNext(true)
        load.onNext(true)
        load.onNext(true)
        load.onNext(true)
        load.onNext(true)
        
//        flatMap()
//        발행: 176 mg/dL
//        발행: 113 mg/dL
//        발행: 185 mg/dL
//        발행: 189 mg/dL
//        발행: 194 mg/dL
                 
//        수신: 189 mg/dL
//        수신: 113 mg/dL
//        수신: 185 mg/dL
//        수신: 194 mg/dL
//        수신: 176 mg/dL
         
//        concatMap()
//        발행: 102 mg/dL
//        발행: 103 mg/dL
//        발행: 162 mg/dL
//        발행: 191 mg/dL
//        발행: 183 mg/dL
 
//        수신: 102 mg/dL
//        수신: 103 mg/dL
//        수신: 162 mg/dL
//        수신: 191 mg/dL
//        수신: 183 mg/dL
        
//        flatmapLatest() - switchMap
//        발행: 140 mg/dL
//        발행: 156 mg/dL
//        발행: 133 mg/dL
//        발행: 126 mg/dL
//        발행: 145 mg/dL
        
//        수신: 145 mg/dL

groupBy()

  • groupBy()는 하나의 옵저버블을 조건에 따라 여러 개의 옵저버블로 바꿔준다.
  • 생성 함수에서 key가 될 수 있는 타입을 반환하고, 구독할 때 .key 프로퍼티를 이용해 해당 타입을 구별한다.
  • 각각의 필터 없이 한번에 여러 번 구독이 가능하다.

예제 코드


enum BGType {
    case bg
    case cg
}

struct BGItem {
    let type: BGType
    let amount: Int
}

let bgList = [
    BGItem(type: .bg, amount: 81),
    BGItem(type: .cg, amount: 121),
    BGItem(type: .cg, amount: 132),
    BGItem(type: .bg, amount: 95),
    BGItem(type: .bg, amount: 73),
    BGItem(type: .cg, amount: 115)
]

// Observable<GroupedObservable<BGType, BGItem>>
let groupedBGItem = Observable<BGItem>.from(bgList).groupBy { bgItem -> BGType in
    return bgItem.type
}

groupedBGItem.subscribe(onNext: { bgItemOb in
    switch bgItemOb.key {
    case .bg:
        bgItemOb.subscribe(onNext: { bg in
            print("bg amount: \(bg.amount)")
        })
    case .cg:
        bgItemOb.subscribe(onNext: { cg in
            print("cg amount: \(cg.amount)")
        })
    }
})

scan()

  • reduce()와 비슷하지만 계산 중간 과정을 지속적으로 발행해 준다.
  • 초깃값과 적용할 계산을 인자로 받는다.

예제 코드


Observable<Int>.from([1,2,3,4,5,6,7,8,9,10]).scan(0) { sum, value in
    return sum + value
}.subscribe(onNext: { value in
    print(value) // 1, 3, 6, 10, 15 ... 55
})

zip

  • 각각 옵저버블로부터의 값을 결합해 발행한다.
  • 한 쪽에서 발행하지 않으면 발행되지 않는다.

예제 코드 - 유닛 테스트(zip)


// 유닛 테스트
func zipExample() {
    
    let response = PublishSubject<Bool>()
    let expect = [true, true, false, true, false]
    
    Observable.zip(response, Observable.from(expect)).bind { res, exp in
        assert(res == exp)
    }.disposed(by: self.disposeBag)
    
    response.onNext(true)
    response.onNext(true)
    response.onNext(false)
    response.onNext(true)
    response.onNext(false)
    response.onCompleted()
}

combineLatest

  • 결합된 옵저버블 중 하나의 옵저버블에서 값이 발행되면 그 값과 다른 옵저버블의 최신값을 묶어서 발행한다.
  • 각각의 옵저버블에서 값을 발행할 때마다 combineLatest에서 발행된다.

예제 코드 - 버튼 숨기기


func combineLatestExample() {
    let isSGLabelHidden = BehaviorSubject<Bool>(value: true)
    let sgLabel = UILabel()
    
    let isSearchingBtnHidden = BehaviorSubject(value: false)
    let isCgmUnusedBtnHidden = BehaviorSubject(value: false)
    let isTransmitterBtnHidden = BehaviorSubject(value: false)
    
    let isAllButtonHidden =
        Observable.combineLatest(isSearchingBtnHidden,
                                    isCgmUnusedBtnHidden,
                                    isTransmitterBtnHidden)
        .map { sb,cb, tb in return sb && cb && tb }
    
    let isLabelHidden = Observable.combineLatest(isAllButtonHidden, isSGLabelHidden)
        .map { buttonHidden, labelHidden -> Bool in
            if buttonHidden {
                return labelHidden
            } else {
                return true
            }
        }
    
    isLabelHidden
        .observe(on: MainScheduler.instance)
        .bind { isHidden in
        sgLabel.isHidden = isHidden
    }.disposed(by: self.disposeBag)
    
    // withLatestFrom
    let a = PublishSubject<Bool>()
    let b = PublishSubject<Bool>()
    
    a.withLatestFrom(b) { ra, rb in
        return ra == rb
    }
}

merge, concat

  • 두 연산자 모두 같은 종류의 옵저버블을 하나의 옵저버블로 결합해 준다.
  • merge()는 각각의 옵저버블에서 발행한 값을 그대로 내려보내준다.
  • concat()은 결합한 순서대로 앞의 옵저버블이 completion이 나면 그 뒤의 옵저버블이 값을 발행할 수 있다. 이전 값은 무시된다.

예제 코드 - ReactorKit(merge, concat)

struct BG {
    enum BGType {
        case bg, sg
    }
    
    let type: BGType
    let amount: Int
}

func mergeConcatExample() {
    struct State {
        var isLabelHidden: Bool
        var bgAmount: Int
    }
    
    enum Mutate {
        case setData(data: Int)
        case setLabel(isHidden: Bool)
    }
    
    func reduce(mutate: Mutate) {
        var newState = originalState
        switch mutate {
        case .setLabel(let isHidden):
            newState.isLabelHidden = isHidden
        case .setData(let data):
            newState.bgAmount = data
        }
        originalState = newState
    }
    
    var originalState = State(isLabelHidden: true, bgAmount: 0)
    
    Observable<Mutate>.concat(Observable.just(.setData(data: 80)),
                                Observable.just(.setLabel(isHidden: false)))
        .bind { mut in
            reduce(mutate: mut)
        }.disposed(by: self.disposeBag)
}

amb

  • 두 옵저버블 중 값을 먼저 발행하는 옵저버블을 선택한다.
  • 늦게 발행되는 옵저버블은 모두 무시한다.
  • 게임(먼저 들어오는 사람만 허용), 여러 서버 접속(빠른 응답이 오는 서버를 사용) 등에 사용한다.

takeUntil

  • 뒤에 물린 옵저버블이 값을 발행할 때까지만 앞의 옵저버블이 값을 발행한다.
  • 뒤의 옵저버블이 발행되면 앞의 옵저버블은 completion된다.

예제 코드 - 서버 네트워킹(takeUntil)


func takeUntilExample() {
    let confirmButton = UIButton()
    confirmButton.isHidden = true
    
    let cancelAction = PublishSubject<Void>()
    let startAction = PublishSubject<Void>()
    let response = Observable.just(1).delay(.seconds(3), scheduler: MainScheduler.instance)
    let completed = PublishSubject<Void>()
    
    startAction
        .flatMap { _ in response }
        .take(until: cancelAction)
    .bind { data in
        completed.onNext(())
    }.disposed(by: self.disposeBag)
    
    completed.bind {
        print("success")
    }.disposed(by: self.disposeBag)
    
    cancelAction.bind {
        print("canceled")
    }.disposed(by: self.disposeBag)
    
    startAction.onNext(()) // success
    
    startAction.onNext(())
    cancelAction.onNext(()) // canceled
}

skipUntil

  • 뒤의 옵저버블이 발행되는 이후로부터 앞의 옵저버블이 값을 발행한다.
  • 각각의 옵저버블의 completion은 연관이 없다.

예제 코드 - 메모 저장(skipUntil)


func skipUntilExample() {
    let button = UIButton()
    let goNext = PublishSubject<Void>()
    let textView = PublishSubject<String>()
    
    button.rx.tap
        .bind(to: goNext)
        .disposed(by: self.disposeBag)
    
    goNext
        .bind {
            print("go next")
        }.disposed(by: self.disposeBag)
    
    goNext.skip(until: textView)
        .bind {
            print("save new memo")
        }.disposed(by: self.disposeBag)
    
    goNext.onNext(()) // go Next
    
    // textView 입력 후
    goNext.onNext(()) // save new memo, go Next
    
    // skip 없이
    let isDirty = BehaviorSubject<Bool>(value: false)
    
    textView
        .map { _ in return true }
        .bind(to: isDirty)
        .disposed(by: self.disposeBag)
    
    button.rx.tap
        .bind(to: goNext)
        .disposed(by: self.disposeBag)
    
    goNext.bind {
        if try! isDirty.value() == true {
            print("save new memo")
        }
        print("go next")
    }.disposed(by: self.disposeBag)
    
    goNext.onNext(()) // go Next
    
    // textView 입력 후
    goNext.onNext(()) // save new memo, go Next
}

All, 수학 함수

예제 코드 - All() 구현

func allExample() {
    let pub = PublishSubject<Int>()
    
    pub.reduce(true) { acc, value in
        acc && (value > 0) // 조건식
    }.bind { result in
        print(result)
    }.disposed(by: self.disposeBag)
}

예제 코드 - 수학 함수 구현


func mathExample() {
    let pub = PublishSubject<Int>()
    
    pub.scan((count: 0, sum: 0)) { (acc, next) -> (count: Int, sum: Int) in
        return (count: acc.count + 1, sum: acc.sum + next)
    }.map { count, sum -> Int in
        return sum / count
    }.bind { avg in
        print(avg)
    }.disposed(by: self.disposeBag)
    
    pub.scan(0) { sum, value in
        let result = sum + value
        return result
    }.bind { sum in
        print(sum)
    }.disposed(by: self.disposeBag)
}

delay, timeInterval

  • delay: 옵저버블의 값이 발행되기까지의 시간을 늦춘다.
  • timeInterval: 옵저버블이 값을 발행할 때마다 이전에 발행한 시간 차이를 같이 발행한다. RxSwift에서는 지원하지 않는다.

예제 코드 - 시계(delay, timeInterval)


func delayExample() {
        let time = 300
        let timer = Observable<Void>.just(()).delay(.seconds(time), scheduler: MainScheduler.instance)
        
        timer.bind { _ in
            print("time over")
        }.disposed(by: self.disposeBag)
    }