Chuyển tới nội dung chính

Stream Operations chi tiết

Mục tiêu bài học

Sau bài này, bạn sẽ:

  • Sử dụng được các intermediate operations: filter, map, flatMap, sorted, distinct, limit, skip
  • Nắm được các terminal operations: collect, forEach, reduce, count, min, max, matching, finding
  • Hiểu được Collectors class và groupingBy, partitioningBy
  • Biết cách dùng Parallel Streams và khi nào nên dùng
  • Thành thạo xử lý data với Stream pipelines

Bài trước: Stream API cơ bản — Đã học cách tạo Stream và Stream pipeline. Bài này sẽ tìm hiểu chi tiết các operations để xử lý dữ liệu hiệu quả.

Intermediate Operations (Phép toán trung gian)

Intermediate operations transform stream thành stream khác và có tính lazy (không execute ngay).

1. filter() - Lọc phần tử

Giữ lại các phần tử thỏa điều kiện (Predicate trả về true).

import java.util.*;
import java.util.stream.*;

public class FilterExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// Lọc số chẵn
List<Integer> evens = numbers.stream()
.filter(n -> n % 2 == 0)
.collect(Collectors.toList());
System.out.println("Even numbers: " + evens); // [2, 4, 6, 8, 10]

// Lọc số > 5
List<Integer> greaterThan5 = numbers.stream()
.filter(n -> n > 5)
.collect(Collectors.toList());
System.out.println("Numbers > 5: " + greaterThan5); // [6, 7, 8, 9, 10]

// Chain nhiều filters
List<Integer> complex = numbers.stream()
.filter(n -> n % 2 == 0) // Số chẵn
.filter(n -> n > 3) // > 3
.filter(n -> n < 9) // < 9
.collect(Collectors.toList());
System.out.println("Complex filter: " + complex); // [4, 6, 8]
}
}

2. map() - Chuyển đổi phần tử

Chuyển đổi mỗi phần tử thành phần tử khác (có thể khác kiểu).

import java.util.*;
import java.util.stream.*;

public class MapExample {
public static void main(String[] args) {
// String → Integer (length)
List<String> words = Arrays.asList("Java", "Python", "C++");
List<Integer> lengths = words.stream()
.map(String::length)
.collect(Collectors.toList());
System.out.println(lengths); // [4, 6, 3]

// Integer → String
List<Integer> numbers = Arrays.asList(1, 2, 3);
List<String> strings = numbers.stream()
.map(n -> "Number: " + n)
.collect(Collectors.toList());
System.out.println(strings); // [Number: 1, Number: 2, Number: 3]

// Object → Object (transformation)
List<Employee> employees = Arrays.asList(
new Employee("An", 5000),
new Employee("Bình", 7000)
);

List<Employee> withRaise = employees.stream()
.map(e -> new Employee(e.getName(), e.getSalary() * 1.1)) // Tăng lương 10%
.collect(Collectors.toList());
}
}

Variants:

  • mapToInt(), mapToLong(), mapToDouble(): Map sang primitive streams
List<String> words = Arrays.asList("Java", "Python", "C++");

// mapToInt: String → int
IntStream lengths = words.stream()
.mapToInt(String::length); // IntStream

int totalLength = lengths.sum(); // IntStream có method sum()
Stream.of(intArray) Trap!

Stream.of() với primitive array tạo Stream<int[]> KHÔNG PHẢI Stream<Integer>!

int[] numbers = {1, 2, 3, 4, 5};

// ❌ SAI: Tạo Stream<int[]> (1 element duy nhất là array)
Stream<int[]> wrong = Stream.of(numbers);
System.out.println(wrong.count()); // Output: 1 (không phải 5!)

// ✅ ĐÚNG: Sử dụng Arrays.stream()
IntStream correct1 = Arrays.stream(numbers);
System.out.println(correct1.count()); // Output: 5

// ✅ ĐÚNG: Sử dụng IntStream.of()
IntStream correct2 = IntStream.of(1, 2, 3, 4, 5);
System.out.println(correct2.count()); // Output: 5

// ✅ Hoặc boxed() để chuyển sang Stream<Integer>
Stream<Integer> boxed = Arrays.stream(numbers).boxed();

Lý do: Java generics không hỗ trợ primitive types — Stream.of(int[]) coi array là 1 object duy nhất, không phải stream of integers.

3. flatMap() - Làm phẳng cấu trúc lồng nhau

Làm phẳng các cấu trúc lồng nhau (Stream of Streams → Single Stream).

import java.util.*;
import java.util.stream.*;

public class FlatMapExample {
public static void main(String[] args) {
// List of Lists → Single List
List<List<Integer>> nestedList = Arrays.asList(
Arrays.asList(1, 2, 3),
Arrays.asList(4, 5),
Arrays.asList(6, 7, 8, 9)
);

// ❌ map: Stream<List<Integer>> → Stream<Stream<Integer>>
// ✅ flatMap: Stream<List<Integer>> → Stream<Integer>
List<Integer> flattened = nestedList.stream()
.flatMap(list -> list.stream()) // Flatten
.collect(Collectors.toList());
System.out.println(flattened); // [1, 2, 3, 4, 5, 6, 7, 8, 9]

// String → Characters
List<String> words = Arrays.asList("Hello", "World");
List<String> characters = words.stream()
.flatMap(word -> Arrays.stream(word.split("")))
.collect(Collectors.toList());
System.out.println(characters); // [H, e, l, l, o, W, o, r, l, d]

// Object với collection property
List<Department> departments = Arrays.asList(
new Department("IT", Arrays.asList("An", "Bình", "Cường")),
new Department("HR", Arrays.asList("Dung", "Em"))
);

List<String> allEmployees = departments.stream()
.flatMap(dept -> dept.getEmployees().stream())
.collect(Collectors.toList());
System.out.println(allEmployees); // [An, Bình, Cường, Dung, Em]
}
}

So sánh map vs flatMap:

OperationInputOutputUse Case
mapStream\<T\>Stream\<R\>Transform 1-1 (mỗi phần tử → 1 phần tử mới)
flatMapStream\<T\>Stream\<R\>Transform 1-many (mỗi phần tử → nhiều phần tử)

flatMap — Khi nào dùng?

Quy tắc đơn giản: Nếu map() cho bạn Stream<Stream<T>> hoặc Stream<List<T>>, hãy chuyển sang flatMap().

// Ví dụ thực tế: Lấy tất cả đơn hàng từ danh sách khách hàng
class Customer {
private List<Order> orders;
public List<Order> getOrders() { return orders; }
}

List<Customer> customers = getCustomers();

// ❌ map: Stream<List<Order>> — không phải điều ta muốn
Stream<List<Order>> orderLists = customers.stream()
.map(Customer::getOrders);

// ✅ flatMap: Stream<Order> — gộp tất cả orders lại
List<Order> allOrders = customers.stream()
.flatMap(c -> c.getOrders().stream())
.collect(Collectors.toList());

Optional.flatMap vs Stream.flatMap

Cả OptionalStream đều có flatMap, nhưng mục đích hơi khác:

// Stream.flatMap: làm phẳng Stream<Stream<T>> → Stream<T>
// Optional.flatMap: tránh Optional<Optional<T>> → Optional<T>

Optional<String> city = Optional.ofNullable(user)
.flatMap(User::getAddress) // getAddress() trả về Optional<Address>
.flatMap(Address::getCity); // getCity() trả về Optional<String>

4. sorted() - Sắp xếp

import java.util.*;
import java.util.stream.*;

public class SortedExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(5, 2, 8, 1, 9, 3);

// Natural ordering (Comparable)
List<Integer> sorted = numbers.stream()
.sorted()
.collect(Collectors.toList());
System.out.println(sorted); // [1, 2, 3, 5, 8, 9]

// Descending order
List<Integer> descending = numbers.stream()
.sorted(Comparator.reverseOrder())
.collect(Collectors.toList());
System.out.println(descending); // [9, 8, 5, 3, 2, 1]

// Custom comparator
List<String> words = Arrays.asList("Java", "C++", "Python", "Go");

// Sort by length
List<String> byLength = words.stream()
.sorted(Comparator.comparingInt(String::length))
.collect(Collectors.toList());
System.out.println(byLength); // [Go, C++, Java, Python]

// Multiple sorting criteria
List<Employee> employees = Arrays.asList(
new Employee("An", 30, 5000),
new Employee("Bình", 25, 6000),
new Employee("Cường", 30, 4500)
);

List<Employee> sorted = employees.stream()
.sorted(Comparator
.comparingInt(Employee::getAge) // First: age
.thenComparingDouble(Employee::getSalary)) // Then: salary
.collect(Collectors.toList());
}
}

5. distinct() - Loại bỏ duplicate

List<Integer> numbers = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 5);
List<Integer> unique = numbers.stream()
.distinct()
.collect(Collectors.toList());
System.out.println(unique); // [1, 2, 3, 4, 5]

// Với objects: dựa vào equals() và hashCode()
List<String> words = Arrays.asList("Java", "java", "JAVA", "Python");
List<String> distinctWords = words.stream()
.map(String::toLowerCase)
.distinct()
.collect(Collectors.toList());
System.out.println(distinctWords); // [java, python]

6. limit()skip()

List<Integer> numbers = IntStream.rangeClosed(1, 10)
.boxed()
.collect(Collectors.toList());

// limit: lấy n elements đầu
List<Integer> first5 = numbers.stream()
.limit(5)
.collect(Collectors.toList());
System.out.println(first5); // [1, 2, 3, 4, 5]

// skip: bỏ qua n elements đầu
List<Integer> after5 = numbers.stream()
.skip(5)
.collect(Collectors.toList());
System.out.println(after5); // [6, 7, 8, 9, 10]

// Pagination: skip + limit
List<Integer> page2 = numbers.stream()
.skip(3) // Bỏ qua 3 đầu
.limit(3) // Lấy 3 tiếp theo
.collect(Collectors.toList());
System.out.println(page2); // [4, 5, 6]

7. peek() - Debug/Side effects

peek() thực hiện một action cho mỗi element mà không thay đổi stream.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

List<Integer> result = numbers.stream()
.peek(n -> System.out.println("Original: " + n))
.map(n -> n * 2)
.peek(n -> System.out.println("After map: " + n))
.filter(n -> n > 5)
.peek(n -> System.out.println("After filter: " + n))
.collect(Collectors.toList());

// Output:
// Original: 1
// After map: 2
// Original: 2
// After map: 4
// Original: 3
// After map: 6
// After filter: 6
// ...
Lưu ý về peek()

peek() chỉ nên dùng cho debugging. Không nên dùng để thay đổi state hoặc side-effect (tác dụng phụ) production code (dùng forEach() hoặc map() cho mục đích đó).

Stateful vs Stateless Operations

Stream operations được chia thành stateless (không trạng thái) và stateful (có trạng thái).

Stateless Operations

Xử lý mỗi phần tử độc lập, không cần nhớ các phần tử trước đó:

  • filter(), map(), flatMap(), peek(), mapToInt(), mapToLong(), mapToDouble()
// Stateless: mỗi element được xử lý độc lập
List<Integer> numbers = List.of(1, 2, 3, 4, 5);
numbers.stream()
.filter(n -> n % 2 == 0) // Stateless: chỉ nhìn element hiện tại
.map(n -> n * 2) // Stateless: transform độc lập
.forEach(System.out::println);

Stateful Operations

Phải nhớ hoặc xử lý các phần tử trước đó — tạo ra bottleneck trong parallel streams:

  • sorted() — phải thấy TẤT CẢ elements trước khi output (buffer toàn bộ stream)
  • distinct() — phải nhớ mọi element đã thấy (memory tỷ lệ với stream size)
  • limit(), skip() — phải đếm số elements đã qua
import java.util.*;
import java.util.stream.*;

public class StatefulOperationsExample {
public static void main(String[] args) {
List<Integer> numbers = IntStream.rangeClosed(1, 1_000_000)
.boxed()
.collect(Collectors.toList());
Collections.shuffle(numbers);

// sorted(): phải buffer TẤT CẢ elements
long start1 = System.currentTimeMillis();
long count1 = numbers.stream()
.sorted() // Stateful: phải thấy hết stream mới sort được
.filter(n -> n < 100)
.count();
long time1 = System.currentTimeMillis() - start1;
System.out.println("With sorted(): " + time1 + "ms");

// Không sorted: nhanh hơn nhiều
long start2 = System.currentTimeMillis();
long count2 = numbers.stream()
.filter(n -> n < 100)
.count();
long time2 = System.currentTimeMillis() - start2;
System.out.println("Without sorted(): " + time2 + "ms");
}
}

Impact trên Parallel Streams

Stateful operations giảm hiệu quả parallel processing vì cần synchronization:

List<Integer> data = IntStream.rangeClosed(1, 10_000_000)
.boxed()
.collect(Collectors.toList());

// ❌ Stateful operation trong parallel stream
long start1 = System.currentTimeMillis();
long count1 = data.parallelStream()
.distinct() // Stateful: phải sync giữa các threads
.count();
long time1 = System.currentTimeMillis() - start1;

// ✅ Stateless operation trong parallel stream
long start2 = System.currentTimeMillis();
long count2 = data.parallelStream()
.filter(n -> n > 0) // Stateless: mỗi thread xử lý độc lập
.count();
long time2 = System.currentTimeMillis() - start2;

System.out.println("Stateful (distinct): " + time1 + "ms");
System.out.println("Stateless (filter): " + time2 + "ms");
// Stateless thường nhanh hơn NHIỀU lần!
Lưu ý Performance
  • sorted() cần buffer toàn bộ stream vào memory → có thể gây OutOfMemoryError với large datasets
  • distinct() cần HashSet internal → memory tỷ lệ với số lượng unique elements
  • Trong parallel streams, stateful operations cần synchronization → mất lợi thế parallel

Terminal Operations (Phép toán kết thúc)

Terminal operations kích hoạt stream pipeline và tạo kết quả. Stream bị consumed sau terminal operation.

1. collect() - Thu thập kết quả

Thu thập elements vào một collection hoặc data structure.

import java.util.*;
import java.util.stream.*;

public class CollectExample {
public static void main(String[] args) {
List<String> words = Arrays.asList("Java", "Python", "C++", "Go");

// Collect to List
List<String> list = words.stream()
.collect(Collectors.toList());

// Collect to Set
Set<String> set = words.stream()
.collect(Collectors.toSet());

// Collect to specific collection
LinkedList<String> linkedList = words.stream()
.collect(Collectors.toCollection(LinkedList::new));

// Collect to Map
Map<String, Integer> wordLengths = words.stream()
.collect(Collectors.toMap(
word -> word, // Key
word -> word.length() // Value
));

// Collect to Map with duplicate key handling
List<String> duplicates = Arrays.asList("Java", "Python", "Java");
Map<String, Integer> counts = duplicates.stream()
.collect(Collectors.toMap(
word -> word,
word -> 1,
(existing, replacement) -> existing + replacement // Merge function
));
}
}
toMap() Duplicate Key Trap

Collectors.toMap() ném IllegalStateException khi có duplicate keys:

class Person {
String name;
int age;
public Person(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() { return name; }
public int getAge() { return age; }
}

List<Person> people = List.of(
new Person("An", 25),
new Person("Bình", 30),
new Person("An", 28) // Duplicate key "An"
);

// ❌ CRASH: IllegalStateException: Duplicate key An
Map<String, Integer> ages1 = people.stream()
.collect(Collectors.toMap(Person::getName, Person::getAge));

// ✅ FIX 1: Merge function — giữ giá trị đầu tiên
Map<String, Integer> ages2 = people.stream()
.collect(Collectors.toMap(
Person::getName,
Person::getAge,
(existing, replacement) -> existing // Giữ existing value
));
// {An=25, Bình=30}

// ✅ FIX 2: Merge function — giữ giá trị mới nhất
Map<String, Integer> ages3 = people.stream()
.collect(Collectors.toMap(
Person::getName,
Person::getAge,
(existing, replacement) -> replacement // Lấy replacement value
));
// {An=28, Bình=30}

// ✅ FIX 3: Merge function — cộng dồn
List<String> words = List.of("Java", "Python", "Java", "Go", "Java");
Map<String, Integer> counts = words.stream()
.collect(Collectors.toMap(
w -> w,
w -> 1,
(count1, count2) -> count1 + count2 // Sum counts
));
// {Java=3, Python=1, Go=1}

### 2. `forEach()` - Thực hiện action

```java
List<String> words = Arrays.asList("Java", "Python", "C++");

// forEach: Consumer<T>
words.stream()
.forEach(word -> System.out.println(word));

// Method reference
words.stream()
.forEach(System.out::println);

// forEachOrdered: đảm bảo order (với parallel stream)
words.parallelStream()
.forEachOrdered(System.out::println);
forEach vs forEachOrdered
  • forEach(): Không đảm bảo order trong parallel stream
  • forEachOrdered(): Đảm bảo order, ngay cả trong parallel stream

3. reduce() - Tích lũy giá trị

Kết hợp tất cả elements thành một giá trị duy nhất.

import java.util.*;
import java.util.stream.*;

public class ReduceExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

// 1. reduce(BinaryOperator): Optional<T>
Optional<Integer> sum1 = numbers.stream()
.reduce((a, b) -> a + b);
System.out.println(sum1.orElse(0)); // 15

// 2. reduce(identity, BinaryOperator): T
int sum2 = numbers.stream()
.reduce(0, (a, b) -> a + b); // identity = 0
System.out.println(sum2); // 15

// Tìm max
int max = numbers.stream()
.reduce(Integer.MIN_VALUE, Integer::max);
System.out.println(max); // 5

// Concatenate strings
List<String> words = Arrays.asList("Java", "is", "awesome");
String sentence = words.stream()
.reduce("", (s1, s2) -> s1 + " " + s2);
System.out.println(sentence.trim()); // "Java is awesome"

// 3. reduce(identity, accumulator, combiner): U
// Dùng cho parallel streams
int parallelSum = numbers.parallelStream()
.reduce(
0, // Identity
(a, b) -> a + b, // Accumulator
(a, b) -> a + b // Combiner (cho parallel)
);
}
}

3 forms của reduce:

FormSignatureReturnUse Case
1-argreduce(BinaryOperator)Optional\<T\>Có thể empty stream
2-argreduce(identity, BinaryOperator)TCó identity value
3-argreduce(identity, BiFunction, BinaryOperator)UParallel stream, khác type

Identity Value Rules

Identity value phải là true identity — nghĩa là identity op x = x cho mọi x:

// ✅ ĐÚNG: 0 là identity của phép cộng (0 + x = x)
int sum = numbers.stream().reduce(0, (a, b) -> a + b);

// ✅ ĐÚNG: 1 là identity của phép nhân (1 * x = x)
int product = numbers.stream().reduce(1, (a, b) -> a * b);

// ✅ ĐÚNG: "" là identity của concatenation ("" + x = x)
String concat = words.stream().reduce("", (a, b) -> a + b);

// ❌ SAI: 1 KHÔNG phải identity của phép cộng (1 + x ≠ x)
int wrong = numbers.stream().reduce(1, (a, b) -> a + b);
// Sequential: 16 (= 1 + 1 + 2 + 3 + 4 + 5) — SAI!

// ❌ NGUY HIỂM: Với parallel stream, identity sai cho kết quả HOÀN TOÀN SAI
int wrongParallel = numbers.parallelStream().reduce(1, (a, b) -> a + b);
// Parallel: có thể là 16, 17, 18... — KHÔNG XÁC ĐỊNH!

Tại sao parallel stream nguy hiểm với identity sai?

Parallel stream chia (splitting - chia nhỏ) stream thành nhiều chunks, mỗi chunk bắt đầu với identity value:

List<Integer> numbers = List.of(1, 2, 3, 4);

// Sequential: 1 + 1 + 2 + 3 + 4 = 11
int seq = numbers.stream().reduce(1, Integer::sum);

// Parallel: giả sử chia thành 2 chunks
// Chunk 1: 1 + 1 + 2 = 4
// Chunk 2: 1 + 3 + 4 = 8
// Merge: 4 + 8 = 12 (SAI!)
int par = numbers.parallelStream().reduce(1, Integer::sum);
Identity Trap trong OCP

Đề thi hay cho identity SAI để kiểm tra:

// Câu hỏi: Output của đoạn code này?
List<Integer> nums = List.of(2, 3, 4);
int result = nums.parallelStream().reduce(1, (a, b) -> a + b);
System.out.println(result);

// Đáp án: KHÔNG XÁC ĐỊNH (10, 11, hoặc 12 đều có thể xảy ra)
// Sequential sẽ cho: 1 + 2 + 3 + 4 = 10
// Parallel phụ thuộc cách chia chunks

Deep dive: reduce() 3 tham số

Dạng 3 tham số phức tạp nhất và thường xuất hiện trong đề OCP:

// Signature: <U> U reduce(U identity, BiFunction<U,T,U> accumulator, BinaryOperator<U> combiner)

List<String> words = List.of("Hello", "World", "Java");

// Tính tổng độ dài tất cả các chuỗi
int totalLength = words.stream()
.reduce(
0, // Identity: giá trị khởi tạo (int)
(sum, word) -> sum + word.length(), // Accumulator: int + String → int
(sum1, sum2) -> sum1 + sum2 // Combiner: int + int → int (cho parallel)
);
System.out.println(totalLength); // 14

Khi nào cần dạng 3 tham số? Khi kiểu kết quả (U) khác kiểu phần tử stream (T). Ở ví dụ trên: stream chứa String nhưng kết quả là int.

Combiner chỉ chạy với parallel stream. Với sequential stream, combiner không bao giờ được gọi — nhưng bạn vẫn phải cung cấp nó.

OCP Trap
// ⚠️ Combiner PHẢI tương thích với accumulator
// Đề thi hay cho combiner sai để hỏi output:
int wrong = List.of("a", "bb", "ccc").parallelStream()
.reduce(0,
(sum, s) -> sum + s.length(),
(a, b) -> a * b // ❌ Combiner sai (nhân thay vì cộng)
);
// Output với parallel: KHÔNG XÁC ĐỊNH (unpredictable)
// Output với sequential: 6 (combiner không được gọi)

4. count(), min(), max()

List<Integer> numbers = Arrays.asList(5, 2, 8, 1, 9);

// count: đếm số elements
long count = numbers.stream()
.filter(n -> n > 3)
.count();
System.out.println(count); // 3

// min: tìm giá trị nhỏ nhất
Optional<Integer> min = numbers.stream()
.min(Integer::compareTo);
System.out.println(min.orElse(-1)); // 1

// max: tìm giá trị lớn nhất
Optional<Integer> max = numbers.stream()
.max(Integer::compareTo);
System.out.println(max.orElse(-1)); // 9

// Với objects
List<Employee> employees = Arrays.asList(
new Employee("An", 5000),
new Employee("Bình", 7000)
);

Optional<Employee> highestPaid = employees.stream()
.max(Comparator.comparingDouble(Employee::getSalary));

5. Matching operations: anyMatch(), allMatch(), noneMatch()

List<Integer> numbers = Arrays.asList(2, 4, 6, 8, 10);

// anyMatch: có ít nhất 1 element thỏa điều kiện?
boolean hasEven = numbers.stream()
.anyMatch(n -> n % 2 == 0);
System.out.println(hasEven); // true

// allMatch: tất cả elements đều thỏa điều kiện?
boolean allEven = numbers.stream()
.allMatch(n -> n % 2 == 0);
System.out.println(allEven); // true

// noneMatch: không có element nào thỏa điều kiện?
boolean noOdd = numbers.stream()
.noneMatch(n -> n % 2 != 0);
System.out.println(noOdd); // true

// Short-circuit: dừng ngay khi tìm được counterexample
List<Integer> mixed = Arrays.asList(1, 2, 3, 4, 5);
boolean allEven2 = mixed.stream()
.peek(n -> System.out.println("Checking: " + n))
.allMatch(n -> n % 2 == 0);
// Output: Checking: 1 (dừng ngay vì 1 là odd)
System.out.println(allEven2); // false

6. Finding operations: findFirst(), findAny()

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

// findFirst: tìm element đầu tiên
Optional<Integer> first = numbers.stream()
.filter(n -> n > 3)
.findFirst();
System.out.println(first.orElse(-1)); // 4

// findAny: tìm bất kỳ element nào (hiệu quả với parallel stream)
Optional<Integer> any = numbers.parallelStream()
.filter(n -> n > 3)
.findAny();
System.out.println(any.orElse(-1)); // Có thể là 4 hoặc 5 (không deterministic)
findFirst vs findAny
  • Sequential stream: findFirst()findAny() giống nhau
  • Parallel stream: findAny() nhanh hơn (không cần maintain order)

Collectors Class

Collectors cung cấp các implementations phổ biến cho collect().

1. Basic Collectors

import java.util.*;
import java.util.stream.*;

public class BasicCollectorsExample {
public static void main(String[] args) {
List<String> words = Arrays.asList("Java", "Python", "C++", "Go", "Rust");

// toList
List<String> list = words.stream().collect(Collectors.toList());

// toSet
Set<String> set = words.stream().collect(Collectors.toSet());

// toCollection
TreeSet<String> treeSet = words.stream()
.collect(Collectors.toCollection(TreeSet::new));

// toMap
Map<String, Integer> map = words.stream()
.collect(Collectors.toMap(
w -> w, // Key
String::length // Value
));

// joining: concatenate strings
String joined = words.stream()
.collect(Collectors.joining());
System.out.println(joined); // "JavaPythonC++GoRust"

String joinedWithDelimiter = words.stream()
.collect(Collectors.joining(", "));
System.out.println(joinedWithDelimiter); // "Java, Python, C++, Go, Rust"

String joinedWithPrefixSuffix = words.stream()
.collect(Collectors.joining(", ", "[", "]"));
System.out.println(joinedWithPrefixSuffix); // "[Java, Python, C++, Go, Rust]"
}
}

2. groupingBy() - Nhóm elements

import java.util.*;
import java.util.stream.*;

class Employee {
private String name;
private String department;
private int age;
private double salary;

// Constructor, getters...
}

public class GroupingByExample {
public static void main(String[] args) {
List<Employee> employees = Arrays.asList(
new Employee("An", "IT", 25, 5000),
new Employee("Bình", "HR", 30, 6000),
new Employee("Cường", "IT", 28, 7000),
new Employee("Dung", "HR", 26, 5500)
);

// Group by department
Map<String, List<Employee>> byDepartment = employees.stream()
.collect(Collectors.groupingBy(Employee::getDepartment));
// {IT=[An, Cường], HR=[Bình, Dung]}

// Group by department, count employees
Map<String, Long> countByDepartment = employees.stream()
.collect(Collectors.groupingBy(
Employee::getDepartment,
Collectors.counting()
));
// {IT=2, HR=2}

// Group by department, get average salary
Map<String, Double> avgSalaryByDept = employees.stream()
.collect(Collectors.groupingBy(
Employee::getDepartment,
Collectors.averagingDouble(Employee::getSalary)
));
// {IT=6000.0, HR=5750.0}

// Group by department, get names
Map<String, List<String>> namesByDept = employees.stream()
.collect(Collectors.groupingBy(
Employee::getDepartment,
Collectors.mapping(Employee::getName, Collectors.toList())
));
// {IT=[An, Cường], HR=[Bình, Dung]}

// Multi-level grouping
Map<String, Map<Integer, List<Employee>>> byDeptAndAge = employees.stream()
.collect(Collectors.groupingBy(
Employee::getDepartment,
Collectors.groupingBy(Employee::getAge)
));
}
}

3. partitioningBy() - Chia thành 2 nhóm

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// Partition by even/odd
Map<Boolean, List<Integer>> partitioned = numbers.stream()
.collect(Collectors.partitioningBy(n -> n % 2 == 0));

List<Integer> evens = partitioned.get(true); // [2, 4, 6, 8, 10]
List<Integer> odds = partitioned.get(false); // [1, 3, 5, 7, 9]

// Partition employees by high salary (> 6000)
Map<Boolean, List<Employee>> byHighSalary = employees.stream()
.collect(Collectors.partitioningBy(e -> e.getSalary() > 6000));

List<Employee> highEarners = byHighSalary.get(true);
List<Employee> lowEarners = byHighSalary.get(false);

4. Summarizing Collectors

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

// summingInt
int sum = numbers.stream()
.collect(Collectors.summingInt(Integer::intValue));

// averagingInt
double average = numbers.stream()
.collect(Collectors.averagingInt(Integer::intValue));

// summarizingInt: tổng hợp tất cả statistics
IntSummaryStatistics stats = numbers.stream()
.collect(Collectors.summarizingInt(Integer::intValue));

System.out.println("Count: " + stats.getCount()); // 5
System.out.println("Sum: " + stats.getSum()); // 15
System.out.println("Min: " + stats.getMin()); // 1
System.out.println("Max: " + stats.getMax()); // 5
System.out.println("Average: " + stats.getAverage()); // 3.0

5. Collectors nâng cao

toUnmodifiableList()toUnmodifiableSet() (Java 10+)

Tạo collection không thể thay đổi (immutable):

List<String> immutableList = words.stream()
.collect(Collectors.toUnmodifiableList());

// immutableList.add("new"); // ❌ UnsupportedOperationException

// Java 16+: Cách ngắn gọn hơn
List<String> simpleWay = words.stream().toList(); // Cũng immutable!
Lưu ý

stream().toList() (Java 16+) trả về unmodifiable list, trong khi stream().collect(Collectors.toList()) trả về mutable ArrayList. Đề OCP hay hỏi sự khác biệt này!

collectingAndThen() — Xử lý thêm sau khi collect

// Collect rồi wrap thành unmodifiable
List<String> result = words.stream()
.collect(Collectors.collectingAndThen(
Collectors.toList(),
Collections::unmodifiableList
));

// Collect rồi lấy size
int count = words.stream()
.collect(Collectors.collectingAndThen(
Collectors.toSet(),
Set::size
));

teeing() — Kết hợp 2 Collectors (Java 12+)

Cho phép chạy 2 collector đồng thời trên cùng stream rồi merge kết quả:

// Tính cả min và max cùng lúc
var result = numbers.stream()
.collect(Collectors.teeing(
Collectors.minBy(Integer::compareTo), // Collector 1
Collectors.maxBy(Integer::compareTo), // Collector 2
(min, max) -> "Min: " + min.orElse(0) + ", Max: " + max.orElse(0) // Merger
));
System.out.println(result); // "Min: 1, Max: 10"

// Ví dụ thực tế: Đếm pass/fail cùng lúc
record ExamResult(long passed, long failed) {}

ExamResult stats = students.stream()
.collect(Collectors.teeing(
Collectors.filtering(s -> s.getScore() >= 5.0, Collectors.counting()),
Collectors.filtering(s -> s.getScore() < 5.0, Collectors.counting()),
ExamResult::new
));

Custom Collector

Khi các collector có sẵn không đủ, bạn có thể tạo collector riêng:

// Custom collector: nối chuỗi với dấu phẩy, bọc trong ngoặc vuông
Collector<String, StringJoiner, String> customJoiner = Collector.of(
() -> new StringJoiner(", ", "[", "]"), // Supplier: tạo container
StringJoiner::add, // Accumulator: thêm phần tử
StringJoiner::merge, // Combiner: gộp (parallel)
StringJoiner::toString // Finisher: kết quả cuối
);

String result = Stream.of("Java", "Python", "Go")
.collect(customJoiner); // "[Java, Python, Go]"

Parallel Streams

Parallel streams xử lý dữ liệu song song trên nhiều threads.

Tạo Parallel Stream

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

// Cách 1: parallelStream()
Stream<Integer> parallelStream1 = numbers.parallelStream();

// Cách 2: parallel() trên existing stream
Stream<Integer> parallelStream2 = numbers.stream().parallel();

// Chuyển về sequential
Stream<Integer> sequential = parallelStream1.sequential();

Ví dụ Performance

import java.util.*;
import java.util.stream.*;

public class ParallelStreamPerformance {
public static void main(String[] args) {
List<Integer> numbers = IntStream.rangeClosed(1, 10_000_000)
.boxed()
.collect(Collectors.toList());

// Sequential stream
long start1 = System.currentTimeMillis();
long sum1 = numbers.stream()
.mapToLong(Integer::longValue)
.sum();
long end1 = System.currentTimeMillis();
System.out.println("Sequential: " + (end1 - start1) + "ms");

// Parallel stream
long start2 = System.currentTimeMillis();
long sum2 = numbers.parallelStream()
.mapToLong(Integer::longValue)
.sum();
long end2 = System.currentTimeMillis();
System.out.println("Parallel: " + (end2 - start2) + "ms");
}
}

N×Q Model — Khi nào Parallel đáng giá?

Parallel stream chỉ nhanh hơn khi N × Q đủ lớn:

  • N = số lượng elements
  • Q = chi phí xử lý mỗi element (cost per element)
// ❌ Small N, small Q → parallel CHẬM hơn
List<Integer> smallData = List.of(1, 2, 3, 4, 5);
int sum1 = smallData.parallelStream()
.mapToInt(n -> n * 2) // Q nhỏ (phép nhân đơn giản)
.sum();
// Overhead (thread scheduling, splitting, merging) > benefit

// ✅ Large N, large Q → parallel NHANH hơn
List<Integer> largeData = IntStream.rangeClosed(1, 10_000_000)
.boxed()
.collect(Collectors.toList());

long sum2 = largeData.parallelStream()
.filter(n -> isPrime(n)) // Q lớn (tính toán phức tạp)
.mapToLong(Integer::longValue)
.sum();

Benchmark: Parallel không phải lúc nào cũng nhanh hơn

import java.util.*;
import java.util.stream.*;

public class ParallelBenchmark {
public static void main(String[] args) {
// Small dataset + simple operation
List<Integer> smallList = IntStream.rangeClosed(1, 100)
.boxed()
.collect(Collectors.toList());

long start1 = System.nanoTime();
long sum1 = smallList.stream()
.mapToLong(Integer::longValue)
.sum();
long time1 = System.nanoTime() - start1;

long start2 = System.nanoTime();
long sum2 = smallList.parallelStream()
.mapToLong(Integer::longValue)
.sum();
long time2 = System.nanoTime() - start2;

System.out.println("Sequential: " + time1 / 1000 + " μs");
System.out.println("Parallel: " + time2 / 1000 + " μs");
// Kết quả thường thấy: Sequential ~50μs, Parallel ~500μs
// Parallel CHẬM HƠN 10 lần với small data!
}
}

Splitting Efficiency (Hiệu quả chia nhỏ)

Không phải tất cả data structures đều chia nhỏ (split) tốt cho parallel:

Data StructureSplitting PerformanceLý do
ArrayListExcellentRandom access O(1), chia đều dễ dàng
IntStream.rangeExcellentBiết trước size, chia toán học
HashSet, TreeSetGoodCó thể chia nhưng phức tạp hơn
LinkedListPoorSequential access O(n), không biết middle
Stream.iterate()PoorLazy, không biết size trước
// ✅ GOOD: ArrayList splits hiệu quả
List<Integer> arrayList = new ArrayList<>(IntStream.rangeClosed(1, 1_000_000)
.boxed()
.toList());
long sum1 = arrayList.parallelStream().mapToLong(Integer::longValue).sum();
// Chia đều thành các chunks, rất hiệu quả

// ❌ BAD: LinkedList splits kém
List<Integer> linkedList = new LinkedList<>(arrayList);
long sum2 = linkedList.parallelStream().mapToLong(Integer::longValue).sum();
// Phải traverse từ đầu để tìm middle → overhead cao

// ❌ BAD: Stream.iterate() không biết khi nào kết thúc
long sum3 = Stream.iterate(0, n -> n + 1)
.limit(1_000_000)
.parallel() // Không thể chia hiệu quả
.mapToLong(Integer::longValue)
.sum();

Encounter Order (thứ tự gặp phải) và Performance

Một số data structures có encounter order (LinkedHashSet, LinkedHashMap) — parallel phải maintain order này → chậm hơn:

// ✅ HashSet: không có order → parallel nhanh
Set<Integer> hashSet = new HashSet<>(IntStream.rangeClosed(1, 1_000_000)
.boxed()
.toList());
long count1 = hashSet.parallelStream().count(); // Nhanh

// ❌ LinkedHashSet: có order → parallel phải maintain order
Set<Integer> linkedHashSet = new LinkedHashSet<>(hashSet);
long count2 = linkedHashSet.parallelStream().count(); // Chậm hơn

// ✅ unordered(): bỏ order requirement → nhanh hơn
long count3 = linkedHashSet.parallelStream()
.unordered() // Bỏ order constraint
.distinct()
.count();

Khi nào dùng Parallel Streams?

✅ NÊN dùng khi:

  1. Large datasets (N > 10,000 elements)
  2. CPU-intensive operations (Q lớn — tính toán phức tạp)
  3. Independent operations (không có shared state)
  4. Stateless operations
  5. Splitting-friendly data structures (ArrayList, array, IntStream.range)
// ✅ Good use case
List<Integer> largeList = IntStream.rangeClosed(1, 1_000_000)
.boxed()
.collect(Collectors.toList());

long sum = largeList.parallelStream()
.filter(n -> isPrime(n)) // CPU-intensive
.mapToLong(Integer::longValue)
.sum();

❌ KHÔNG nên dùng khi:

  1. Small datasets (N < 10,000 elements)
  2. Simple operations (Q nhỏ — phép toán đơn giản)
  3. I/O operations (file, network, database)
  4. Stateful operations (shared mutable state)
  5. Order-dependent operations (cần maintain order)
  6. Poor splitting data structures (LinkedList, Stream.iterate)
// ❌ Bad use case
List<String> smallList = Arrays.asList("A", "B", "C");
smallList.parallelStream() // Overhead > benefit
.forEach(System.out::println);

// ❌ Shared mutable state
List<Integer> result = new ArrayList<>();
numbers.parallelStream()
.forEach(n -> result.add(n)); // ❌ Thread-unsafe!
Performance Considerations
  • Overhead: Parallel stream có overhead (chia task, merging - gộp lại results)
  • CPU cores: Hiệu quả phụ thuộc số CPU cores
  • Not always faster: Với small datasets (N nhỏ) hoặc simple operations (Q nhỏ), sequential thường nhanh hơn
  • Thread pool: Parallel stream dùng common ForkJoinPool (shared)
  • Data structure matters: ArrayList/array > LinkedList/Iterator streams

ForkJoinPool và parallel stream

Parallel stream mặc định sử dụng common ForkJoinPool — nghĩa là TẤT CẢ parallel stream trong ứng dụng chia sẻ cùng thread pool.

// Kiểm tra thread pool
List.of(1, 2, 3, 4, 5).parallelStream()
.forEach(n -> System.out.println(
Thread.currentThread().getName() + ": " + n
));
// Output: ForkJoinPool.commonPool-worker-1: 3
// main: 4
// ForkJoinPool.commonPool-worker-2: 1
// ...

Vấn đề thực tế: Nếu một parallel stream chạy tác vụ I/O chậm (database query, HTTP call), nó sẽ block các thread trong common pool — ảnh hưởng đến TẤT CẢ parallel stream khác trong ứng dụng.

// ✅ Giải pháp: Dùng ForkJoinPool riêng
ForkJoinPool customPool = new ForkJoinPool(4);
try {
List<Integer> result = customPool.submit(() ->
numbers.parallelStream()
.filter(n -> isPrime(n))
.collect(Collectors.toList())
).get();
} finally {
customPool.shutdown();
}

Vấn đề về thứ tự (ordering)

List<Integer> numbers = List.of(1, 2, 3, 4, 5);

// Sequential: luôn in 1, 2, 3, 4, 5
numbers.stream().forEach(System.out::println);

// Parallel: thứ tự KHÔNG đảm bảo
numbers.parallelStream().forEach(System.out::println);
// Có thể in: 3, 1, 5, 2, 4

// ✅ forEachOrdered: đảm bảo thứ tự nhưng mất lợi thế parallel
numbers.parallelStream().forEachOrdered(System.out::println);
// Luôn in: 1, 2, 3, 4, 5

Encounter Order Impact on Performance

Một số operations chạy nhanh hơn khi không cần maintain order:

List<Integer> data = IntStream.rangeClosed(1, 10_000_000)
.boxed()
.collect(Collectors.toList());

// ❌ Chậm: limit() phải maintain order
long start1 = System.currentTimeMillis();
List<Integer> result1 = data.parallelStream()
.limit(1000) // Phải giữ 1000 elements ĐẦU TIÊN
.collect(Collectors.toList());
long time1 = System.currentTimeMillis() - start1;

// ✅ Nhanh hơn: unordered() + limit()
long start2 = System.currentTimeMillis();
List<Integer> result2 = data.parallelStream()
.unordered() // Bỏ order requirement
.limit(1000) // Lấy BẤT KỲ 1000 elements nào
.collect(Collectors.toList());
long time2 = System.currentTimeMillis() - start2;

System.out.println("Ordered limit: " + time1 + "ms");
System.out.println("Unordered limit: " + time2 + "ms");
// Unordered thường nhanh hơn 2-3 lần!

Ví dụ tổng hợp: Data Processing Pipeline

import java.util.*;
import java.util.stream.*;

class Transaction {
private String id;
private double amount;
private String category;
private String date;

public Transaction(String id, double amount, String category, String date) {
this.id = id;
this.amount = amount;
this.category = category;
this.date = date;
}

// Getters...
public String getId() { return id; }
public double getAmount() { return amount; }
public String getCategory() { return category; }
public String getDate() { return date; }
}

public class DataProcessingPipeline {
public static void main(String[] args) {
List<Transaction> transactions = Arrays.asList(
new Transaction("T1", 1500, "Food", "2024-01"),
new Transaction("T2", 5000, "Electronics", "2024-01"),
new Transaction("T3", 800, "Food", "2024-01"),
new Transaction("T4", 3000, "Clothing", "2024-02"),
new Transaction("T5", 12000, "Electronics", "2024-02"),
new Transaction("T6", 600, "Food", "2024-02")
);

// 1. Tổng tiền theo category
Map<String, Double> totalByCategory = transactions.stream()
.collect(Collectors.groupingBy(
Transaction::getCategory,
Collectors.summingDouble(Transaction::getAmount)
));
System.out.println("Total by category: " + totalByCategory);
// {Food=2900.0, Electronics=17000.0, Clothing=3000.0}

// 2. Top 3 transactions có amount lớn nhất
List<Transaction> top3 = transactions.stream()
.sorted(Comparator.comparingDouble(Transaction::getAmount).reversed())
.limit(3)
.collect(Collectors.toList());
System.out.println("Top 3 transactions:");
top3.forEach(t -> System.out.println(" " + t.getId() + ": " + t.getAmount()));

// 3. Transactions theo tháng và category
Map<String, Map<String, List<Transaction>>> byMonthAndCategory = transactions.stream()
.collect(Collectors.groupingBy(
Transaction::getDate,
Collectors.groupingBy(Transaction::getCategory)
));

// 4. Statistics
DoubleSummaryStatistics stats = transactions.stream()
.collect(Collectors.summarizingDouble(Transaction::getAmount));
System.out.println("\nTransaction Statistics:");
System.out.println(" Count: " + stats.getCount());
System.out.println(" Sum: " + stats.getSum());
System.out.println(" Average: " + stats.getAverage());
System.out.println(" Min: " + stats.getMin());
System.out.println(" Max: " + stats.getMax());

// 5. Partition by high value (> 2000)
Map<Boolean, List<Transaction>> partitioned = transactions.stream()
.collect(Collectors.partitioningBy(t -> t.getAmount() > 2000));
System.out.println("\nHigh value transactions: " + partitioned.get(true).size());
System.out.println("Low value transactions: " + partitioned.get(false).size());
}
}
OCP Exam Tips
  • Stream reuse: Stream chỉ được consume một lần — gọi terminal operation lần 2 ném IllegalStateException

    Stream<String> stream = List.of("A", "B").stream();
    stream.forEach(System.out::println); // OK
    stream.count(); // ❌ IllegalStateException: stream has already been operated upon
  • findFirst() vs findAny(): Với parallel stream, findAny() hiệu quả hơn vì không cần maintain order

    // findFirst(): phải giữ order → chậm hơn trong parallel
    Optional<Integer> first = data.parallelStream().filter(n -> n > 100).findFirst();

    // findAny(): không cần order → nhanh hơn trong parallel
    Optional<Integer> any = data.parallelStream().filter(n -> n > 100).findAny();
  • reduce() vs collect():

    • reduce() cho immutable reduction (tạo giá trị mới mỗi bước)
    • collect() cho mutable reduction (modify container có sẵn) → hiệu quả hơn
    // reduce: tạo String mới mỗi bước → chậm, nhiều garbage
    String result1 = words.stream().reduce("", (a, b) -> a + b);

    // collect: modify StringBuilder → nhanh hơn
    String result2 = words.stream().collect(Collectors.joining());
  • peek() side-effects: peek() dùng cho debugging, side-effects không đảm bảo thực thi trong parallel hoặc khi có short-circuiting (ngắn mạch)

    // ❌ Không đảm bảo in 10 dòng
    Stream.of(1,2,3,4,5,6,7,8,9,10)
    .peek(System.out::println) // Có thể không chạy hết!
    .findFirst(); // Short-circuit sau element đầu
  • flatMap() vs map():

    • flatMap() trả về Stream<T> (many elements) → flatten
    • map() trả về single value T (one element) → transform
    // map: List<String> → Stream<Stream<String>> ❌
    Stream<Stream<String>> wrong = customers.stream()
    .map(c -> c.getOrders().stream());

    // flatMap: List<String> → Stream<String> ✅
    Stream<String> correct = customers.stream()
    .flatMap(c -> c.getOrders().stream());
  • Lazy evaluation: Intermediate operations không execute cho đến khi có terminal operation

    List<Integer> numbers = List.of(1, 2, 3);
    Stream<Integer> stream = numbers.stream()
    .filter(n -> {
    System.out.println("Filtering: " + n);
    return n > 1;
    });
    // Không in gì cả! Chưa có terminal operation.

    stream.count(); // BÂY GIỜ mới in: Filtering: 1, Filtering: 2, Filtering: 3
  • Stateful operations bottleneck: sorted(), distinct() là bottleneck trong parallel — phải xử lý toàn bộ stream

Tóm tắt

  • Intermediate operations: filter, map, flatMap, sorted, distinct, limit, skip, peek
    • Stateless: filter, map, flatMap, peek — xử lý mỗi element độc lập
    • Stateful: sorted, distinct, limit, skip — cần nhớ elements trước đó
  • Terminal operations: collect, forEach, reduce, count, min, max, matching, finding
  • Collectors: toList, toSet, toMap, groupingBy, partitioningBy, joining, summarizing
    • toMap() cần merge function để xử lý duplicate keys
  • reduce(): 3 forms - 1-arg (Optional), 2-arg (with identity), 3-arg (parallel)
    • Identity value phải thỏa: identity op x = x cho mọi x
  • flatMap(): Flatten nested structures (Stream<Stream<T>> → Stream<T>)
  • Parallel streams: .parallelStream() hoặc .parallel()
    • Dùng khi N × Q lớn (large datasets + CPU-intensive)
    • ArrayList/array split tốt, LinkedList/iterate() split kém
    • Encounter order impact performance

Thử thách: Output là gì?

Câu 1: reduce với identity

List<Integer> numbers = List.of();  // Empty list
int result = numbers.stream().reduce(42, Integer::sum);
System.out.println(result);
Đáp án

Output: 42

Khi stream rỗng, reduce() với identity trả về chính giá trị identity. Đây là lý do identity phải là phần tử trung hòa (identity element) của phép toán.

Câu 2: groupingBy kết hợp

List<String> words = List.of("Java", "Go", "Rust", "C++", "Kotlin");
Map<Integer, Long> result = words.stream()
.collect(Collectors.groupingBy(String::length, Collectors.counting()));
System.out.println(result);
Đáp án

Output: {2=1, 3=1, 4=2, 6=1} (thứ tự key có thể khác)

Giải thích: Nhóm theo độ dài chuỗi rồi đếm — "Go"=2, "C++"=3, "Rust"=4, "Java"=4, "Kotlin"=6.

Câu 3: Parallel stream ordering

List<Integer> nums = List.of(1, 2, 3, 4, 5);
List<Integer> result = nums.parallelStream()
.map(n -> n * 2)
.collect(Collectors.toList());
System.out.println(result);
Đáp án

Output: [2, 4, 6, 8, 10]

collect(Collectors.toList()) duy trì thứ tự (encounter order) ngay cả với parallel stream. Chỉ forEach() mới không đảm bảo thứ tự trong parallel — collect()forEachOrdered() luôn giữ thứ tự.

OCP Exam Tips
  • stream().toList() (Java 16+) trả về unmodifiable list, nhưng collect(Collectors.toList()) trả về mutable ArrayList
  • peek() có thể không chạy nếu pipeline không có terminal operation — hoặc nếu short-circuit terminal operation dừng sớm
  • reduce() 3 tham số: combiner chỉ dùng với parallel stream, nhưng vẫn bắt buộc phải cung cấp
  • findAny() với parallel stream có thể trả về bất kỳ phần tử nào, không nhất thiết là phần tử đầu
  • Stream chỉ dùng một lần — gọi terminal operation lần 2 sẽ ném IllegalStateException

Bài tập

Bài 1: Stream Operations

List<String> words = Arrays.asList("Java", "Python", "JavaScript", "C++", "Go", "Rust", "Kotlin");

// TODO:
// 1. Lọc words có length > 4
// 2. Chuyển thành uppercase
// 3. Sắp xếp alphabetically
// 4. Lấy 3 đầu tiên
// 5. Join với ", "
String result = // Complete this pipeline

Bài 2: Collectors - groupingBy

class Student {
String name;
int age;
double score;
}

List<Student> students = Arrays.asList(
new Student("An", 20, 8.5),
new Student("Bình", 21, 7.0),
new Student("Cường", 20, 9.0),
new Student("Dung", 21, 6.5)
);

// TODO: Group students by age, get average score per age group
Map<Integer, Double> avgScoreByAge = // Complete

Bài 3: Data Processing

Cho list transactions, viết stream pipeline để:

  1. Lọc transactions có amount > 1000
  2. Group by category
  3. Tính tổng amount per category
  4. Tìm category có tổng amount cao nhất

Bài tiếp theo: Optional Class →

Đọc thêm

Official Documentation

Books

  • Modern Java in Action (Chapter 4-7) — Stream operations, collectors, parallel streams
  • Effective Java, 3rd Edition (Items 45-48) — Stream best practices
    • Item 45: Use streams judiciously
    • Item 46: Prefer side-effect-free functions in streams
    • Item 47: Prefer Collection to Stream as a return type
    • Item 48: Use caution when making streams parallel