+1

Streams - Phía sau bức màn sân khấu

Hai bài viết đầu tiên trong loạt bài này khám phá cách sử dụng thư viện java.util.stream được thêm vào trong Java SE 8, thư viện này giúp dễ dàng diễn đạt truy vấn trên tập dữ liệu theo cách khai báo. Trong nhiều trường hợp, thư viện tìm ra cách thực hiện các truy vấn một cách hiệu quả mà không cần sự trợ giúp của người sử dụng. Nhưng khi performance là rất quan trọng, thì việc hiểu cách thức hoạt động của thư viện trong nội bộ là rất có giá trị, để chúng ta có thể loại bỏ các nguyên nhân có thể gây ra sự kém hiệu quả. Bài viết này khám phá cách triển khai Streams hoạt động và giải thích một số tối ưu hóa mà phương pháp khai báo có thể thực hiện được.

Stream pipelines

Một stream pipeline bao gồm một stream source, không hoặc nhiều intermediate operations và một terminal operation.

Stream sources có thể là collections, arrays, generator functions hoặc bất kỳ nguồn dữ liệu nào khác có thể cung cấp quyền truy cập phù hợp vào các phần tử của nó.
Intermediate operations chuyển đổi stream thành stream khác — bằng cách filtering các phần tử (filter()), biến đổi các phần tử (map()), sắp xếp các phần tử (sorted()), cắt bớt stream thành một kích thước nhất định (limit()) và nhiều thứ khác.
Terminal operations bao gồm tập hợp (reduce(),collect()), tìm kiếm (findFirst()) và lặp lại (forEach()).

Stream pipelines được xây dựng theo cách lazy. Xây dựng stream source không tính toán các phần tử của luồng mà thay vào đó nắm bắt cách tìm các phần tử khi cần thiết. Tương tự, gọi một intermediate operation không thực hiện bất kỳ tính toán nào trên các phần tử; nó chỉ thêm một operation khác vào cuối stream description. Chỉ khi terminal operation được gọi thì pipeline mới thực sự thực hiện công việc — tính toán các phần tử, áp dụng các intermediate operations và terminal operation. Cách tiếp cận này để thực hiện làm cho một số tối ưu hóa thú vị có thể thực hiện được.

Stream sources

Một stream source được mô tả bằng một trừu tượng có tên là Spliterator. Như tên gọi của nó, Spliterator kết hợp hai hành vi: truy cập các phần tử của source (lặp lại) và có thể phân tách source đầu vào để thực thi song song (tách).

Mặc dù Spliterator bao gồm các hành vi cơ bản giống như Iterator, nhưng nó không extend Iterator, thay vào đó sử dụng một cách tiếp cận khác để truy cập phần tử. Một Iterator có hai phương thức, hasNext() và next(); truy cập phần tử tiếp theo có thể liên quan (nhưng không yêu cầu) gọi cả hai phương thức này. Kết quả là, mã hóa một Iterator chính xác yêu cầu một lượng code defensive và duplicative nhất định. (Nếu chúng ta không gọi hasNext() trước next() thì sao? Nếu nó gọi hasNext() hai lần thì sao?). Ngoài ra, giao thức two-method thường yêu cầu một trạng thái hợp lý, chẳng hạn như xem trước một phần tử (và theo dõi xem chúng ta đã xem trước chưa). Cùng với nhau, các yêu cầu này bổ sung đáng kể chi phí để truy cập trên mỗi phần tử.

Có lambdas trong ngôn ngữ cho phép Spliterator thực hiện một cách tiếp cận để truy cập phần tử thường hiệu quả hơn — và dễ viết mã chính xác hơn. Spliterator có hai phương thức để truy cập các phần tử:

boolean tryAdvance(Consumer<? super T> action);
void forEachRemaining(Consumer<? super T> action);

Phương thức tryAdvance() cố gắng xử lý một phần tử. Nếu không còn phần tử nào, tryAdvance() chỉ trả về false; mặt khác, nó tiến con trỏ và chuyển phần tử hiện tại tới trình xử lý được cung cấp và trả về true. Phương thức forEachRemaining() xử lý tất cả các phần tử còn lại, chuyển từng phần tử một cho trình xử lý được cung cấp.

Ngay cả khi bỏ qua khả năng phân tách song song, sự trừu tượng hóa của Spliterator đã là một "iterator tốt hơn" — viết đơn giản hơn, sử dụng đơn giản hơn và thường có chi phí truy cập trên mỗi phần tử thấp hơn. Nhưng sự trừu tượng hóa của Spliterator cũng mở rộng để phân tách song song. Spliterator mô tả một chuỗi các phần tử còn lại; gọi phương thức truy cập phần tử tryAdvance() hoặc forEachRemaining() tiến qua trình tự đó. Để tách source, sao cho hai luồng có thể hoạt động riêng biệt trên các phần khác nhau của đầu vào, Spliterator cung cấp phương thức trySplit():

Spliterator<T> trySplit();

Hành vi của trySplit() là cố gắng chia các phần tử còn lại thành hai phần, lý tưởng là có kích thước tương tự nhau. Nếu Spliterator có thể được chia, trySplit() sẽ cắt một phần ban đầu của các phần tử được mô tả thành một Spliterator mới, phần này được trả về và điều chỉnh trạng thái của nó để mô tả các phần tử theo sau phần được chia. Nếu không thể tách nguồn, trySplit() trả về null, cho biết rằng việc tách không thể thực hiện được và người gọi sẽ tiến hành tuần tự. Đối với các nguồn có thứ tự là quan trọng (ví dụ: arrays, List hoặc SortedSet), trySplit() phải duy trì thứ tự này; nó phải tách phần ban đầu của các phần tử còn lại thành Spliterator mới và Spliterator hiện tại phải mô tả các phần tử còn lại theo thứ tự nhất quán với thứ tự ban đầu.

Tất cả các triển khai Collection trong JDK đều được trang bị các triển khai Spliterator chất lượng cao. Một số nguồn thừa nhận việc triển khai tốt hơn các nguồn khác: một ArrayList có nhiều hơn một phần tử luôn có thể được phân chia rõ ràng và đồng đều; một LinkedList luôn phân chia kém; và các Collection dựa trên hash và dựa trên tree nói chung có thể được phân chia hợp lý.

Về chi tiết và cụ thể hơn về interface Spliterator trong Java, các bạn có thể tham khảo ở đây

Xây dựng một stream pipeline

Một stream pipeline được xây dựng bằng cách xây dựng biểu diễn linked-list của stream source và các intermediate operations của nó. Trong xử lý bên trong, mỗi giai đoạn của pipeline được mô tả bằng một bitmap gồm các stream flags mô tả những gì đã biết về các phần tử ở giai đoạn này của stream pipeline. Streams sử dụng các cờ này để tối ưu hóa cả quá trình xây dựng và thực thi stream. Bảng 1 cho thấy các stream flags và diễn giải của chúng.

Stream flag Diễn giải
SIZED Kích thước của Stream là được biết.
DISTINCT Các phần tử của Stream là khác biệt, theo Object.equals() đối với object streams hoặc theo == đối với primitive streams.
SORTED Các phần tử của Stream được sắp xếp theo thứ tự tự nhiên.
ORDERED Stream có Encounter order(thứ tự cuộc gặp gỡ) có ý nghĩa (xem phần " Encounter order").

Các stream flags cho giai đoạn source được lấy từ characteristics bitmap của spliterator (spliterator hỗ trợ một tập hợp các cờ lớn hơn so với các streams). Việc triển khai spliterator chất lượng cao không chỉ cung cấp khả năng truy cập và tách phần tử hiệu quả mà còn mô tả các đặc điểm của phần tử. (Ví dụ: spliterator cho HashSet báo cáo đặc tính DISTINCT, vì các phần tử của Set được biết là khác biệt.)

Trong một số trường hợp, Streams có thể sử dụng kiến thức về source và các operations trước đó để loại bỏ hoàn toàn một operations.

Mỗi intermediate operation có tác dụng đã biết đối với các stream flags; một operation có thể đặt, xóa hoặc duy trì cài đặt cho từng cờ. Ví dụ: filter() operation giữ lại các cờ SORTEDDISTINCT nhưng xóa cờ SIZED; map() operation xóa các cờ SORTEDDISTINCT nhưng giữ nguyên cờ SIZED; và sorted() operation bảo toàn các cờ SIZEDDISTINCT đồng thời thêm cờ SORTED. Khi biểu diễn linked-list của các giai đoạn được xây dựng, các cờ cho giai đoạn trước được kết hợp với hành vi của giai đoạn hiện tại để tạo ra một bộ cờ mới cho giai đoạn hiện tại.

Trong một số trường hợp, các cờ cho phép hoàn toàn bỏ qua một thao tác, như trong đường dẫn luồng trong Ví dụ 1.

Ví dụ 1. Stream pipeline trong đó các operations có thể được tự động loại bỏ

TreeSet<String> ts = ...
String[] sortedAWords = ts.stream()
                          .filter(s ‑> s.startsWith("a"))
                          .sorted()
                          .toArray();

Các stream flags cho giai đoạn source bao gồm SORTED, vì source là một TreeSet. Phương thức filter() duy trì cờ SORTED, vì vậy stream flags cho giai đoạn filtering cũng bao gồm cờ SORTED. Thông thường, kết quả của phương thức sorted() sẽ là xây dựng một giai đoạn quy trình mới, thêm nó vào cuối quy trình và trả về giai đoạn mới. Thông thường, kết quả của phương thức sorted() sẽ là xây dựng một giai đoạn pipeline mới, thêm nó vào cuối pipeline và trả về giai đoạn mới. Tuy nhiên, vì biết rằng các phần tử đã được sắp xếp theo thứ tự tự nhiên, nên phương thức sorted() không hoạt động — nó chỉ trả về giai đoạn trước đó (giai đoạn filtering), vì việc sắp xếp sẽ là dư thừa. (Tương tự, nếu các phần tử được biết là DISTINCT, thì phép toán distinct() có thể bị loại bỏ hoàn toàn.)

Thực thi một stream pipeline

Khi terminal operation được bắt đầu, việc triển khai stream sẽ chọn một kế hoạch thực thi. Intermediate operations được chia thành các hoạt động stateless (filter(), map(), flatMap()) và stateful (sorted(), limit(), distinct()). stateless operation là hoạt động có thể được thực hiện trên một phần tử mà không cần biết về bất kỳ phần tử nào khác. Ví dụ: thao tác filtering chỉ cần kiểm tra phần tử hiện tại để xác định xem nên bao gồm hay loại bỏ nó, nhưng thao tác sorting phải xem tất cả các phần tử trước khi biết phần tử nào sẽ đứng trước.

Nếu pipeline đang thực thi tuần tự hoặc đang thực thi song song nhưng bao gồm tất cả các stateless operations, thì nó có thể được tính trong a single pass(một lần truyền dữ liệu). Mặt khác, pipeline được chia thành các phần (tại các ranh giới stateful operation) và được tính toán trong nhiều lần.

Terminal operations là short-circuiting (allMatch(), findFirst()) hoặc non–short-circuiting (reduce(), collect(), forEach()). Nếuterminal operation là non–short-circuiting, thì dữ liệu có thể được xử lý hàng loạt (sử dụng phương thức forEachRemaining() của spliterator, giúp giảm thêm chi phí truy cập từng phần tử); nếu nó là short-circuiting, nó phải được xử lý từng phần tử một (sử dụng tryAdvance()).

Để thực thi tuần tự, Streams xây dựng một "machine" — một chuỗi các đối tượng Consumer có cấu trúc khớp với cấu trúc của pipeline. Mỗi đối tượng Consumer này biết về giai đoạn tiếp theo; khi nó nhận được một phần tử (hoặc được thông báo rằng không còn phần tử nào nữa), nó sẽ gửi không hoặc nhiều phần tử đến giai đoạn tiếp theo trong chuỗi. Ví dụ: Consumer được liên kết với giai đoạn filter() áp dụng predicate filter cho phần tử đầu vào và gửi hoặc không gửi nó sang giai đoạn tiếp theo; Consumer được liên kết với giai đoạn map() áp dụng chức năng ánh xạ cho phần tử đầu vào và gửi kết quả đến giai đoạn tiếp theo. Consumer được liên kết với một stateful operation, chẳng hạn như các phần tử bộ đệm được sorted() cho đến khi nó nhìn thấy phần cuối của đầu vào, sau đó nó sẽ gửi dữ liệu đã sắp xếp đến giai đoạn tiếp theo. Giai đoạn cuối cùng trong machine thực hiện terminal operation. Nếu operation này tạo ra kết quả, chẳng hạn như reduce() hoặc toArray(), thì giai đoạn này đóng vai trò là accumulator(bộ tích lũy) cho kết quả.

Hình 1 hiển thị hình ảnh động của "stream machine" cho stream pipeline phía sau. (Trong Hình 1, các khối màu vàng, lục và lam đi vào giai đoạn đầu tiên của máy tính từ trên xuống theo trình tự. Trong giai đoạn đầu tiên, mỗi khối được nén thành một khối nhỏ hơn và sau đó rơi vào giai đoạn thứ hai. Ở đó, một nhân vật giống Pacman nuốt chửng từng khối màu vàng, chỉ để các khối màu xanh lục và xanh lam rơi vào màn thứ ba. Các khối màu xanh lam và xanh lục được nén lần lượt hiển thị trên màn hình máy tính.)

blocks.stream()
      .map(block ‑> block.squash())
      .filter(block ‑> block.getColor() != YELLOW)
      .forEach(block ‑> block.display());

image.png

Thực thi song song thực hiện một việc tương tự, ngoại trừ việc thay vì tạo một máy duy nhất, mỗi worker thread sẽ nhận được bản sao của machine riêng và cung cấp phần dữ liệu của nó cho nó, sau đó kết quả của từng máy trên mỗi thread được hợp nhất với kết quả của các máy khác để tạo ra kết quả cuối cùng.

Việc thực thi các stream pipelines cũng có thể được tối ưu hóa thông qua việc sử dụng các stream flags. Ví dụ: cờ SIZED chỉ ra rằng kích thước của kết quả cuối cùng đã được biết. Thao tác đầu cuối toArray() có thể sử dụng cờ này để phân bổ trước mảng có kích thước chính xác; nếu cờ SIZED không xuất hiện, nó sẽ phải đoán kích thước mảng và có thể sao chép dữ liệu nếu đoán sai.

Việc tối ưu hóa định cỡ trước thậm chí còn hiệu quả hơn trong việc thực thi parallel stream. Ngoài cờ SIZED, một đặc tính khác của spliterator, SUBSIZED, có nghĩa là không chỉ biết kích thước mà nếu spliterator được chia, thì split sizes(kích thước chia) cũng sẽ được biết. (Điều này đúng với mảng và ArrayList, nhưng không nhất thiết đúng với các nguồn có thể chia nhỏ khác, chẳng hạn như tree.) Nếu có đặc tính SUBSIZED, trong một lần thực thi song song, thao tác toArray() có thể phân bổ một mảng có kích thước chính xác duy nhất cho toàn bộ kết quả và các luồng riêng lẻ (mỗi luồng làm việc trên một phần riêng biệt của đầu vào) có thể ghi trực tiếp kết quả của chúng vào đúng phần của mảng — mà không cần đồng bộ hóa hoặc sao chép. (Trong trường hợp không có cờ SUBSIZED, mỗi phần được thu thập vào một mảng trung gian và sau đó được sao chép vào vị trí cuối cùng.)

Encounter order(Thứ tự gặp gỡ)

Một cân nhắc tinh tế khác ảnh hưởng đến khả năng tối ưu hóa của library's là encounter order(thứ tự gặp gỡ). Thứ tự gặp gỡ đề cập đến việc liệu thứ tự mà nguồn phân phối các phần tử có quan trọng đối với tính toán hay không. Một số sources (chẳng hạn như set và map dựa trên hashing) không có thứ tự gặp phải có ý nghĩa. Stream flag, ORDERED, mô tả liệu luồng có thứ tự gặp gỡ có ý nghĩa hay không. Spliterators của JDK collections đặt cờ này dựa trên đặc điểm kỹ thuật của collection; một số intermediate operations có thể thêm ORDERED (sorted()) hoặc xóa nó (unordered()).

Nếu stream có thứ tự gặp gỡ, thì hầu hết các hoạt động của stream phải tôn trọng thứ tự đó. Đối với các lần thực thi tuần tự, việc duy trì thứ tự gặp phải về cơ bản là miễn phí, vì các phần tử được xử lý tự nhiên theo thứ tự mà chúng gặp phải. Ngay cả khi thực hiện song song, đối với nhiều operations (stateless intermediate operations và một số terminal operations nhất định như reduce()), việc tuân thủ thứ tự gặp gỡ sẽ không áp đặt bất kỳ chi phí thực tế nào. Nhưng đối với những operations (stateful intermediate operations và có ngữ nghĩa được gắn với thứ tự gặp phải, chẳng hạn như findFirst() hoặc forEachOrdered()), nghĩa vụ tôn trọng thứ tự gặp phải trong thực thi song song có thể rất quan trọng. Nếu luồng có thứ tự gặp gỡ đã xác định, nhưng thứ tự đó không quan trọng đối với kết quả, thì có thể tăng tốc độ thực thi song song của các pipelines chứa các operations nhạy cảm với thứ tự bằng cách xóa cờ ORDERED bằng unordered() operation.

Ví dụ về operations nhạy cảm với thứ tự gặp phải, hãy xem xét limit(), operation này cắt bớt luồng ở kích thước đã chỉ định. Việc triển khai limit() trong một lần thực thi tuần tự là chuyện nhỏ: Giữ bộ đếm xem có bao nhiêu phần tử đã được nhìn thấy và loại bỏ bất kỳ phần tử nào sau đó. Nhưng trong một thực thi song song, việc triển khai limit() phức tạp hơn nhiều; chúng ta phải giữ N phần tử đầu tiên. Yêu cầu này hạn chế rất nhiều khả năng khai thác xử lý song song; nếu đầu vào được chia thành các phần, chúng ta sẽ không biết liệu kết quả của một phần có được đưa vào kết quả cuối cùng hay không cho đến khi tất cả các phần trước phần đó được hoàn thành. Do đó, việc triển khai thường có lựa chọn tồi là không sử dụng tất cả các core có sẵn hoặc sử dụng bộ đệm toàn bộ kết quả dự kiến cho đến khi chúng ta đạt được độ dài mục tiêu.

Nếu stream không có thứ tự gặp gỡ, limit() operation được tự do chọn bất kỳ phần tử N nào, điều này cho phép thực thi hiệu quả hơn nhiều. Các phần tử có thể được gửi xuôi dòng ngay khi chúng được biết đến mà không cần bất kỳ bộ đệm nào và sự phối hợp duy nhất cần thiết giữa các luồng là một semaphore(đèn hiệu) để đảm bảo rằng độ dài luồng mục tiêu không bị vượt quá.

Một ví dụ khác, tinh tế hơn về chi phí của thứ tự gặp gỡ là sorting. Nếu thứ tự gặp phải là quan trọng, thao tác sorted() thực thi một stable sort (các phần tử bằng nhau xuất hiện theo thứ tự giống nhau ở đầu ra giống như ở đầu vào), trong khi đối với luồng không có thứ tự, tính ổn định — có chi phí — là không yêu cầu. Một câu chuyện tương tự tồn tại đối với distinct(): Nếu stream có thứ tự gặp gỡ, thì đối với nhiều phần tử đầu vào bằng nhau, thì distinct() phải đưa ra phần tử đầu tiên trong số chúng, trong khi đối với stream không có thứ tự, nó có thể đưa ra bất kỳ phần tử nào trong số chúng — điều này một lần nữa thừa nhận một thực hiện song song hiệu quả hơn nhiều.

Một tình huống tương tự phát sinh khi chúng ta tổng hợp với collect(). Nếu chúng ta thực hiện thao tác collect(groupingBy()) trên stream đã sắp xếp, thì các phần tử tương ứng với bất kỳ khóa nào phải được trình bày cho downstream collector theo thứ tự mà chúng xuất hiện trong đầu vào. Thông thường, thứ tự này không quan trọng đối với ứng dụng và bất kỳ thứ tự nào cũng được. Trong những trường hợp này, có thể nên chọn một concurrent collector (chẳng hạn như groupingByConcurrent()), được phép bỏ qua thứ tự gặp phải và để tất cả các luồng thu thập trực tiếp vào cấu trúc dữ liệu đồng thời được chia sẻ (chẳng hạn như ConcurrentHashMap) thay vì có từng luồng thu thập vào map trung gian của riêng mình, sau đó hợp nhất các map trung gian (có thể tốn kém).

Creating streams

Thật dễ dàng để điều chỉnh các cấu trúc dữ liệu hiện có để phân phối các streams.

Mặc dù nhiều lớp trong JDK đã được trang bị thêm để phục vụ như stream sources, nhưng cũng dễ dàng điều chỉnh cấu trúc dữ liệu hiện có để phân phối streams. Để tạo luồng từ một nguồn dữ liệu tùy ý, chúng ta cần tạo Spliterator cho các phần tử của stream và chuyển spliterator cho StreamSupport.stream(), cùng với một cờ boolean cho biết stream kết quả là tuần tự hay song song.

Việc triển khai Spliterator có thể khác nhau đáng kể về chất lượng, tạo ra sự đánh đổi giữa nỗ lực triển khai và hiệu suất của các stream pipelines sử dụng spliterators làm nguồn. Spliterator interface có một số phương thức về cơ bản là tùy chọn, chẳng hạn như trySplit(). Nếu chúng ta không muốn thực hiện chia tách, chúng ta luôn có thể trả về null từ trySplit()— nhưng điều này có nghĩa là các luồng sử dụng Spliterator tách này làm nguồn sẽ không thể khai thác cơ chế song song để tăng tốc độ tính toán.

Các cân nhắc ảnh hưởng đến chất lượng của Spliterator bao gồm:

  • Spliterator có báo cáo kích thước chính xác không?
  • Spliterator có thể chia nhỏ đầu vào không?
  • Nó có thể chia đầu vào thành các phần gần bằng nhau không?
  • Kích thước của các phần tách có thể dự đoán được không (được phản ánh thông qua các đặc tính SUBSIZED)?
  • Spliterator có báo cáo tất cả các đặc tính liên quan không?

Cách dễ nhất để tạo một Spliterator, nhưng dẫn đến kết quả có chất lượng kém nhất, là pass một Iterator cho Spliterators.spliteratorUnknownSize(). chúng ta có thể có được bộ tách tốt hơn một chút bằng cách pass Iterator và size cho Spliterators.spliterator. Nhưng nếu hiệu suất luồng là quan trọng — đặc biệt là hiệu suất song song — hãy triển khai giao diện Spliterator đầy đủ, bao gồm tất cả các đặc điểm có thể áp dụng. Các nguồn JDK cho các lớp tập hợp chẳng hạn như ArrayList, TreeSetHashMap cung cấp các ví dụ về bộ tách chất lượng cao mà chúng ta có thể mô phỏng cho cấu trúc dữ liệu của riêng mình.

Kết luận cho Phần 2

Mặc dù hiệu suất của Streams bề ngoài nói chung là tốt (đôi khi còn tốt hơn cả mã mệnh lệnh tương ứng kiểu truyền thống), nhưng việc nắm vững cách thức hoạt động của Streams ẩn sâu bên trong sẽ cho phép chúng ta sử dụng thư viện với hiệu quả tối đa và tạo bộ điều hợp tùy chỉnh cho phép lấy một stream từ bất kỳ nguồn dữ liệu nào. Hai loạt phần tiếp theo khám phá sâu về parallelism.


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí