RxSwift Beginning
Bài đăng này đã không được cập nhật trong 3 năm
Not a long time ago I faced a project dedicated to the FRP – ReactiveX, and its implementation with Swift – RxSwift.
From what I had to face, the most difficult part is completely different building of code. With my experience in imperative programming, it was difficult to change myself to start thinking in new way.
But I had to learn this and I don't regret night time spent
So in this post, we’re going to start with simple examples to understand FRP.
Observables and Observers
The main things in RxSwift are Observable and Observers.
Observable
An Observable is something which emits notifications of change.
Below is example of how to create Observable. This Observable sends onNext()
once, then immediately send onCompleted()
.Observable which send onNext()
once is called Just.
Seriously! it's called Just
let justObservable = Observable<String> // 1
.create { observer in // 2
let random = Int(arc4random_uniform(2)) // 3
if random == 0 {
observer.onNext("next element") // 4
observer.onCompleted() // 5
} else {
let error = AError.General("error")
observer.onError(error) // 6
}
return Disposables.create() // 7
}
- At first I've created varible which is Observable, and set Observables elements type as String.
- When we create Observable, we must implement commands which will execute. We also get object Observer which will be future subscriber.
- Random number 0 or 1.
- Send element to Observer
- Observable sends
.onCompleted()
, if Observable finished - Send error to Observer
- When Observable is creating, it must return Disposables for release this Observable. I will explain about this later in the part about Disposing
Observer
An Observer is something which subscribes to an Observable, in order to be notified when it has changed
This is example of how to implement Observer. Observable won't start execute code, while Observer will not subscribe on it.
justObservable.subscribe { event in // 1
switch (event) {
case .next(let element):
print(element)
case .error(let error):
print(error.localizedDescription)
case .completed():
break
}
}.addDisposableTo(self.disposeBag) // 2
- Subscribe on Observable
- This function for disposing Observable. I will explain about this later in the part about Disposing
Observable can be terminated in 3 ways:
Terminates normally: --1--2--3--4--5--6--|
Terminates with error: --1--2--3--4--5--6--X
No terminates, buttons tap, textField inputs e.t.c: ---tap-tap-------tap--->
Variable
Variable is something which store value, and emits notifications of this value change.
asObservable()
method helps to create Observable from a Variable. In order to change value of Variable, you must refer to .value
property
Example of how Variable works:
let demoVariable = Variable<Int>(0) // 1
demoVariable.asObservable().subscribe(onNext: { value in
print(value) // 2
}).addDisposableTo(self.disposeBag)
for i in 0...4 {
demoVariable.value = i // 3
}
- Create Variable, and set Observables elements type as String.
- Print new value
- Change value
this will print:
0
1
2
3
4
Observable guarantees
Observable can't send next element until observer.on
method has finished execution.
observable.subscribe { event in
print("Started")
// any processing
print("Ended")
}.addDisposableTo(self.disposeBag)
this will always print:
Started
Ended
Started
Ended
it can never print:
Started
Started
Ended
Ended
Disposing
Like I said before, when Observable is creating, it must return Disposables for release this Observable.
let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
.subscribe { event in
print(event)
}
Thread.sleep(forTimeInterval: 2.0)
subscription.dispose()
When we call dispose()
method, our Observable have been dispose. Note that you usually DO NOT want to manually call dispose()
; this is only educational example. Calling dispose()
method manually is usually a bad code smell. You have to use DisposeBag which is better.
When
deinit()
is called on the object which has a DisposeBag as a property, the bag is “emptied” and each disposable Observer is automatically unsubscribed from what it was observing. This allows ARC to take back memory as it normally would.
Without a DisposeBag, you’d get one of two results: either the Observer would create a retain cycle, hanging on to what it’s observing indefinitely, or it could get deallocated out from under your object, causing a crash.
let disposeBag = DisposeBag()
...
observable.subscribe { event in
print("Started")
// any processing
print("Ended")
}.addDisposableTo(self.disposeBag)
If immediate cleanup is required, You can just init a bag again.
self.disposeBag = DisposeBag()
Also you can create your own Disposable, which will execute when Observable dispose.
let observable = Observable<String>.create { observer in
print("Started")
for i in 0...4 {
observer.onNext("element \(i)")
}
observer.onCompleted()
print("Ended")
return Disposables.create {
print("Dispose")
}
}
observable.subscribe { event in
print(event)
}.addDisposableTo(self.disposeBag)
this will print:
Started
next(element 0)
next(element 1)
next(element 2)
next(element 3)
next(element 4)
completed
Ended
Dispose
Sharing
Sometimes we need to multiple Observers share events from only one Observable.
For handle past elements that have been received before the new subscriber was interested in observing, we will use for that replay takeLast(n), replayAll(), replay(n)
let observable = Observable<Int>.interval(0.3, scheduler: scheduler)
let subscription1 = observable
.subscribe(onNext: { n in
print("[First] -> \(n)")
})
Thread.sleep(forTimeInterval: 2.0)
let subscription2 = observable
.subscribe(onNext: { n in
print("[Second] -> \(n)")
})
Thread.sleep(forTimeInterval: 1.0)
subscription1.dispose()
Thread.sleep(forTimeInterval: 1.0)
subscription2.dispose()
this will print
[First] -> 0
[First] -> 1
[First] -> 2
[First] -> 3
[First] -> 4
[First] -> 5
[First] -> 6
[Second] -> 0
[First] -> 7
[Second] -> 1
[First] -> 8
[Second] -> 2
[First] -> 9
[Second] -> 3
[Second] -> 4
[Second] -> 5
As you can see subscription1
start and print 6 numbers, and after this moment subscription2
have subscribed and start print from 0. This two subscriptions have two differents observables. For share replays the best solution is shareReplay()
, it's like combination of this functions replay(1).refCount()
let's change first row of code. Just add shareReplay(1)
to your Observable, and that's all.
let observable = Observable<Int>.interval(0.3, scheduler: scheduler).shareReplay(1)
this will print
[First] -> 0
[First] -> 1
[First] -> 2
[First] -> 3
[First] -> 4
[First] -> 5
[Second] -> 5
[First] -> 6
[Second] -> 6
[First] -> 7
[Second] -> 7
[First] -> 8
[Second] -> 8
[First] -> 9
[Second] -> 9
[Second] -> 10
[Second] -> 11
[Second] -> 12
As you can see two subscriptions have one shared observable. That's all for now! You can download this project here. Good luck! Be reactive!
All rights reserved