[Golang] Channel trong golang và use case - part IIII (pubsub pattern)
Mở đầu
- Tiếp tục series, hôm nay là một buổi chia sẽ của tôi về cách implement lại pubsub pattern bằng golang channel. Let's go, guys!
Pubsub
-
Trước hết ta sẽ có một định nghĩa đầy đủ từ wiki :
In software architecture, publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead categorize published messages into classes without knowledge of which subscribers, if any, there may be. -
Pubsub là một message pattern mà ở trong đó publisher chỉ việc gửi message và không cần quan tâm đến có subscriber nào nhận hay không và các message sẽ được phân loại và gủi đi mà không cần quan tâm xem có subscribers nào hay không. Publishers và subscribers không biết sự tồn tại của nhau. Ở một số hệ thông pubsub, sẽ có thêm 1 thành phần là broker, nó đảm nhiệm phân loại và gửi message.
-
Pubsub hay message queue nói chung được sử dụng khá phổ biến trong micro-service architectures. Nó cung cấp một phương thức giúp các service giao tiếp với nhau một cách bất đồng bộ. Ngoài ra, chúng ta sẽ có một vài use cases for messaging queues in real-world scenarios như là: Sending emails, Data post-processing, Batch updates for databases ...
Use case 3: Build pubsub service with buffered channel
- Dựa vào đặc tính channel là một queue, tôi sẽ có 1 simple demo về pubsub như sau:
messageQueue: chứa danh sách các message sẽ xủ lí.type Message struct { topic string content interface{} } type MessageChannel chan Message func main() { maxMessage := 10000 topic := "update-user" messageQueue := make(chan Message, maxMessage) mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannel }
MessageChannel: kênh giao tiếp giữa các publisher và subcriber. MessageChannel lúc này sẽ đóng vai trò như một trái tim của hệ thống.
mapTopicMessage: chứa một bản map giữa topic và danh sách các message channel. Một topic sẽ được subcribe bởi nhiều subcriber nên ta sẽ quan hệ 1:N. Nó đóng vai trò như việc quản lý topic và các message channel.
- Nguyên lý vận hành:
messageQueue khi nhận được một message mới, service sẽ lọc ra các danh sách MessageChannel tương ứng với topic của message đó và gửi message mới đến. Mỗi subcriber sẽ communicate with MessageChannel để lấy message.... func main() { maxMessage := 10000 topic := "update-user" messageQueue := make(chan Message, maxMessage) mapTopicMessage := make(map[string][]MessageChannel) go run(messageQueue, mapTopicMessage) } func run(messageQueue chan Message, mapTopicMessage map[string][]MessageChannel) { for { message := <-messageQueue listMessageChannel, ok := mapTopicMessage[message.topic] if ok { for _, messageChannel := range listMessageChannel { messageChannel <- message } } } }
- Publish message
func main() { maxMessage := 10000 topic := "update-user" messageQueue := make(chan Message, maxMessage) mapTopicMessage := make(map[string][]MessageChannel) go run(messageQueue, mapTopicMessage) // publish publish(messageQueue, topic, "user-name is update to Hung") time.Sleep(time.Second * 10) } func publish(messageQueue chan Message, topic string, content string) { message := Message{ topic: topic, content: content, } messageQueue <- message fmt.Printf("%v: publish new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.topic, message.content) } result: 08:46:28: publish new message with topic: 'update-user' - content: 'user-name is update to Hung'
- Register Subscription
khi có một "register subcription" request, service sẽ trả về một MessageChannel. Subcriber sẽ giao tiếp với MessageChannel đó để nhận message.func main() { ... // subcribe sub1 := registerSubscription(mapTopicMessage, topic) ... } func registerSubscription(mapTopicMessage map[string][]MessageChannel, topic string) MessageChannel { newMessageChannel := make(MessageChannel) value, ok := mapTopicMessage[topic] if ok { value = append(value, newMessageChannel) mapTopicMessage[topic] = value } else { mapTopicMessage[topic] = []MessageChannel{newMessageChannel} } return newMessageChannel }
- Subcribe
func subcribe(messageChannel MessageChannel) { go func() { for { message := <-messageChannel fmt.Printf("%v: receive new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.topic, message.content) } }() }
- Running and see what happen!
func main() { maxMessage := 10000 topic := "update-user" messageQueue := make(chan Message, maxMessage) mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannel go run(messageQueue, mapTopicMessage) // register subcriptions sub1 := registerSubscription(mapTopicMessage, topic) // publish publish(messageQueue, topic, "user-name is update to Hung") subcribe(sub1) time.Sleep(time.Second * 10) } result: 09:16:02: publish new message with topic: 'update-user' - content: 'user-name is update to Hung' 09:16:02: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
- Add more subcriber
func main() { ... // register subcriptions sub1 := registerSubscription(mapTopicMessage, topic) sub2 := registerSubscription(mapTopicMessage, topic) sub3 := registerSubscription(mapTopicMessage, topic) // publish publish(messageQueue, topic, "user-name is update to Hung") subcribe(sub1) subcribe(sub2) subcribe(sub3) time.Sleep(time.Second * 10) } result: 09:20:15: publish new message with topic: 'update-user' - content: 'user-name is update to Hung' 09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung' 09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung' 09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
Xây dựng một pubsub hoàn chỉnh
-
pubsub folder
pubsub - message.go - pubsub.go main.go
-
message.go
package pubsub type Message struct { topic string content interface{} } func NewMessage(topic string, content interface{}) *Message { return &Message{ topic: topic, content: content, } } func (m *Message) GetTopic() string { return m.topic } func (m *Message) GetContent() interface{} { return m.content } type MessageChannel chan Message
-
pubsub.go
Migrate from above demo to "myPubSub" structmain.go func main() { maxMessage := 10000 topic := "update-user" messageQueue := make(chan Message, maxMessage) mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannel } pubsub.go type myPubSub struct { messageQueue MessageChannel subscribeChannel map[string][]MessageChannel } func NewMyPubSub() *myPubSub { return &myPubSub{ messageQueue: make(MessageChannel, 10000), subscribeChannel: make(map[string][]MessageChannel), } }
Migrate run function
main.go func run(messageQueue chan Message, mapTopicMessage map[string][]MessageChannel) { for { message := <-messageQueue listMessageChannel, ok := mapTopicMessage[message.topic] if ok { for _, messageChannel := range listMessageChannel { messageChannel <- message } } } } pubsub.go func (pb *myPubSub) Run() { go func() { for { message := <-pb.messageQueue subscribeChannel, ok := pb.subscribeChannel[message.topic] if ok { for _, channel := range subscribeChannel { channel <- message } } } }() }
Migrate publish function
main.go func publish(messageQueue chan Message, topic string, content string) { go func() message := Message{ topic: topic, content: content, } messageQueue <- message fmt.Printf("%v: publish new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.topic, message.content) } pubsub.go func (pb *myPubSub) Publish(topic string, content interface{}) { pb.messageQueue <- *NewMessage(topic, content) }
Migrate registerSubscription function
main.go func registerSubscription(mapTopicMessage map[string][]MessageChannel, topic string) MessageChannel { newMessageChannel := make(MessageChannel) value, ok := mapTopicMessage[topic] if ok { value = append(value, newMessageChannel) mapTopicMessage[topic] = value } else { mapTopicMessage[topic] = []MessageChannel{newMessageChannel} } return newMessageChannel } pubsub.go func (pb *myPubSub) RegisterSubscription(topic string) MessageChannel { newChannelForSubscriber := make(MessageChannel) currentSubcriberChannel, ok := pb.subscribeChannel[topic] if ok { currentSubcriberChannel = append(currentSubcriberChannel, newChannelForSubscriber) pb.subscribeChannel[topic] = currentSubcriberChannel } else { pb.subscribeChannel[topic] = []MessageChannel{newChannelForSubscriber} } return newChannelForSubscriber }
Testing
main.go func main() { topic := "update-user" myPubSub := pubsub.NewMyPubSub() myPubSub.Run() // register sub1 := myPubSub.RegisterSubscription(topic) sub2 := myPubSub.RegisterSubscription(topic) sub3 := myPubSub.RegisterSubscription(topic) // publish myPubSub.Publish(topic, "user-name is update to Hung") subcribe(sub1) subcribe(sub2) subcribe(sub3) time.Sleep(time.Second * 10) } result: 16:00:58: receive new message with topic: 'update-user' - content: 'user-name is update to Hung' 16:00:58: receive new message with topic: 'update-user' - content: 'user-name is update to Hung' 16:00:58: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
-
Data racing and locking solution
Data racing luôn là 1 vấn đề cần được các gopher quan tâm khi làm việc với go routine. Và một trong những giải pháp khá phổ biến là mutex lock. Vậy trong trong pubsub service của chúng ta có chỗ nào là xảy ra data racing ? Yes, bản chất của data racing là writing data bất đồng bộ và chúng ta có thể thấy ngay rằng subscribeChannel đang được ghi đè trong registerSubscription function. 🤔🤔 hmm, tuy nhiên, nếu back lại caller hiện tại của registerSubscription function, thì nó đang được setup để chạy một cách tuần tự ?main.go func main() { ... sub1 := myPubSub.RegisterSubscription(topic) sub2 := myPubSub.RegisterSubscription(topic) sub3 := myPubSub.RegisterSubscription(topic) ... }
Chính xác, subscribeChannel sẽ không bị tranh chấp dữ liệu nếu các caller được goi 1 cách tuần tự. Tuy nhiên, trong trường hợp nếu 3 sub1, sub2, sub3 được register bởi 3 goroutine thì việc tranh chấp có thể xảy ra và MessageChannel được trả về sẽ là không chính xác.
-
Apply mutex lock when modifiy subscribeChannel
Add locker to myPubSubtype myPubSub struct { locker *sync.RWMutex messageQueue MessageChannel subscribeChannel map[string][]MessageChannel } func NewMyPubSub() *myPubSub { return &myPubSub{ locker: new(sync.RWMutex), messageQueue: make(MessageChannel, 10000), subscribeChannel: make(map[string][]MessageChannel), } }
Locking at registerSubscription function
func (pb *myPubSub) RegisterSubscription(topic string) MessageChannel { newChannelForSubscriber := make(MessageChannel) pb.locker.Lock() currentSubcriberChannel, ok := pb.subscribeChannel[topic] if ok { currentSubcriberChannel = append(currentSubcriberChannel, newChannelForSubscriber) pb.subscribeChannel[topic] = currentSubcriberChannel } else { pb.subscribeChannel[topic] = []MessageChannel{newChannelForSubscriber} } pb.locker.Unlock() return newChannelForSubscriber }
-
Về cơ bản, hệ thống pubsub của chúng ta đã hoàn thiên một cách cơ bản. Tuy nhiên, về phía consumer hay subcriber vẫn còn phải làm khá nhiều công việc như for loop cho đến khi có response. So, chúng ta sẽ tiếp tục chặng hành trình để xây dựng một subcriber engine để nhận request và giúp giảm tải công việc cho subcriber.
Xây dựng subcriber engine để nhận message từ pubsub
- subcriber folder
pubsub - message.go - pubsub.go subcriber - subcriber.go - updateUser.go main.go
- subcriber.go
define subscribeEngine struct to listening message from pubsubtype PubSubProvider interface { RegisterSubscription(topic string) pubsub.MessageChannel } type subscribeEngine struct { pb PubSubProvider } func NewSubscribeEngine(pb PubSubProvider) *subscribeEngine { return &subscribeEngine{ pb: pb, } }
- Add Run function to listen message
Trong trường hợp, khi có thêm một topic và chúng ta sẽ phải tiến hành 'RegisterSubscription(topic)' và 'for loop' again. Nên để giúp our subscribeEngine is more useful, tôi sẽ set up 2 function RegisterSubscription và 'hande message' một cách dynamic hơn.// migrate from "subcribe"(main.go) to "Run"(subcriber.go) main.go func main() { ... topic := "update-user" sub1 := myPubSub.RegisterSubscription(topic) subcribe(sub1) } func subcribe(messageChannel MessageChannel) { go func() { for { message := <-messageChannel fmt.Printf("%v: receive new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.topic, message.content) } }() } subcriber.go func (c *subscribeEngine) Run() { topic := "update-user" subscription := c.pb.RegisterSubscription(topic) go func(channel pubsub.MessageChannel) { for { message := <-channel fmt.Printf("%v: receive new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.GetTopic(), message.GetContent()) } }(subscription) }
subcriber.go func (c *subscribeEngine) Run() { c.subscribeTopic("update-user", updateUser) // c.subscribeTopic("update-user-2", updateUser2) } func (c *subscribeEngine) subscribeTopic(topic string, handler func(message pubsub.Message)) { subscription := c.pb.RegisterSubscription(topic) go func(channel pubsub.MessageChannel) { for { message := <-channel handler(message) } }(subscription) } func updateUser(message pubsub.Message) { fmt.Printf("%v: receive new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.GetTopic(), message.GetContent()) }
- Move 'updateUser' function to new file for maintaining easier
updateUser.go func updateUser(message pubsub.Message) { fmt.Printf("%v: receive new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.GetTopic(), message.GetContent()) } subcriber.go func (c *subscribeEngine) Run() { c.subscribeTopic("update-user", updateUser) }
- Testing
main.go func main() { topic := "update-user" ranDomtopic := "random-update-user" myPubSub := pubsub.NewMyPubSub() myPubSub.Run() subscribeEngine := subcriber.NewSubscribeEngine(myPubSub) subscribeEngine.Run() // publish myPubSub.Publish(topic, "user-name is update to Hung") myPubSub.Publish(topic, "user-name is update to Quang") myPubSub.Publish(ranDomtopic, "hmm ...") time.Sleep(time.Second * 20) } result: 11:11:03: receive new message with topic: 'update-user' - content: 'user-name is update to Hung' 11:11:03: receive new message with topic: 'update-user' - content: 'user-name is update to Quang'
Tạm kết
- Rất cám ơn vì bạn còn ở đây. Ở phần design trên, cá nhận tôi thấy vẫn còn thiếu phần context và unscribe function. 2 thành phần này cũng khá là quan trong nếu pubsub này được sử dụng trong real application. Tuy nhiên, tôi xin phép dừng lại phần trình bày ở đây và back lai khi có thời gian 🙃🙃 .
- Again, thanks for reading! See you on next part!
All Rights Reserved