+8

[MSDP] - Event Carried State Transfer

Trong hướng dẫn này, chúng ta cùng tìm hiểu về Microservice Desin Pattern - Event Carried State Transfer (chuyển trạng thái theo sự kiện) để đạt được sự nhất quán về dữ liệu giữa các microservice.

Mẫu thiết kế chuyển trạng thái theo sự kiện (Event Carried State Transfer)

Trong kiến trúc nguyên khối truyền thống tất cả các mô-đun sẽ cùng nằm trong một ứng dụng, một cơ sở dữ liệu chung chứa tất cả các bảng cho tất cả các mô-đun. Khi chúng ta chuyển từ ứng dụng nguyên khối sang kiến trúc microservice, chúng ta cũng chia DB lớn thành nhiều DB nhỏ. Mỗi dịch vụ quản lý dữ liệu của riêng nó. Cơ sở dữ liệu và mô hình dữ liệu khác nhau mang lại nhiều lợi thế cho kiến trúc hệ thống phân tán. Tuy nhiên, khi chúng ta có nhiều nguồn dữ liệu, thách thức rõ ràng sẽ là làm thế nào để duy trì tính nhất quán của dữ liệu giữa tất cả các microservice khi một trong các dữ liệu bị sửa đổi. Ý tưởng đằng sau design pattern chuyển trạng thái theo sự kiện là khi một microservice thêm mới, sửa đổi hoặc xóa dữ liệu, nó sẽ tạo ra một sự kiện cùng với thông tin dữ liệu vừa thêm mới, sửa đổi hoặc bị xóa. Khi đó các microservice quan tâm sẽ subscrible sự kiện này và cập nhật bản sao dữ liệu của chính nó cho phù hợp.

Chúng ta hãy cùng xem xét một ứng dụng đơn giản như dưới đây. Một ứng dụng nguyên khối có các module như module người dùng, module sản phẩm và mô-đun đặt hàng.

Và DB có 3 table:

CREATE TABLE users(
   id serial PRIMARY KEY,
   firstname VARCHAR (50),
   lastname VARCHAR (50),
   email varchar(50)
);

CREATE TABLE product(
   id serial PRIMARY KEY,
   description VARCHAR (500),
   price numeric (10,2) NOT NULL,
   qty_available integer NOT NULL
);

CREATE TABLE purchase_order(
    id serial PRIMARY KEY,
    user_id integer references users (id),
    product_id integer references product (id),
    price numeric (10,2) NOT NULL
);

Khi chúng ta cần tìm tất cả các đơn đặt hàng của người dùng, ta có thể viết một truy vấn join như bên dưới để tìm xem người dùng nào mua mặt hàng nào và giá bao nhiêu:

select 
    u.firstname,
    u.lastname,
    p.description,
    po.price
from
    users u,
    product p,
    purchase_order po
where 
    u.id = po.user_id
and p.id = po.product_id
order by u.id;

Bây giờ, hãy giả sử rằng chúng ta chuyển sang kiến trúc microservice. Chúng ta có dịch vụ quản lý người dùng (user-service), dịch vụ quản lý sản phẩm (product-service ) và dịch vụ quản lý đơn đặt hàng (order-service). Mỗi dịch vụ có cơ sở dữ liệu riêng.

user-service

  • Microservice chịu trách nhiệm quản lý các chức năng ứng dụng liên quan đến người dùng
  • Kết nối với Cơ sở dữ liệu PostgreSQL có chứa table users

product-service

  • Microservice chịu trách nhiệm quản lý các chức năng ứng dụng liên quan đến sản phẩm
  • Kết nối với Cơ sở dữ liệu PostgreSQL có chứa table products

order-service

  • Kết nối với MongoDB và chứa tất cả các đơn đặt hàng của người dùng cùng với thông tin sản phẩm, giá cả, v.v.
  • MongoDB chứa một collection được gọi là buy_order có thông tin như sau.
[
   {
      "userId":1,
      "productId":1,
      "price":300.00
   },
   {
      "userId":2,
      "productId":1,
      "price":250.00
   },
   {
      "userId":2,
      "productId":2,
      "price":650.00
   },
   {
      "userId":3,
      "productId":3,
      "price":320.00
   }
]

Bây giờ trong trường hợp trên, khi chúng ta tìm kiếm thông tin đơn hàng của người dùng, chúng ta không thể viết một truy vấn join trên tất cả các nguồn dữ liệu khác nhau như chúng ta đã làm trước đó. Trước tiên, chúng ta cần gửi yêu cầu đến order-service. Khi nhận được request đến, dựa trên userIdproductId mà nó có, order-service cần gửi yêu cầu đến user-serviceproduct-service để lấy thông tin chi tiết về người dùng và sản phẩm, xử lý dữ liệu và hiển thị trên giao diện người dùng. Các cuộc gọi HTTP, độ trễ mạng,... chúng đều sẽ ảnh hưởng rất xấu đến hiệu suất của ứng dụng. Nó cũng tạo ra sự liên kết chặt chẽ giữa các microservices, điều này rất là không tốt. Và điều gì sẽ xảy ra khi user-service không khả dụng? Nó có thể sẽ kéo theo order-service không khả dụng theo vì phải chờ đợi phản hồi lâu, đây là điều mà chúng ta không mong muốn. Một giải pháp khả thi cho chúng ta rằng lưu thông tin người dùng và sản phẩm trong chính collection buy_order trong MongoDB như bên dưới, nhưng đây có thể là một lời khuyên tệ .

{
   "user":{
      "id":1,
      "firstname":"vins",
      "lastname":"guru",
      "email: "[email protected]" 
   },
   "product":{
      "id":1,
      "description":"ipad"
   },
   "price":300.00
}

Trong cách tiếp cận này, bản thân order-service sẽ có tất cả thông tin để chúng ta hiển thị dữ liệu trên giao diện người dùng. Nó không phụ thuộc vào các dịch vụ khác như user-serviceproduct-service để cung cấp thông tin chúng ta cần. Nó được liên kết lỏng lẻo hơn. Tuy nhiên cách tổ chức dữ liệu này cũng có những ưu và nhược điểm.

Ưu điểm

  • Không cần join dữ liệu từ nhiều bảng, tăng tốc độ cải thiện truy vấn
  • Ít bị ảnh hưởng bởi mạng khi thực hiện cuộc gọi HTTP
  • Hiệu suất được cải thiện
  • Các microservice ít phụ thuộc nhau hơn

Tại sao nó có vẻ là lời khuyên tệ là bởi vì, dữ liệu được lưu trữ dư thừa và điều gì sẽ xảy ra nếu người dùng thay đổi tên, email của họ, hoặc nếu mô tả sản phẩm được cập nhật thì order-service sẽ không có thông tin cập nhật mới nhất. Nó sẽ chứa dữ liệu cũ nếu thông tin người dùng hoặc sản phẩm được cập nhật.

Nhược điểm

  • Dữ liệu cũ (user-service hoặc product-service cập nhật thông tin người dùng, dịch vụ đặt hàng sẽ chứa dữ liệu cũ)
  • Dữ liệu dự phòng tăng nhanh (dung lượng bộ nhớ lưu trữ tăng nhanh)

Ngày nay dung lượng bộ nhớ lưu trữ thực sự không phải là vấn đề vì việc lưu trữ dữ liệu không còn đắt đỏ. Nhưng vẫn đề ở đây là sẽ xảy ra việc dữ liệu không đồng bộ. Tính nhất quán cuối cùng là sự đánh đổi cho hiệu suất và thiết kế có khả năng phục hồi. Hãy xem cách chúng ta có thể duy trì dữ liệu cập nhật trên tất cả các microservice sử dụng Kafka để tránh sự cố đã đề cập ở trên.

Sử dụng Kafka

Chúng ta sẽ cập nhật thông tin chi tiết của người dùng trong order-service bất cứ khi nào cập nhật thông tin người dùng trong user-service một cách không đồng bộ. Để làm được điều đó, chúng ta sẽ tạo một chủ đề (topic) có tên là user-service-event trong cụm Kafka.


User service

Entity

@Entity
public class Users {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String firstname;
    private String lastname;
    private String email;
    
   // getters and setters

}

Repository (data access layer)

@Repository
public interface UsersRepository extends JpaRepository<Users, Long> {
}

DTO

public class UserDto {
    private Long id;
    private String firstname;
    private String lastname;
    private String email;

    // getters & setters

}

Service

  • UserServiceImpl chịu trách nhiệm cập nhật thông tin người dùng và gửi thông tin cập nhật đến Kafka topic.
  • Việc cập nhật cơ sở dữ liệu và xuất bản sự kiện phải diễn ra trong một transaction duy nhất.
public interface UserService {
    Long createUser(UserDto userDto);
    void updateUser(UserDto userDto);
}

@Service
public class UserServiceImpl implements UserService {

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    @Autowired
    private UsersRepository usersRepository;

    @Autowired
    private KafkaTemplate<Long, String> kafkaTemplate;

    @Override
    public Long createUser(UserDto userDto) {
        Users user = new Users();
        user.setFirstname(userDto.getFirstname());
        user.setLastname(userDto.getLastname());
        user.setEmail(userDto.getEmail());
        return this.usersRepository.save(user).getId();
    }

    @Override
    @Transactional
    public void updateUser(UserDto userDto) {
        this.usersRepository.findById(userDto.getId())
                .ifPresent(user -> {
                    user.setFirstname(userDto.getFirstname());
                    user.setLastname(userDto.getLastname());
                    user.setEmail(userDto.getEmail());
                    this.raiseEvent(userDto);
                });
    }

    private void raiseEvent(UserDto dto){
        try{
            String value = OBJECT_MAPPER.writeValueAsString(dto);
            this.kafkaTemplate.sendDefault(dto.getId(), value);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

Controller

@RestController
@RequestMapping("/user-service")
public class UserController {

    @Autowired
    private UserService userService;

    @PostMapping("/create")
    public Long createUser(@RequestBody UserDto userDto){
        return this.userService.createUser(userDto);
    }

    @PutMapping("/update")
    public void updateUser(@RequestBody UserDto userDto){
        this.userService.updateUser(userDto);
    }

}

Bây giờ chúng ta sẽ có thể tạo mới, cập nhật nhật thông tin người dùng. Bất cứ khi nào thông tin người dùng được cập nhật sẽ gửi một sự kiện tới Kafka topic. Vì vậy các microservice khác quan tâm có thể đăng ký vào topic này để đồng bộ dữ liệu hoặc xử lý theo nghiệp vụ riêng.


Order service

Dịch vụ này sẽ đăng ký tới Kafka topic user-service-event và đóng vai trò là một Consumer.

Entity

@Document
public class PurchaseOrder {

    @Id
    private String id;
    private User user;
    private Product product;
    private double price;
   
    // Getters & Setters
}

public class Product {

    private long id;
    private String description;

    // Getters & Setters
}

public class User {

        private Long id;
        private String firstname;
        private String lastname;
        private String email;

        // Getters & Setters
}

Repository (data access layer)

@Repository
public interface PurchaseOrderRepository extends MongoRepository<PurchaseOrder, String> {

    @Query("{ 'user.id': ?0 }")
    List<PurchaseOrder> findByUserId(long userId);
    
}

Service

  • Lớp dịch vụ này chỉ đơn giản là lấy tất cả dữ liệu từ DB
  • Thực hiện tạo dữ liệu đơn hàng khi có đơn đặt hàng mới
  • Lớp dịch vụ này không chịu trách nhiệm cập nhật thông tin người dùng
public interface PurchaseOrderService {
    List<PurchaseOrder> getPurchaseOrders();
    void createPurchaseOrder(PurchaseOrder purchaseOrder);
}

@Service
public class PurchaseOrderServiceImpl implements PurchaseOrderService {

    @Autowired
    private PurchaseOrderRepository purchaseOrderRepository;

    @Override
    public List<PurchaseOrder> getPurchaseOrders() {
        return this.purchaseOrderRepository.findAll();
    }

    @Override
    public void createPurchaseOrder(PurchaseOrder purchaseOrder) {
        this.purchaseOrderRepository.save(purchaseOrder);
    }
    
}

UserSeviceHandler

  • Lớp này chịu trách nhiệm đăng ký một chủ đề Kafka
  • Bất cứ khi nào user-service gửi một sự kiện đến Kafka topic, class này sẽ handle sự kiện này và cập nhật thông tin người dùng.
public interface UserServiceEventHandler {
    void updateUser(User user);
}

@Service
public class UserServiceEventHandlerImpl implements UserServiceEventHandler {

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    @Autowired
    private PurchaseOrderRepository purchaseOrderRepository;

    @KafkaListener(topics = "user-service-event")
    public void consume(String userStr) {
        try{
            User user = OBJECT_MAPPER.readValue(userStr, User.class);
            this.updateUser(user);
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    @Override
    @Transactional
    public void updateUser(User user) {
        List<PurchaseOrder> userOrders = this.purchaseOrderRepository.findByUserId(user.getId());
        userOrders.forEach(p -> p.setUser(user));
        this.purchaseOrderRepository.saveAll(userOrders);
    }
}

Controller

@RestController
@RequestMapping("/order-service")
public class OrderController {

    @Autowired
    private PurchaseOrderService purchaseOrderService;

    @GetMapping("/all")
    public List<PurchaseOrder> getAllOrders(){
        return this.purchaseOrderService.getPurchaseOrders();
    }
    
    @PostMapping("/create")
    public void createOrder(@RequestBody PurchaseOrder purchaseOrder){
        this.purchaseOrderService.createPurchaseOrder(purchaseOrder);
    }

}

Thực hiện test

  • Giả sử call GET request để lấy thông tin đơn đặt hàng tới order-service được kết quả là.
[
    {
        "id": "5dcfb1056637311008e17f80",
        "user": {
            "id": 1,
            "firstname": "Obama",
            "lastname": "Barack",
            "email": "[email protected]"
        },
        "product": {
            "id": 1,
            "description": "ipad"
        },
        "price": 300
    }
]
  • Thực hiện call PUT request, cập nhật thông tin người dùng tới user-service:
{
    "id": 1,
    "firstname":"Trump",
    "lastname": "Donald",
    "email": "[email protected]"
}
  • Bây giờ call GET request để lấy thông tin đơn đặt hàng tới order-service được kết quả mới (thông tin người dùng đã được cập nhật) là:
[
    {
        "id": "5dcfb1056637311008e17f80",
        "user": {
            "id": 1,
            "firstname": "Trump",
            "lastname": "Donald",
            "email": "[email protected]"
        },
        "product": {
            "id": 1,
            "description": "ipad"
        },
        "price": 300
    }
]

Tổng kết

Theo cách tiếp cận cũ làm cho các microservice phải liên kết chặt chẽ (tất cả các microservice phải hoạt động cùng nhau). Theo cách tiếp cận mới, chúng ta có thể duy trì tính nhất quán của dữ liệu trên tất cả các microservice bằng Kafka. Cách tiếp cận này tránh được nhiều cuộc gọi HTTP và sự cố ngẽn mạng không cần thiết giữa các microservices, cải thiện hiệu suất của các microservice và làm cho các microservice được ghép nối với nhau ít phụ thuộc nhất. Ví dụ: order-service có thể không hoạt động khi thông tin người dùng được cập nhật trên user-service, thông tin này sẽ vẫn nằm trong topic không bị mất và order-service có thể lấy được thông tin này khi nó khả dụng.


All Rights Reserved

Viblo
Let's register a Viblo Account to get more interesting posts.