Queue with Redis lists and Golang
Queue is a data structure in the programming. It is used to transmit the message between system components. Queue operates the FIFO ( First in first out ) principle. It means if the message is pushed to the queue before then it will be the consumer handle and process.
Components in Queue:
- Producer: where the message is produced and pushed to the queue.
- Consumer: where the message is received and processed. It will get the message from the queue.
- Message queue: where the message is stored.
- Message: It is the message and is created by the producer.
Message queues usually increase the durability of the system. Message queues support asynchronous processing. Example: The user imports a list of products, it will be pushed to the queue and the system will process it. The user will receive a notification once the video has finished processing, without having to wait for the process to complete before continuing to use the service.
Bellow this is the sample and guide implemented with Redis lists client
Redis lists are linked lists of string values. Redis lists are frequently used to:
- Implement stacks and queues.
- Build queue management for Go systems.
Basic commands
- LPUSH add a new element to the head of a list; RPUSH adds to the tail.
- LPOP removes and returns an element from the head of a list; RPOP does the same but from the tails of a list.
- LLEN returns the length of a list.
- LMOVE atomically moves elements from one list to another.
- LRANGE extracts a range of elements from a list.
- LTRIM reduces a list to the specified range of elements.
Blocking commands
Lists support several blocking commands. For example:
- BLPOP removes and returns an element from the head of a list. If the list is empty, the command blocks until an element becomes available or until the specified timeout is reached.
- BRPOP is a blocking list pop primitive. It is the blocking version of RPOP because it blocks the connection when there are no elements to pop from any of the given lists. An element is popped from the tail of the first list that is non-empty, with the given keys being checked in the order that they are given.
Because queue operates the FIFO ( First in first out ) principle, I will use LPush and BRPop to implement.
127.0.0.1:6379> brpop tuan 0 -> consume queue "tuan" without timeout. BRPop will wait with a timeout if the queue is empty. If the timeout is 0 it will wait forever.
127.0.0.1:6379> lpush tuan "hello-world" -> push to the queue "tuan" a message with value "hello-world"
A result consumer gets this message from the queue "tuan". Redis return []string with the key and value of the message
Now, let's implement the queue with Golang
Init the Redis connection
func initRedis() (*RedisClient, error) {
var redisClient *redis.Client
opts, err := redis.ParseURL("redis://default:@localhost:6379")
if err != nil {
log.Fatal("failed to init redis:", err)
return nil, err
}
opts.PoolSize = 30
opts.DialTimeout = 10 * time.Second
opts.ReadTimeout = 5 * time.Second
opts.WriteTimeout = 5 * time.Second
opts.IdleTimeout = 5 * time.Second
opts.Username = ""
redisClient = redis.NewClient(opts)
cmd := redisClient.Ping(context.Background())
if cmd.Err() != nil {
return nil, cmd.Err()
}
return &RedisClient{
Redis: redisClient,
}, nil
}
The worker will consume the queue. It gets and processes the message from the queue.
// the worker consumes queue
go func() {
for {
// using BRPop will wait with a timeout if the queue is empty. If timeout is 0 it will wait forever
message, err := redisClient.BRPop(context.Background(), queue, 0)
if err != nil {
fmt.Println(err)
continue
}
fmt.Println(fmt.Sprintf("message: %v", message))
}
}()
The worker push the message to the queue
go func() {
for i := 0; i < 10; i++ {
_, err := redisClient.LPush(ctx, queue, fmt.Sprintf("hello %v", i))
if err != nil {
return
}
}
}()
This is the main function to implement
func main() {
// init redis
redisClient, err := initRedis()
if err != nil {
logrus.Warnf("init redis client is failed with err: %v", err)
return
}
ctx, queue := context.Background(), "tuan"
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
// the worker consumes queue
go func() {
for {
// using BRPop will wait with a timeout if the queue is empty. If timeout is 0 it will wait forever
message, err := redisClient.BRPop(context.Background(), queue, 0)
if err != nil {
fmt.Println(err)
continue
}
fmt.Println(fmt.Sprintf("message: %v", message))
}
}()
go func() {
for i := 0; i < 10; i++ {
_, err := redisClient.LPush(ctx, queue, fmt.Sprintf("hello %v", i))
if err != nil {
return
}
}
}()
<-quit
log.Println("shutting down")
}
Reference:
All rights reserved