[RFP] Học Rx bằng cách tự xây dựng Observable

Đối với lập trình viên chúng ta, để học một công nghệ mới sẽ có những cách sau

  • Đọc về nó
  • Làm thử với nó
  • Hack, viết lại một thứ có tính năng tương tự

Đối với 2 mục đầu tiên đã có rất nhiều bài viết đề cập tuy nhiên cách thức Rx hoạt động như thế nào thì có vẻ như rất ít người nói tới. Bài viết này sẽ cố gắng xây dựng, mô phỏng lại RxSwift, từ đó giúp các bạn hiểu hơn bí mật đằng sau RxSwift.

Disposable

Disposable chỉ có duy nhất một method là dispose() với nhiệm vụ xoá bỏ toàn bộ side-effect xuất hiện trong quá trình subcription chẳng hạn khi kết nối với network hoặc database.

public protocol Disposable {
    func dispose()
}

AnonymousDisposable — cung cấp một closure sẽ được gọi khi hàm dispose được gọi.

CompositeDisposable — chứa một mảng các Disposable và sẽ đảm nhiệm việc dispose cho từng phần tử trong mảng khi nó được dispose.

public final class AnonimousDisposable: Disposable {
    private let _disposeHandler: () -> Void
    
    public init(_ disposeClosure: @escaping () -> Void) {
        _disposeHandler = disposeClosure
    }
    
    public func dispose() {
        _disposeHandler()
    }
}

public final class CompositeDisposable: Disposable {
    private var isDisposed: Bool = false
    private var disposables: [Disposable] = []
    
    public init() {}
    
    public func add(disposable: Disposable) {
        if isDisposed {
            disposable.dispose()
            return
        }
        disposables.append(disposable)
    }
    
    public func dispose() {
        if isDisposed { return }
        disposables.forEach {
            $0.dispose()
        }
        isDisposed = true
    }
}

Observer

Observer được sử dụng để quản lý các event. Observer sẽ khởi tạo với một event handler closure, closure này sẽ được gọi mỗi khi có một Event mới.

public protocol ObserverType {
    associatedtype E
    
    func on(event: Event<E>)
}

public enum Event<T> {
    case next(T)
    case error(Error)
    case completed
}
public final class Observer<E>: ObserverType {
    private let _handler: (Event<E>) -> Void
    
    public init(handler: @escaping (Event<E>) -> Void) {
        _handler = handler
    }
    
    public func on(event: Event<E>) {
        _handler(event)
    }
}

Observable

Observable được RxSWift định nghĩa như sau. Observable sẽ là nơi bạn truyền vào Observer (bản chất là một callback) bằng việc gọi hàm subscribe()

public protocol ObservableType {
    associatedtype E
    
    func subscribe<O: ObserverType>(observer: O) -> Disposable where O.E == E
}

Phương thức "subscribe" có parameter là một Observer và trả về Disposable.

Trước hết ta sẽ định nghĩa một helper "Sink" như sau. Lớp Sink này có nhiệm vụ quản lý Disposable, chuyển event đến Observer, kiểm tra subscription đã được dispose hay chưa trước khi chuyển tiếp event.

final class Sink<O: ObserverType>: Disposable {
    private var _disposed: Bool = false
    private let _forward: O
    private let _subscriptionHandler: (Observer<O.E>) -> Disposable
    private let _composite = CompositeDisposable()
    
    init(forvard: O, subscriptionHandler: @escaping (Observer<O.E>) -> Disposable) {
        _forward = forvard
        _subscriptionHandler = subscriptionHandler
    }
    
    func run() {
        let observer = Observer<O.E>(handler: forward)
        _composite.add(disposable: _subscriptionHandler(observer))
    }
    
    private func forward(event: Event<O.E>) {
        if _disposed {
            return
        }
        _forward.on(event: event)
        switch event {
        case .completed, .error:
            self.dispose()
        default:
            break
        }
    }
    
    func dispose() {
        _disposed = true
        _composite.dispose()
    }
}

Observable sử dụng Sink

public class Observable<Element>: ObservableType {
    public typealias E = Element
    private let _subscribeHandler: (Observer<Element>) -> Disposable
    
    public init(_ subscribtionClosure: @escaping (Observer<Element>) -> Disposable) {
        _subscribeHandler = subscribtionClosure
    }
    
    public func subscribe<O : ObserverType>(observer: O) -> Disposable where O.E == E {
        let sink = Sink(forvard: observer, subscribtionHandler: _subscribeHandler)
        sink.run()
        return sink
    }
}

Operator

Operator map sẽ được thực hiện như sau

extension ObservableType {    
    public func map<U>(_ transform: @escaping (E) throws -> U) -> Observable<U> {
        return Observable<U> { observer in
            return self.subscribe(observer: Observer { (event) in
                switch event {
                case .next(let element):
                    do {
                        try observer.on(event: .next(transform(element)))
                    } catch {
                        observer.on(event: .error(error))
                    }
                case .error(let e):
                    observer.on(event: .error(e))
                case .completed:
                    observer.on(event: .completed)
                }
            })
        }
    }
}

Kết quả

Và đây là thành quả chúng ta đạt được :

Hy vọng bài viết này đã giúp các bạn hiểu thêm về những thành phần cơ bản của Rx, cách thức chúng hoạt động và hiểu rằng không có gì là ma thuật đằng sau Rx!

Reference

Bài viết này được dịch từ : https://medium.com/@SergDort/learn-rx-by-implementing-observable-e5cb08c9c35