+9

Sử dụng ThreadPoolExecutor trong android

Đã làm việc nhiều với Thread và cả sử dụng Thread Pool nhưng chưa thực sự tạo được một reusable code. Tình cờ đọc được một cách hiện thực ThreadPoolExecutor khá hay, mong muốn được chia sẻ cùng mọi người 😃

1. Thread Pools

  • Một thread pool đơn giản là một pool (bể chứa) bao gồm nhiều worker threads (Số lượng worker thread sẽ phụ thuộc vào cách mà thread pool được hiện thực).
  • Một Task Queue (hàng đợi nhiệm vụ) sẽ có nhiệm vụ chứa các tasks. Một khi có 1 Idle Thread (thread rãnh rỗi), nó sẽ vào trong Queue nhận task và thực thi chúng. Lúc này các Idle Thread khác sẽ phải chờ thread trước đó nhận xong task.

2. ThreadPoolExecutor

ThreadPoolExecutor thực thi nhiệm vụ được giao bởi một trong những thread từ thread pool.

2.1 Lợi ích của việc sử dụng ThreadPoolExecutor

  • Là một powerful task execution framework. ThreadPoolExecutor hỗ trợ việc: thêm task vào Queue (task addition), hủy task (task cancellation), và thực hiện task có độ ưu tiên (task prioritization).
  • Giảm thiểu chi phí trong việc tạo quá nhiều threadThreadPoolExecutor có cơ chế quản lý số lượng thread được tạo ra trong thread pool.

2.2 ThreadPoolExecutor trong android

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue
);
  • corePoolSize: Số lượng thread ít nhất chứa được giữ trong pool, kể cả khi các thread đều ở trạng thái Idle. Khi khởi tạo, số lượng thread có thể là 0. Khi task được thêm vào thì thread mới được tạo ra. Kể từ đây, nếu số lượng thread ít hơn corePoolSize thì những thread mới sẽ được tạo ra đến khi số thread bằng giá trị của corePoolSize.
  • maximumPoolSize: Số lượng thread nhiều nhất có thể chứa trong pool.
  • keepAliveTime: Khi số thread lớn hơn core, keepAliveTime là thời gian tối đa mà 1 Idle thread chờ task. Khi hết thời gian chờ mà chưa có task, thread này sẽ bị hủy.
  • unit: Đơn vị thời gian của keepAliveTime
  • workQueue: là hàng đợi dùng để chứa/giữ các Task trước khi chúng được thực thi. Hàng đợi này chỉ chứa Runnable task được đăng kí bởi phương thức (hàm) execute(). Trong android, hàng đợi này là một BlockingQueue.

3. Sử dụng ThreadPoolExecutor

Phần này sẽ trình bày cách hiện thực và sử dụng ThreadPoolExcutor một cách hiệu quả từ việc tạo, thực thi, hủy và thực thi có độ ưu tiên task.

3.1 Khởi tạo và thực thi

MainThreadExecutor

Class này đảm nhiệm việc tạo task trong Main Thread.

public class MainThreadExecutor implements Executor {
    private final Handler handler = new Handler(Looper.getMainLooper());
    @Override
    public void execute(Runnable runnable) {
        handler.post(runnable);
    }
}
PriorityThreadFactory

Class này sẽ đóng vai trò phân biệt Executor cho Main Thread tasks hay Background Thread task bằng phương thức Process.setThreadPriority(int priority). VD: Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND)

public class PriorityThreadFactory implements ThreadFactory {
    // priority of thread based on  Linux priorities
    // A Linux priority level, from -20 for highest scheduling priority to 19 for lowest scheduling priority.
    private final int mThreadPriority;
    public PriorityThreadFactory(int threadPriority) {
        mThreadPriority = threadPriority;
    }
    @Override
    public Thread newThread(final Runnable runnable) {
        Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {
                try {
                    Process.setThreadPriority(mThreadPriority);
                } catch (Throwable t) {
                }
                runnable.run();
            }
        };
        return new Thread(wrapperRunnable);
    }
}
DefaultExecutorSupplier

Singleton class giúp khởi tạo các ThreadPoolExecutor khác nhau cho những task khác nhau.

public class DefaultExecutorSupplier {
    // Number of cores to decide the number of threads
    public static final int NUMBER_OF_CORES = Runtime.getRuntime().availableProcessors();
    // thread pool executor for background tasks
    private final ThreadPoolExecutor mForBackgroundTasks;
    // thread pool executor for light weight background tasks
    private final ThreadPoolExecutor mForLightWeightBackgroundTasks;
    // thread pool executor for main thread tasks
    private final Executor mMainThreadExecutor;
    // an instance of DefaultExecutorSupplier
    private static DefaultExecutorSupplier sInstance;
    // returns the instance of DefaultExecutorSupplier
    public static DefaultExecutorSupplier getInstance() {
       if (sInstance == null) {
         synchronized(DefaultExecutorSupplier.class){
             sInstance = new DefaultExecutorSupplier();
        }
        return sInstance;
    }
    // constructor for  DefaultExecutorSupplier
    private DefaultExecutorSupplier() {
        // setting the thread factory
        ThreadFactory backgroundPriorityThreadFactory = new 
                PriorityThreadFactory(Process.THREAD_PRIORITY_BACKGROUND);
        // setting the thread pool executor for mForBackgroundTasks;
        mForBackgroundTasks = new ThreadPoolExecutor(
                NUMBER_OF_CORES * 2,
                NUMBER_OF_CORES * 2,
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(),
                backgroundPriorityThreadFactory
        );
        // setting the thread pool executor for mForLightWeightBackgroundTasks;
        mForLightWeightBackgroundTasks = new ThreadPoolExecutor(
                NUMBER_OF_CORES * 2,
                NUMBER_OF_CORES * 2,
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(),
                backgroundPriorityThreadFactory
        );
        // setting the thread pool executor for mMainThreadExecutor;
        mMainThreadExecutor = new MainThreadExecutor();
    }
    // returns the thread pool executor for background task
    public ThreadPoolExecutor forBackgroundTasks() {
        return mForBackgroundTasks;
    }

    // returns the thread pool executor for light weight background task
    public ThreadPoolExecutor forLightWeightBackgroundTasks() {
        return mForLightWeightBackgroundTasks;
    }
    // returns the thread pool executor for main thread task
    public Executor forMainThreadTasks() {
        return mMainThreadExecutor;
    }
}
Execute task

Sử dụng những ThreadPoolExecutor khác nhau để thực thi những loại task khác nhau:

// Using it for Background Tasks
public void doSomeBackgroundWork(){
  DefaultExecutorSupplier.getInstance().forBackgroundTasks()
    .execute(new Runnable() { 
    // Executes the given task sometime in the future.
    @Override
    public void run() {
       // do some background work here.
    }
  });
}

// Using it for Light-Weight Background Tasks
public void doSomeLightWeightBackgroundWork(){
  DefaultExecutorSupplier.getInstance().forLightWeightBackgroundTasks()
    .execute(new Runnable() {
    @Override
    public void run() {
       // do some light-weight background work here.
    }
  });
}

// Using it for MainThread Tasks
public void doSomeMainThreadWork(){
  DefaultExecutorSupplier.getInstance().forMainThreadTasks()
    .execute(new Runnable() {
    @Override
    public void run() {
       // do some Main Thread work here.
    }
  });
}

Với cách này chúng ta có sử dụng những Thread pool khác nhau cho những mục đích khác nhau: Network tasks, I/O tasks, heavy background tasks...

3.2 Cancel task

Thay vì dùng hàm execute, ta dùng hàm submit và sử dụng nó để hủy task. VD:

// Get the future of the task by submitting it to the pool
Future future = DefaultExecutorSupplier.getInstance().forBackgroundTasks()
    .submit(new Runnable() {
    @Override
    public void run() {
      // do some background work here.
    }
});
...
// Cancelling the task
future.cancel(true); 

3.3 Set độ ưu tiên cho task

Giả sử chúng ta có nhiều task, nhưng thread pool chỉ chứa hữu hạn số lượng các thread, vì vậy việc thực thi task theo độ ưu tiên có ý nghĩa rất lớn. VD: Bạn muốn query data ngay lập tức, muốn download dữ liệu nào đó trước khi save data...

Trước hết tạo một enum các độ ưu tiên mong muốn:
public enum Priority {
    /**
     * NOTE: DO NOT CHANGE ORDERING OF THOSE CONSTANTS UNDER ANY CIRCUMSTANCES.
     * Doing so will make ordering incorrect.
     */
    // Lowest priority level. Used for prefetches of data.
    LOW,
    // Medium priority level. Used for warming of data that might soon get visible.
    MEDIUM,
    // Highest priority level. Used for data that are currently visible on screen.
    HIGH,
    // Highest priority level. Used for data that are required instantly(mainly for emergency).
    IMMEDIATE;
}
Tiếp theo tạo một Runnable chứa độ ưu tiên
public class PriorityRunnable implements Runnable {
    private final Priority priority;
    public PriorityRunnable(Priority priority) {
        this.priority = priority;
    }
    @Override
    public void run() {
      // nothing to do here.
    }
    public Priority getPriority() {
        return priority;
    }
}
Tạo ThreadPoolExecutor có xử lý độ ưu tiên:

Lưu ý hàm compareTo(PriorityFutureTask other) dùng để so sánh độ ưu tiên của 2 task khác nhau.

public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
   public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
         TimeUnit unit, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,new PriorityBlockingQueue<Runnable>(), threadFactory);
    }
    @Override
    public Future<?> submit(Runnable task) {
        PriorityFutureTask futureTask = new PriorityFutureTask((PriorityRunnable) task);
        execute(futureTask);
        return futureTask;
    }
    private static final class PriorityFutureTask extends FutureTask<PriorityRunnable>
            implements Comparable<PriorityFutureTask> {
        private final PriorityRunnable priorityRunnable;
        public PriorityFutureTask(PriorityRunnable priorityRunnable) {
            super(priorityRunnable, null);
            this.priorityRunnable = priorityRunnable;
        }
        /*
         * compareTo() method is defined to compare 2 tasks with their priority.
         */
        @Override
        public int compareTo(PriorityFutureTask other) {
            Priority p1 = priorityRunnable.getPriority();
            Priority p2 = other.priorityRunnable.getPriority();
            return p2.ordinal() - p1.ordinal();
        }
    }
}
Thay đổi 1 chút trong DefaultExecutorSupplier (ở trên)

Thay vì dùng ThreadPoolExecutor, ta sẽ dùng PriorityThreadPoolExecutor

public class DefaultExecutorSupplier{
private final PriorityThreadPoolExecutor mForBackgroundTasks;
private DefaultExecutorSupplier() { 
        mForBackgroundTasks = new PriorityThreadPoolExecutor(
                NUMBER_OF_CORES * 2,
                NUMBER_OF_CORES * 2,
                60L,
                TimeUnit.SECONDS,
                backgroundPriorityThreadFactory
        );
     ...
    }
}
Cuối cùng thực thi một task với độ ưu tiên mà bạn mong muốn 😃
//do some task at high priority
public void doSomeTaskAtHighPriority(){
  DefaultExecutorSupplier.getInstance().forBackgroundTasks()
    .submit(new PriorityRunnable(Priority.HIGH) {
    @Override
    public void run() {
      // do some background work here at high priority.
    }
});
}

Reference:

https://blog.mindorks.com/threadpoolexecutor-in-android-8e9d22330ee3#.pjemmk4t7 https://developer.android.com/reference/java/util/concurrent/ThreadPoolExecutor.html https://github.com/amitshekhariitbhu/Fast-Android-Networking/tree/master/android-networking/src/main/java/com/androidnetworking/core


All Rights Reserved

Viblo
Let's register a Viblo Account to get more interesting posts.