RxSwift Beginning

Not a long time ago I faced a project dedicated to the FRPReactiveX, and its implementation with Swift – RxSwift. From what I had to face, the most difficult part is completely different building of code. With my experience in imperative programming, it was difficult to change myself to start thinking in new way. But I had to learn this and I don't regret night time spent So in this post, we’re going to start with simple examples to understand FRP.

Observables and Observers

The main things in RxSwift are Observable and Observers.

Observable

An Observable is something which emits notifications of change.

Below is example of how to create Observable. This Observable sends onNext() once, then immediately send onCompleted().Observable which send onNext() once is called Just. Seriously! it's called Just

let justObservable = Observable<String> // 1
    .create { observer in // 2
    let random = Int(arc4random_uniform(2)) // 3
    if random == 0 {
        observer.onNext("next element") // 4
        observer.onCompleted() // 5
    } else {
        let error = AError.General("error")
        observer.onError(error) // 6
    }
    return Disposables.create() // 7
    }
  1. At first I've created varible which is Observable, and set Observables elements type as String.
  2. When we create Observable, we must implement commands which will execute. We also get object Observer which will be future subscriber.
  3. Random number 0 or 1.
  4. Send element to Observer
  5. Observable sends .onCompleted(), if Observable finished
  6. Send error to Observer
  7. When Observable is creating, it must return Disposables for release this Observable. I will explain about this later in the part about Disposing

Observer

An Observer is something which subscribes to an Observable, in order to be notified when it has changed

This is example of how to implement Observer. Observable won't start execute code, while Observer will not subscribe on it.

justObservable.subscribe { event in // 1
    switch (event) {
    case .next(let element):
        print(element)
    case .error(let error):
        print(error.localizedDescription)
    case .completed():
        break
    }
}.addDisposableTo(self.disposeBag) // 2
  1. Subscribe on Observable
  2. This function for disposing Observable. I will explain about this later in the part about Disposing

Observable can be terminated in 3 ways: Terminates normally: --1--2--3--4--5--6--|
Terminates with error: --1--2--3--4--5--6--X No terminates, buttons tap, textField inputs e.t.c: ---tap-tap-------tap--->

Variable

Variable is something which store value, and emits notifications of this value change. asObservable() method helps to create Observable from a Variable. In order to change value of Variable, you must refer to .value property Example of how Variable works:

let demoVariable = Variable<Int>(0) // 1
        
demoVariable.asObservable().subscribe(onNext: { value in
    print(value) // 2
}).addDisposableTo(self.disposeBag)
        
for i in 0...4 {
    demoVariable.value = i // 3
}
  1. Create Variable, and set Observables elements type as String.
  2. Print new value
  3. Change value

this will print:

0
1
2
3
4

Observable guarantees

Observable can't send next element until observer.on method has finished execution.

observable.subscribe { event in
    print("Started")
    // any processing
    print("Ended")
}.addDisposableTo(self.disposeBag)

this will always print:

Started
Ended
Started
Ended

it can never print:

Started
Started
Ended
Ended

Disposing

Like I said before, when Observable is creating, it must return Disposables for release this Observable.

let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
    .subscribe { event in
        print(event)
    }

Thread.sleep(forTimeInterval: 2.0)

subscription.dispose()

When we call dispose() method, our Observable have been dispose. Note that you usually DO NOT want to manually call dispose(); this is only educational example. Calling dispose() method manually is usually a bad code smell. You have to use DisposeBag which is better.

When deinit() is called on the object which has a DisposeBag as a property, the bag is “emptied” and each disposable Observer is automatically unsubscribed from what it was observing. This allows ARC to take back memory as it normally would.

Without a DisposeBag, you’d get one of two results: either the Observer would create a retain cycle, hanging on to what it’s observing indefinitely, or it could get deallocated out from under your object, causing a crash.


let disposeBag = DisposeBag()
...
 observable.subscribe { event in
     print("Started")
    // any processing
     print("Ended")
}.addDisposableTo(self.disposeBag)

If immediate cleanup is required, You can just init a bag again.

self.disposeBag = DisposeBag()

Also you can create your own Disposable, which will execute when Observable dispose.

let observable = Observable<String>.create { observer in
    print("Started")
    for i in 0...4 {
        observer.onNext("element \(i)")
    }
            
    observer.onCompleted()
    print("Ended")
    return Disposables.create {
        print("Dispose")
    }
}

observable.subscribe { event in
    print(event)
    }.addDisposableTo(self.disposeBag)

this will print:

Started
next(element 0)
next(element 1)
next(element 2)
next(element 3)
next(element 4)
completed
Ended
Dispose

Sharing

Sometimes we need to multiple Observers share events from only one Observable.

For handle past elements that have been received before the new subscriber was interested in observing, we will use for that replay takeLast(n), replayAll(), replay(n)

let observable = Observable<Int>.interval(0.3, scheduler: scheduler)
        
let subscription1 = observable
    .subscribe(onNext: { n in
        print("[First]  -> \(n)")
    })
        
Thread.sleep(forTimeInterval: 2.0)
        
let subscription2 = observable
    .subscribe(onNext: { n in
        print("[Second] -> \(n)")
    })
        
Thread.sleep(forTimeInterval: 1.0)
        
subscription1.dispose()
        
Thread.sleep(forTimeInterval: 1.0)
        
subscription2.dispose()

this will print

[First]  -> 0
[First]  -> 1
[First]  -> 2
[First]  -> 3
[First]  -> 4
[First]  -> 5
[First]  -> 6
[Second] -> 0
[First]  -> 7
[Second] -> 1
[First]  -> 8
[Second] -> 2
[First]  -> 9
[Second] -> 3
[Second] -> 4
[Second] -> 5

As you can see subscription1 start and print 6 numbers, and after this moment subscription2 have subscribed and start print from 0. This two subscriptions have two differents observables. For share replays the best solution is shareReplay(), it's like combination of this functions replay(1).refCount()

let's change first row of code. Just add shareReplay(1) to your Observable, and that's all. let observable = Observable<Int>.interval(0.3, scheduler: scheduler).shareReplay(1)

this will print

[First]  -> 0
[First]  -> 1
[First]  -> 2
[First]  -> 3
[First]  -> 4
[First]  -> 5
[Second] -> 5
[First]  -> 6
[Second] -> 6
[First]  -> 7
[Second] -> 7
[First]  -> 8
[Second] -> 8
[First]  -> 9
[Second] -> 9
[Second] -> 10
[Second] -> 11
[Second] -> 12

As you can see two subscriptions have one shared observable. That's all for now! You can download this project here. Good luck! Be reactive!