0

Cách xử lý nhiều yêu cầu tranh chấp một tài nguyên ở cùng một thời điểm khi scaling Kubernetes Pod

Tôi có một business service (tạm gọi là Service W được viết bằng Nodejs) được subscribe vào keyspace của Redis theo cơ chế pub-sub (đang được serve trên Kubernetes), Service W này hoạt động như một worker có nhiệm vụ lắng nghe thông báo khi một redis key hết hạn, tiếp theo đó nó sẽ xử lý một số logic cập nhật xuống database (tôi dùng Prisma Orm để mapping tới Postgres) và publish một message vào Kafka cho một tài nguyên duy nhất

1. Vấn đề gặp phải

Lấy ví dụ dưới đây là một đoạn logic của Service W xử lý khi lắng nghe event từ Redis

const movieName = 'Bleach thousand year blood war'

const availableSeat = await prisma.seat.findFirst({
  where: {
    movie: {
      name: movieName,
    },
    claimedBy: null,
  },
})

if (!availableSeat) {
  throw new Error(`Oh no! ${movieName} is all booked.`)
}

await prisma.seat.update({
  data: {
    claimedBy: userId,
  },
  where: {
    id: availableSeat.id,
  },
})

// Send kafka an event to do something
await producer.send({ topic: 'topic-name', messages: [] })

Đối với đoạn logic phía trên, khi scale Service W lên N-pods (workers) thì sẽ có tương đương được subscribes vào Redis, khi một redis key hết hạn thì N-pods (workers) sẽ nhận được same event ở cùng một thời điểm (cơ chế pub-sub) dẫn tới sẽ gửi cho Kafka N-messages giống nhau (duplicated messages)

2. Cách giải quyết

Theo bài toán ở trên sẽ có nhiều cách để xử lý, nhưng trong bài này tôi sẽ đề cập tới Optimistic concurrency control

Optimistic concurrency control là một mô hình xử lý các hoạt động đồng thời trên một thực thể duy nhất mà không dựa vào locking (bạn có thể tìm hiểu thêm về cơ chế này), hiểu đơn giản OCC sẽ sử dụng mã thông báo (trường version hoặc timestamp trong table) để phát hiện những thay đổi với bản ghi

Chúng ta cần thay đổi logic một chút để thoả điều kiện chỉ một worker được gửi message tới Kafka, tránh duplicated messages

const userEmail = '98savage@gmail.io'
const movieName = 'Bleach thousand year blood war'

const availableSeat = await client.seat.findFirst({
  where: {
    Movie: {
      name: movieName,
    },
    claimedBy: null,
  },
})

if (!availableSeat) {
  throw new Error(`Oh no! ${movieName} is all booked.`)
}

const seats = await client.seat.updateMany({
  data: {
    claimedBy: userEmail,
    version: {
      increment: 1,
    },
  },
  where: {
    id: availableSeat.id,
    version: 0,
  },
})

if (seats.count === 0) {
  throw new Error(`That seat is already booked! Please try again.`)
}

// Send kafka an event to do something
await producer.send({ topic: 'topic-name', messages: [] })

Đối với cách thay đổi này, khi N-pods (workers) nhận same event ở cùng một thời điểm, thì chỉ có một worker xử lý và gửi message vào kafka, còn các workers còn lại sẽ throw error bởi vì worker đảm nhiệm nó đã tăng version record từ 0 lên 1, các worker còn lại query version record = 0 không còn match đối với record đó nữa

Related links:

https://www.prisma.io/docs/orm/prisma-client/queries/transactions#optimistic-concurrency-control

https://redis.io/docs/latest/develop/use/keyspace/


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí