Java concurrency part 1 + part 2

Java platform cung cấp khả năng xử lý multi-threading ở mức low-level, cho phép các developer viết các chương trình chạy song song xử dụng các keywords: synchronized, volatile, wait(), notify() và notifyAll(). Tuy nhiên việc xử lý concurrency ở mức low-level này khó sử dụng khi, hoặc sử dụng không đúng cách khi gặp phải những vấn đề khó nhần trong xử lý multi-threading như: deadlock, thread stavation, race condition. Chính vì vậy mà bắt đầu từ Java5 trở đi, framework Concurrency(java.util.concurrent) được thiết kế để giải quyết những vấn đề trền, ở mức high-level hơn giúp các developer dễ dàng quản lý resource, thread và đồng bộ các tiến trình hơn.

Bài viết này sẽ đề cập đến một vài utility của framework để hy vọng có thể giúp chúng ta dễ tiếp cận và xử lý các ứng dụng chạy concurrency.

  • The Executor framework

Trong xử lý multi-thread, task là một đơn vị công việc. Một vấn đề khi sử dụng thư viện primative là sự phụ thuộc quá chặt chẽ giữa task và thread thực thi task đó. Lấy ví dụ:

import java.io.IOException;

import java.net.ServerSocket;
import java.net.Socket;

class Server
{
   public static void main(String[] args) throws IOException
   {
      ServerSocket socket = new ServerSocket(9000);
      while (true)
      {
         final Socket s = socket.accept();
         Runnable r = new Runnable()
                      {
                         @@Override
                         public void run()
                         {
                            doWork(s);
                         }
                      };
         new Thread(r).start();
      }
   }

   static void doWork(Socket s)
   {
   }
}

Ở đoạn code trên mỗi khi nhận một kết nối từ client đến server sẽ tạo một thread mới để xử lý tác vụ. Rõ ràng các xử lý này sẽ có vẫn đề khi có nhiều kết nối đến server, số lượng thread phải tạo mới sẽ tăng lên dẫn đến tràn nhớ hoặc số lượng thread được tạo ra vượt quá số lượng MAX_THREAD mà hệ điều hành cho phép. Do đó chương trình có thể bị chết ngay lập tức.

Để giải quyết vấn đề này thay vì mỗi task tạo một thread, chúng ta sẽ sử dụng một thread pool chứa một số lượng các thread nhất định, các thread này sẽ có thực thi các task nếu chúng ở trạng thái active. Đoạn code bên trên có thể được viết lại như sau:

import java.io.IOException;

import java.net.ServerSocket;
import java.net.Socket;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

class Server
{
   static Executor pool = Executors.newFixedThreadPool(5);

   public static void main(String[] args) throws IOException
   {
      ServerSocket socket = new ServerSocket(9000);
      while (true)
      {
         final Socket s = socket.accept();
         Runnable r = new Runnable()
                      {
                         @@Override
                         public void run()
                         {
                            doWork(s);
                         }
                      };
         pool.execute(r);
      }
   }

   static void doWork(Socket s)
   {
   }
}

Một ví dụ khác về việc sử dụng Executor để đọc nội dụng một webpage

public class ReadWebPage
{
   public static void main(final String[] args)
   {
      if (args.length != 1)
      {
         System.err.println("usage: java ReadWebPage url");
         return;
      }
      ExecutorService executor = Executors.newSingleThreadExecutor();
      Callable<List<String>> callable;
      callable = new Callable<List<String>>()
                 {
                    @@Override
                    public List<String> call()
                       throws IOException, MalformedURLException
                    {
                       List<String> lines = new ArrayList<>();
                       URL url = new URL(args[0]);
                       HttpURLConnection con;
                       con = (HttpURLConnection) url.openConnection();
                       InputStreamReader isr;
                       isr = new InputStreamReader(con.getInputStream());
                       BufferedReader br;
                       br = new BufferedReader(isr);
                       String line;
                       while ((line = br.readLine()) != null)
                          lines.add(line);
                       return lines;
                    }
                 };
      Future<List<String>> future = executor.submit(callable);
      try
      {
         List<String> lines = future.get(5, TimeUnit.SECONDS);
         for (String line: lines)
            System.out.println(line);
      }
      catch (ExecutionException ee)
      {
         System.err.println("Callable through exception: "+ee.getMessage());
      }
      catch (InterruptedException | TimeoutException eite)
      {
         System.err.println("URL not responding");
      }
      executor.shutdown();
    }
}

Ở đoạn code trên một single thread được taọ ra để thực hiện đọc webpage được define trong phương thức call() của interface Callable. Sau khi submit task, thử lấy kết quả sau 5s. Nếu webpage được đọc xong thì kết quả đươc trả về còn nếu sau 5s thì TimeoutException sẽ được ném ra.

  • Semaphore

Là một biến dùng để hạn chế truy nhập tới tài nguyên dùng chung trong môi trường multi-threading. Giá trị của nó được tăng lên khi tài nguyên bị chiếm và giảm đi khi tài nguyên được giải phóng bởi các Thread.

Có 2 loại biến semaphore:

  • Counting Semaphore: Nếu giá trị của chúng có thể tăng giảm 1 sau mỗi lần tài nguyên bị chiếm
  • Mutex: Nếu giá trị chỉ có thể là 0 và 1

Xem xét chương trình dưới đây:

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;

public class SemaphoreDemo
{
   public static void main(String[] args)
   {
      final Pool pool = new Pool();
      Runnable r = new Runnable()
                   {
                      @Override
                      public void run()
                      {
                         String name = Thread.currentThread().getName();
                         try
                         {
                            while (true)
                            {
                               String item;
                               System.out.printf("%s acquiring %s%n", name,
                                                 item = pool.getItem());
                               Thread.sleep(200+(int)(Math.random()*100));
                               System.out.printf("%s putting back %s%n",
name,
                                                 item);
                               pool.putItem(item);
                            }
                         }
                         catch (InterruptedException ie)
                         {
                            System.out.printf("%s interrupted%n", name);
                         }
                      }
                   };
      ExecutorService[] executors = new
ExecutorService[Pool.MAX_AVAILABLE+1];
      for (int i = 0; i < executors.length; i++)
      {
         executors[i] = Executors.newSingleThreadExecutor();
         executors[i].execute(r);
      }
   }
}

final class Pool
{
   public static final int MAX_AVAILABLE = 10;

   private Semaphore available = new Semaphore(MAX_AVAILABLE, true);
   private String[] items;
   private boolean[] used = new boolean[MAX_AVAILABLE];

   Pool()
   {
      items = new String[MAX_AVAILABLE];
      for (int i = 0; i < items.length; i++)
         items[i] = "ITEM"+i;
   }

   String getItem() throws InterruptedException
   {
      available.acquire();
      return getNextAvailableItem();
   }

   void putItem(String item)
   {
      if (markAsUnused(item))
         available.release();
   }

   private synchronized String getNextAvailableItem()
   {
      for (int i = 0; i < MAX_AVAILABLE; ++i)
      {
         if (!used[i])
         {
            used[i] = true;
            return items[i];
         }
      }
      return null; // not reached
   }

   private synchronized boolean markAsUnused(String item)
   {
      for (int i = 0; i < MAX_AVAILABLE; ++i)
      {
         if (item == items[i])
         {
            if (used[i])
            {
               used[i] = false;
               return true;
            }
            else
               return false;
         }
      }
      return false;
   }
}

Chương trình trên gồm 2 class là SemaphoreDemoPool. Class SemaphoreDemo tạo ra các Executor để truy nhập đến tài nguyên String items từ Pool thông qua các method getItem()putItem(). Để hạn chế số lượng max Thread MAX_AVAILABLE:= 10có thể đồng thời truy nhập đến các items trong class pool có khai báo một semaphore

private Semaphore available = new Semaphore(MAX_AVAILABLE, true);

Mỗi khi chiếm tài nguyên một item thì semaphore này sẽ tăng lên một bằng cách gọi method available.acquire() và giảm đi một khi giải phóng item này available.release(). Nếu số lượng Thread đang truy nhập tới tài nguyên items mà vượt quá 10 thì các thread khác phải chờ cho đến khi có một thread bất kỳ giải phóng 1 tài nguyên nào đó. Đây là kết quả sau khi chạy chương trình:

pool-1-thread-1 acquiring ITEM0
pool-9-thread-1 acquiring ITEM9
pool-7-thread-1 acquiring ITEM8
pool-5-thread-1 acquiring ITEM7
pool-3-thread-1 acquiring ITEM6
pool-10-thread-1 acquiring ITEM5
pool-8-thread-1 acquiring ITEM4
pool-6-thread-1 acquiring ITEM3
pool-4-thread-1 acquiring ITEM2
pool-2-thread-1 acquiring ITEM1
pool-5-thread-1 putting back ITEM7
pool-11-thread-1 acquiring ITEM7
pool-9-thread-1 putting back ITEM9
pool-5-thread-1 acquiring ITEM9
pool-7-thread-1 putting back ITEM8
pool-9-thread-1 acquiring ITEM8
pool-3-thread-1 putting back ITEM6
pool-7-thread-1 acquiring ITEM6

Như kết quả ở trên tổng cộng có 11 thread đang tranh giành 10 tài nguyên. Thread pool-11-thread-1 bị buộc phải chờ cho đến khi thread pool-5-thread-1 giải phóng ITEM7 thì nó mới có thể truyê nhập được vào các tài nguyên khác.

  • Barrier

Dùng để đồng bộ các thread chạy song song cùng chờ nhau tại một điểm nào đó trong chương trình. Nghe có vẻ hơi trừu tượng phải không? Bạn có thể hình dung nó như tấm Bare dùng để chặn các phương tiện giao thông, mỗi Thread giống như một phương tiện giao thông chạy song song trên đường. Các phương tiện này dù chạy nhanh hay chậm cũng sẽ phải dừng lại tại tấm Bare. Để sau đó bạn có thể thu vé hay kiểm tra ...

Java có implement Barrier bằng class java.lang.concurrent.CyclicBarrier. Barrier được gọi là Cyclic hay Tuần hoàn bởi chúng có thể tái sử dụng sau mỗi lần các waiting Thread được cho phép chạy qua. Dưới đây là Demo:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierDemo
{
   public static void main(String[] args)
   {
      Runnable action = new Runnable()
                        {
                           @Override
                           public void run()
                           {
                              String name =
Thread.currentThread().getName();
                              System.out.printf("Thread %s "+
                                                "executing barrier
action.%n",
                                                name);
                           }
                        };
      final CyclicBarrier barrier = new CyclicBarrier(3, action);
      Runnable task = new Runnable()
                      {
                         @Override
                         public void run()
                         {
                            String name = Thread.currentThread().getName();
                            System.out.printf("%s about to join game...%n",
                                              name);
                            try
                            {
                               barrier.await();
                            }
                            catch (BrokenBarrierException bbe)
                            {
                               System.out.println("barrier is broken");
                               return;
                            }
                            catch (InterruptedException ie)
                            {
                               System.out.println("thread interrupted");
                               return;
                            }
                            System.out.printf("%s has joined game%n", name);
                         }
                      };
      ExecutorService[] executors = new ExecutorService[]
                                    {
                                       Executors.newSingleThreadExecutor(),
                                       Executors.newSingleThreadExecutor(),
                                       Executors.newSingleThreadExecutor()
                                    };
      for (ExecutorService executor: executors)
      {
         executor.execute(task);
         executor.shutdown();
      }
   }
}

Chương trình trên trong có vẻ phức tạp nhưng tư tưởng khá đơn giản. Có 3 thread cùng thực thi 1 task song song, tuy nhiên tại thời điểm trong khối try barrier.await() 3 thread này phải chờ nhau, không thread nào được tiếp tục thực thi cho đến khi task action kết thúc. Barrier sẽ gọi action khi có 3 thread chạy đến điểm barrier.await().

Sau khi biên dịch và chạy trương chình bạn sẽ nhận được output như dưới đây:

pool-1-thread-1 about to join game...
pool-3-thread-1 about to join game...
pool-2-thread-1 about to join game...
Thread pool-2-thread-1 executing barrier action.
pool-2-thread-1 has joined game
pool-3-thread-1 has joined game
pool-1-thread-1 has joined game

Một điều lưu ý là Semaphore và Barrier là những thuật ngữ chung sử dụng trong lập trình song song nói chung không chỉ riêng cho Java hay bất cứ ngôn ngữ lập trình nào khác.