Stream API cơ bản
Sau bài này, bạn sẽ:
- Hiểu được Stream là gì và khác biệt với Collection
- Nắm được các cách tạo Stream (từ Collection, Array, range, generate, iterate)
- Hiểu được Stream Pipeline: Source, Intermediate, Terminal operations
- Nắm được Lazy Evaluation và lợi ích của nó
- Biết được Short-Circuit operations và cách hoạt động
Hãy tưởng tượng Stream như băng chuyền trong nhà máy: nguyên liệu (dữ liệu) đi qua từng trạm xử lý (filter, map, sort...), mỗi trạm làm một việc cụ thể. Nguyên liệu đi qua rồi thì không quay lại — giống như stream chỉ dùng được một lần. Và băng chuyền chỉ bắt đầu chạy khi có người đứng ở cuối nhận hàng (terminal operation).
Bài trước: Functional Interfaces — Đã học các functional interfaces như Predicate, Function, Consumer. Bài này sẽ giới thiệu Stream API để xử lý collections theo functional style.
Stream là gì?
Stream là một chuỗi (sequence) các phần tử hỗ trợ các operations tuần tự (sequential) và song song (parallel) để xử lý dữ liệu theo functional style.
Stream được giới thiệu từ Java 8 như một phần của functional programming support.
import java.util.*;
import java.util.stream.*;
public class StreamIntro {
public static void main(String[] args) {
List<String> names = Arrays.asList("An", "Bình", "Cường", "Dung");
// ❌ Traditional approach
List<String> result = new ArrayList<>();
for (String name : names) {
if (name.length() > 2) {
result.add(name.toUpperCase());
}
}
Collections.sort(result);
// ✅ Stream approach
List<String> streamResult = names.stream()
.filter(name -> name.length() > 2) // Lọc
.map(String::toUpperCase) // Transform
.sorted() // Sắp xếp
.collect(Collectors.toList()); // Thu thập kết quả
}
}
Stream KHÔNG lưu trữ dữ liệu. Nó chỉ là một view hoặc pipeline để xử lý dữ liệu từ một source (như Collection, array, I/O channel).
Stream vs Collection
So sánh chi tiết
| Khía cạnh | Collection | Stream |
|---|---|---|
| Lưu trữ dữ liệu | Lưu trữ tất cả phần tử | Không lưu trữ, chỉ process |
| Tính toán | Eager (tức thời) | Lazy (trì hoãn) |
| Iteration | External (for loop) | Internal (hidden) |
| Sử dụng lại | Iterate nhiều lần | One-time use (dùng 1 lần) |
| Modification | Có thể modify phần tử | Immutable (không modify source) |
| Parallelization | Phải tự implement | Dễ dàng (.parallel()) |
| Purpose | Store data | Process/transform data |
- Collection giống như kho hàng — hàng hóa được lưu trữ, bạn vào lấy bất cứ lúc nào, bao nhiêu lần cũng được
- Stream giống như ống nước — nước chảy qua một lần rồi hết, bạn không thể "quay lại" lấy nước đã chảy qua
Hoặc nghĩ theo DVD vs YouTube:
- Collection = DVD: bạn sở hữu toàn bộ phim, tua đi tua lại thoải mái
- Stream = Livestream: xem theo dòng chảy, không tua lại được
Ví dụ so sánh
import java.util.*;
import java.util.stream.*;
public class CollectionVsStream {
public static void main(String[] args) {
// === Collection ===
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// ✅ Có thể iterate nhiều lần
for (int n : numbers) {
System.out.println(n);
}
for (int n : numbers) { // OK - iterate lại
System.out.println(n);
}
// ✅ Có thể modify
numbers.add(6); // Nếu là mutable list
// === Stream ===
Stream<Integer> stream = numbers.stream();
// ✅ Iterate lần 1
stream.forEach(System.out::println);
// ❌ Iterate lần 2 → IllegalStateException
// stream.forEach(System.out::println); // Error: stream has already been operated upon
// ✅ Phải tạo stream mới
numbers.stream().forEach(System.out::println); // OK
// === Lazy Evaluation ===
System.out.println("=== Collection (Eager) ===");
List<Integer> collectionResult = new ArrayList<>();
for (int n : numbers) {
System.out.println("Processing: " + n);
if (n % 2 == 0) {
collectionResult.add(n * n);
}
}
System.out.println("\n=== Stream (Lazy) ===");
Stream<Integer> lazyStream = numbers.stream()
.peek(n -> System.out.println("Processing: " + n))
.filter(n -> n % 2 == 0)
.map(n -> n * n);
System.out.println("Stream created, but not executed yet!");
// Chưa in gì cả - vì chưa có terminal operation
System.out.println("\nNow executing terminal operation:");
lazyStream.forEach(System.out::println); // Bây giờ mới execute
}
}
Output:
=== Collection (Eager) ===
Processing: 1
Processing: 2
Processing: 3
Processing: 4
Processing: 5
=== Stream (Lazy) ===
Stream created, but not executed yet!
Now executing terminal operation:
Processing: 1
Processing: 2
4
Processing: 3
Processing: 4
16
Processing: 5
Tạo Stream
1. Từ Collection
import java.util.*;
import java.util.stream.*;
public class StreamFromCollection {
public static void main(String[] args) {
// List
List<String> list = Arrays.asList("A", "B", "C");
Stream<String> streamFromList = list.stream();
// Set
Set<Integer> set = new HashSet<>(Arrays.asList(1, 2, 3));
Stream<Integer> streamFromSet = set.stream();
// Map (stream entries)
Map<String, Integer> map = new HashMap<>();
map.put("A", 1);
map.put("B", 2);
Stream<Map.Entry<String, Integer>> entryStream = map.entrySet().stream();
Stream<String> keyStream = map.keySet().stream();
Stream<Integer> valueStream = map.values().stream();
}
}
2. Từ Array
import java.util.Arrays;
import java.util.stream.*;
public class StreamFromArray {
public static void main(String[] args) {
// String array
String[] strArray = {"Java", "Python", "C++"};
Stream<String> streamFromArray = Arrays.stream(strArray);
// Primitive array
int[] intArray = {1, 2, 3, 4, 5};
IntStream intStream = Arrays.stream(intArray);
// Range trong array
Stream<String> partialStream = Arrays.stream(strArray, 0, 2); // Java, Python
}
}
3. Dùng Stream.of()
import java.util.stream.*;
public class StreamOf {
public static void main(String[] args) {
// Varargs
Stream<String> stream1 = Stream.of("A", "B", "C");
// Single element
Stream<String> stream2 = Stream.of("Single");
// Empty stream
Stream<String> emptyStream = Stream.empty();
// Nullable stream (Java 9+)
String value = null;
Stream<String> nullableStream = Stream.ofNullable(value); // Empty stream
}
}
4. Dùng Stream.generate()
import java.util.stream.*;
public class StreamGenerate {
public static void main(String[] args) {
// Random numbers
Stream<Double> randomNumbers = Stream.generate(Math::random);
randomNumbers.limit(5).forEach(System.out::println);
// Constant value
Stream<String> constants = Stream.generate(() -> "Constant");
constants.limit(3).forEach(System.out::println); // Constant, Constant, Constant
// UUID generator
Stream<String> uuids = Stream.generate(() -> java.util.UUID.randomUUID().toString());
uuids.limit(3).forEach(System.out::println);
}
}
Stream.generate() tạo infinite stream (stream vô hạn). Phải dùng limit() để giới hạn, nếu không sẽ chạy mãi.
5. Dùng Stream.iterate()
import java.util.stream.*;
public class StreamIterate {
public static void main(String[] args) {
// Số chẵn: 0, 2, 4, 6, 8
Stream<Integer> evenNumbers = Stream.iterate(0, n -> n + 2);
evenNumbers.limit(5).forEach(System.out::println);
// Fibonacci: 0, 1, 1, 2, 3, 5, 8, ...
Stream.iterate(new int[]{0, 1}, f -> new int[]{f[1], f[0] + f[1]})
.limit(10)
.map(f -> f[0])
.forEach(System.out::println);
// Java 9+: iterate với predicate
Stream.iterate(1, n -> n <= 10, n -> n + 1) // Giống for (int n=1; n<=10; n++)
.forEach(System.out::println);
}
}
6. Primitive Streams
import java.util.stream.*;
public class PrimitiveStreams {
public static void main(String[] args) {
// IntStream
IntStream intStream1 = IntStream.range(1, 5); // 1, 2, 3, 4 (exclusive end)
IntStream intStream2 = IntStream.rangeClosed(1, 5); // 1, 2, 3, 4, 5 (inclusive)
// LongStream
LongStream longStream = LongStream.range(1L, 1_000_000L);
// DoubleStream
DoubleStream doubleStream = DoubleStream.of(1.1, 2.2, 3.3);
// IntStream methods
int sum = IntStream.rangeClosed(1, 100).sum();
double average = IntStream.rangeClosed(1, 100).average().orElse(0.0);
int max = IntStream.of(3, 1, 4, 1, 5, 9).max().orElse(0);
System.out.println("Sum 1-100: " + sum); // 5050
System.out.println("Average: " + average); // 50.5
System.out.println("Max: " + max); // 9
}
}
Bảng tổng hợp cách tạo Stream
| Cách tạo | Syntax | Use Case |
|---|---|---|
| Collection | collection.stream() | Từ List, Set, Map |
| Array | Arrays.stream(array) | Từ mảng |
| Stream.of() | Stream.of(phần tử) | Từ varargs/phần tử rời |
| Stream.generate() | Stream.generate(supplier) | Infinite stream với generator |
| Stream.iterate() | Stream.iterate(seed, unaryOp) | Infinite stream với iteration |
| IntStream.range() | IntStream.range(start, end) | Range số nguyên (exclusive) |
| IntStream.rangeClosed() | IntStream.rangeClosed(start, end) | Range số nguyên (inclusive) |
| Stream.empty() | Stream.empty() | Empty stream |
| Stream.ofNullable() | Stream.ofNullable(value) | Stream từ nullable value |
| Stream.concat() | Stream.concat(stream1, stream2) | Nối 2 streams thành 1 |
| Stream.builder() | Stream.builder().add(e).build() | Xây dựng stream bằng builder pattern |
// Stream.concat() — nối 2 streams
Stream<String> s1 = Stream.of("A", "B");
Stream<String> s2 = Stream.of("C", "D");
Stream<String> combined = Stream.concat(s1, s2); // A, B, C, D
// Stream.builder() — xây dựng stream linh hoạt
Stream<String> built = Stream.<String>builder()
.add("X")
.add("Y")
.add("Z")
.build(); // X, Y, Z
Stream Pipeline
Stream pipeline bao gồm 3 phần:
Source → Intermediate Operations → Terminal Operation
1. Source (Nguồn)
Nơi dữ liệu đến từ: Collection, array, generator, etc.
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5); // Source
2. Intermediate Operations (Phép toán trung gian)
Các operations transform stream thành stream khác. Chúng là lazy (không execute ngay).
Đặc điểm:
- Lazy evaluation: Chỉ execute khi có terminal operation
- Return Stream: Trả về Stream mới, cho phép chain
- Immutable: Không modify source
// Intermediate operations (chưa execute)
Stream<Integer> processed = numbers.stream()
.filter(n -> n > 2) // Intermediate
.map(n -> n * n); // Intermediate
// Vẫn chưa có gì xảy ra!
Các intermediate operations phổ biến:
filter(),map(),flatMap()distinct(),sorted(),limit(),skip()peek()(để debug)
3. Terminal Operation (Phép toán kết thúc)
Operation kích hoạt stream pipeline và tạo kết quả. Stream bị "consumed" sau terminal operation.
Đặc điểm:
- Eager evaluation: Execute ngay lập tức
- Close stream: Stream không dùng lại được sau đó
- Produce result: Trả về kết quả cuối cùng (value, collection, side-effect)
// Terminal operation → execute toàn bộ pipeline
List<Integer> result = numbers.stream()
.filter(n -> n > 2)
.map(n -> n * n)
.collect(Collectors.toList()); // Terminal operation
// Stream đã bị consumed, không dùng lại được
Các terminal operations phổ biến:
collect(),forEach(),reduce()count(),min(),max()anyMatch(),allMatch(),noneMatch()findFirst(),findAny()
Ví dụ đầy đủ Pipeline
import java.util.*;
import java.util.stream.*;
public class StreamPipelineExample {
public static void main(String[] args) {
List<String> words = Arrays.asList(
"Java", "Python", "JavaScript", "C++", "Go", "Rust"
);
List<String> result = words.stream() // Source
.filter(w -> w.length() > 3) // Intermediate: lọc
.map(String::toUpperCase) // Intermediate: transform
.sorted() // Intermediate: sắp xếp
.limit(3) // Intermediate: giới hạn
.collect(Collectors.toList()); // Terminal: thu thập
System.out.println(result); // [JAVA, JAVASCRIPT, PYTHON]
}
}
Lazy Evaluation (Đánh giá trì hoãn)
Lazy evaluation là một tính năng quan trọng của Stream: intermediate operations không execute cho đến khi có terminal operation.
Ví dụ minh họa
import java.util.*;
import java.util.stream.*;
public class LazyEvaluationDemo {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
System.out.println("=== Creating Stream Pipeline ===");
Stream<Integer> stream = numbers.stream()
.filter(n -> {
System.out.println("filter: " + n);
return n % 2 == 0;
})
.map(n -> {
System.out.println("map: " + n);
return n * n;
});
System.out.println("Pipeline created, but nothing executed yet!");
System.out.println();
System.out.println("=== Executing Terminal Operation ===");
List<Integer> result = stream.collect(Collectors.toList());
System.out.println("\nResult: " + result);
}
}
Output:
=== Creating Stream Pipeline ===
Pipeline created, but nothing executed yet!
=== Executing Terminal Operation ===
filter: 1
filter: 2
map: 2
filter: 3
filter: 4
map: 4
filter: 5
filter: 6
map: 6
filter: 7
filter: 8
map: 8
Result: [4, 16, 36, 64]
- Performance: Không xử lý dữ liệu không cần thiết
- Short-circuiting: Có thể dừng sớm (ví dụ:
findFirst()) - Infinite streams: Có thể làm việc với infinite streams
- Optimization: Compiler có thể optimize pipeline
Vertical Processing (Xử lý dọc)
Stream xử lý từng phần tử qua toàn bộ pipeline (vertical), không phải xử lý hết tất cả phần tử qua 1 operation rồi mới sang operation tiếp theo (horizontal).
List<Integer> numbers = Arrays.asList(1, 2, 3, 4);
// Stream: Vertical processing
numbers.stream()
.filter(n -> { System.out.println("Filter: " + n); return n % 2 == 0; })
.map(n -> { System.out.println("Map: " + n); return n * n; })
.forEach(n -> System.out.println("Result: " + n));
// Output:
// Filter: 1
// Filter: 2
// Map: 2 ← Element 2 đi qua toàn bộ pipeline
// Result: 4
// Filter: 3
// Filter: 4
// Map: 4 ← Element 4 đi qua toàn bộ pipeline
// Result: 16
Minh họa vertical processing
Mỗi phần tử đi qua toàn bộ pipeline trước khi phần tử tiếp theo bắt đầu. Đây gọi là xử lý dọc (vertical processing), khác với xử lý ngang (horizontal) khi tất cả phần tử qua filter xong mới chuyển sang map.
Bên trong hoạt động ra sao: Spliterator
Stream không trực tiếp duyệt phần tử bằng Iterator truyền thống. Thay vào đó, nó sử dụng Spliterator — một iterator đặc biệt có khả năng chia tách (split) dữ liệu cho xử lý song song.
Spliterator vs Iterator
| Đặc điểm | Iterator | Spliterator |
|---|---|---|
| Duyệt | hasNext() + next() | tryAdvance() hoặc forEachRemaining() |
| Chia tách | Không hỗ trợ | trySplit() — chia thành 2 phần |
| Song song | Không | Được thiết kế cho parallel |
| Kích thước | Không biết | estimateSize() ước lượng số phần tử |
| Đặc tính | Không có | characteristics(): ORDERED, SORTED, SIZED, DISTINCT... |
// Mỗi Collection có Spliterator riêng
List<String> list = List.of("A", "B", "C", "D");
Spliterator<String> spliterator = list.spliterator();
System.out.println(spliterator.estimateSize()); // 4
System.out.println(spliterator.characteristics()); // ORDERED | SIZED | SUBSIZED
// trySplit chia thành 2 phần
Spliterator<String> firstHalf = spliterator.trySplit();
// firstHalf: A, B
// spliterator (remaining): C, D
Khi bạn gọi .parallelStream(), JVM dùng trySplit() để chia dữ liệu thành các phần nhỏ, giao cho các thread khác nhau xử lý song song. Đây là lý do ArrayList (hỗ trợ random access → dễ chia) có parallel performance tốt hơn LinkedList (phải duyệt tuần tự để chia).
Short-Circuit Operations
Short-circuit operations có thể dừng sớm mà không cần xử lý hết tất cả phần tử.
Intermediate Short-Circuit: limit(), skip()
import java.util.stream.*;
public class ShortCircuitIntermediate {
public static void main(String[] args) {
// limit: chỉ lấy n phần tử đầu
Stream.iterate(1, n -> n + 1) // Infinite stream: 1, 2, 3, ...
.limit(5) // Dừng sau 5 phần tử
.forEach(System.out::println); // 1, 2, 3, 4, 5
// skip: bỏ qua n phần tử đầu
IntStream.rangeClosed(1, 10)
.skip(5) // Bỏ qua 5 phần tử đầu
.forEach(System.out::println); // 6, 7, 8, 9, 10
}
}
Terminal Short-Circuit: findFirst(), findAny(), anyMatch(), allMatch(), noneMatch()
import java.util.*;
import java.util.stream.*;
public class ShortCircuitTerminal {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
// findFirst: tìm element đầu tiên thỏa điều kiện
Optional<Integer> first = numbers.stream()
.filter(n -> {
System.out.println("Checking: " + n);
return n > 5;
})
.findFirst();
System.out.println("First > 5: " + first.orElse(-1));
// Output: Checking: 1, 2, 3, 4, 5, 6 → First > 5: 6 (dừng ngay khi tìm thấy)
System.out.println();
// anyMatch: có ít nhất 1 element thỏa điều kiện?
boolean hasEven = numbers.stream()
.peek(n -> System.out.println("Checking: " + n))
.anyMatch(n -> n % 2 == 0);
System.out.println("Has even? " + hasEven);
// Output: Checking: 1, 2 → Has even? true (dừng ngay khi tìm thấy)
}
}
Short-circuit operations giúp tăng performance đáng kể, đặc biệt với large datasets hoặc infinite streams.
Ví dụ tổng hợp
import java.util.*;
import java.util.stream.*;
class Product {
private String name;
private double price;
private String category;
public Product(String name, double price, String category) {
this.name = name;
this.price = price;
this.category = category;
}
public String getName() { return name; }
public double getPrice() { return price; }
public String getCategory() { return category; }
@Override
public String toString() {
return String.format("%s (%.0f VND, %s)", name, price, category);
}
}
public class StreamComprehensiveExample {
public static void main(String[] args) {
List<Product> products = Arrays.asList(
new Product("Laptop", 20000000, "Electronics"),
new Product("Phone", 15000000, "Electronics"),
new Product("Desk", 3000000, "Furniture"),
new Product("Chair", 1500000, "Furniture"),
new Product("Monitor", 5000000, "Electronics"),
new Product("Keyboard", 800000, "Electronics")
);
// Bài toán: Tìm 3 sản phẩm Electronics có giá cao nhất
System.out.println("=== Top 3 Most Expensive Electronics ===");
products.stream() // Source
.filter(p -> p.getCategory().equals("Electronics")) // Lọc category
.sorted((p1, p2) -> Double.compare(p2.getPrice(), p1.getPrice())) // Sắp xếp giảm dần
.limit(3) // Lấy 3 đầu
.forEach(System.out::println); // In ra
// Tính tổng giá trị của tất cả sản phẩm
double totalValue = products.stream()
.mapToDouble(Product::getPrice)
.sum();
System.out.println("\nTotal value: " + totalValue + " VND");
// Tìm sản phẩm rẻ nhất
Optional<Product> cheapest = products.stream()
.min(Comparator.comparingDouble(Product::getPrice));
cheapest.ifPresent(p -> System.out.println("Cheapest: " + p));
// Group by category
Map<String, List<Product>> byCategory = products.stream()
.collect(Collectors.groupingBy(Product::getCategory));
System.out.println("\nGrouped by category:");
byCategory.forEach((cat, prods) -> {
System.out.println(cat + ": " + prods.size() + " products");
});
}
}
Stream.of() với mảng nguyên thủyint[] intArray = {1, 2, 3, 4, 5};
// ❌ SAI — tạo Stream<int[]> chứa 1 phần tử (cả mảng)
Stream<int[]> wrong = Stream.of(intArray);
System.out.println(wrong.count()); // 1 (không phải 5!)
// ✅ ĐÚNG — dùng Arrays.stream() hoặc IntStream.of()
IntStream correct1 = Arrays.stream(intArray); // 5 phần tử
IntStream correct2 = IntStream.of(1, 2, 3, 4, 5); // 5 phần tử
Lý do: Stream.of(T... values) coi int[] là một object duy nhất (vì int[] không phải Integer[]). Chỉ String[] hoặc Integer[] mới hoạt động đúng với Stream.of().
- Stream chỉ dùng một lần: gọi terminal operation lần 2 →
IllegalStateException peek()có thể không chạy nếu pipeline không có terminal operation — vì lazy evaluationStream.of(array)với int array choStream<int[]>(1 phần tử), KHÔNG phảiStream<Integer>— phải dùngArrays.stream(intArray)hoặcIntStream.of()Stream.generate()vàStream.iterate()tạo infinite stream — quênlimit()sẽ chạy mãifindFirst()trên ordered parallel stream vẫn trả về phần tử đầu tiên — nhưng chậm hơnfindAny()
Tóm tắt
- Stream là chuỗi các phần tử + operations, không lưu trữ dữ liệu
- Stream vs Collection: Lazy vs Eager, One-time vs Reusable, Internal vs External iteration
- Tạo Stream: từ Collection, Array,
Stream.of(),generate(),iterate(), primitive ranges - Pipeline: Source → Intermediate Operations → Terminal Operation
- Lazy evaluation: Intermediate operations chỉ execute khi có terminal operation
- Short-circuit: Dừng sớm mà không cần xử lý hết phần tử
Bài tập
Bài 1: Tạo Stream
Tạo streams theo các cách khác nhau:
// 1. Stream các số lẻ từ 1 đến 20
Stream<Integer> oddNumbers = // TODO
// 2. Stream vô hạn các số chia hết cho 3: 0, 3, 6, 9, ...
Stream<Integer> multiplesOf3 = // TODO
// 3. Stream gồm 10 UUID ngẫu nhiên
Stream<String> randomUUIDs = // TODO
Bài 2: Stream Pipeline
Cho danh sách số nguyên. Viết stream pipeline để:
- Lọc các số chẵn
- Bình phương chúng
- Lấy 5 số đầu tiên
- Tính tổng
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
int result = // TODO
Bài 3: Lazy Evaluation
Thêm peek() để debug và quan sát lazy evaluation:
List<String> words = Arrays.asList("Java", "is", "awesome", "and", "powerful");
// TODO: Thêm peek() sau mỗi operation để quan sát execution order
words.stream()
.filter(w -> w.length() > 2)
.map(String::toUpperCase)
.limit(2)
.collect(Collectors.toList());
Bài tiếp theo: Stream Operations chi tiết →
Đọc thêm
- Oracle: The Stream API
- Java API: Spliterator
- Modern Java in Action — Raoul-Gabriel Urma et al. — Ch. 4-5 (Introducing Streams, Working with Streams)