Executor Framework
Sau bài này, bạn sẽ:
- Hiểu được vấn đề của việc tạo threads thủ công và lợi ích của Thread Pool
- Sử dụng được ExecutorService và các loại thread pools (Fixed, Cached, Single, Scheduled)
- Nắm được cách làm việc với Future để lấy kết quả async
- Sử dụng CompletableFuture cho non-blocking operations
- Biết cách shutdown executor service đúng cách
Bài trước: Synchronization — Đã học cách đồng bộ hóa threads với synchronized và locks. Bài này sẽ giới thiệu cách quản lý threads hiệu quả hơn với Executor Framework.
Vấn đề với việc tạo Thread thủ công
Tạo thread trực tiếp có nhiều hạn chế
// Cách cũ: Tạo thread thủ công
public class ManualThreadProblem {
public static void main(String[] args) {
// Tạo 100 tasks
for (int i = 0; i < 100; i++) {
Thread t = new Thread(() -> {
// Process task
System.out.println("Processing in: " +
Thread.currentThread().getName());
});
t.start(); // Tạo 100 threads!
}
}
}
Vấn đề:
- Overhead: Tạo/hủy thread tốn tài nguyên (CPU, memory)
- Không giới hạn: Có thể tạo vô số threads → crash JVM
- Không tái sử dụng: Thread dùng xong bị hủy
- Khó quản lý: Không có cách tập trung theo dõi, hủy tasks
- Không có queue: Task đến phải tạo thread ngay
Giải pháp: Thread Pool - Tập hợp các threads sẵn sàng, tái sử dụng được.
Executor Interface
Interface cơ bản nhất trong framework:
public interface Executor {
void execute(Runnable command);
}
Tách biệt:
- Task submission (submit task)
- Task execution (thực thi task)
// Cách cũ
Thread t = new Thread(task);
t.start();
// Cách mới với Executor
Executor executor = ...;
executor.execute(task); // Executor quyết định cách thực thi
ExecutorService Interface
Mở rộng Executor, thêm nhiều features:
public interface ExecutorService extends Executor {
// Submit tasks
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
// Shutdown
void shutdown(); // Graceful shutdown
List<Runnable> shutdownNow(); // Immediate shutdown
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit);
// Batch submissions
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
<T> T invokeAny(Collection<? extends Callable<T>> tasks);
}
Ví dụ cơ bản
import java.util.concurrent.*;
public class ExecutorServiceDemo {
public static void main(String[] args) {
// Tạo thread pool với 3 threads
ExecutorService executor = Executors.newFixedThreadPool(3);
// Submit 10 tasks
for (int i = 1; i <= 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " running in: " +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// Shutdown executor
executor.shutdown(); // Không nhận task mới, chờ tasks hiện tại hoàn thành
try {
// Chờ tối đa 1 phút
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // Force shutdown
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
System.out.println("All tasks completed");
}
}
Output:
Task 1 running in: pool-1-thread-1
Task 2 running in: pool-1-thread-2
Task 3 running in: pool-1-thread-3
Task 4 running in: pool-1-thread-1 ← Thread được tái sử dụng
Task 5 running in: pool-1-thread-2
...
All tasks completed
Executors Factory Methods
Class Executors cung cấp các factory methods để tạo thread pools:
1. newFixedThreadPool(n)
Thread pool với số lượng threads cố định:
ExecutorService executor = Executors.newFixedThreadPool(4);
Đặc điểm:
- Tạo pool với n threads
- Threads tồn tại cho đến khi shutdown
- Nếu tất cả threads bận, tasks chờ trong queue
- Use case: Biết trước số lượng tasks, cần giới hạn threads
Thread Pool (size = 4)
┌─────────┬─────────┬─────────┬─────────┐
│ Thread1 │ Thread2 │ Thread3 │ Thread4 │
└─────────┴─────────┴─────────┴─────────┘
↑
Task Queue
┌────┬────┬────┐
│ T5 │ T6 │ T7 │
└────┴────┴────┘
2. newCachedThreadPool()
Thread pool mở rộng động:
ExecutorService executor = Executors.newCachedThreadPool();
Đặc điểm:
- Tạo thread mới khi cần
- Threads idle > 60s sẽ bị terminate
- Không giới hạn số threads (nguy hiểm nếu lạm dụng!)
- Use case: Nhiều short-lived tasks, số lượng không xác định
Cẩn thận với newCachedThreadPool(): Có thể tạo quá nhiều threads → OutOfMemoryError!
3. newSingleThreadExecutor()
Thread pool với chỉ 1 thread:
ExecutorService executor = Executors.newSingleThreadExecutor();
Đặc điểm:
- Chỉ 1 thread xử lý tasks tuần tự
- Tasks thực thi theo thứ tự submit (FIFO)
- Use case: Đảm bảo tasks chạy tuần tự, không đồng thời
public class SingleThreadDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " - " +
Thread.currentThread().getName());
});
}
executor.shutdown();
}
}
Output (tuần tự):
Task 1 - pool-1-thread-1
Task 2 - pool-1-thread-1
Task 3 - pool-1-thread-1
Task 4 - pool-1-thread-1
Task 5 - pool-1-thread-1
4. newScheduledThreadPool(n)
Thread pool để schedule tasks (thực thi sau delay hoặc định kỳ):
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// Chạy sau 5 giây
executor.schedule(() -> {
System.out.println("Task executed after 5 seconds");
}, 5, TimeUnit.SECONDS);
// Chạy mỗi 2 giây, bắt đầu sau 1 giây
executor.scheduleAtFixedRate(() -> {
System.out.println("Periodic task: " + System.currentTimeMillis());
}, 1, 2, TimeUnit.SECONDS);
// Chạy với delay 3 giây giữa các lần (sau khi task trước hoàn thành)
executor.scheduleWithFixedDelay(() -> {
System.out.println("Task with fixed delay");
}, 0, 3, TimeUnit.SECONDS);
So sánh:
| Method | Ý nghĩa |
|---|---|
schedule(task, delay, unit) | Chạy 1 lần sau delay |
scheduleAtFixedRate(task, initialDelay, period, unit) | Chạy định kỳ, bỏ qua execution time |
scheduleWithFixedDelay(task, initialDelay, delay, unit) | Chạy định kỳ, delay sau khi task hoàn thành |
So sánh các Executors
| Type | Threads | Queue | Use Case |
|---|---|---|---|
| FixedThreadPool | Cố định (n) | Unbounded | Tasks ổn định, cần giới hạn threads |
| CachedThreadPool | Động (0 - ∞) | SynchronousQueue | Short-lived tasks, số lượng không xác định |
| SingleThreadExecutor | 1 | Unbounded | Tasks tuần tự |
| ScheduledThreadPool | Cố định (n) | DelayedWorkQueue | Scheduled/periodic tasks |
ThreadPoolExecutor — Cấu hình chi tiết
Các factory methods của Executors thuận tiện nhưng che giấu nhiều chi tiết quan trọng. Trong production, nên tạo ThreadPoolExecutor trực tiếp để kiểm soát đầy đủ.
Constructor đầy đủ
public ThreadPoolExecutor(
int corePoolSize, // Số threads tối thiểu luôn tồn tại
int maximumPoolSize, // Số threads tối đa
long keepAliveTime, // Thời gian thread idle trước khi bị terminate
TimeUnit unit, // Đơn vị của keepAliveTime
BlockingQueue<Runnable> workQueue, // Queue chứa tasks đang chờ
ThreadFactory threadFactory, // Factory tạo threads mới
RejectedExecutionHandler handler // Xử lý khi pool đầy
)
Cách hoạt động:
- Nếu số threads <
corePoolSize: Tạo thread mới cho task - Nếu số threads ≥
corePoolSize: Đưa task vào queue - Nếu queue đầy và threads <
maximumPoolSize: Tạo thêm thread mới - Nếu đạt
maximumPoolSizevà queue đầy: GọiRejectedExecutionHandler
Vấn đề với Executors factory methods
newCachedThreadPool() — Nguy hiểm không giới hạn
// Executors.newCachedThreadPool() tạo:
new ThreadPoolExecutor(
0, // corePoolSize = 0
Integer.MAX_VALUE, // maximumPoolSize = ~2 tỷ threads!
60L, TimeUnit.SECONDS,
new SynchronousQueue<>() // Không lưu trữ tasks
);
Vấn đề: SynchronousQueue không có capacity — mỗi task phải có thread sẵn sàng, nếu không sẽ tạo thread mới → có thể tạo vô số threads → OutOfMemoryError.
newCachedThreadPool() rất nguy hiểm với spike traffic hoặc slow tasks. Có thể crash JVM nếu tạo hàng nghìn threads cùng lúc!
newFixedThreadPool(n) — Risk của unbounded queue
// Executors.newFixedThreadPool(n) tạo:
new ThreadPoolExecutor(
n, n, // corePoolSize = maximumPoolSize = n
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>() // Unbounded queue — không giới hạn!
);
Vấn đề: LinkedBlockingQueue không giới hạn capacity → nếu tasks đến nhanh hơn xử lý, queue phình to vô hạn → OutOfMemoryError.
Cấu hình ThreadPoolExecutor an toàn
import java.util.concurrent.*;
public class SafeThreadPoolExample {
public static void main(String[] args) {
// Bounded queue với capacity giới hạn
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
// Custom ThreadFactory để đặt tên threads
ThreadFactory threadFactory = new ThreadFactory() {
private int counter = 0;
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "Worker-" + counter++);
t.setDaemon(false); // Non-daemon
return t;
}
};
// Rejection handler — log khi pool đầy
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
// Tạo pool an toàn
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize: 5 threads luôn tồn tại
10, // maximumPoolSize: tối đa 10 threads
60L, // keepAliveTime: 60 giây
TimeUnit.SECONDS,
workQueue, // Bounded queue, max 100 tasks
threadFactory,
handler
);
// Submit tasks
for (int i = 0; i < 200; i++) {
final int taskId = i;
try {
executor.submit(() -> {
System.out.println("Task " + taskId + " in " +
Thread.currentThread().getName());
try { Thread.sleep(100); } catch (InterruptedException e) {}
});
} catch (RejectedExecutionException e) {
System.err.println("Task " + taskId + " rejected!");
}
}
// Theo dõi pool
System.out.println("Active threads: " + executor.getActiveCount());
System.out.println("Queue size: " + executor.getQueue().size());
System.out.println("Completed tasks: " + executor.getCompletedTaskCount());
executor.shutdown();
}
}
RejectedExecutionHandler — Xử lý khi Pool đầy
Khi pool đạt maximumPoolSize và queue đầy, RejectedExecutionHandler quyết định xử lý task bị từ chối (rejection/từ chối).
4 built-in policies
| Policy | Hành vi | Khi nào dùng |
|---|---|---|
| AbortPolicy (default) | Throw RejectedExecutionException | Muốn biết ngay khi pool quá tải, fail-fast |
| CallerRunsPolicy | Thread gọi execute() tự chạy task | Tạo backpressure (áp lực ngược), làm chậm producer |
| DiscardPolicy | Im lặng bỏ qua task | Task không quan trọng, chấp nhận mất data |
| DiscardOldestPolicy | Bỏ task cũ nhất trong queue, thêm task mới | Ưu tiên tasks mới hơn |
Ví dụ các policies
// 1. AbortPolicy (default): Throw exception
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// Task bị reject → RejectedExecutionException
// 2. CallerRunsPolicy: Caller chạy task (tạo backpressure)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// Main thread sẽ chạy task nếu pool đầy → làm chậm submission
// 3. DiscardPolicy: Im lặng drop task
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// Task bị drop, không log, không exception
// 4. DiscardOldestPolicy: Drop task cũ nhất
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// Bỏ task đầu queue, thêm task mới
Custom RejectedExecutionHandler với logging + metrics
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class LoggingRejectionHandler implements RejectedExecutionHandler {
private final AtomicLong rejectedCount = new AtomicLong(0);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
long count = rejectedCount.incrementAndGet();
// Log rejection
System.err.println(String.format(
"[REJECTED] Task: %s | Pool: [active=%d, pool=%d, queue=%d] | Total rejected: %d",
r.toString(),
executor.getActiveCount(),
executor.getPoolSize(),
executor.getQueue().size(),
count
));
// Gửi metrics đến monitoring system
// MetricsRegistry.counter("threadpool.rejected").increment();
// Fallback: Chạy task trong caller thread (như CallerRunsPolicy)
if (!executor.isShutdown()) {
r.run();
}
}
public long getRejectedCount() {
return rejectedCount.get();
}
}
// Sử dụng
LoggingRejectionHandler handler = new LoggingRejectionHandler();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
handler
);
Trong production, luôn tạo ThreadPoolExecutor trực tiếp thay vì dùng Executors factory methods:
- Đặt
maximumPoolSizehợp lý (không phảiInteger.MAX_VALUE) - Dùng bounded queue (không phải
LinkedBlockingQueuemặc định) - Chọn
RejectedExecutionHandlerphù hợp với business logic - Đặt tên threads rõ ràng với custom
ThreadFactory - Monitor metrics: active threads, queue size, rejected count
Future<V> Interface
Future đại diện cho kết quả của async computation:
public interface Future<V> {
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws TimeoutException;
boolean isDone();
boolean isCancelled();
boolean cancel(boolean mayInterruptIfRunning);
}
Ví dụ với Future
import java.util.concurrent.*;
public class FutureDemo {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
// Submit Callable task
Future<Integer> future = executor.submit(() -> {
System.out.println("Task started...");
Thread.sleep(3000); // Giả lập công việc lâu
return 42;
});
System.out.println("Task submitted. Doing other work...");
// Kiểm tra xem task đã xong chưa
while (!future.isDone()) {
System.out.println("Task still running...");
Thread.sleep(500);
}
// Lấy kết quả (blocking)
Integer result = future.get();
System.out.println("Result: " + result);
executor.shutdown();
}
}
Output:
Task submitted. Doing other work...
Task still running...
Task started...
Task still running...
Task still running...
Task still running...
Task still running...
Result: 42
Future.get() với timeout
try {
Integer result = future.get(2, TimeUnit.SECONDS); // Chờ tối đa 2 giây
System.out.println("Result: " + result);
} catch (TimeoutException e) {
System.out.println("Task timeout! Cancelling...");
future.cancel(true); // Interrupt task
}
Cancel task
Future<Integer> future = executor.submit(() -> {
for (int i = 0; i < 10; i++) {
if (Thread.interrupted()) {
System.out.println("Task cancelled");
return -1;
}
System.out.println("Working... " + i);
Thread.sleep(1000);
}
return 100;
});
Thread.sleep(3000);
future.cancel(true); // Cancel và interrupt thread
CompletableFuture (Java 8+)
CompletableFuture là Future nâng cao, hỗ trợ:
- Non-blocking operations
- Functional programming (chaining)
- Exception handling tốt hơn
- Kết hợp nhiều futures
Tạo CompletableFuture
import java.util.concurrent.CompletableFuture;
// 1. Completed future
CompletableFuture<String> future1 = CompletableFuture.completedFuture("Hello");
// 2. Async computation
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
// Chạy trong ForkJoinPool.commonPool()
try { Thread.sleep(1000); } catch (InterruptedException e) { }
return "Result from async task";
});
// 3. Async với custom executor
ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
return "Result from custom executor";
}, executor);
Chaining với thenApply, thenAccept, thenRun
CompletableFuture.supplyAsync(() -> {
System.out.println("Step 1: Fetch data");
return "data";
})
.thenApply(data -> {
System.out.println("Step 2: Process " + data);
return data.toUpperCase();
})
.thenAccept(result -> {
System.out.println("Step 3: Result = " + result);
})
.thenRun(() -> {
System.out.println("Step 4: Cleanup");
});
Output:
Step 1: Fetch data
Step 2: Process data
Step 3: Result = DATA
Step 4: Cleanup
| Method | Input | Output | Use Case |
|---|---|---|---|
thenApply(fn) | Kết quả trước | Future mới | Transform kết quả |
thenAccept(consumer) | Kết quả trước | Void | Xử lý kết quả, không return |
thenRun(runnable) | Không | Void | Chạy action, không quan tâm kết quả |
thenCompose() vs thenApply() — Khi nào dùng gì?
Hai methods này thường gây nhầm lẫn. Hiểu sự khác biệt rất quan trọng khi chain async operations.
Quy tắc đơn giản:
thenApply(): Transform kết quả (giốngmaptrong Stream)thenCompose(): Chain async operation phụ thuộc (giốngflatMaptrong Stream)
thenApply: Synchronous transformation
Dùng khi bạn transform kết quả thành giá trị khác ngay lập tức (không phải async):
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 5)
.thenApply(x -> x * 2) // Transform: 5 → 10
.thenApply(x -> x + 3); // Transform: 10 → 13
System.out.println(future.get()); // 13
Signature:
<U> CompletableFuture<U> thenApply(Function<T, U> fn)
// T → U: Transform trực tiếp
thenCompose: Async chaining
Dùng khi bạn chain một async operation khác (trả về CompletableFuture):
// Service methods trả về CompletableFuture
CompletableFuture<User> getUserById(int id) {
return CompletableFuture.supplyAsync(() -> {
// Fetch user from DB
return new User(id, "Alice");
});
}
CompletableFuture<List<Order>> getOrdersByUser(User user) {
return CompletableFuture.supplyAsync(() -> {
// Fetch orders from DB
return List.of(new Order(1), new Order(2));
});
}
// Sai: thenApply tạo nested Future
CompletableFuture<CompletableFuture<List<Order>>> wrong =
getUserById(1)
.thenApply(user -> getOrdersByUser(user)); // ← Trả về Future[Future[List]]
// Đúng: thenCompose flatten Future
CompletableFuture<List<Order>> correct =
getUserById(1)
.thenCompose(user -> getOrdersByUser(user)); // ← Trả về Future[List]
List<Order> orders = correct.get();
System.out.println("Orders: " + orders.size());
Signature:
<U> CompletableFuture<U> thenCompose(Function<T, CompletableFuture<U>> fn)
// T → CompletableFuture<U>: Flatten nested Future
So sánh trực quan
// Ví dụ: Fetch user → Fetch profile → Process
// 1. thenApply: Sync transformation
CompletableFuture.supplyAsync(() -> "userId-123")
.thenApply(id -> id.toUpperCase()) // String → String
.thenApply(id -> "User: " + id) // String → String
.thenAccept(System.out::println); // User: USERID-123
// 2. thenCompose: Async chaining
CompletableFuture.supplyAsync(() -> "userId-123")
.thenCompose(id -> fetchUser(id)) // String → Future<User>
.thenCompose(user -> fetchProfile(user)) // User → Future<Profile>
.thenAccept(profile -> System.out.println("Profile: " + profile));
Bảng so sánh
thenApply(fn) | thenCompose(fn) | |
|---|---|---|
| Input function | T → U | T → CompletableFuture<U> |
| Output | CompletableFuture<U> | CompletableFuture<U> |
| Use case | Sync transformation | Async chaining (dependent operations) |
| Stream analogy | map() | flatMap() |
| Nested Future? | Tạo Future<Future<U>> nếu fn trả về Future | Flatten thành Future<U> |
- thenApply: "Transform giá trị này"
- thenCompose: "Chạy async operation tiếp theo phụ thuộc vào giá trị này"
Kết hợp nhiều Futures
thenCombine: Kết hợp 2 futures
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Integer> combined = future1.thenCombine(future2, (a, b) -> {
System.out.println("Combining " + a + " and " + b);
return a + b;
});
System.out.println("Result: " + combined.get()); // 30
allOf: Chờ tất cả futures
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Task 1";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "Task 2";
});
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
sleep(1500);
return "Task 3";
});
// Chờ tất cả hoàn thành
CompletableFuture<Void> allOf = CompletableFuture.allOf(f1, f2, f3);
allOf.join(); // Blocking wait
System.out.println("All completed:");
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
anyOf: Lấy kết quả đầu tiên
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "Slow task";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Fast task";
});
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(f1, f2);
System.out.println("First result: " + anyOf.get()); // "Fast task"
Exception Handling
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Random error!");
}
return "Success";
})
.exceptionally(ex -> {
// Xử lý exception
System.out.println("Error: " + ex.getMessage());
return "Default value";
})
.thenAccept(result -> {
System.out.println("Result: " + result);
});
handle(): Xử lý cả success và failure
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Error!");
}
return 10;
})
.handle((result, ex) -> {
if (ex != null) {
System.out.println("Error: " + ex.getMessage());
return 0; // Default value
} else {
return result * 2;
}
})
.thenAccept(result -> System.out.println("Final: " + result));
Ví dụ thực tế: Parallel API Calls
import java.util.concurrent.*;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class ParallelAPICallsDemo {
// Giả lập API calls
static CompletableFuture<String> fetchUser(int id) {
return CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "User-" + id;
});
}
static CompletableFuture<List<String>> fetchPosts(String user) {
return CompletableFuture.supplyAsync(() -> {
sleep(1500);
return Arrays.asList(user + "-Post1", user + "-Post2");
});
}
static CompletableFuture<String> fetchComments(String post) {
return CompletableFuture.supplyAsync(() -> {
sleep(800);
return post + "-Comments";
});
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
// Fetch users song song
List<CompletableFuture<String>> userFutures = Arrays.asList(
fetchUser(1),
fetchUser(2),
fetchUser(3)
);
// Chờ tất cả users
CompletableFuture<Void> allUsers = CompletableFuture.allOf(
userFutures.toArray(new CompletableFuture[0])
);
CompletableFuture<List<String>> allData = allUsers.thenApply(v -> {
// Lấy kết quả tất cả users
return userFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
});
List<String> users = allData.join();
System.out.println("Users: " + users);
long end = System.currentTimeMillis();
System.out.println("Time: " + (end - start) + "ms");
// Chỉ ~1000ms thay vì 3000ms (nếu tuần tự)
}
static void sleep(int ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { }
}
}
ForkJoinPool và Work-Stealing
ForkJoinPool là một ExecutorService đặc biệt được thiết kế cho recursive, divide-and-conquer (chia để trị) tasks. Java 7+ sử dụng nó làm engine cho parallel streams và CompletableFuture.supplyAsync().
Khái niệm Work-Stealing
Work-stealing (đánh cắp công việc) là thuật toán scheduling đặc biệt:
- Mỗi worker thread có một deque riêng (double-ended queue)
- Thread xử lý tasks từ đầu deque của mình (LIFO)
- Khi một thread rảnh, nó "đánh cắp" tasks từ cuối deque của thread khác (FIFO)
- Giảm contention (tranh chấp) — mỗi thread chủ yếu làm việc với queue riêng
Work-Stealing Visualization:
Thread 1 Deque Thread 2 Deque Thread 3 Deque (idle)
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Task A ←── │ own │ Task D ←── │ own │ (empty) │
│ Task B │work │ Task E │work │ │
│ Task C │ │ Task F │ │ │
└──────┬───────┘ └──────┬───────┘ └──────────────┘
↑ ↑ ↓
(LIFO) (LIFO) Steal from
Process Process others (FIFO)
own tasks own tasks ↓
┌──────────────┐
│ Steal Task C │
│ from T1 │
└──────────────┘
Lợi ích:
- Load balancing tự động — threads bận không bị block vì threads rảnh giúp
- Cache locality tốt — thread xử lý tasks liên quan (từ cùng một split)
- Ít contention — mỗi thread chủ yếu làm việc với queue riêng
RecursiveTask vs RecursiveAction
ForkJoinPool chạy các tasks extend từ ForkJoinTask, thường dùng 2 subclass:
| Class | Trả về giá trị? | Use case |
|---|---|---|
RecursiveTask<V> | Có (compute() trả về V) | Tính toán có kết quả (sum, max, search) |
RecursiveAction | Không (compute() void) | Side effects (logging, update array) |
Ví dụ: Parallel Array Sum với RecursiveTask
import java.util.concurrent.*;
public class ParallelSumExample {
// RecursiveTask: Tính tổng array song song
static class SumTask extends RecursiveTask<Long> {
private final int[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 1000; // Ngưỡng để chia nhỏ
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
// Base case: Array nhỏ → tính trực tiếp
if (length <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// Recursive case: Chia array thành 2 nửa
int mid = start + length / 2;
// Fork: Tạo subtask cho nửa trái
SumTask leftTask = new SumTask(array, start, mid);
leftTask.fork(); // Async execution
// Compute: Xử lý nửa phải trong thread hiện tại
SumTask rightTask = new SumTask(array, mid, end);
long rightResult = rightTask.compute(); // Sync
// Join: Chờ kết quả nửa trái
long leftResult = leftTask.join(); // Blocking wait
// Combine results
return leftResult + rightResult;
}
}
public static void main(String[] args) {
int size = 10_000_000;
int[] array = new int[size];
for (int i = 0; i < size; i++) {
array[i] = i + 1;
}
// 1. Sequential sum
long start = System.currentTimeMillis();
long sum1 = 0;
for (int value : array) {
sum1 += value;
}
long time1 = System.currentTimeMillis() - start;
System.out.println("Sequential: " + sum1 + " in " + time1 + "ms");
// 2. Parallel sum với ForkJoinPool
start = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool(); // Default: Runtime.getRuntime().availableProcessors() threads
SumTask task = new SumTask(array, 0, array.length);
long sum2 = pool.invoke(task); // Submit và chờ kết quả
long time2 = System.currentTimeMillis() - start;
System.out.println("Parallel: " + sum2 + " in " + time2 + "ms");
// Speedup
System.out.printf("Speedup: %.2fx\n", (double) time1 / time2);
pool.shutdown();
}
}
Output (8 cores):
Sequential: 50000005000000 in 15ms
Parallel: 50000005000000 in 5ms
Speedup: 3.00x
Pattern:
- Base case: Nếu problem nhỏ → solve trực tiếp
- Divide: Chia problem thành subproblems
- Fork: Submit subtasks async (
fork()) - Compute: Xử lý một phần sync (
compute()) - Join: Chờ kết quả (
join()) - Combine: Merge kết quả
ForkJoinPool.commonPool()
Java 8+ cung cấp shared common pool cho toàn bộ JVM:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
// CompletableFuture.supplyAsync() dùng common pool mặc định
CompletableFuture.supplyAsync(() -> "Task"); // ← Chạy trong common pool
// Parallel streams dùng common pool
List.of(1, 2, 3, 4, 5)
.parallelStream() // ← Chạy trong common pool
.map(x -> x * 2)
.forEach(System.out::println);
Số threads trong common pool:
int parallelism = ForkJoinPool.commonPool().getParallelism();
// Mặc định: Runtime.getRuntime().availableProcessors() - 1
System.out.println("Common pool parallelism: " + parallelism);
Tùy chỉnh:
# JVM flag để set common pool size
java -Djava.util.concurrent.ForkJoinPool.common.parallelism=4 MyApp
Khi nào dùng ForkJoinPool?
Phù hợp
- Recursive algorithms: Quick sort, merge sort, tree traversal
- Data parallelism: Process large array/collection song song
- Compute-intensive tasks: CPU-bound, không I/O
- Divide-and-conquer: Problems chia nhỏ được thành subproblems độc lập
// Ví dụ: Parallel merge sort, parallel file tree traversal
class MergeSortTask extends RecursiveAction {
private int[] array;
private int left, right;
// ...
}
Không phù hợp
KHÔNG dùng ForkJoinPool cho blocking I/O tasks (database, network, file I/O)!
- ForkJoinPool có ít threads (default = CPU cores)
- Blocking tasks làm threads bận rỗi → starve các tasks khác
- Work-stealing không hiệu quả khi threads bị block
Dùng standard ThreadPoolExecutor cho I/O tasks với nhiều threads hơn.
// ❌ SAI: Blocking I/O trong ForkJoinPool
ForkJoinPool.commonPool().submit(() -> {
// Đọc file → block thread!
Files.readAllLines(Paths.get("large-file.txt"));
});
// ✅ ĐÚNG: Dùng ThreadPoolExecutor cho I/O
ExecutorService ioPool = Executors.newFixedThreadPool(20);
ioPool.submit(() -> {
Files.readAllLines(Paths.get("large-file.txt"));
});
Parallel Streams và ForkJoinPool
Parallel streams sử dụng ForkJoinPool.commonPool() — điều này có implications:
// Parallel stream dùng common pool
List<Integer> list = IntStream.range(0, 1000).boxed().toList();
list.parallelStream()
.map(x -> {
System.out.println(Thread.currentThread().getName());
return x * 2;
})
.forEach(System.out::println);
// Output: Tất cả chạy trong ForkJoinPool.commonPool-worker-X
Vấn đề: Nếu code của bạn và library bên thứ 3 cùng dùng parallel streams → share chung common pool → có thể starve lẫn nhau.
Giải pháp: Dùng custom ForkJoinPool:
ForkJoinPool customPool = new ForkJoinPool(4);
customPool.submit(() -> {
// Parallel stream trong custom pool
list.parallelStream()
.map(x -> x * 2)
.forEach(System.out::println);
}).get(); // Chờ complete
customPool.shutdown();
Best Practices
1. Always shutdown executor correctly
Canonical Oracle shutdown pattern:
ExecutorService executor = Executors.newFixedThreadPool(10);
try {
// Submit tasks
executor.submit(() -> doWork());
} finally {
// Bước 1: Graceful shutdown — không nhận tasks mới
executor.shutdown();
try {
// Bước 2: Chờ tasks hiện tại hoàn thành (tối đa 60s)
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// Bước 3: Timeout → Force shutdown
executor.shutdownNow();
// Bước 4: Chờ thêm lần nữa sau shutdownNow
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException ex) {
// Bước 5: Thread bị interrupt → Force shutdown ngay
executor.shutdownNow();
// CRITICAL: Preserve interrupt status
Thread.currentThread().interrupt();
}
}
Tại sao Thread.currentThread().interrupt() quan trọng?
Khi catch InterruptedException, interrupt flag bị clear. Nếu không restore, caller không biết thread đã bị interrupt:
// Nếu không restore interrupt
catch (InterruptedException ex) {
executor.shutdownNow();
// ❌ Caller không biết thread bị interrupt!
}
// Đúng: Restore interrupt status
catch (InterruptedException ex) {
executor.shutdownNow();
Thread.currentThread().interrupt(); // ✅ Preserve interrupt
// Caller có thể check: Thread.interrupted()
}
2. Calculate pool size correctly
Goetz formula (từ "Java Concurrency in Practice"):
N = Ncpu × Ucpu × (1 + W/C)
Trong đó:
Ncpu= Số CPU cores (Runtime.getRuntime().availableProcessors())Ucpu= CPU utilization target (0.0 - 1.0, ví dụ 0.8 = 80%)W= Wait time per task (thời gian chờ I/O, network, etc.)C= Compute time per task (thời gian CPU xử lý)
Ví dụ tính toán:
// Hệ thống: 8 cores, target 80% CPU utilization
int Ncpu = Runtime.getRuntime().availableProcessors(); // 8
double Ucpu = 0.8;
// Case 1: CPU-bound tasks (W/C ≈ 0, hầu như không I/O)
// N = 8 × 0.8 × (1 + 0) = 6.4 ≈ 6-8 threads
int cpuBoundPoolSize = Ncpu; // hoặc Ncpu + 1
// Case 2: I/O-bound tasks (W/C = 9, 90% thời gian chờ I/O, 10% CPU)
// N = 8 × 0.8 × (1 + 9) = 64 threads
double W_C_ratio = 9.0; // 9:1 wait-to-compute
int ioBoundPoolSize = (int) (Ncpu * Ucpu * (1 + W_C_ratio)); // 64
// Case 3: Mixed workload (W/C = 1, 50% I/O, 50% CPU)
// N = 8 × 0.8 × (1 + 1) = 12.8 ≈ 12-16 threads
int mixedPoolSize = (int) (Ncpu * Ucpu * (1 + 1.0)); // ~13
Rule of thumb:
- CPU-bound:
poolSize = NcpuhoặcNcpu + 1 - I/O-bound:
poolSize = Ncpu * 2(hoặc nhiều hơn tùy I/O intensity) - Database calls:
poolSize = 20-50(hoặc theo DB connection pool size) - Pure blocking I/O:
poolSize = 50-200+
Formula chỉ là heuristic — luôn cần đo đạc và tune dựa trên monitoring production!
3. Handle exceptions properly
Đặc biệt trong CompletableFuture:
CompletableFuture.supplyAsync(() -> riskyOperation())
.exceptionally(ex -> {
log.error("Operation failed", ex);
return defaultValue;
})
.thenAccept(result -> process(result));
// Hoặc dùng handle() để xử lý cả success và failure
future.handle((result, ex) -> {
if (ex != null) {
return handleError(ex);
}
return processResult(result);
});
4. Avoid blocking in CompletableFuture
Dùng async methods để tránh block threads:
// ❌ BAD: Block thread pool
CompletableFuture.supplyAsync(() -> fetchData())
.thenApply(data -> {
blockingDatabaseCall(data); // ← Block worker thread!
return result;
});
// ✅ GOOD: Async all the way
CompletableFuture.supplyAsync(() -> fetchData())
.thenApplyAsync(data -> blockingDatabaseCall(data), ioExecutor);
// ↑ Chạy trong dedicated I/O pool
5. Use custom executor for critical tasks
Tránh dùng ForkJoinPool.commonPool() cho business-critical tasks:
// ❌ BAD: Dùng shared common pool
CompletableFuture.supplyAsync(() -> criticalTask());
// ✅ GOOD: Custom executor riêng
ExecutorService criticalExecutor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> criticalTask(), criticalExecutor);
6. Always set timeouts
Tránh chờ vô hạn:
// Future với timeout
try {
result = future.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
// Handle timeout
}
// CompletableFuture với timeout (Java 9+)
future.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
// Handle timeout
}
return defaultValue;
});
7. Name your threads
Giúp debugging dễ dàng hơn:
ThreadFactory namedFactory = new ThreadFactory() {
private AtomicInteger counter = new AtomicInteger(0);
public Thread newThread(Runnable r) {
return new Thread(r, "MyApp-Worker-" + counter.incrementAndGet());
}
};
ExecutorService executor = new ThreadPoolExecutor(
10, 20, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
namedFactory // ← Custom thread names
);
8. Monitor your thread pools
Track metrics trong production:
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
// Periodic monitoring
ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
monitor.scheduleAtFixedRate(() -> {
System.out.println("=== Thread Pool Stats ===");
System.out.println("Active threads: " + executor.getActiveCount());
System.out.println("Pool size: " + executor.getPoolSize());
System.out.println("Queue size: " + executor.getQueue().size());
System.out.println("Completed tasks: " + executor.getCompletedTaskCount());
System.out.println("Total tasks: " + executor.getTaskCount());
}, 0, 10, TimeUnit.SECONDS);
:::
OCP Exam Tips
Những điểm thường xuất hiện trong bài thi Oracle Certified Professional Java:
1. submit() vs execute()
ExecutorService executor = Executors.newFixedThreadPool(2);
// execute(): Chỉ nhận Runnable, không trả về gì
executor.execute(() -> System.out.println("Task"));
// Không thể get result, không thể cancel
// submit(): Nhận Runnable hoặc Callable, trả về Future
Future<String> future = executor.submit(() -> "Result");
String result = future.get(); // Có thể get result
future.cancel(true); // Có thể cancel
execute(Runnable) | submit(Callable<T>) | submit(Runnable) | |
|---|---|---|---|
| Return type | void | Future<T> | Future<?> |
| Get result | ❌ | ✅ | ✅ (null) |
| Cancel task | ❌ | ✅ | ✅ |
| Exception handling | In ra console | Wrapped trong ExecutionException | Wrapped trong ExecutionException |
2. Future.get() throws ExecutionException
Future.get() wrap exception gốc trong ExecutionException:
Future<Integer> future = executor.submit(() -> {
throw new IllegalArgumentException("Invalid input"); // ← Exception gốc
});
try {
future.get();
} catch (ExecutionException e) {
// ← Bắt ExecutionException, không phải IllegalArgumentException!
System.out.println("Caught: " + e.getClass().getName());
// ExecutionException
Throwable cause = e.getCause(); // ← Lấy exception gốc
System.out.println("Cause: " + cause.getClass().getName());
// IllegalArgumentException
}
Quy tắc:
get()throwExecutionException(checked) nếu task throw exceptionget()throwInterruptedException(checked) nếu thread bị interruptget()throwTimeoutException(checked) nếu dùng timeout- Exception gốc nằm trong
ExecutionException.getCause()
3. shutdown() vs shutdownNow()
ExecutorService executor = Executors.newFixedThreadPool(5);
// shutdown(): Graceful shutdown
executor.shutdown();
// - Không nhận tasks mới (reject với RejectedExecutionException)
// - Tasks đang chạy: Tiếp tục chạy đến hết
// - Tasks trong queue: Vẫn được chạy
// - Trả về: void
// shutdownNow(): Force shutdown
List<Runnable> notStarted = executor.shutdownNow();
// - Không nhận tasks mới
// - Tasks đang chạy: Interrupt (nếu task hỗ trợ interruption)
// - Tasks trong queue: KHÔNG chạy, trả về list
// - Trả về: List<Runnable> (tasks chưa chạy)
shutdown() | shutdownNow() | |
|---|---|---|
| Nhận tasks mới | ❌ | ❌ |
| Tasks đang chạy | Hoàn thành | Interrupt |
| Tasks trong queue | Chạy hết | Bỏ qua, return list |
| Return type | void | List<Runnable> |
| Blocking | ❌ (non-blocking) | ❌ (non-blocking) |
Lưu ý: shutdown() và shutdownNow() không block — dùng awaitTermination() để chờ:
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS); // ← Blocking wait
4. CompletableFuture.supplyAsync() default pool
// Không truyền executor → dùng ForkJoinPool.commonPool()
CompletableFuture.supplyAsync(() -> "Task");
// ↑ Chạy trong ForkJoinPool.commonPool()
// Parallelism của common pool
int parallelism = ForkJoinPool.commonPool().getParallelism();
// = Runtime.getRuntime().availableProcessors() - 1
Exam gotcha:
// Q: Đoạn code này chạy trong bao nhiêu threads?
CompletableFuture.supplyAsync(() -> task1());
CompletableFuture.supplyAsync(() -> task2());
CompletableFuture.supplyAsync(() -> task3());
// A: ForkJoinPool.commonPool() threads (shared)
// Không phải 3 threads mới!
5. ScheduledExecutorService: scheduleAtFixedRate vs scheduleWithFixedDelay
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// scheduleAtFixedRate: Period cố định, BỎ QUA execution time
scheduler.scheduleAtFixedRate(
() -> { sleep(1500); }, // Task chạy 1.5s
0, // Initial delay
1, // Period = 1s
TimeUnit.SECONDS
);
// Timeline: 0s → 1s → 2s → 3s (start times, ignore 1.5s execution)
// Nếu task chạy lâu hơn period → task chạy liền nhau (không delay)
// scheduleWithFixedDelay: Delay SAU KHI task hoàn thành
scheduler.scheduleWithFixedDelay(
() -> { sleep(1500); }, // Task chạy 1.5s
0, // Initial delay
1, // Delay after completion = 1s
TimeUnit.SECONDS
);
// Timeline: 0s → 2.5s (1.5s run + 1s delay) → 5s → 7.5s
So sánh:
scheduleAtFixedRate | scheduleWithFixedDelay | |
|---|---|---|
| Period/Delay tính từ | Start time của task | End time của task |
| Execution time < period | Chờ đủ period rồi chạy | Chờ delay sau khi done |
| Execution time > period | Chạy liền (no delay) | Vẫn chờ delay |
| Use case | Periodic polls (fixed rate) | Tasks phụ thuộc nhau (với delay) |
6. invokeAll() vs invokeAny()
List<Callable<String>> tasks = List.of(
() -> { sleep(1000); return "Task 1"; },
() -> { sleep(2000); return "Task 2"; },
() -> { sleep(3000); return "Task 3"; }
);
// invokeAll(): Chờ TẤT CẢ hoàn thành
List<Future<String>> results = executor.invokeAll(tasks);
// Chờ 3000ms (task chậm nhất)
// Return: List<Future> cho tất cả tasks
// invokeAny(): Trả về kết quả ĐẦU TIÊN hoàn thành
String result = executor.invokeAny(tasks);
// Chờ ~1000ms (task nhanh nhất)
// Return: String (kết quả trực tiếp, không phải Future)
// Các tasks khác bị cancel
7. Callable vs Runnable
// Runnable: Không return, không throw checked exception
Runnable r = () -> System.out.println("No return");
executor.execute(r);
// Callable: Return value, throw checked exception
Callable<Integer> c = () -> {
if (error) throw new Exception("Checked!"); // ✅ OK
return 42;
};
Future<Integer> future = executor.submit(c);
Runnable | Callable<V> | |
|---|---|---|
| Method | void run() | V call() throws Exception |
| Return value | ❌ | ✅ (type V) |
| Throw checked exception | ❌ | ✅ |
| Submit với execute() | ✅ | ❌ (compile error) |
| Submit với submit() | ✅ (return Future<?>) | ✅ (return Future<V>) |
Common Exam Traps
// Trap 1: shutdown() không block
executor.shutdown();
System.out.println("Done"); // ← In ngay, không chờ tasks
// Fix: dùng awaitTermination()
// Trap 2: Future.get() wrap exception
try {
future.get();
} catch (IllegalStateException e) { // ← SAI! Không bắt được
// ExecutionException bao ngoài!
}
// Trap 3: submit(Runnable) return Future<?>, get() return null
Future<?> f = executor.submit(() -> System.out.println("Task"));
Object result = f.get(); // ← null, không phải "Task"!
// Trap 4: CachedThreadPool có thể tạo vô số threads
ExecutorService e = Executors.newCachedThreadPool();
// Không giới hạn threads → nguy hiểm!
Bài tập
Bài 1: Thread Pool Comparison
So sánh hiệu suất:
- Tạo 1000 threads thủ công vs FixedThreadPool(10)
- Measure time và memory usage
Bài 2: CompletableFuture Chaining
Tạo pipeline:
- Fetch user data (1s)
- Fetch user posts (2s)
- Fetch comments for each post (1s)
- Aggregate results
Bài 3: Parallel File Processing
Xử lý 10 files song song:
- Read file
- Count words
- Aggregate total word count Dùng CompletableFuture.allOf()
Tóm tắt
- Executor Framework: Quản lý thread pools, tách task submission khỏi execution
- ExecutorService: Interface chính, cung cấp submit, shutdown
- 4 loại pools: FixedThreadPool, CachedThreadPool, SingleThreadExecutor, ScheduledThreadPool
- Future: Đại diện async result, blocking get()
- CompletableFuture: Non-blocking, chaining, exception handling tốt
Bài tiếp theo: Concurrent Collections - Thread-safe data structures!
Đọc thêm
Tài liệu chính thức
- Oracle: Executor Interfaces
- ThreadPoolExecutor Javadoc
- ForkJoinPool Javadoc
- CompletableFuture Javadoc
Sách chuyên sâu
- Java Concurrency in Practice (Brian Goetz)
- Chapter 6: Task Execution
- Chapter 8: Applying Thread Pools
- Section 8.1: Implicit Couplings Between Tasks and Execution Policies
- Section 8.2: Sizing Thread Pools (Goetz formula)
- Effective Java (Joshua Bloch)
- Item 80: Prefer executors, tasks, and streams to threads
- Item 81: Prefer concurrency utilities to wait and notify
Bài học liên quan
- Thread Creation — Nền tảng về threads
- Synchronization — Đồng bộ hóa với locks
- Concurrent Collections — Thread-safe data structures
Chủ đề nâng cao
- Parallel Streams internals và ForkJoinPool
- Virtual Threads (Project Loom, Java 19+)
- Reactive Programming với Project Reactor/RxJava
- Distributed task execution với frameworks như Apache Spark