asynchronous trên Java 8 với CompletableFuture

I. Thế nào là Asynchronous Giả sử chúng ta có 3 task cần phải thực hiện. ví dụ: T1; T2 và T3 Cách đơn giản nhất chúng ta thực hiện chúng đó là T1 >> T2 >> T3 (Thực hiện tuần tự, làm T1 xong rồi làm đến T2, hết T2 rồi làm tiếp T3). Cách này gọi là synchronous execution (thực hiện đồng bộ)

Cách thứ hai đó là chúng ta thực hiện chúng đồng thời với nhau T1 >> T2 >> T3 >> multithreaded execution (thực hiện đa luồng)

Giờ đây trên java 8, chúng ta được cung cấp thêm một cách thức thứ 3 nữa để thực hiện việc này đó là asynchronous

T1(part 1) >> T2(part 1) >> T3(part 1) >> T1(part 2) >> T2(part 2) >> T3(part 2)

II. Thực hiện Asynchronous trên Java Ví dụ:

queryEngine.select("select user from User")
.forEach(user -> System.out.prinln(user)) ;

Chúng ta có thể sử dụng ExecutorService

Callable<String> task = () -> "select user from User" ;
Future<String> future = executorService.submit(task) ;
List<User> users = future.get() ; // blocking
users.forEach(System.out::println) ;

Nhược điểm của cách này là khi chúng ta lấy kết quả trả về, vẫn có đoạn code bị block lại trên main thread.

Giờ đây trên java 8, chúng ta có thể sử dụng CompletionStage / CompletableFuture (CompletionStage là interface, CompletableFuture là class implement java 8 cung cấp) để implement đoạn code trên như sau:

CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
    queryEngine.select("select user from User")
}, executor);

get data

completableFuture.thenRun(users -> {
    users.forEach(System.out::println) 
});

CompletionStage là một mô hình cho một nhiệm vụ:

  • thực hiện một hành động có thể trở lại một giá trị khi một hành động khác hoàn thành
  • có thể kích hoạt các nhiệm vụ khác Vì vậy CompletionStage là một phần của Chain (chuỗi)

Ưu điểm của CompletionStage là giúp ta dễ dàng kết hợp giữa các task vs nhau, kết hợp nhiều kiểu 1 - 1 (sau khi thực hiện T1, thực hiện T2)

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);

public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor);

public CompletionStage<Void> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);

2 - 1 (sau khi thực hiện T1, T2 thực hiện T3)

public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<U> other, BiFunction<T, U, V> function) ;

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<U> other, BiConsumer<T, U> action) ;

public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) ;

Một ví dụ khác cho việc phân tích một trang HTML và hiển thị link page có trên trang đó

CompletableFuture.supplyAsync(
    () -> readPage("http://whatever.com/")  // đọc source code từ trang html
)
.thenApply(Parser::getLinks) // sau đó lấy những link xuất hiện trên đó
.thenAcceptAsync(
    DisplayPanel::display, // thành công thì hiển thị trang
    SwingUtilities::invokeLater // chạy trên thread do SwingUtilities invoke vào main thread.
) ;

III. Unit Test

public class HomeController extends Controller {

public CompletionStage<Result> edit(Long id) {

   CompletionStage<Map<String, String>> companiesFuture = companyRepository.options();

   return computerRepository.lookup(id).thenCombineAsync(companiesFuture, (computerOptional, companies) -> {
       Computer c = computerOptional.get();
       Form<Computer> computerForm = formFactory.form(Computer.class).fill(c);
       return ok(views.html.computer.editForm.render(id, computerForm, companies));
   }, httpExecutionContext.current());
}

Method Test

ComputerRepository computerRepository = mock(ComputerRepository.class);

   @Override
   protected Application provideApplication() {
       return new GuiceApplicationBuilder().overrides(bind(ComputerRepository.class).toInstance(computerRepository)).build();
   }

   @Test
   public void edit() {
       final ComputerRepository computerRepository = app.injector().instanceOf(ComputerRepository.class);
       CompletionStage/<Optional<Computer>> computer = CompletableFuture.supplyAsync(()->{
           return Optional.of(new Computer());
       });
       when(computerRepository.lookup(21L)).thenReturn(computer);
       final CompletionStage<Optional<Computer>> stage = computerRepository.lookup(21L);

       Result result = route(app, controllers.routes.HomeController.edit(21L));
       assertThat(result.status()).isEqualTo(OK);
   }

}

IV. Tổng kết CompletionStage/CompletableFuture giúp chúng ta thực hiện việc lập trình bất đồng bộ một cách dễ dàng hơn, giờ đây chúng ta có thể coi các Task như một thành phần của chương trình. Có thể sử lý thứ tự thự hiện, điều kiện để thực hiện Task một cách linh hoạt, mềm dẻo hơn.