Sử dụng Dispatch Semaphore Pause/Stop/Resume các task đang chạy

1. GCD là gì ?

Grand Central Dispatch(GCD) là 1 một low-level API được xây dựng bởi Apple, dùng để làm cái mà người ta thường gọi là đa nhiệm (multiasking), dùng để thực hiện các tác vụ ngoài main thread, giúp cải thiện khả năng response của app bằng cách chuyển các tác vụ nặng, tốn nhiều thời gian vào các threads, queues để main thread chỉ phục vụ việc hiển thị giao diện và tương tác của người dùng.

2. Bài toán đặt ra

Giả sử ta có n task cần xử lý, nhưng do việc xử lý từng task khá nặng tốn tài nguyên hệ thống, nên chúng ta sẽ chỉ giới hạn trong 1 thời điểm chỉ có 1 số task đc xử lý dồng thời. Nó giống như việc chúng ta đi mua vé xem phim, lượt người mua có thể lên đến cả trăm người nhưng chỉ duy nhất có 3 người đứng bán vé, người trước mua xong thì người sau mới đến lượt. Việc mua vé của từng người cũng có thể nhanh chậm tuỳ vào kiểu họ mua vé online hay trực tiếp, họ mua nhiều vé hay ít. Trong quá trình đang bán vé có sự cố nào đó xảy ra, có thể là mất điện, máy tính trục trặc hoặc tệ hơn là cháy chẳng hạn thì việc đầu tiên chúng ta phải làm có thể là bảo mọi người chờ đợi trong lúc xử lý sự cố máy tính, hoặc giải tán nếu có cháy xảy ra, hoặc tiếp tục bán vé sau khi khắc phục xong. Vậy làm thế nào chúng ta có thể thực hiện tương tự quá trình như vậy trong xử lý các task ?

3. Dispatch Semaphore

3.1 Semaphore hoạt động như thế nào ?

  • dispatch_semaphore_create : đầu tiên khi khởi tạo semaphore chúng ta sẽ phải khởi tạo số task mà có thể được xử lý cùng 1 lúc, giống như việc sắp xếp có bao nhiêu nhân viên bán vé.
  • dispatch_semaphore_signal : khi 1 task được xử lý xong nó sẽ báo lại cho semaphore biết để tiếp tục xử lý task sau, nó giống như khi người đã nhận được vé xem phim họ sẽ bước ra ngoài chỗ sếp hàng, người đằng sau thấy như vậy sẽ tự đi lên trên để đặt vé.
  • dispatch_semaphore_wait : việc có rất nhiều task mà chỉ 1 số task được xử lý cùng 1 lúc buộc các task sau phải đợi đến lượt, giống như 1 giải ruy băng chăng lên theo hàng ngang buộc người đặt vé phải đứng vào trong đấy đợi đến lượt mình.

3.2 Áp dụng semaphore vào giải quyết bài toán

Đầu tiên chúng ta sẽ tạo 1 class SemaphoreManager quản lý các task và việc luân chuyển việc xử lý các task.

class SemaphoreManager {
    let semaphore: dispatch_semaphore_t
    var isServicing = false
    var tasks = [Task]()
    var currentTaskIndex = 0
    var numberThread = 0
}

Trong đó:

  • isServicing : dùng để nhận biết khi nào SemaphoreManager đang thực thi các task hay là đã bị pause hoặc stop
  • tasks : số task cần thực thi
  • currentTaskIndex: số thứ tự task thực thi
  • numberThread: số thread xác định để thực thi task.

Ta tạo tiếp 1 struct là Task minh hoạ cho 1 tiến trình cần thực thi nào đó:

struct Task {
    var id = 0
    var time = 0
    
    func execute(serviceTask: CompletionHandler, completion: CompletionHandler) {
        let serialQueue = dispatch_queue_create("serial queue", DISPATCH_QUEUE_SERIAL)
        
        dispatch_async(serialQueue) { 
            sleep(UInt32(self.time))
            completion()
        }
        
        serviceTask()
    }
}

Task đơn giản chi có id để phân biệt nó với các task khác, time minh hoạ cho thời gian thực thi của task, và hàm execute thực thi của task, hàm execute sẽ có 2 closure:

  • serviceTask: dùng để chuyển đến task tiếp theo
  • completion: khi task hiện tại thực thi xong nó sẽ thông báo cho SemaphoreManager biết để chuyển sang task tiếp theo

SemaphoreManager sẽ dụa vào số luồng để thực thi các task nên chúng ta sẽ khởi tạo nó với số luồng thực thi task

init(numberThread: Int) {
    self.numberThread = numberThread
    self.semaphore = dispatch_semaphore_create(numberThread)
}

3.3 Các function trong SemaphoreManager

SemaphoreManager sẽ nhận vào các task để thực thi, chúng ta sẽ tạo hàm nhận vào các task:

func getTasks(tasks: [Task]) {
    self.isServicing = true
    self.tasks = tasks
}

Tiếp theo ta sẽ tạo 1 hàm thực thi các task trong list task đã nhận vào, để giới hạn số lượng task đc chạy ở cùng 1 thời điểm chúng ta sẽ bắt hàm startServicing phải đợi cho đến khi các task trong hàng đợi thực thi xong bằng lệnh dispatch_semaphore_wait

func startServicing() {
    dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER)
        
     if currentTaskIndex > tasks.count - 1 {
         if let delegate = self.delegate {
            delegate.didFinishService()
        }
    } else {
         dispatch_async(serialQueue, {
            self.tasks[self.currentTaskIndex].execute({ 
                self.nextTask()
            }, completion: { 
                if self.isServicing {
                    dispatch_semaphore_signal(self.semaphore)
                 }
            })
        })
    }
}

Hàm startServicing sẽ wait cho đến khi 1 task cũ được thực thi xong và trạng thái của SemaphoreManager sẽ không phải là pause hoặc stop Hàm nextTask sẽ chuyển thực thi task tiếp theo nếu như trạng thái của SemaphoreManager ko phải pause hoặc stop

 func nextTask() {
    currentTaskIndex += 1
        
    if self.isServicing {
        self.startServicing()
    }
}

Hàm pause sẽ chuyển trạng thái isServicing từ true về false

func pause() {
    isServicing = false
}

Hàm stop sẽ remove hết các task chưa được thực thi, chuyển giá trị của isServicing sang false và giải phóng các thread đang bị lock

func stop() {
    isServicing = false
        
    tasks.removeAll()
    currentTaskIndex = 0

    for _ in 0..<numberThread {
        dispatch_semaphore_signal(semaphore)
    }
}

Hàm resume sẽ chuyển isServicing sang giá trị true và giải phóng các thread đang bị lock

func resume() {
    isServicing = true

    for _ in 0..<numberThread {
        dispatch_semaphore_signal(semaphore)
    }
}

4. Test

Ta sẽ tiến hành chạy thử xem kết quả có được như mong muốn hay không.

Khi ấn nút start ta sẽ tạo ra 200 task với time chạy ngẫu nhiên từ 0-10 sau đó + 3 s để thời gian nhỏ nhất các task là 3s. Ta sẽ thấy rằng ngay sau khi ấn start sẽ chỉ có 3 task đầu tiên được thực thi

task: 0 time: 3 is executing.
task: 1 time: 4 is executing.
task: 2 time: 11 is executing.

lúc này hàm startServicing sẽ bị lock lại cho đến khi nào 1 trong 3 task trên thực thi xong. Nếu 1 task nào đó đã thực thi xong nó sẽ trả về hàm completion, tại đây nó sẽ check nếu không có trạng thái pause hoặc stop ( isServicing = false ) thì nó sẽ nhảy đến task tiếp theo trong danh sách task. Nếu ta bấm nút pause thì trạng thái isServicing sẽ chuyển sang false, lúc này các task thực thi xong sẽ không được chuyển tiếp đên task tiếp theo. Nếu ta bấm nút stop thì sẽ xoá bỏ hết các task còn chưa được thực thi và quay lại trạng thái ban đầu. Nếu ta bấm nút resume isServicing sẽ chuyển về true, đông thời dispatch_semaphore_signal(semaphore) sẽ giải phóng các thread đang bị lock và việc thực thi task lại diễn ra tuần tự.

5. Demo

https://github.com/pqhuy87it/MonthlyReport/tree/master/UsingDispatchSemaphore