Viblo Learning
+1

Implement Multi-Connections Download in Go - Part II

Getting Start

In Part I we've looked at how to implement core download functionality as well as define a public API for client to use our package. Most of the functionality was done, but there are some part that were left out in the previous article like download worker pool and our final goal, which is to build a CLI tool to download file with multiple connection.

In this part we will look at how to implement worker pool and finally pieced together on what we've built so far and write a command line tool that will display download progress and provide some options for customization.

Worker Pool

As you saw in previous article, there is this piece of code like below

w := newWorkerPool(s.maxConn)

var wg sync.WaitGroup
wg.Add(len(ranges))

for i := 0; i < len(ranges); i++ {
    info := chunkInfo{
        totalSize: totalSize,
        ranges:    ranges[i],
        index:     i + 1,
        fileURL:   fileURL,
        filename:  filename,
    }
    go func() {
        w.RunFunc(func() {
            s.download(info, ch)
        })
        wg.Done()
    }()
}

To quickly recap, we basically create a worker pool with maxConn number of workers. We then loop through each block of chunk that needed to download for a whole file and assign it to each worker with w.RunFunc. Because RunFunc will block until each chunk is completed so we make sure to put it into a goroutine. The WaitGroup is needed to ensure that the whole file is downloaded before we proceed to combine each chunk into one file like show below.

wg.Wait()
w.Stop()

if err := s.combineFiles(filename, len(ranges)); err != nil {
    // code here
}

Pool Interface

The worker pool interface is simple, as it will need to assign task to a worker that is avaiable in the pool and the ability to stop, so something like below is enough

type pool struct {
    // private fields. more onto this soon
}

func (p *pool) Run(w worker) {
    // code here
}

func (p *pool) Stop() {
    // code here
}

Worker Interface

As you can see pool accept worker as an argument to Run function. To make this generic and easy to test we will define it as an interface like below

type worker interface {
    Task()
}

Implementation

To start of, lets look at the fields that are needed for pool struct. To be able to assign task to a worker we will need a channel that will stream and assign each task to a worker that is avaiable in the pool. We will also need a sync.WaitGroup to wait for all the worker to complete in the even of shuting down the pool. So pool struct will looks like below

type pool struct {
    workers chan worker
    wg      sync.WaitGroup
}

Now to create n number of workers in a pool, lets look at the pool factory function. The function is simple, it accepts a maximum number of worker to start in a pool. In the function we initialize a struct instance and each fields that needed then loop through it n times and create goroutines.

These goroutines that we've just created are the actual workers. It is waiting for tasks to come through p.works channel and perform them. The trick is with the fact that range will block until there is a task or until a task that was perform finished that's why we run it in goroutine. So by creating multiple goroutine we basically multiple worker as well. range will break when channel is close and when that happen we mark that worker as done and exit.

func newWorkerPool(n int) *pool {
    p := pool{
		works: make(chan worker),
	}

	p.wg.Add(n)
	for i := 0; i < n; i++ {
		go func() {
			for w := range p.works {
				w.Task()
			}
			p.wg.Done()
		}()
	}

	return &p
}

To assign task to a worker is simple all we need to do is plug it into works channel like below

func (p *pool) Run(w worker) {
    p.works <- w
}

And to shutdown the worker pool

func (p *pool) Stop() {
    close(p.works)
    p.wg.Wait()
}

p.wg.Wait is to ensure that all workers are shutdown properly

CLI Application

Now that we've done with worker pool implementation lets take a look at how to use the package to build a CLI tool. The first half is the prepare for command line options. Here there are two options: -n & -o, the former is to configure max connection while the latter is specify where the file should be saved. -n will default current number of CPU on your computer and -o will default to current directory. The second half is where we use our download package. It start by creating a new download instance and begin download a file. The final part is to use the channel returned from our download and print out download progress by using pb.ProgressBar package.

And that's it for our command line application to download file.

// first half
var (
	fs       = flag.NewFlagSet("goget", flag.ExitOnError)
	maxConn  = fs.Int("n", runtime.NumCPU(), "Max connections")
	fileName = fs.String("o", "", "Output file path")
)

func main() {
	if len(os.Args) < 2 {
		fmt.Println(usage)
		return
	}
	err := fs.Parse(os.Args[2:])
	handleError(err)

	fileURL := os.Args[1]
	if *fileName == "" {
		dir, err := os.Getwd()
		handleError(err)
		*fileName = filepath.Join(dir, filepath.Base(fileURL))
	}

    // second half
	svc := goget.New(*maxConn)
	ch, err := svc.Download(fileURL, *fileName)
	handleError(err)

    // last part
	var bar *pb.ProgressBar
	for p := range ch {
		if bar == nil {
			bar = pb.Start64(p.TotalSize)
			bar.Set(pb.Bytes, true)
		}
		bar.Add64(p.ChunkSize)
	}

	bar.Finish()
}

All Rights Reserved

Viblo
Let's register a Viblo Account to get more interesting posts.