+1

Go - Pooling Base Pattern

The purpose of this writing is to demonstrate a concurrency pattern that makes use of buffered channel to implement a pool to shared a set of resources, such as database connection, that can be used by any number of goroutines. This technique is useful when a goroutine needs a resource it can acquire from the pool and then return it back to the pool.

Defining a Structure

Let's start by declares a Pool struct which contains four fields. The first one is declared as a channel of interface type io.Closer and will be created as a buffered channel to hold shared resources. Next is a factory function that will be used to allocate a new resource when the pool requires one. The third field is a state of the pool that indicate whether the pool is close or not. The last one is of type sync.Mutex and will be used to secure operations against a pool value between multiple goroutines.

type Pool struct {
    resources chan io.Closer
    factory func() (io.Closer, error)
    closed bool
    locker sync.Mutex
}

Pool's Factory Function

The facotry function takes two arguments. The first one is a function that will be used as factory to allocate shared resource. And the last one is the size of the pool. It's important to note that the resource that is managed by the pool can be any type as long as it implement io.Closer interface and that is the beauty of coding to an interface.

func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("pool's size is too small")
    }
    
    return &Pool{
        factory:   fn,
        resources: make(chan io.Closer, size),
    }, nil
}

Acquiring a Resource

The code in this method will return a resource from the pool if one is available, or create a new one. The trick is to use a select / case statement on a buffered channel to check if there is a resource in a channel. If there is then get it from the channel then return it to the caller or else the default clause will run and a new resource will be allocated. Creating an error variable is a common practice in Go. It provides the caller with the ability the cache for a specify error.

var ErrPoolClosed = errors.New("pool has been closed")

func (p *Pool) Acquire() (io.Closer, error) {
    select {
    case r, ok := <-p.resources:
        if !ok { // in case the pool is closed
            return nil, ErrPoolClosed
        }
        return r, nil
    default:
        return p.factory(), nil
    }
}

Releasing a Resource

Release method take one argument which is a resource that needed to return to the pool. First we check see to if a pool has been closed and if it is then close the resource and return from the method otherwise attempt to return the resource back into the pool using case clause. When the channel is full the default clause will be invoke and the resource will be freed as there is no space left in the pool. All operation are being lock by using Mutex to ensure the correctness of the pool's state.

func (p *Pool) Release(r io.Closer) {
    p.locker.Lock()
    defer p.locker.Unlock()
    
    if p.closed {
        r.Close()
        return
    }
    
    select {
    case p.resources <- r:
        log.Printn("return resource back into a pool")
    default:
        r.Close()
    }
}

Closing a Pool

The pupose of this method is to shutdown the pool and release all resources. To do that first we set the pool's closed field to true, close up the resources channel, loop through all resources and call close on each one of them. The if condition here is used to safeguard in-case the pool was already closed. Then again the using of Mutex is to ensure the correctness of the pool's state.

func (p *Pool) Close() {
    p.locker.Lock()
    defer p.locker.Unlock()
    
    if p.closed {
        return
    }
    
    p.closed = true
    close(p.resources)
    
    for r := range p.resources {
        r.Close()
    }
}

Wrapping Up

Pooling is a very useful concurrency pattern that we should have it in our toolbelt. Not only it can be used to create to pool of shared resources like in this post, but it can also be used to create a pool of workers base to process background task as well as many other things according to your situation.


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí