Tổng quan về stream API trong java 8

Stream API là một trong những feature chính của java 8 khi nó được giới thiệu, toàn bộ nằm trong package java.util.stream , gồm những API xử lý tuần tự các element cho collection. Dưới đây mình sẽ trình bày một vài ví dụ để demo cho việc làm thế nào để làm việc với Java 8 streams và làm thế nào để sử dụng các loại operation khác nhau của nó.

Stream hoạt động như thế nào?

A stream đại diện cho một collection được xử lý tuần tự và hỗ trợ rất nhiều loại operation để tính toán dựa trên những element của collection đó, ví dụ đơn giản:

List<String> myList =
    Arrays.asList("a", "aa", "bb", "b", "c");

myList
    .stream()
    .filter(s -> s.startsWith("b"))
    .map(String::toUpperCase)
    .sorted()
    .forEach(System.out::println);

// BB
// B

Một stream operation có thể ở giữa hoặc ở cuối. Nếu nó là operation ở giữa, nó sẽ return một stream, nhờ vậy mà ta có thể thực hiện tính toán trên nhiều element ngay trong lúc này mà không cần bất cứ một dấu chấm phẩy nào. Một operation cuối nghĩa là nó ko return một stream nữa hoặc thậm chí là void. Ví dụ ở trên thì filter() map() sorted chính là operation giữa, còn forEach() là một operation cuối. Hầu hết các operation của stream đều có thể dùng lamba expression nên bạn có thể yên tâm sử dụng nhé.

Các loại stream

Stream có thể được tạo từ nhiều nguồn khác nhau, nhưng hầu hết là collection, List và Set collection hỗ trợ 2 method để khởi tạo stream tuần tự là stream() và stream song song là parallelStream(). Một vài ví dụ về stream tuần tự như sau:

Arrays.asList("a", "aa", "aaa")
    .stream()
    .findFirst()
    .ifPresent(System.out::println);  // a

Nhưng nếu bạn muốn làm việc mới stream mà không cần tạo ra collection, có thể chọn cách sau:

Stream.of("a", "aa", "aaa")
    .findFirst()
    .ifPresent(System.out::println);  // a

Lưu ý: chỉ dùng Stream.of khi muốn tạo stream từ nguồn dữ liệu tham chiếu. Vậy giờ nếu mình muốn dùng stream với kiểu primitive thì sẽ như thế nào? Bạn có thể yên tâm vì stream đã support nhé:

IntStream.range(3, 7)
    .forEach(System.out::println);

// 3
// 4
// 5
// 6

Tương tự với LongStream hay DoubleStream nhé 😃

Một ví dụ về tạo primitive stream từ bunch data:

Arrays.stream(new int[] {1, 2, 3})
    .map(n -> 2 * n + 1)
    .average()
    .ifPresent(System.out::println);  // 5.0

Object stream sang primitive stream

Stream.of("a1", "c5", "b4")
    .map(s -> s.substring(1))
    .mapToInt(Integer::parseInt)
    .max()
    .ifPresent(System.out::println);  // 5

Primitive stream sang object stream IntStream.range(3, 6) .mapToObj(i -> "C" + i) .forEach(System.out::println);

// C3 // C4 // C5

Quá trình execute bên trong stream pipeline

Stream thực chất là một pipeline, nghĩa là các operation bên trong lambda express sẽ thực hiện tuần tự nhau. Tuy nhiên có một điểm đáng chú ý là nó là một loại lazy execution, nghĩa là khi chưa có operation cuối thì không có cái nào được execute cả. Ví dụ sau thì console sẽ không in ra gì cả, vì chưa có operation cuối cùng.

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    });

Reuse với stream

Java 8 stream không thể sử dụng lại.
Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

Đoạn code trên chắc chắn sẽ văng exceptionexception

Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
	at java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:449)
	at com.mediado.bookstore.dto.DownloadHistorySearchDTO.main(DownloadHistorySearchDTO.java:17)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Nếu muốn reuse với stream thì phải dùng tới Supplier, ví dụ sau :

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

Ngoài ra còn có các advance operation như reduce, flatMap, collect sẽ được giới thiệu ở part sau nhé 😃 Tiếp theo mình sẽ giới thiệu về phần stream song song

Stream song song

Stream có thể được execute song song để tăng performance khi iterator qua 1 collection lớn. Stream song song sử dụng ForkJoinPool.commonPool() để làm việc này, thread pool có thể lên đến 5 thread, tất nhiên điều này còn phụ thuộc vào CPU, JVM setting hay thậm chí là framework config. Ví dụ sau sẽ giúp ta hiểu rõ hơn về stream song song.

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

Kết quả như sau :

filter: a2 [ForkJoinPool.commonPool-worker-1]
filter: c1 [ForkJoinPool.commonPool-worker-3]
map: c1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-3]
filter: c2 [ForkJoinPool.commonPool-worker-3]
map: c2 [ForkJoinPool.commonPool-worker-3]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
filter: b1 [main]
map: b1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-2]
map: a1 [ForkJoinPool.commonPool-worker-2]
forEach: A1 [ForkJoinPool.commonPool-worker-2]
forEach: B1 [main]
map: a2 [ForkJoinPool.commonPool-worker-1]
forEach: A2 [ForkJoinPool.commonPool-worker-1]

Một điều nữa, stream song song có thể được convert ngược về lại stream tuần tự bằng cách dùng method sequential().

**Trên đây là tất cả những chia sẻ của mình về stream API của java 8, còn chần chừ gì nữa mà chưa thử. **