0

rewrite a rxjs class using FP (Subject classes: Behavior, Replay, Async) 2nd approach

This is my experiment rewriting a part of rxjs lib with pure js, that provide same effect as the original one

At previous post, I'm implemented BehaviorSubject, ReplaySubject, AsyncSubject using functions only with object spreading. You can have a look at https://viblo.asia/p/rewrite-a-rxjs-class-using-fp-subject-classes-behavior-replay-async-1st-approach-4P856n135Y3

Today I will make another version without any this inside, to get rid of potential trouble

Instead of using this to access to state, I will pass the state as an argument, so there won't be any problem of accessing wrong state when execute or compose functions.

Full implementation is at https://codesandbox.io/s/rxjs-fp-rewrite-subjects-2nd-approach-yd82n

  1. base functions

    We can use an object to wrap all these base functions, like previous post, and then when in need, we will copy them using object spreading. But this time, I just create base functions only, and every time we need them, we compose them directly

    const subscribe = (state) => (obj) => {
      state.subscribers.push(obj);
      return () => state.subscribers.pop();
    };
    
    const getValue = (state) => () => {
      return state.value[state.value.length - 1];
    };
    
    const next = (state) => (nextValue) => {
      if (!state.isStopped) {
        state.value.push({ nextValue: nextValue, timestamp: Date.now() });
        // this.subscribers.forEach((obj) => obj.next(nextValue));
      }
    };
    
    const error = (state) => (e) => {
      if (!state.isStopped) {
        state.isStopped = true;
        state.subscribers.forEach((obj) => obj.error(e));
      }
    };
    
    const complete = (state) => () => {
      if (!state.isStopped) {
        state.isStopped = true;
        state.subscribers.forEach((obj) => obj.complete());
      }
    };
    

    2.myAsyncSubject

    We take a state as an argument, and then pass it down to every other functions

    function myAsyncSubject(
      state = { isStopped: false, subscribers: [], value: [] }
    ) {
      return {
        subscribe: subscribe(state),
        getValue: getValue(state),
        next: next(state),
        error: error(state),
        complete: () => {
          complete(state)();
          state.subscribers.forEach((obj) =>
            obj.next(state.value[state.value.length - 1].nextValue)
          );
        }
      };
    }
    
    1. mySyncSubject
    const mySyncSubject = (state) => {
      return {
        subscribe: subscribe(state),
        getValue: getValue(state),
        next: (nextValue) => {
          next(state)(nextValue);
          if (!state.isStopped) {
            state.subscribers.forEach((obj) => obj.next(nextValue));
          }
        },
        error: error(state),
        complete: complete(state)
      };
    };
    
    1. myReplaySubject
    const myReplaySubject = (
      bufferSize,
      timeLimit = -1,
      state = {
        isStopped: false,
        subscribers: [],
        value: [],
        bufferSize,
        timeLimit
      }
    ) => {
      return {
        subscribe: (fn) => {
          state.value
            .slice(Math.max(state.value.length - state.bufferSize, 0))
            .filter((i) =>
              timeLimit > 0 ? Date.now() - i.timestamp < timeLimit : true
            )
            .map((i) => fn.next(i.nextValue));
          subscribe(state)(fn);
        },
        getValue: getValue(state),
        next: mySyncSubject(state).next,
        error: error(state),
        complete: complete(state)
      };
    };
    
    1. myBehaviorSubject
    const myBehaviorSubject = (defaultValue) => {
      return myReplaySubject(1, -1, {
        isStopped: false,
        subscribers: [],
        value: [{ nextValue: defaultValue, timestamp: Date.now() }],
        bufferSize: 1,
        timeLimit: -1
      });
    };
    

    That's it. They should have same result as the previous approach. Simple test cases are at https://rxjs.dev/guide/subject Just replace their original classes with new functions above accordingly.

    And in addition, a ReplayAsyncSubject should be rewrite like this

    function myReplayAsyncSubject(
      bufferSize,
      timeLimit = -1,
      state = {
        isStopped: false,
        subscribers: [],
        value: [],
        bufferSize,
        timeLimit
      }
    ) {
      return {
        subscribe: myReplaySubject(bufferSize, timeLimit, state).subscribe,
        getValue: getValue(state),
        error: error(state),
        next: next(state),
        complete: myAsyncSubject(state).complete
      };
    }
    

    We should see the same result as we had like 1st approach at previous post https://viblo.asia/p/rewrite-a-rxjs-class-using-fp-subject-classes-behavior-replay-async-1st-approach-4P856n135Y3


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí