+5

[Golang] Channel trong golang và use case - part IIII (pubsub pattern)

Mở đầu

  • Tiếp tục series, hôm nay là một buổi chia sẽ của tôi về cách implement lại pubsub pattern bằng golang channel. Let's go, guys!

Pubsub

pub-sub-messaging.png

  • Trước hết ta sẽ có một định nghĩa đầy đủ từ wiki :
    In software architecture, publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead categorize published messages into classes without knowledge of which subscribers, if any, there may be.

  • Pubsub là một message pattern mà ở trong đó publisher chỉ việc gửi message và không cần quan tâm đến có subscriber nào nhận hay không và các message sẽ được phân loại và gủi đi mà không cần quan tâm xem có subscribers nào hay không. Publishers và subscribers không biết sự tồn tại của nhau. Ở một số hệ thông pubsub, sẽ có thêm 1 thành phần là broker, nó đảm nhiệm phân loại và gửi message.

  • Pubsub hay message queue nói chung được sử dụng khá phổ biến trong micro-service architectures. Nó cung cấp một phương thức giúp các service giao tiếp với nhau một cách bất đồng bộ. Ngoài ra, chúng ta sẽ có một vài use cases for messaging queues in real-world scenarios như là: Sending emails, Data post-processing, Batch updates for databases ...

Use case 3: Build pubsub service with buffered channel

  • Dựa vào đặc tính channel là một queue, tôi sẽ có 1 simple demo về pubsub như sau:
    type Message struct {
        topic   string
        content interface{}
    }
    
    type MessageChannel chan Message
    
    func main() {
        maxMessage := 10000
        topic := "update-user"
        
        messageQueue := make(chan Message, maxMessage)
        mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannel
    }
    
    messageQueue: chứa danh sách các message sẽ xủ lí.

    MessageChannel: kênh giao tiếp giữa các publisher và subcriber. MessageChannel lúc này sẽ đóng vai trò như một trái tim của hệ thống.

    mapTopicMessage: chứa một bản map giữa topic và danh sách các message channel. Một topic sẽ được subcribe bởi nhiều subcriber nên ta sẽ quan hệ 1:N. Nó đóng vai trò như việc quản lý topic và các message channel.

  • Nguyên lý vận hành:
    messageQueue khi nhận được một message mới, service sẽ lọc ra các danh sách MessageChannel tương ứng với topic của message đó và gửi message mới đến. Mỗi subcriber sẽ communicate with MessageChannel để lấy message.
    ...
    func main() {
        maxMessage := 10000
        topic := "update-user"
    
        messageQueue := make(chan Message, maxMessage)
        mapTopicMessage := make(map[string][]MessageChannel)
        go run(messageQueue, mapTopicMessage)
        
    }
    
    func run(messageQueue chan Message, mapTopicMessage map[string][]MessageChannel) {
        for {
            message := <-messageQueue
    
            listMessageChannel, ok := mapTopicMessage[message.topic]
            if ok {
                for _, messageChannel := range listMessageChannel {
                    messageChannel <- message
                }
            }
    
        }
    }
    
  • Publish message
    func main() {
        maxMessage := 10000
        topic := "update-user"
    
        messageQueue := make(chan Message, maxMessage)
        mapTopicMessage := make(map[string][]MessageChannel)
        go run(messageQueue, mapTopicMessage)
        
        // publish
        publish(messageQueue, topic, "user-name is update to Hung")
        
        time.Sleep(time.Second * 10)
    }
    
    func publish(messageQueue chan Message, topic string, content string) {
        message := Message{
            topic:   topic,
            content: content,
        }
        messageQueue <- message
        fmt.Printf("%v: publish new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.topic, message.content)
    }
    
    result: 
        08:46:28: publish new message with topic: 'update-user' - content: 'user-name is update to Hung'
    
  • Register Subscription
    func main() {
        ...
        // subcribe
        sub1 := registerSubscription(mapTopicMessage, topic)
        ...
    }
    
    func registerSubscription(mapTopicMessage map[string][]MessageChannel, topic string) MessageChannel {
        newMessageChannel := make(MessageChannel)
    
        value, ok := mapTopicMessage[topic]
        if ok {
            value = append(value, newMessageChannel)
            mapTopicMessage[topic] = value
        } else {
            mapTopicMessage[topic] = []MessageChannel{newMessageChannel}
        }
    
        return newMessageChannel
    }
    
    khi có một "register subcription" request, service sẽ trả về một MessageChannel. Subcriber sẽ giao tiếp với MessageChannel đó để nhận message.
  • Subcribe
    func subcribe(messageChannel MessageChannel) {
         go func() {
             for {
                 message := <-messageChannel
                 fmt.Printf("%v: receive new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.topic, message.content)
             }
         }()
     }
    
  • Running and see what happen!
    func main() {
        maxMessage := 10000
        topic := "update-user"
    
        messageQueue := make(chan Message, maxMessage)
        mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannel
        go run(messageQueue, mapTopicMessage)
    
        // register subcriptions
        sub1 := registerSubscription(mapTopicMessage, topic)
    
        // publish
        publish(messageQueue, topic, "user-name is update to Hung")
    
        subcribe(sub1)
    
        time.Sleep(time.Second * 10)
    }
    result:
        09:16:02: publish new message with topic: 'update-user' - content: 'user-name is update to Hung'
        09:16:02: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
    
  • Add more subcriber
    func main() {
         ...
         // register subcriptions
         sub1 := registerSubscription(mapTopicMessage, topic)
         sub2 := registerSubscription(mapTopicMessage, topic)
         sub3 := registerSubscription(mapTopicMessage, topic)
         
         // publish
         publish(messageQueue, topic, "user-name is update to Hung")
    
         subcribe(sub1)
         subcribe(sub2)
         subcribe(sub3)
         
         time.Sleep(time.Second * 10)
    }
    result:
         09:20:15: publish new message with topic: 'update-user' - content: 'user-name is update to Hung'
         09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
         09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
         09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
    

Xây dựng một pubsub hoàn chỉnh

  • pubsub folder

    pubsub
        - message.go
        - pubsub.go
    main.go
    
  • message.go

     package pubsub
    
     type Message struct {
         topic   string
         content interface{}
     }
    
     func NewMessage(topic string, content interface{}) *Message {
         return &Message{
             topic:   topic,
             content: content,
         }
     }
    
     func (m *Message) GetTopic() string {
         return m.topic
     }
    
     func (m *Message) GetContent() interface{} {
         return m.content
     }
    
     type MessageChannel chan Message
    
  • pubsub.go
    Migrate from above demo to "myPubSub" struct

     main.go
     func main() {
         maxMessage := 10000
         topic := "update-user"
    
         messageQueue := make(chan Message, maxMessage)
         mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannel
     }
     
     pubsub.go
     type myPubSub struct {
         messageQueue     MessageChannel
         subscribeChannel map[string][]MessageChannel
     }
    
     func NewMyPubSub() *myPubSub {
         return &myPubSub{
             messageQueue:     make(MessageChannel, 10000),
             subscribeChannel: make(map[string][]MessageChannel),
         }
     }
    

    Migrate run function

    main.go
    func run(messageQueue chan Message, mapTopicMessage map[string][]MessageChannel) {
         for {
             message := <-messageQueue
    
             listMessageChannel, ok := mapTopicMessage[message.topic]
             if ok {
                 for _, messageChannel := range listMessageChannel {
                     messageChannel <- message
                 }
             }
         }
    }
    
    pubsub.go
    func (pb *myPubSub) Run() {
         go func() {
             for {
                 message := <-pb.messageQueue
    
                 subscribeChannel, ok := pb.subscribeChannel[message.topic]
                 if ok {
                     for _, channel := range subscribeChannel {
                         channel <- message
                     }
                 }
             }
         }()
     }
    

    Migrate publish function

    main.go
    func publish(messageQueue chan Message, topic string, content string) {
         go func() 
         message := Message{
             topic:   topic,
             content: content,
         }
         messageQueue <- message
         fmt.Printf("%v: publish new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.topic, message.content)
    }
    
    pubsub.go
    func (pb *myPubSub) Publish(topic string, content interface{}) {
         pb.messageQueue <- *NewMessage(topic, content)
    }
    

    Migrate registerSubscription function

    main.go
    func registerSubscription(mapTopicMessage map[string][]MessageChannel, topic string) MessageChannel {
         newMessageChannel := make(MessageChannel)
    
         value, ok := mapTopicMessage[topic]
         if ok {
             value = append(value, newMessageChannel)
             mapTopicMessage[topic] = value
         } else {
             mapTopicMessage[topic] = []MessageChannel{newMessageChannel}
         }
    
         return newMessageChannel
    }
    
    pubsub.go
    func (pb *myPubSub) RegisterSubscription(topic string) MessageChannel {
         newChannelForSubscriber := make(MessageChannel)
    
         currentSubcriberChannel, ok := pb.subscribeChannel[topic]
         if ok {
             currentSubcriberChannel = append(currentSubcriberChannel, newChannelForSubscriber)
             pb.subscribeChannel[topic] = currentSubcriberChannel
         } else {
             pb.subscribeChannel[topic] = []MessageChannel{newChannelForSubscriber}
         }
         return newChannelForSubscriber
    }
    

    Testing

    main.go
    func main() {
         topic := "update-user"
    
         myPubSub := pubsub.NewMyPubSub()
         myPubSub.Run()
    
         // register
         sub1 := myPubSub.RegisterSubscription(topic)
         sub2 := myPubSub.RegisterSubscription(topic)
         sub3 := myPubSub.RegisterSubscription(topic)
    
         // publish
         myPubSub.Publish(topic, "user-name is update to Hung")
    
         subcribe(sub1)
         subcribe(sub2)
         subcribe(sub3)
    
         time.Sleep(time.Second * 10)
    }
    result: 
        16:00:58: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
        16:00:58: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
        16:00:58: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
    
  • Data racing and locking solution
    Data racing luôn là 1 vấn đề cần được các gopher quan tâm khi làm việc với go routine. Và một trong những giải pháp khá phổ biến là mutex lock. Vậy trong trong pubsub service của chúng ta có chỗ nào là xảy ra data racing ? Yes, bản chất của data racing là writing data bất đồng bộ và chúng ta có thể thấy ngay rằng subscribeChannel đang được ghi đè trong registerSubscription function. 🤔🤔 hmm, tuy nhiên, nếu back lại caller hiện tại của registerSubscription function, thì nó đang được setup để chạy một cách tuần tự ?

    main.go
    func main() {
        ...
         sub1 := myPubSub.RegisterSubscription(topic)
         sub2 := myPubSub.RegisterSubscription(topic)
         sub3 := myPubSub.RegisterSubscription(topic)   
        ...
    }
    

    Chính xác, subscribeChannel sẽ không bị tranh chấp dữ liệu nếu các caller được goi 1 cách tuần tự. Tuy nhiên, trong trường hợp nếu 3 sub1, sub2, sub3 được register bởi 3 goroutine thì việc tranh chấp có thể xảy ra và MessageChannel được trả về sẽ là không chính xác.

  • Apply mutex lock when modifiy subscribeChannel

    Add locker to myPubSub

     type myPubSub struct {
         locker           *sync.RWMutex
         messageQueue     MessageChannel
         subscribeChannel map[string][]MessageChannel
     }
    
     func NewMyPubSub() *myPubSub {
         return &myPubSub{
             locker:           new(sync.RWMutex),
             messageQueue:     make(MessageChannel, 10000),
             subscribeChannel: make(map[string][]MessageChannel),
         }
     }
    

    Locking at registerSubscription function

     func (pb *myPubSub) RegisterSubscription(topic string) MessageChannel {
         newChannelForSubscriber := make(MessageChannel)
    
         pb.locker.Lock()
         currentSubcriberChannel, ok := pb.subscribeChannel[topic]
         if ok {
             currentSubcriberChannel = append(currentSubcriberChannel, newChannelForSubscriber)
             pb.subscribeChannel[topic] = currentSubcriberChannel
         } else {
             pb.subscribeChannel[topic] = []MessageChannel{newChannelForSubscriber}
         }
         pb.locker.Unlock()
    
         return newChannelForSubscriber
     }
    
  • Về cơ bản, hệ thống pubsub của chúng ta đã hoàn thiên một cách cơ bản. Tuy nhiên, về phía consumer hay subcriber vẫn còn phải làm khá nhiều công việc như for loop cho đến khi có response. So, chúng ta sẽ tiếp tục chặng hành trình để xây dựng một subcriber engine để nhận request và giúp giảm tải công việc cho subcriber.

Xây dựng subcriber engine để nhận message từ pubsub

  • subcriber folder
    pubsub
       - message.go
       - pubsub.go
    subcriber
      - subcriber.go
      - updateUser.go
    main.go
    
  • subcriber.go
    define subscribeEngine struct to listening message from pubsub
    type PubSubProvider interface {
         RegisterSubscription(topic string) pubsub.MessageChannel
    }
    
    type subscribeEngine struct {
         pb PubSubProvider
    }
    
    func NewSubscribeEngine(pb PubSubProvider) *subscribeEngine {
         return &subscribeEngine{
             pb: pb,
         }
    }
    
  • Add Run function to listen message
    // migrate from "subcribe"(main.go) to "Run"(subcriber.go)
    main.go
    func main() {
      ...
      topic := "update-user"
      sub1 := myPubSub.RegisterSubscription(topic)
      subcribe(sub1)
    }
    
    func subcribe(messageChannel MessageChannel) {
      go func() {
          for {
              message := <-messageChannel
              fmt.Printf("%v: receive new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.topic, message.content)
          }
      }()
    }
    
    subcriber.go
    func (c *subscribeEngine) Run() {
         topic := "update-user"
         subscription := c.pb.RegisterSubscription(topic)
         go func(channel pubsub.MessageChannel) {
             for {
                 message := <-channel
                 fmt.Printf("%v: receive new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.GetTopic(), message.GetContent())
             }
         }(subscription)
    }
    
    Trong trường hợp, khi có thêm một topic và chúng ta sẽ phải tiến hành 'RegisterSubscription(topic)' và 'for loop' again. Nên để giúp our subscribeEngine is more useful, tôi sẽ set up 2 function RegisterSubscription và 'hande message' một cách dynamic hơn.
    subcriber.go
    func (c *subscribeEngine) Run() {
          c.subscribeTopic("update-user", updateUser)
         // c.subscribeTopic("update-user-2", updateUser2)
    }
    
    func (c *subscribeEngine) subscribeTopic(topic string, handler func(message pubsub.Message)) {
         subscription := c.pb.RegisterSubscription(topic)
         go func(channel pubsub.MessageChannel) {
             for {
                 message := <-channel
                 handler(message)
             }
         }(subscription)
    }
    
    func updateUser(message pubsub.Message) {
         fmt.Printf("%v: receive new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.GetTopic(), message.GetContent())
    }
    
  • Move 'updateUser' function to new file for maintaining easier
    updateUser.go
    func updateUser(message pubsub.Message) {
         fmt.Printf("%v: receive new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.GetTopic(), message.GetContent())
    }
    
    subcriber.go
    func (c *subscribeEngine) Run() {
         c.subscribeTopic("update-user", updateUser)
    }
    
  • Testing
    main.go
    func main() {
         topic := "update-user"
         ranDomtopic := "random-update-user"
    
         myPubSub := pubsub.NewMyPubSub()
         myPubSub.Run()
    
         subscribeEngine := subcriber.NewSubscribeEngine(myPubSub)
         subscribeEngine.Run()
    
         // publish
         myPubSub.Publish(topic, "user-name is update to Hung")
         myPubSub.Publish(topic, "user-name is update to Quang")
         myPubSub.Publish(ranDomtopic, "hmm ...")
    
         time.Sleep(time.Second * 20)
    }
    result:
        11:11:03: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
        11:11:03: receive new message with topic: 'update-user' - content: 'user-name is update to Quang'    
    

Tạm kết

  • Rất cám ơn vì bạn còn ở đây. Ở phần design trên, cá nhận tôi thấy vẫn còn thiếu phần context và unscribe function. 2 thành phần này cũng khá là quan trong nếu pubsub này được sử dụng trong real application. Tuy nhiên, tôi xin phép dừng lại phần trình bày ở đây và back lai khi có thời gian 🙃🙃 .
  • Again, thanks for reading! See you on next part!

All Rights Reserved

Viblo
Let's register a Viblo Account to get more interesting posts.