+3

[AWS] Simple Queue Service - Phần 1

Mục Đích

  • Call api tới AWS Simple Queue Service(SQS) thông qua sdk

Code Mẫu

Chuẩn bị

  • cài đặt sdk aws, run:
go get -u github.com/aws/aws-sdk-go/

Tạo một session

  • tạo một session để tái sử dụng trong những function khác
  • profile được dùng là default, để biết profile đang được cấu hình, run trên linux/mac
cat ~/.aws/credentials 
  • function để tạo 1 session
func GetSession() *session.Session {
	sess, err := session.NewSessionWithOptions(session.Options{
		Profile: "default",
		Config: aws.Config{
			Region: aws.String("us-west-1"),
		},
	})
	if err != nil {
		panic(err)
	}
	return sess
}

Cách tạo ra 1 queue

  • sử dụng function CreateQueue trong aws-sdk để tạo 1 queue cho việc test.
  • có một số parameter quan trọng cần lưu ý:
    • queueName: name của queue mà bạn muốn tạo
    • DelaySeconds: thời gian message bạn muốn giữ lại trước khi gửi đi,
    • VisibilityTimeout: thời gian một message trước khi expire.
  • function create 1 queue
func CreateQueue(sess *session.Session, queueName string) (*sqs.CreateQueueOutput, error) {
	sqsClient := sqs.New(sess)
	result, err := sqsClient.CreateQueue(&sqs.CreateQueueInput{
		QueueName: &queueName,
		Attributes: map[string]*string{
			"DelaySeconds":      aws.String("0"),
			"VisibilityTimeout": aws.String("60"),
		},
	})
	if err != nil {
		return nil, err
	}
	return result, nil
}

Get URL của 1 queue

  • Tất cả các apis điều yêu cầu url của queue, vì thế sử dụng GetQueueURL để truy vấn url của queue-name
  • function get url
func GetQueueURL(sess *session.Session, queue string) (*sqs.GetQueueUrlOutput, error) {
	sqsClient := sqs.New(sess)
	result, err := sqsClient.GetQueueUrl(&sqs.GetQueueUrlInput{
		QueueName: &queue,
	})
	if err != nil {
		return nil, err
	}
	return result, nil
}

Gửi message đến queue

  • chúng ta sẽ sử dụng hàm SendMessage từ aws-sdk để gửi message đến queue
  • một vài paramters quan trọng cần chú ý trong lúc gửi message:
    • QueueUrl: url của queue muốn gửi message.
    • MessageBody: body được gửi đến queue, có thể là json-string hoặc string
  • function gửi message
func SendMessage(sess *session.Session, queueUrl string, messageBody string) (*sqs.SendMessageOutput, error) {
	sqsClient := sqs.New(sess)
	sendOut, err := sqsClient.SendMessage(&sqs.SendMessageInput{
		QueueUrl:    &queueUrl,
		MessageBody: aws.String(messageBody),
	})
	if err != nil {
		return nil, err
	}
	return sendOut, nil
}

Nhận message từ queue

  • chúng ta sẽ sử dụng hàm ReceiveMessage từ aws-sdk để lấy message từ queue
  • một vài paramters quan trọng cần được chú ý trong lúc nhận message:
    • QueueUrl: url của queue muốn nhận message.
    • MaxNumberOfMessages: tổng số message có thể nhận được.
  • function receive messsage:
func GetMessages(sess *session.Session, queueUrl string, maxMessages int) (*sqs.ReceiveMessageOutput, error) {
	sqsClient := sqs.New(sess)
	msgResult, err := sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{
		QueueUrl:            &queueUrl,
		MaxNumberOfMessages: aws.Int64(int64(maxMessages)),
	})
	if err != nil {
		return nil, err
	}
	return msgResult, nil
}

Xoá message trong queue

  • khi nhận message từ queue, message sẽ không tự động xoá ra khỏi queue.
  • consumer khác có thể nhận message sau thời gian VisibilityTimeout hết hạn.
  • để đảm bảo rằng không bị trùng message thì cần phải xoá.
  • chúng ta sẽ sử dụng hàm DeleteMessage từ aws-sdk để xoá message từ queue
  • cần cung cấp ReceiptHandle trong thành phần của method
  • function xoá message:
func DeleteMessage(sess *session.Session, queueUrl string, messageHandle *string) error {
	sqsClient := sqs.New(sess)
	_, err := sqsClient.DeleteMessage(&sqs.DeleteMessageInput{
		QueueUrl:      &queueUrl,
		ReceiptHandle: messageHandle,
	})
	return err
}

Xoá tất cả message trong queue

  • chúng ta sẽ sử dụng hàm PurgeQueue từ aws-sdk để xoá tất cả message trong queue
  • một vài paramters quan trọng cần được chú ý trong lúc xoá tất cả message:
  • function xoá tất cả message:
func PurgeQueue(sess *session.Session, queueUrl string) error {
	sqsClient := sqs.New(sess)
	_, err := sqsClient.PurgeQueue(&sqs.PurgeQueueInput{
		QueueUrl: aws.String(queueUrl),
	})
	return err
}

Ref

Contact


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí