Chuyển tới nội dung chính

Stream API cơ bản

Mục tiêu bài học

Sau bài này, bạn sẽ:

  • Hiểu được Stream là gì và khác biệt với Collection
  • Nắm được các cách tạo Stream (từ Collection, Array, range, generate, iterate)
  • Hiểu được Stream Pipeline: Source, Intermediate, Terminal operations
  • Nắm được Lazy Evaluation và lợi ích của nó
  • Biết được Short-Circuit operations và cách hoạt động
Hình dung Stream

Hãy tưởng tượng Stream như băng chuyền trong nhà máy: nguyên liệu (dữ liệu) đi qua từng trạm xử lý (filter, map, sort...), mỗi trạm làm một việc cụ thể. Nguyên liệu đi qua rồi thì không quay lại — giống như stream chỉ dùng được một lần. Và băng chuyền chỉ bắt đầu chạy khi có người đứng ở cuối nhận hàng (terminal operation).

Bài trước: Functional Interfaces — Đã học các functional interfaces như Predicate, Function, Consumer. Bài này sẽ giới thiệu Stream API để xử lý collections theo functional style.

Stream là gì?

Stream là một chuỗi (sequence) các phần tử hỗ trợ các operations tuần tự (sequential) và song song (parallel) để xử lý dữ liệu theo functional style.

Stream được giới thiệu từ Java 8 như một phần của functional programming support.

import java.util.*;
import java.util.stream.*;

public class StreamIntro {
public static void main(String[] args) {
List<String> names = Arrays.asList("An", "Bình", "Cường", "Dung");

// ❌ Traditional approach
List<String> result = new ArrayList<>();
for (String name : names) {
if (name.length() > 2) {
result.add(name.toUpperCase());
}
}
Collections.sort(result);

// ✅ Stream approach
List<String> streamResult = names.stream()
.filter(name -> name.length() > 2) // Lọc
.map(String::toUpperCase) // Transform
.sorted() // Sắp xếp
.collect(Collectors.toList()); // Thu thập kết quả
}
}
Khái niệm cốt lõi

Stream KHÔNG lưu trữ dữ liệu. Nó chỉ là một view hoặc pipeline để xử lý dữ liệu từ một source (như Collection, array, I/O channel).

Stream vs Collection

So sánh chi tiết

Khía cạnhCollectionStream
Lưu trữ dữ liệuLưu trữ tất cả phần tửKhông lưu trữ, chỉ process
Tính toánEager (tức thời)Lazy (trì hoãn)
IterationExternal (for loop)Internal (hidden)
Sử dụng lạiIterate nhiều lầnOne-time use (dùng 1 lần)
ModificationCó thể modify phần tửImmutable (không modify source)
ParallelizationPhải tự implementDễ dàng (.parallel())
PurposeStore dataProcess/transform data
Hình dung sự khác biệt
  • Collection giống như kho hàng — hàng hóa được lưu trữ, bạn vào lấy bất cứ lúc nào, bao nhiêu lần cũng được
  • Stream giống như ống nước — nước chảy qua một lần rồi hết, bạn không thể "quay lại" lấy nước đã chảy qua

Hoặc nghĩ theo DVD vs YouTube:

  • Collection = DVD: bạn sở hữu toàn bộ phim, tua đi tua lại thoải mái
  • Stream = Livestream: xem theo dòng chảy, không tua lại được

Ví dụ so sánh

import java.util.*;
import java.util.stream.*;

public class CollectionVsStream {
public static void main(String[] args) {
// === Collection ===
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

// ✅ Có thể iterate nhiều lần
for (int n : numbers) {
System.out.println(n);
}
for (int n : numbers) { // OK - iterate lại
System.out.println(n);
}

// ✅ Có thể modify
numbers.add(6); // Nếu là mutable list

// === Stream ===
Stream<Integer> stream = numbers.stream();

// ✅ Iterate lần 1
stream.forEach(System.out::println);

// ❌ Iterate lần 2 → IllegalStateException
// stream.forEach(System.out::println); // Error: stream has already been operated upon

// ✅ Phải tạo stream mới
numbers.stream().forEach(System.out::println); // OK

// === Lazy Evaluation ===
System.out.println("=== Collection (Eager) ===");
List<Integer> collectionResult = new ArrayList<>();
for (int n : numbers) {
System.out.println("Processing: " + n);
if (n % 2 == 0) {
collectionResult.add(n * n);
}
}

System.out.println("\n=== Stream (Lazy) ===");
Stream<Integer> lazyStream = numbers.stream()
.peek(n -> System.out.println("Processing: " + n))
.filter(n -> n % 2 == 0)
.map(n -> n * n);

System.out.println("Stream created, but not executed yet!");
// Chưa in gì cả - vì chưa có terminal operation

System.out.println("\nNow executing terminal operation:");
lazyStream.forEach(System.out::println); // Bây giờ mới execute
}
}

Output:

=== Collection (Eager) ===
Processing: 1
Processing: 2
Processing: 3
Processing: 4
Processing: 5

=== Stream (Lazy) ===
Stream created, but not executed yet!

Now executing terminal operation:
Processing: 1
Processing: 2
4
Processing: 3
Processing: 4
16
Processing: 5

Tạo Stream

1. Từ Collection

import java.util.*;
import java.util.stream.*;

public class StreamFromCollection {
public static void main(String[] args) {
// List
List<String> list = Arrays.asList("A", "B", "C");
Stream<String> streamFromList = list.stream();

// Set
Set<Integer> set = new HashSet<>(Arrays.asList(1, 2, 3));
Stream<Integer> streamFromSet = set.stream();

// Map (stream entries)
Map<String, Integer> map = new HashMap<>();
map.put("A", 1);
map.put("B", 2);

Stream<Map.Entry<String, Integer>> entryStream = map.entrySet().stream();
Stream<String> keyStream = map.keySet().stream();
Stream<Integer> valueStream = map.values().stream();
}
}

2. Từ Array

import java.util.Arrays;
import java.util.stream.*;

public class StreamFromArray {
public static void main(String[] args) {
// String array
String[] strArray = {"Java", "Python", "C++"};
Stream<String> streamFromArray = Arrays.stream(strArray);

// Primitive array
int[] intArray = {1, 2, 3, 4, 5};
IntStream intStream = Arrays.stream(intArray);

// Range trong array
Stream<String> partialStream = Arrays.stream(strArray, 0, 2); // Java, Python
}
}

3. Dùng Stream.of()

import java.util.stream.*;

public class StreamOf {
public static void main(String[] args) {
// Varargs
Stream<String> stream1 = Stream.of("A", "B", "C");

// Single element
Stream<String> stream2 = Stream.of("Single");

// Empty stream
Stream<String> emptyStream = Stream.empty();

// Nullable stream (Java 9+)
String value = null;
Stream<String> nullableStream = Stream.ofNullable(value); // Empty stream
}
}

4. Dùng Stream.generate()

import java.util.stream.*;

public class StreamGenerate {
public static void main(String[] args) {
// Random numbers
Stream<Double> randomNumbers = Stream.generate(Math::random);
randomNumbers.limit(5).forEach(System.out::println);

// Constant value
Stream<String> constants = Stream.generate(() -> "Constant");
constants.limit(3).forEach(System.out::println); // Constant, Constant, Constant

// UUID generator
Stream<String> uuids = Stream.generate(() -> java.util.UUID.randomUUID().toString());
uuids.limit(3).forEach(System.out::println);
}
}
Infinite Stream

Stream.generate() tạo infinite stream (stream vô hạn). Phải dùng limit() để giới hạn, nếu không sẽ chạy mãi.

5. Dùng Stream.iterate()

import java.util.stream.*;

public class StreamIterate {
public static void main(String[] args) {
// Số chẵn: 0, 2, 4, 6, 8
Stream<Integer> evenNumbers = Stream.iterate(0, n -> n + 2);
evenNumbers.limit(5).forEach(System.out::println);

// Fibonacci: 0, 1, 1, 2, 3, 5, 8, ...
Stream.iterate(new int[]{0, 1}, f -> new int[]{f[1], f[0] + f[1]})
.limit(10)
.map(f -> f[0])
.forEach(System.out::println);

// Java 9+: iterate với predicate
Stream.iterate(1, n -> n <= 10, n -> n + 1) // Giống for (int n=1; n<=10; n++)
.forEach(System.out::println);
}
}

6. Primitive Streams

import java.util.stream.*;

public class PrimitiveStreams {
public static void main(String[] args) {
// IntStream
IntStream intStream1 = IntStream.range(1, 5); // 1, 2, 3, 4 (exclusive end)
IntStream intStream2 = IntStream.rangeClosed(1, 5); // 1, 2, 3, 4, 5 (inclusive)

// LongStream
LongStream longStream = LongStream.range(1L, 1_000_000L);

// DoubleStream
DoubleStream doubleStream = DoubleStream.of(1.1, 2.2, 3.3);

// IntStream methods
int sum = IntStream.rangeClosed(1, 100).sum();
double average = IntStream.rangeClosed(1, 100).average().orElse(0.0);
int max = IntStream.of(3, 1, 4, 1, 5, 9).max().orElse(0);

System.out.println("Sum 1-100: " + sum); // 5050
System.out.println("Average: " + average); // 50.5
System.out.println("Max: " + max); // 9
}
}

Bảng tổng hợp cách tạo Stream

Cách tạoSyntaxUse Case
Collectioncollection.stream()Từ List, Set, Map
ArrayArrays.stream(array)Từ mảng
Stream.of()Stream.of(phần tử)Từ varargs/phần tử rời
Stream.generate()Stream.generate(supplier)Infinite stream với generator
Stream.iterate()Stream.iterate(seed, unaryOp)Infinite stream với iteration
IntStream.range()IntStream.range(start, end)Range số nguyên (exclusive)
IntStream.rangeClosed()IntStream.rangeClosed(start, end)Range số nguyên (inclusive)
Stream.empty()Stream.empty()Empty stream
Stream.ofNullable()Stream.ofNullable(value)Stream từ nullable value
Stream.concat()Stream.concat(stream1, stream2)Nối 2 streams thành 1
Stream.builder()Stream.builder().add(e).build()Xây dựng stream bằng builder pattern
// Stream.concat() — nối 2 streams
Stream<String> s1 = Stream.of("A", "B");
Stream<String> s2 = Stream.of("C", "D");
Stream<String> combined = Stream.concat(s1, s2); // A, B, C, D

// Stream.builder() — xây dựng stream linh hoạt
Stream<String> built = Stream.<String>builder()
.add("X")
.add("Y")
.add("Z")
.build(); // X, Y, Z

Stream Pipeline

Stream pipeline bao gồm 3 phần:

Source → Intermediate Operations → Terminal Operation

1. Source (Nguồn)

Nơi dữ liệu đến từ: Collection, array, generator, etc.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);  // Source

2. Intermediate Operations (Phép toán trung gian)

Các operations transform stream thành stream khác. Chúng là lazy (không execute ngay).

Đặc điểm:

  • Lazy evaluation: Chỉ execute khi có terminal operation
  • Return Stream: Trả về Stream mới, cho phép chain
  • Immutable: Không modify source
// Intermediate operations (chưa execute)
Stream<Integer> processed = numbers.stream()
.filter(n -> n > 2) // Intermediate
.map(n -> n * n); // Intermediate

// Vẫn chưa có gì xảy ra!

Các intermediate operations phổ biến:

  • filter(), map(), flatMap()
  • distinct(), sorted(), limit(), skip()
  • peek() (để debug)

3. Terminal Operation (Phép toán kết thúc)

Operation kích hoạt stream pipeline và tạo kết quả. Stream bị "consumed" sau terminal operation.

Đặc điểm:

  • Eager evaluation: Execute ngay lập tức
  • Close stream: Stream không dùng lại được sau đó
  • Produce result: Trả về kết quả cuối cùng (value, collection, side-effect)
// Terminal operation → execute toàn bộ pipeline
List<Integer> result = numbers.stream()
.filter(n -> n > 2)
.map(n -> n * n)
.collect(Collectors.toList()); // Terminal operation

// Stream đã bị consumed, không dùng lại được

Các terminal operations phổ biến:

  • collect(), forEach(), reduce()
  • count(), min(), max()
  • anyMatch(), allMatch(), noneMatch()
  • findFirst(), findAny()

Ví dụ đầy đủ Pipeline

import java.util.*;
import java.util.stream.*;

public class StreamPipelineExample {
public static void main(String[] args) {
List<String> words = Arrays.asList(
"Java", "Python", "JavaScript", "C++", "Go", "Rust"
);

List<String> result = words.stream() // Source
.filter(w -> w.length() > 3) // Intermediate: lọc
.map(String::toUpperCase) // Intermediate: transform
.sorted() // Intermediate: sắp xếp
.limit(3) // Intermediate: giới hạn
.collect(Collectors.toList()); // Terminal: thu thập

System.out.println(result); // [JAVA, JAVASCRIPT, PYTHON]
}
}

Lazy Evaluation (Đánh giá trì hoãn)

Lazy evaluation là một tính năng quan trọng của Stream: intermediate operations không execute cho đến khi có terminal operation.

Ví dụ minh họa

import java.util.*;
import java.util.stream.*;

public class LazyEvaluationDemo {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

System.out.println("=== Creating Stream Pipeline ===");
Stream<Integer> stream = numbers.stream()
.filter(n -> {
System.out.println("filter: " + n);
return n % 2 == 0;
})
.map(n -> {
System.out.println("map: " + n);
return n * n;
});

System.out.println("Pipeline created, but nothing executed yet!");
System.out.println();

System.out.println("=== Executing Terminal Operation ===");
List<Integer> result = stream.collect(Collectors.toList());

System.out.println("\nResult: " + result);
}
}

Output:

=== Creating Stream Pipeline ===
Pipeline created, but nothing executed yet!

=== Executing Terminal Operation ===
filter: 1
filter: 2
map: 2
filter: 3
filter: 4
map: 4
filter: 5
filter: 6
map: 6
filter: 7
filter: 8
map: 8

Result: [4, 16, 36, 64]
Lợi ích của Lazy Evaluation
  1. Performance: Không xử lý dữ liệu không cần thiết
  2. Short-circuiting: Có thể dừng sớm (ví dụ: findFirst())
  3. Infinite streams: Có thể làm việc với infinite streams
  4. Optimization: Compiler có thể optimize pipeline

Vertical Processing (Xử lý dọc)

Stream xử lý từng phần tử qua toàn bộ pipeline (vertical), không phải xử lý hết tất cả phần tử qua 1 operation rồi mới sang operation tiếp theo (horizontal).

List<Integer> numbers = Arrays.asList(1, 2, 3, 4);

// Stream: Vertical processing
numbers.stream()
.filter(n -> { System.out.println("Filter: " + n); return n % 2 == 0; })
.map(n -> { System.out.println("Map: " + n); return n * n; })
.forEach(n -> System.out.println("Result: " + n));

// Output:
// Filter: 1
// Filter: 2
// Map: 2 ← Element 2 đi qua toàn bộ pipeline
// Result: 4
// Filter: 3
// Filter: 4
// Map: 4 ← Element 4 đi qua toàn bộ pipeline
// Result: 16

Minh họa vertical processing

Mỗi phần tử đi qua toàn bộ pipeline trước khi phần tử tiếp theo bắt đầu. Đây gọi là xử lý dọc (vertical processing), khác với xử lý ngang (horizontal) khi tất cả phần tử qua filter xong mới chuyển sang map.

Bên trong hoạt động ra sao: Spliterator

Stream không trực tiếp duyệt phần tử bằng Iterator truyền thống. Thay vào đó, nó sử dụng Spliterator — một iterator đặc biệt có khả năng chia tách (split) dữ liệu cho xử lý song song.

Spliterator vs Iterator

Đặc điểmIteratorSpliterator
DuyệthasNext() + next()tryAdvance() hoặc forEachRemaining()
Chia táchKhông hỗ trợtrySplit() — chia thành 2 phần
Song songKhôngĐược thiết kế cho parallel
Kích thướcKhông biếtestimateSize() ước lượng số phần tử
Đặc tínhKhông cócharacteristics(): ORDERED, SORTED, SIZED, DISTINCT...
// Mỗi Collection có Spliterator riêng
List<String> list = List.of("A", "B", "C", "D");
Spliterator<String> spliterator = list.spliterator();

System.out.println(spliterator.estimateSize()); // 4
System.out.println(spliterator.characteristics()); // ORDERED | SIZED | SUBSIZED

// trySplit chia thành 2 phần
Spliterator<String> firstHalf = spliterator.trySplit();
// firstHalf: A, B
// spliterator (remaining): C, D

Khi bạn gọi .parallelStream(), JVM dùng trySplit() để chia dữ liệu thành các phần nhỏ, giao cho các thread khác nhau xử lý song song. Đây là lý do ArrayList (hỗ trợ random access → dễ chia) có parallel performance tốt hơn LinkedList (phải duyệt tuần tự để chia).

Short-Circuit Operations

Short-circuit operations có thể dừng sớm mà không cần xử lý hết tất cả phần tử.

Intermediate Short-Circuit: limit(), skip()

import java.util.stream.*;

public class ShortCircuitIntermediate {
public static void main(String[] args) {
// limit: chỉ lấy n phần tử đầu
Stream.iterate(1, n -> n + 1) // Infinite stream: 1, 2, 3, ...
.limit(5) // Dừng sau 5 phần tử
.forEach(System.out::println); // 1, 2, 3, 4, 5

// skip: bỏ qua n phần tử đầu
IntStream.rangeClosed(1, 10)
.skip(5) // Bỏ qua 5 phần tử đầu
.forEach(System.out::println); // 6, 7, 8, 9, 10
}
}

Terminal Short-Circuit: findFirst(), findAny(), anyMatch(), allMatch(), noneMatch()

import java.util.*;
import java.util.stream.*;

public class ShortCircuitTerminal {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

// findFirst: tìm element đầu tiên thỏa điều kiện
Optional<Integer> first = numbers.stream()
.filter(n -> {
System.out.println("Checking: " + n);
return n > 5;
})
.findFirst();
System.out.println("First > 5: " + first.orElse(-1));
// Output: Checking: 1, 2, 3, 4, 5, 6 → First > 5: 6 (dừng ngay khi tìm thấy)

System.out.println();

// anyMatch: có ít nhất 1 element thỏa điều kiện?
boolean hasEven = numbers.stream()
.peek(n -> System.out.println("Checking: " + n))
.anyMatch(n -> n % 2 == 0);
System.out.println("Has even? " + hasEven);
// Output: Checking: 1, 2 → Has even? true (dừng ngay khi tìm thấy)
}
}
Short-Circuit = Performance

Short-circuit operations giúp tăng performance đáng kể, đặc biệt với large datasets hoặc infinite streams.

Ví dụ tổng hợp

import java.util.*;
import java.util.stream.*;

class Product {
private String name;
private double price;
private String category;

public Product(String name, double price, String category) {
this.name = name;
this.price = price;
this.category = category;
}

public String getName() { return name; }
public double getPrice() { return price; }
public String getCategory() { return category; }

@Override
public String toString() {
return String.format("%s (%.0f VND, %s)", name, price, category);
}
}

public class StreamComprehensiveExample {
public static void main(String[] args) {
List<Product> products = Arrays.asList(
new Product("Laptop", 20000000, "Electronics"),
new Product("Phone", 15000000, "Electronics"),
new Product("Desk", 3000000, "Furniture"),
new Product("Chair", 1500000, "Furniture"),
new Product("Monitor", 5000000, "Electronics"),
new Product("Keyboard", 800000, "Electronics")
);

// Bài toán: Tìm 3 sản phẩm Electronics có giá cao nhất
System.out.println("=== Top 3 Most Expensive Electronics ===");
products.stream() // Source
.filter(p -> p.getCategory().equals("Electronics")) // Lọc category
.sorted((p1, p2) -> Double.compare(p2.getPrice(), p1.getPrice())) // Sắp xếp giảm dần
.limit(3) // Lấy 3 đầu
.forEach(System.out::println); // In ra

// Tính tổng giá trị của tất cả sản phẩm
double totalValue = products.stream()
.mapToDouble(Product::getPrice)
.sum();
System.out.println("\nTotal value: " + totalValue + " VND");

// Tìm sản phẩm rẻ nhất
Optional<Product> cheapest = products.stream()
.min(Comparator.comparingDouble(Product::getPrice));
cheapest.ifPresent(p -> System.out.println("Cheapest: " + p));

// Group by category
Map<String, List<Product>> byCategory = products.stream()
.collect(Collectors.groupingBy(Product::getCategory));
System.out.println("\nGrouped by category:");
byCategory.forEach((cat, prods) -> {
System.out.println(cat + ": " + prods.size() + " products");
});
}
}
Bẫy phổ biến: Stream.of() với mảng nguyên thủy
int[] intArray = {1, 2, 3, 4, 5};

// ❌ SAI — tạo Stream<int[]> chứa 1 phần tử (cả mảng)
Stream<int[]> wrong = Stream.of(intArray);
System.out.println(wrong.count()); // 1 (không phải 5!)

// ✅ ĐÚNG — dùng Arrays.stream() hoặc IntStream.of()
IntStream correct1 = Arrays.stream(intArray); // 5 phần tử
IntStream correct2 = IntStream.of(1, 2, 3, 4, 5); // 5 phần tử

Lý do: Stream.of(T... values) coi int[]một object duy nhất (vì int[] không phải Integer[]). Chỉ String[] hoặc Integer[] mới hoạt động đúng với Stream.of().

OCP Exam Tips
  • Stream chỉ dùng một lần: gọi terminal operation lần 2 → IllegalStateException
  • peek() có thể không chạy nếu pipeline không có terminal operation — vì lazy evaluation
  • Stream.of(array) với int array cho Stream<int[]> (1 phần tử), KHÔNG phải Stream<Integer> — phải dùng Arrays.stream(intArray) hoặc IntStream.of()
  • Stream.generate()Stream.iterate() tạo infinite stream — quên limit() sẽ chạy mãi
  • findFirst() trên ordered parallel stream vẫn trả về phần tử đầu tiên — nhưng chậm hơn findAny()

Tóm tắt

  • Stream là chuỗi các phần tử + operations, không lưu trữ dữ liệu
  • Stream vs Collection: Lazy vs Eager, One-time vs Reusable, Internal vs External iteration
  • Tạo Stream: từ Collection, Array, Stream.of(), generate(), iterate(), primitive ranges
  • Pipeline: Source → Intermediate Operations → Terminal Operation
  • Lazy evaluation: Intermediate operations chỉ execute khi có terminal operation
  • Short-circuit: Dừng sớm mà không cần xử lý hết phần tử

Bài tập

Bài 1: Tạo Stream

Tạo streams theo các cách khác nhau:

// 1. Stream các số lẻ từ 1 đến 20
Stream<Integer> oddNumbers = // TODO

// 2. Stream vô hạn các số chia hết cho 3: 0, 3, 6, 9, ...
Stream<Integer> multiplesOf3 = // TODO

// 3. Stream gồm 10 UUID ngẫu nhiên
Stream<String> randomUUIDs = // TODO

Bài 2: Stream Pipeline

Cho danh sách số nguyên. Viết stream pipeline để:

  1. Lọc các số chẵn
  2. Bình phương chúng
  3. Lấy 5 số đầu tiên
  4. Tính tổng
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
int result = // TODO

Bài 3: Lazy Evaluation

Thêm peek() để debug và quan sát lazy evaluation:

List<String> words = Arrays.asList("Java", "is", "awesome", "and", "powerful");

// TODO: Thêm peek() sau mỗi operation để quan sát execution order
words.stream()
.filter(w -> w.length() > 2)
.map(String::toUpperCase)
.limit(2)
.collect(Collectors.toList());

Bài tiếp theo: Stream Operations chi tiết →

Đọc thêm