Go sync Package: 6 khái niệm chính bạn cần biết
Ngoài sử dụng channel, chúng ta còn có một vài kỹ thuật để xử lý các vấn đề về concurrent, goroutines,... và Go cung cấp sẵn một số hàm và kiểu như WaitGroup để chờ goroutines xử lý, hay Mutex để khoá một "đoạn code" hay một biến được truy cập bởi nhiều goroutines khác nhau mà bạn khó kiểm soát.
Ở bài này, mình sẽ thảo luận 6 concepts tiêu biểu trong sync package theo thứ tự sử dụng phổ biến.
0. Tác giả
Mình thường xuyên update các bài viết insights trên LinkedIn và blog Devtrovert
1. sync.Mutex và sync.RWMutex
Khái niệm mutex lock hay mutual exclusion đã quá quen thuộc với lập trình viên, các bạn tới từ Java, C#,... đã sử dụng hoặc nghe tới khái niệm này.
Một trong những điều quan trọng nhất khi xử lý goroutines là đảm bảo rằng, chúng ta truy cập vào cùng 1 resources tại cùng một thời điểm và mutex giúp ta đảm bảo điều này.
sync.Mutex
Chúng ta tới với một ví dụ không sử dụng sync.Mutex khi nhiều goroutines cùng truy cập một biến a
:
var a = 0
func Add() {
a++
}
func main() {
for i := 0; i < 500; i++ {
go Add()
}
time.Sleep(5 * time.Second)
fmt.Println(a)
}
Ở đây, mình tạo ra 500 goroutines cùng tăng biến a lên 1, một điều chắc chắn ở đây nếu là bạn sẽ rất may mắn nếu fmt.Println(a)
in ra giá trị 500, hấu hết trường hợp, kết quả sẽ nhỏ hơn 500.
Nào, mình sẽ cải thiện ví dụ trên với sync.Mutex
:
var mtx = sync.Mutex{}
func Add() {
mtx.Lock()
defer mtx.Unlock()
a++
}
Bây giờ code chúng ta sẽ in ra đúng giá trị mà chúng ta mong muốn, tiếp theo mình sẽ đề cập tới sync.RWMutex
.
Chờ chút, mình để ý
sync.Mutex
có hàmTryLock
, mục đích hàm này là gì?
Khi các goroutines khác gọi mtx.Lock()
, chúng sẽ bị blocked nếu như có goroutine đã gọi mtx.Lock()
trước đó và đang truy cập a++
, tuy nhiên nếu bạn không muốn goroutine của mình đợi, bạn có thể sử dụng TryLock
và nó sẽ trả về cho bạn một biến boolean, true nếu bạn có thể acquired lock và false nếu bạn không thể vào.
sync.RWMutex là gì?
Giả sử bạn đang kiểm tra một biến a
, và các goroutines khác cũng đang truy cập? Rất có thể bạn sẽ sử dụng một biến a
với giá trị đã cũ, và chúng ta sẽ fix vấn đề này với sync.Mutex như cũ:
func Add() {
mtx.Lock()
defer mtx.Unlock()
a++
}
func Get() int {
mtx.Lock()
defer mtx.Unlock()
return a
}
Nhưng vấn đề ở đây, đó là nếu bạn gọi Get()
1 triệu và không gọi Add()
lần nào, thì 1 triệu lần đó các goroutines đều bị blocked và chờ nhau, trong khi biến a
không thay đổi và chúng ta hoàn toàn cả thể bỏ mtx.Lock()
ở hàm Get()
.
Để giải quyết vấn đề, chúng ta sử dụng sync.RWMutex
:
var mtx = sync.RWMutex{}
func Add() {
mtx.Lock()
defer mtx.Unlock()
a++
}
func Look() {
mtx.RLock()
defer mtx.RUnlock()
fmt.Println(a)
}
Thay vì gọi mtx.Lock()
, chúng ta gọi mtx.RLock()
, và 1 triệu goroutines khi cùng đọc biến a, chúng đều có thể đọc cùng lúc mà không bị blocked lẫn nhau.
Ủa vậy khi nào nó lock?
Khi chúng ta write a bằng hàm Add(), ở hàm này, chúng ta vẫn sử dụng mtx.Lock()
, khi có một goroutine đang write a++
, các goroutines khác sẽ không thể đọc được a bằng Get()
và chúng phải chờ.
Dưới đây là cách nó hoạt động:
- Khi write, các read và write khác sẽ bị blocked.
- Khi read, write sẽ bị blocked
- Các read không blocked lẫn nhau
sync.Locker
Ngoài ra, trong sync package, chúng ta còn thấy sự xuất hiện của interface sync.Locker
, nào cùng xem định nghĩa của nó:
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
Bạn hoàn toàn có thể implement một sync.Mutex
khác bằng cách implement 2 hàm này, chúng ta sẽ sửa hàm Add()
một chút:
func Add(mtx sync.Locker) {
mtx.Lock()
defer mtx.Unlock()
a++
}
2. sync.WaitGroup
Bạn có thể đã chú ý cách mình xài để chờ 500 goroutines tăng biến a
ở ví dụ đầu, và đây là một cách không chính thống vì nó không đảm bảo rằng toàn bộ goroutines thực hiện xong và cũng lãng phí thời gian chờ nếu goroutines thực hiện nhanh.
Và đây là cách sync.WaitGroup cứu rỗi chúng ta:
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 500; i++ {
wg.Add(1)
go func() {
defer wg.Done()
Add()
}()
}
wg.Wait()
fmt.Println(a)
}
Kiểu sync.WaitGroup
có 3 hàm chính: Add
, Done
và Wait
.
Trong đó, hàm Add(delta int)
: dùng để tăng biến đếm lên delta đơn vị và chúng ta thường gọi nó trước khi tạo một goroutine và khi tăng như vậy, chúng ta đang nói với WaitGroup rằng có 1 goroutine đang thực hiện.
2 hàm còn lại thì khá đơn giản:
Done
: hàm này gọi khi goroutine đã xong nhiệm vụ của nó, biến đếm -1.Wait
: khi goroutine gọi hàm này, nó sẽ bị blocked cho tới khi biến đếm của WaitGroup trở về 0, đảm bảo rằng không còn goroutine nào đang thực chạy.
Điều gì sẽ xảy ra nếu mình đặt wg.Add(1)
trong goroutine?
go func() {
wg.Add(1)
defer wg.Done()
Add()
}()
Compiler của mình đang chửi vào mặt mình: “should call wg.Add(1) before starting the goroutine to avoid a race”, và nếu chúng ta mặc kệ và run, bạn có thể sẽ gặp phải panic: “panic: sync: WaitGroup is reused before previous Wait has returned”.
3. sync.Once
Giả sử bạn có một hàm gọi là CreateInstance()
và bạn muốn đảm bảo body của hàm này chỉ được thực thi một lần duy nhất:
var i = 0
var _isInitialized = false
func CreateInstance() {
if _isInitialized {
return
}
i = GetISomewhere()
_isInitialized = true
}
Tuy nhiên, nếu nhiều goroutines cùng gọi hàm này thì sao? lúc này _isInitialized đều trả về false và chúng sẽ chạy đoạn code phía sau.
Bạn có thể sử dụng mutex lock để đảm bảo điều này, tuy nhiên sync package hỗ trợ chúng ta một cách làm đơn giản hơn cho quá trình này:
var i = 0
var once = &sync.Once{}
func CreateInstance() {
once.Do(func() {
i = GetISomewhere()
})
}
Chúng ta bỏ _isInitialized, thay vào đó, once.Do()
sẽ đảm bảo hàm của chúng ta thực hiện một lần duy nhất dù có bao nhiêu goroutines gọi song song.
4. sync.Pool
Nếu bạn biết về Object Pooling (một kỹ thuật trong Unity) dùng để sử dụng lại các resources mà không cần tốn thời gian để cấp phát lại, thì hẳn bạn sẽ dễ làm quen với sync.Pool
và sử dụng sync.Pool sẽ giúp bạn giảm gánh nặng cho garbage collector cũng như cpu.
Nhưng Object trong Go không tốn kém bằng Unity?.
Tốn kém ở đây không kể tới thời gian để tạo, mà có thể là thời gian để setup, chẳng hạn bạn tạo một struct A
, và để hoàn thiện các fields của struct A, bạn cần sử dụng reflection, hay đi 3 - 4 roundtrips về database, services khác..., thì thay vì destroy, bạn sẽ đặt nó vào pool và sử dụng lại.
Đó cũng chính là cách mà Go Team optimize encoding/json
package, họ chỉ xử lý lâu một struct ở lần duy nhất, và các lần sau sẽ nhanh hơn bởi họ cache lại cách xử lý struct đó vào pool.
Ở đây mình chỉ đưa 1 ví dụ đơn giản, chúng ta cache các số int vào pool:
var pool = sync.Pool{
New: func() interface{} {
return 0
},
}
func main() {
pool.Put(1)
pool.Put(2)
pool.Put(3)
a := pool.Get().(int)
b := pool.Get().(int)
c := pool.Get().(int)
fmt.Println(a, b, c) // Output: 1, 3, 2 (order may vary)
}
Thứ tự khi lấy ra các giá trị này là random.
Một vài tips khi sử dụng sync.Pool
:
- Pool đặc biệt hữu ích đối với các objects được sử dụng nhiều và lâu dài, có nhiều instances chẳng hạn như database connections, worker goroutines, buffers,...
- Hãy reset lại state của objects trước khi trả chúng về lại pool để đảm bảo chúng ta luôn lấy được một instance có giá trị giống nhau.
5. sync.Map
Khi bạn xử lý map một cách song song, nhiều goroutines cùng truy cập, read và write đồng thời, chúng ta có thể sử dụng RWMutex.
Như vậy bạn có thể đọc một map song song nhiều goroutines, đảm bảo chỉ 1 write duy nhất tại 1 thời điểm như chúng ta đã thảo luận ở phần sync.RWMutex.
Vậy chúng ta cần gì ở sync.Map
?
Chẳng hạn bạn có một map { A: 1, B: 2, C: 3 }, tại sao chúng ta phải locked cả map chỉ để write key A? Chúng ta hoàn toàn có thể write key A và đọc key B một cách song song.
Và đó là lý do sync package hỗ trợ thêm sync.Map
.
Các hàm hỗ trợ của sync.Map
:
- CompareAndDelete(key, old any) (go 1.20): xoá key nếu value của map[key] == old, trả về true nếu xoá và ngược lại false.
- CompareAndSwap(key, old, new any) (go 1.20): hoán đổi giá trị cũ và giá trị mới của map[key] nếu map[key] == old, trả về true nếu hoán đổi và ngược lại false.
- Swap(key, value any) (previous any, loaded bool) (go 1.20): sửa giá trị của map[key] thành value và trả về giá trị trước đó của map[key], false nếu key không tồn tại trong map.
- LoadOrStore(key, value any) (actual any, loaded bool): lấy giá trị của map[key], nếu map không chứa key, hàm sẽ set key và value cho map, loaded trả về true nếu map chứa key và false nếu không có.
- Range (f func(key, value any) bool): duyệt qua toàn bộ map và thực hiện function với toàn bộ phần tử của map, nếu f trả về false, range sẽ ngừng loop.
- Store, Delete, Load, LoadAndDelete
Tuy nhiên, sync.Map
có cơ chế lock riêng cho việc write, khi write key A, chúng ta không hẳn có thể đọc key B song song, điều này tuỳ thuộc vào sync.Map
.
6. sync.Cond
Bạn có thể hiểu sync.Cond
như một biến có điều kiện, hỗ trợ cho nhiều goroutines chờ và tương tác với nhau, để hiễu rõ hơn, chúng ta sẽ tạo sync.Cond:
var mtx sync.Mutex
var cond = sync.NewCond(&mtx)
Để tạo sync.Cond
chúng ta cần một sync.Locker
, mình đã giải thích về interface này ở trên.
Chẳng hạn một goroutine gọi tới sync.Cond
để chờ một tín hiệu từ một goroutine khác:
func dummyGoroutine(id int) {
cond.L.Lock()
defer cond.L.Unlock()
fmt.Printf("Goroutine %d is waiting...\n", id)
cond.Wait()
fmt.Printf("Goroutine %d received the signal.\n", id)
}
Và một goroutine khác phát tín hiệu tới goroutine này thông qua sync.Cond (chúng ta sẽ phát tín hiệu từ main):
func main() {
go dummyGoroutine(1) // <---- goroutine này sẽ chờ tín hiệu
time.Sleep(1 * time.Second)
fmt.Println("Sending signal...")
cond.Signal() // <---- phát tín hiệu tới dummyGoroutine để tiếp tục thực thi
time.Sleep(1 * time.Second)
}
Và đây là log:
Goroutine 1 is waiting...
Sending signal...
Goroutine 1 received the signal.
Và điều gì sẽ xày ra nếu có nhiều goroutines cùng chờ 1 signal?
func main() {
go dummyGoroutine(1)
go dummyGoroutine(2)
time.Sleep(1 * time.Second)
cond.Broadcast() // broadcast to all goroutines
time.Sleep(1 * time.Second)
}
Và đây là kết quả:
Goroutine 1 is waiting...
Goroutine 2 is waiting...
Goroutine 2 received the signal.
Goroutine 1 received the signal.
"Tại sao goroutine 2 có thể in ra "Goroutine 2 is waiting"? nó phải bị blocked bởi
cond.L.Lock()
chứ?"
Đây là điều đặc biệt của sync.Cond
, khi gọi cond.Wait()
, nó sẽ tự động unlock mutex và cho phép các goroutines khác truy cập và cùng chờ (cond.Wait()
).
"Tại sao bạn sử dụng
cond.Broadcast()
mà không phảicond.Signal
?"
Trong trường hợp này, chỉ có một goroutine có thể nhận signal, trong khi goroutine còn lại vẫn sẽ bị blocked cho tới khi nhận được signal khác.
Claims
Bài viết được dịch từ 6 Key Concepts You Should Know for Concurrency in the Sync Package.
All Rights Reserved