Chuyển tới nội dung chính

Executor Framework

Mục tiêu bài học

Sau bài này, bạn sẽ:

  • Hiểu được vấn đề của việc tạo threads thủ công và lợi ích của Thread Pool
  • Sử dụng được ExecutorService và các loại thread pools (Fixed, Cached, Single, Scheduled)
  • Nắm được cách làm việc với Future để lấy kết quả async
  • Sử dụng CompletableFuture cho non-blocking operations
  • Biết cách shutdown executor service đúng cách

Bài trước: Synchronization — Đã học cách đồng bộ hóa threads với synchronized và locks. Bài này sẽ giới thiệu cách quản lý threads hiệu quả hơn với Executor Framework.

Vấn đề với việc tạo Thread thủ công

Tạo thread trực tiếp có nhiều hạn chế

// Cách cũ: Tạo thread thủ công
public class ManualThreadProblem {
public static void main(String[] args) {
// Tạo 100 tasks
for (int i = 0; i < 100; i++) {
Thread t = new Thread(() -> {
// Process task
System.out.println("Processing in: " +
Thread.currentThread().getName());
});
t.start(); // Tạo 100 threads!
}
}
}

Vấn đề:

  1. Overhead: Tạo/hủy thread tốn tài nguyên (CPU, memory)
  2. Không giới hạn: Có thể tạo vô số threads → crash JVM
  3. Không tái sử dụng: Thread dùng xong bị hủy
  4. Khó quản lý: Không có cách tập trung theo dõi, hủy tasks
  5. Không có queue: Task đến phải tạo thread ngay
mẹo

Giải pháp: Thread Pool - Tập hợp các threads sẵn sàng, tái sử dụng được.

Executor Interface

Interface cơ bản nhất trong framework:

public interface Executor {
void execute(Runnable command);
}

Tách biệt:

  • Task submission (submit task)
  • Task execution (thực thi task)
// Cách cũ
Thread t = new Thread(task);
t.start();

// Cách mới với Executor
Executor executor = ...;
executor.execute(task); // Executor quyết định cách thực thi

ExecutorService Interface

Mở rộng Executor, thêm nhiều features:

public interface ExecutorService extends Executor {
// Submit tasks
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);

// Shutdown
void shutdown(); // Graceful shutdown
List<Runnable> shutdownNow(); // Immediate shutdown
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit);

// Batch submissions
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
<T> T invokeAny(Collection<? extends Callable<T>> tasks);
}

Ví dụ cơ bản

import java.util.concurrent.*;

public class ExecutorServiceDemo {
public static void main(String[] args) {
// Tạo thread pool với 3 threads
ExecutorService executor = Executors.newFixedThreadPool(3);

// Submit 10 tasks
for (int i = 1; i <= 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " running in: " +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

// Shutdown executor
executor.shutdown(); // Không nhận task mới, chờ tasks hiện tại hoàn thành

try {
// Chờ tối đa 1 phút
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // Force shutdown
}
} catch (InterruptedException e) {
executor.shutdownNow();
}

System.out.println("All tasks completed");
}
}

Output:

Task 1 running in: pool-1-thread-1
Task 2 running in: pool-1-thread-2
Task 3 running in: pool-1-thread-3
Task 4 running in: pool-1-thread-1 ← Thread được tái sử dụng
Task 5 running in: pool-1-thread-2
...
All tasks completed

Executors Factory Methods

Class Executors cung cấp các factory methods để tạo thread pools:

1. newFixedThreadPool(n)

Thread pool với số lượng threads cố định:

ExecutorService executor = Executors.newFixedThreadPool(4);

Đặc điểm:

  • Tạo pool với n threads
  • Threads tồn tại cho đến khi shutdown
  • Nếu tất cả threads bận, tasks chờ trong queue
  • Use case: Biết trước số lượng tasks, cần giới hạn threads
Thread Pool (size = 4)
┌─────────┬─────────┬─────────┬─────────┐
│ Thread1 │ Thread2 │ Thread3 │ Thread4 │
└─────────┴─────────┴─────────┴─────────┘

Task Queue
┌────┬────┬────┐
│ T5 │ T6 │ T7 │
└────┴────┴────┘

2. newCachedThreadPool()

Thread pool mở rộng động:

ExecutorService executor = Executors.newCachedThreadPool();

Đặc điểm:

  • Tạo thread mới khi cần
  • Threads idle > 60s sẽ bị terminate
  • Không giới hạn số threads (nguy hiểm nếu lạm dụng!)
  • Use case: Nhiều short-lived tasks, số lượng không xác định
cảnh báo

Cẩn thận với newCachedThreadPool(): Có thể tạo quá nhiều threads → OutOfMemoryError!

3. newSingleThreadExecutor()

Thread pool với chỉ 1 thread:

ExecutorService executor = Executors.newSingleThreadExecutor();

Đặc điểm:

  • Chỉ 1 thread xử lý tasks tuần tự
  • Tasks thực thi theo thứ tự submit (FIFO)
  • Use case: Đảm bảo tasks chạy tuần tự, không đồng thời
public class SingleThreadDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();

for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " - " +
Thread.currentThread().getName());
});
}

executor.shutdown();
}
}

Output (tuần tự):

Task 1 - pool-1-thread-1
Task 2 - pool-1-thread-1
Task 3 - pool-1-thread-1
Task 4 - pool-1-thread-1
Task 5 - pool-1-thread-1

4. newScheduledThreadPool(n)

Thread pool để schedule tasks (thực thi sau delay hoặc định kỳ):

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

// Chạy sau 5 giây
executor.schedule(() -> {
System.out.println("Task executed after 5 seconds");
}, 5, TimeUnit.SECONDS);

// Chạy mỗi 2 giây, bắt đầu sau 1 giây
executor.scheduleAtFixedRate(() -> {
System.out.println("Periodic task: " + System.currentTimeMillis());
}, 1, 2, TimeUnit.SECONDS);

// Chạy với delay 3 giây giữa các lần (sau khi task trước hoàn thành)
executor.scheduleWithFixedDelay(() -> {
System.out.println("Task with fixed delay");
}, 0, 3, TimeUnit.SECONDS);

So sánh:

MethodÝ nghĩa
schedule(task, delay, unit)Chạy 1 lần sau delay
scheduleAtFixedRate(task, initialDelay, period, unit)Chạy định kỳ, bỏ qua execution time
scheduleWithFixedDelay(task, initialDelay, delay, unit)Chạy định kỳ, delay sau khi task hoàn thành

So sánh các Executors

TypeThreadsQueueUse Case
FixedThreadPoolCố định (n)UnboundedTasks ổn định, cần giới hạn threads
CachedThreadPoolĐộng (0 - ∞)SynchronousQueueShort-lived tasks, số lượng không xác định
SingleThreadExecutor1UnboundedTasks tuần tự
ScheduledThreadPoolCố định (n)DelayedWorkQueueScheduled/periodic tasks

ThreadPoolExecutor — Cấu hình chi tiết

Các factory methods của Executors thuận tiện nhưng che giấu nhiều chi tiết quan trọng. Trong production, nên tạo ThreadPoolExecutor trực tiếp để kiểm soát đầy đủ.

Constructor đầy đủ

public ThreadPoolExecutor(
int corePoolSize, // Số threads tối thiểu luôn tồn tại
int maximumPoolSize, // Số threads tối đa
long keepAliveTime, // Thời gian thread idle trước khi bị terminate
TimeUnit unit, // Đơn vị của keepAliveTime
BlockingQueue<Runnable> workQueue, // Queue chứa tasks đang chờ
ThreadFactory threadFactory, // Factory tạo threads mới
RejectedExecutionHandler handler // Xử lý khi pool đầy
)

Cách hoạt động:

  1. Nếu số threads < corePoolSize: Tạo thread mới cho task
  2. Nếu số threads ≥ corePoolSize: Đưa task vào queue
  3. Nếu queue đầy và threads < maximumPoolSize: Tạo thêm thread mới
  4. Nếu đạt maximumPoolSize và queue đầy: Gọi RejectedExecutionHandler

Vấn đề với Executors factory methods

newCachedThreadPool() — Nguy hiểm không giới hạn

// Executors.newCachedThreadPool() tạo:
new ThreadPoolExecutor(
0, // corePoolSize = 0
Integer.MAX_VALUE, // maximumPoolSize = ~2 tỷ threads!
60L, TimeUnit.SECONDS,
new SynchronousQueue<>() // Không lưu trữ tasks
);

Vấn đề: SynchronousQueue không có capacity — mỗi task phải có thread sẵn sàng, nếu không sẽ tạo thread mới → có thể tạo vô số threads → OutOfMemoryError.

Cảnh báo

newCachedThreadPool() rất nguy hiểm với spike traffic hoặc slow tasks. Có thể crash JVM nếu tạo hàng nghìn threads cùng lúc!

newFixedThreadPool(n) — Risk của unbounded queue

// Executors.newFixedThreadPool(n) tạo:
new ThreadPoolExecutor(
n, n, // corePoolSize = maximumPoolSize = n
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>() // Unbounded queue — không giới hạn!
);

Vấn đề: LinkedBlockingQueue không giới hạn capacity → nếu tasks đến nhanh hơn xử lý, queue phình to vô hạn → OutOfMemoryError.

Cấu hình ThreadPoolExecutor an toàn

import java.util.concurrent.*;

public class SafeThreadPoolExample {
public static void main(String[] args) {
// Bounded queue với capacity giới hạn
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);

// Custom ThreadFactory để đặt tên threads
ThreadFactory threadFactory = new ThreadFactory() {
private int counter = 0;
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "Worker-" + counter++);
t.setDaemon(false); // Non-daemon
return t;
}
};

// Rejection handler — log khi pool đầy
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();

// Tạo pool an toàn
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize: 5 threads luôn tồn tại
10, // maximumPoolSize: tối đa 10 threads
60L, // keepAliveTime: 60 giây
TimeUnit.SECONDS,
workQueue, // Bounded queue, max 100 tasks
threadFactory,
handler
);

// Submit tasks
for (int i = 0; i < 200; i++) {
final int taskId = i;
try {
executor.submit(() -> {
System.out.println("Task " + taskId + " in " +
Thread.currentThread().getName());
try { Thread.sleep(100); } catch (InterruptedException e) {}
});
} catch (RejectedExecutionException e) {
System.err.println("Task " + taskId + " rejected!");
}
}

// Theo dõi pool
System.out.println("Active threads: " + executor.getActiveCount());
System.out.println("Queue size: " + executor.getQueue().size());
System.out.println("Completed tasks: " + executor.getCompletedTaskCount());

executor.shutdown();
}
}

RejectedExecutionHandler — Xử lý khi Pool đầy

Khi pool đạt maximumPoolSize và queue đầy, RejectedExecutionHandler quyết định xử lý task bị từ chối (rejection/từ chối).

4 built-in policies

PolicyHành viKhi nào dùng
AbortPolicy (default)Throw RejectedExecutionExceptionMuốn biết ngay khi pool quá tải, fail-fast
CallerRunsPolicyThread gọi execute() tự chạy taskTạo backpressure (áp lực ngược), làm chậm producer
DiscardPolicyIm lặng bỏ qua taskTask không quan trọng, chấp nhận mất data
DiscardOldestPolicyBỏ task cũ nhất trong queue, thêm task mớiƯu tiên tasks mới hơn

Ví dụ các policies

// 1. AbortPolicy (default): Throw exception
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// Task bị reject → RejectedExecutionException

// 2. CallerRunsPolicy: Caller chạy task (tạo backpressure)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// Main thread sẽ chạy task nếu pool đầy → làm chậm submission

// 3. DiscardPolicy: Im lặng drop task
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// Task bị drop, không log, không exception

// 4. DiscardOldestPolicy: Drop task cũ nhất
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// Bỏ task đầu queue, thêm task mới

Custom RejectedExecutionHandler với logging + metrics

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class LoggingRejectionHandler implements RejectedExecutionHandler {
private final AtomicLong rejectedCount = new AtomicLong(0);

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
long count = rejectedCount.incrementAndGet();

// Log rejection
System.err.println(String.format(
"[REJECTED] Task: %s | Pool: [active=%d, pool=%d, queue=%d] | Total rejected: %d",
r.toString(),
executor.getActiveCount(),
executor.getPoolSize(),
executor.getQueue().size(),
count
));

// Gửi metrics đến monitoring system
// MetricsRegistry.counter("threadpool.rejected").increment();

// Fallback: Chạy task trong caller thread (như CallerRunsPolicy)
if (!executor.isShutdown()) {
r.run();
}
}

public long getRejectedCount() {
return rejectedCount.get();
}
}

// Sử dụng
LoggingRejectionHandler handler = new LoggingRejectionHandler();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
handler
);
Best Practice

Trong production, luôn tạo ThreadPoolExecutor trực tiếp thay vì dùng Executors factory methods:

  • Đặt maximumPoolSize hợp lý (không phải Integer.MAX_VALUE)
  • Dùng bounded queue (không phải LinkedBlockingQueue mặc định)
  • Chọn RejectedExecutionHandler phù hợp với business logic
  • Đặt tên threads rõ ràng với custom ThreadFactory
  • Monitor metrics: active threads, queue size, rejected count

Future<V> Interface

Future đại diện cho kết quả của async computation:

public interface Future<V> {
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws TimeoutException;
boolean isDone();
boolean isCancelled();
boolean cancel(boolean mayInterruptIfRunning);
}

Ví dụ với Future

import java.util.concurrent.*;

public class FutureDemo {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();

// Submit Callable task
Future<Integer> future = executor.submit(() -> {
System.out.println("Task started...");
Thread.sleep(3000); // Giả lập công việc lâu
return 42;
});

System.out.println("Task submitted. Doing other work...");

// Kiểm tra xem task đã xong chưa
while (!future.isDone()) {
System.out.println("Task still running...");
Thread.sleep(500);
}

// Lấy kết quả (blocking)
Integer result = future.get();
System.out.println("Result: " + result);

executor.shutdown();
}
}

Output:

Task submitted. Doing other work...
Task still running...
Task started...
Task still running...
Task still running...
Task still running...
Task still running...
Result: 42

Future.get() với timeout

try {
Integer result = future.get(2, TimeUnit.SECONDS); // Chờ tối đa 2 giây
System.out.println("Result: " + result);
} catch (TimeoutException e) {
System.out.println("Task timeout! Cancelling...");
future.cancel(true); // Interrupt task
}

Cancel task

Future<Integer> future = executor.submit(() -> {
for (int i = 0; i < 10; i++) {
if (Thread.interrupted()) {
System.out.println("Task cancelled");
return -1;
}
System.out.println("Working... " + i);
Thread.sleep(1000);
}
return 100;
});

Thread.sleep(3000);
future.cancel(true); // Cancel và interrupt thread

CompletableFuture (Java 8+)

CompletableFutureFuture nâng cao, hỗ trợ:

  • Non-blocking operations
  • Functional programming (chaining)
  • Exception handling tốt hơn
  • Kết hợp nhiều futures

Tạo CompletableFuture

import java.util.concurrent.CompletableFuture;

// 1. Completed future
CompletableFuture<String> future1 = CompletableFuture.completedFuture("Hello");

// 2. Async computation
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
// Chạy trong ForkJoinPool.commonPool()
try { Thread.sleep(1000); } catch (InterruptedException e) { }
return "Result from async task";
});

// 3. Async với custom executor
ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
return "Result from custom executor";
}, executor);

Chaining với thenApply, thenAccept, thenRun

CompletableFuture.supplyAsync(() -> {
System.out.println("Step 1: Fetch data");
return "data";
})
.thenApply(data -> {
System.out.println("Step 2: Process " + data);
return data.toUpperCase();
})
.thenAccept(result -> {
System.out.println("Step 3: Result = " + result);
})
.thenRun(() -> {
System.out.println("Step 4: Cleanup");
});

Output:

Step 1: Fetch data
Step 2: Process data
Step 3: Result = DATA
Step 4: Cleanup
MethodInputOutputUse Case
thenApply(fn)Kết quả trướcFuture mớiTransform kết quả
thenAccept(consumer)Kết quả trướcVoidXử lý kết quả, không return
thenRun(runnable)KhôngVoidChạy action, không quan tâm kết quả

thenCompose() vs thenApply() — Khi nào dùng gì?

Hai methods này thường gây nhầm lẫn. Hiểu sự khác biệt rất quan trọng khi chain async operations.

Quy tắc đơn giản:

  • thenApply(): Transform kết quả (giống map trong Stream)
  • thenCompose(): Chain async operation phụ thuộc (giống flatMap trong Stream)

thenApply: Synchronous transformation

Dùng khi bạn transform kết quả thành giá trị khác ngay lập tức (không phải async):

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 5)
.thenApply(x -> x * 2) // Transform: 5 → 10
.thenApply(x -> x + 3); // Transform: 10 → 13

System.out.println(future.get()); // 13

Signature:

<U> CompletableFuture<U> thenApply(Function<T, U> fn)
// T → U: Transform trực tiếp

thenCompose: Async chaining

Dùng khi bạn chain một async operation khác (trả về CompletableFuture):

// Service methods trả về CompletableFuture
CompletableFuture<User> getUserById(int id) {
return CompletableFuture.supplyAsync(() -> {
// Fetch user from DB
return new User(id, "Alice");
});
}

CompletableFuture<List<Order>> getOrdersByUser(User user) {
return CompletableFuture.supplyAsync(() -> {
// Fetch orders from DB
return List.of(new Order(1), new Order(2));
});
}

// Sai: thenApply tạo nested Future
CompletableFuture<CompletableFuture<List<Order>>> wrong =
getUserById(1)
.thenApply(user -> getOrdersByUser(user)); // ← Trả về Future[Future[List]]

// Đúng: thenCompose flatten Future
CompletableFuture<List<Order>> correct =
getUserById(1)
.thenCompose(user -> getOrdersByUser(user)); // ← Trả về Future[List]

List<Order> orders = correct.get();
System.out.println("Orders: " + orders.size());

Signature:

<U> CompletableFuture<U> thenCompose(Function<T, CompletableFuture<U>> fn)
// T → CompletableFuture<U>: Flatten nested Future

So sánh trực quan

// Ví dụ: Fetch user → Fetch profile → Process

// 1. thenApply: Sync transformation
CompletableFuture.supplyAsync(() -> "userId-123")
.thenApply(id -> id.toUpperCase()) // String → String
.thenApply(id -> "User: " + id) // String → String
.thenAccept(System.out::println); // User: USERID-123

// 2. thenCompose: Async chaining
CompletableFuture.supplyAsync(() -> "userId-123")
.thenCompose(id -> fetchUser(id)) // String → Future<User>
.thenCompose(user -> fetchProfile(user)) // User → Future<Profile>
.thenAccept(profile -> System.out.println("Profile: " + profile));

Bảng so sánh

thenApply(fn)thenCompose(fn)
Input functionT → UT → CompletableFuture<U>
OutputCompletableFuture<U>CompletableFuture<U>
Use caseSync transformationAsync chaining (dependent operations)
Stream analogymap()flatMap()
Nested Future?Tạo Future<Future<U>> nếu fn trả về FutureFlatten thành Future<U>
Ghi nhớ
  • thenApply: "Transform giá trị này"
  • thenCompose: "Chạy async operation tiếp theo phụ thuộc vào giá trị này"

Kết hợp nhiều Futures

thenCombine: Kết hợp 2 futures

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);

CompletableFuture<Integer> combined = future1.thenCombine(future2, (a, b) -> {
System.out.println("Combining " + a + " and " + b);
return a + b;
});

System.out.println("Result: " + combined.get()); // 30

allOf: Chờ tất cả futures

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Task 1";
});

CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "Task 2";
});

CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
sleep(1500);
return "Task 3";
});

// Chờ tất cả hoàn thành
CompletableFuture<Void> allOf = CompletableFuture.allOf(f1, f2, f3);

allOf.join(); // Blocking wait

System.out.println("All completed:");
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());

anyOf: Lấy kết quả đầu tiên

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "Slow task";
});

CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Fast task";
});

CompletableFuture<Object> anyOf = CompletableFuture.anyOf(f1, f2);
System.out.println("First result: " + anyOf.get()); // "Fast task"

Exception Handling

CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Random error!");
}
return "Success";
})
.exceptionally(ex -> {
// Xử lý exception
System.out.println("Error: " + ex.getMessage());
return "Default value";
})
.thenAccept(result -> {
System.out.println("Result: " + result);
});

handle(): Xử lý cả success và failure

CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Error!");
}
return 10;
})
.handle((result, ex) -> {
if (ex != null) {
System.out.println("Error: " + ex.getMessage());
return 0; // Default value
} else {
return result * 2;
}
})
.thenAccept(result -> System.out.println("Final: " + result));

Ví dụ thực tế: Parallel API Calls

import java.util.concurrent.*;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelAPICallsDemo {

// Giả lập API calls
static CompletableFuture<String> fetchUser(int id) {
return CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "User-" + id;
});
}

static CompletableFuture<List<String>> fetchPosts(String user) {
return CompletableFuture.supplyAsync(() -> {
sleep(1500);
return Arrays.asList(user + "-Post1", user + "-Post2");
});
}

static CompletableFuture<String> fetchComments(String post) {
return CompletableFuture.supplyAsync(() -> {
sleep(800);
return post + "-Comments";
});
}

public static void main(String[] args) {
long start = System.currentTimeMillis();

// Fetch users song song
List<CompletableFuture<String>> userFutures = Arrays.asList(
fetchUser(1),
fetchUser(2),
fetchUser(3)
);

// Chờ tất cả users
CompletableFuture<Void> allUsers = CompletableFuture.allOf(
userFutures.toArray(new CompletableFuture[0])
);

CompletableFuture<List<String>> allData = allUsers.thenApply(v -> {
// Lấy kết quả tất cả users
return userFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
});

List<String> users = allData.join();
System.out.println("Users: " + users);

long end = System.currentTimeMillis();
System.out.println("Time: " + (end - start) + "ms");
// Chỉ ~1000ms thay vì 3000ms (nếu tuần tự)
}

static void sleep(int ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { }
}
}

ForkJoinPool và Work-Stealing

ForkJoinPool là một ExecutorService đặc biệt được thiết kế cho recursive, divide-and-conquer (chia để trị) tasks. Java 7+ sử dụng nó làm engine cho parallel streams và CompletableFuture.supplyAsync().

Khái niệm Work-Stealing

Work-stealing (đánh cắp công việc) là thuật toán scheduling đặc biệt:

  1. Mỗi worker thread có một deque riêng (double-ended queue)
  2. Thread xử lý tasks từ đầu deque của mình (LIFO)
  3. Khi một thread rảnh, nó "đánh cắp" tasks từ cuối deque của thread khác (FIFO)
  4. Giảm contention (tranh chấp) — mỗi thread chủ yếu làm việc với queue riêng
Work-Stealing Visualization:

Thread 1 Deque Thread 2 Deque Thread 3 Deque (idle)
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Task A ←── │ own │ Task D ←── │ own │ (empty) │
│ Task B │work │ Task E │work │ │
│ Task C │ │ Task F │ │ │
└──────┬───────┘ └──────┬───────┘ └──────────────┘
↑ ↑ ↓
(LIFO) (LIFO) Steal from
Process Process others (FIFO)
own tasks own tasks ↓
┌──────────────┐
│ Steal Task C │
│ from T1 │
└──────────────┘

Lợi ích:

  • Load balancing tự động — threads bận không bị block vì threads rảnh giúp
  • Cache locality tốt — thread xử lý tasks liên quan (từ cùng một split)
  • Ít contention — mỗi thread chủ yếu làm việc với queue riêng

RecursiveTask vs RecursiveAction

ForkJoinPool chạy các tasks extend từ ForkJoinTask, thường dùng 2 subclass:

ClassTrả về giá trị?Use case
RecursiveTask<V>Có (compute() trả về V)Tính toán có kết quả (sum, max, search)
RecursiveActionKhông (compute() void)Side effects (logging, update array)

Ví dụ: Parallel Array Sum với RecursiveTask

import java.util.concurrent.*;

public class ParallelSumExample {

// RecursiveTask: Tính tổng array song song
static class SumTask extends RecursiveTask<Long> {
private final int[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 1000; // Ngưỡng để chia nhỏ

public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
int length = end - start;

// Base case: Array nhỏ → tính trực tiếp
if (length <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}

// Recursive case: Chia array thành 2 nửa
int mid = start + length / 2;

// Fork: Tạo subtask cho nửa trái
SumTask leftTask = new SumTask(array, start, mid);
leftTask.fork(); // Async execution

// Compute: Xử lý nửa phải trong thread hiện tại
SumTask rightTask = new SumTask(array, mid, end);
long rightResult = rightTask.compute(); // Sync

// Join: Chờ kết quả nửa trái
long leftResult = leftTask.join(); // Blocking wait

// Combine results
return leftResult + rightResult;
}
}

public static void main(String[] args) {
int size = 10_000_000;
int[] array = new int[size];
for (int i = 0; i < size; i++) {
array[i] = i + 1;
}

// 1. Sequential sum
long start = System.currentTimeMillis();
long sum1 = 0;
for (int value : array) {
sum1 += value;
}
long time1 = System.currentTimeMillis() - start;
System.out.println("Sequential: " + sum1 + " in " + time1 + "ms");

// 2. Parallel sum với ForkJoinPool
start = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool(); // Default: Runtime.getRuntime().availableProcessors() threads
SumTask task = new SumTask(array, 0, array.length);
long sum2 = pool.invoke(task); // Submit và chờ kết quả
long time2 = System.currentTimeMillis() - start;
System.out.println("Parallel: " + sum2 + " in " + time2 + "ms");

// Speedup
System.out.printf("Speedup: %.2fx\n", (double) time1 / time2);

pool.shutdown();
}
}

Output (8 cores):

Sequential: 50000005000000 in 15ms
Parallel: 50000005000000 in 5ms
Speedup: 3.00x

Pattern:

  1. Base case: Nếu problem nhỏ → solve trực tiếp
  2. Divide: Chia problem thành subproblems
  3. Fork: Submit subtasks async (fork())
  4. Compute: Xử lý một phần sync (compute())
  5. Join: Chờ kết quả (join())
  6. Combine: Merge kết quả

ForkJoinPool.commonPool()

Java 8+ cung cấp shared common pool cho toàn bộ JVM:

ForkJoinPool commonPool = ForkJoinPool.commonPool();

// CompletableFuture.supplyAsync() dùng common pool mặc định
CompletableFuture.supplyAsync(() -> "Task"); // ← Chạy trong common pool

// Parallel streams dùng common pool
List.of(1, 2, 3, 4, 5)
.parallelStream() // ← Chạy trong common pool
.map(x -> x * 2)
.forEach(System.out::println);

Số threads trong common pool:

int parallelism = ForkJoinPool.commonPool().getParallelism();
// Mặc định: Runtime.getRuntime().availableProcessors() - 1
System.out.println("Common pool parallelism: " + parallelism);

Tùy chỉnh:

# JVM flag để set common pool size
java -Djava.util.concurrent.ForkJoinPool.common.parallelism=4 MyApp

Khi nào dùng ForkJoinPool?

Phù hợp

  • Recursive algorithms: Quick sort, merge sort, tree traversal
  • Data parallelism: Process large array/collection song song
  • Compute-intensive tasks: CPU-bound, không I/O
  • Divide-and-conquer: Problems chia nhỏ được thành subproblems độc lập
// Ví dụ: Parallel merge sort, parallel file tree traversal
class MergeSortTask extends RecursiveAction {
private int[] array;
private int left, right;
// ...
}

Không phù hợp

Cảnh báo

KHÔNG dùng ForkJoinPool cho blocking I/O tasks (database, network, file I/O)!

  • ForkJoinPool có ít threads (default = CPU cores)
  • Blocking tasks làm threads bận rỗi → starve các tasks khác
  • Work-stealing không hiệu quả khi threads bị block

Dùng standard ThreadPoolExecutor cho I/O tasks với nhiều threads hơn.

// ❌ SAI: Blocking I/O trong ForkJoinPool
ForkJoinPool.commonPool().submit(() -> {
// Đọc file → block thread!
Files.readAllLines(Paths.get("large-file.txt"));
});

// ✅ ĐÚNG: Dùng ThreadPoolExecutor cho I/O
ExecutorService ioPool = Executors.newFixedThreadPool(20);
ioPool.submit(() -> {
Files.readAllLines(Paths.get("large-file.txt"));
});

Parallel Streams và ForkJoinPool

Parallel streams sử dụng ForkJoinPool.commonPool() — điều này có implications:

// Parallel stream dùng common pool
List<Integer> list = IntStream.range(0, 1000).boxed().toList();

list.parallelStream()
.map(x -> {
System.out.println(Thread.currentThread().getName());
return x * 2;
})
.forEach(System.out::println);

// Output: Tất cả chạy trong ForkJoinPool.commonPool-worker-X

Vấn đề: Nếu code của bạn và library bên thứ 3 cùng dùng parallel streams → share chung common pool → có thể starve lẫn nhau.

Giải pháp: Dùng custom ForkJoinPool:

ForkJoinPool customPool = new ForkJoinPool(4);

customPool.submit(() -> {
// Parallel stream trong custom pool
list.parallelStream()
.map(x -> x * 2)
.forEach(System.out::println);
}).get(); // Chờ complete

customPool.shutdown();

Best Practices

mẹo

1. Always shutdown executor correctly

Canonical Oracle shutdown pattern:

ExecutorService executor = Executors.newFixedThreadPool(10);

try {
// Submit tasks
executor.submit(() -> doWork());
} finally {
// Bước 1: Graceful shutdown — không nhận tasks mới
executor.shutdown();

try {
// Bước 2: Chờ tasks hiện tại hoàn thành (tối đa 60s)
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// Bước 3: Timeout → Force shutdown
executor.shutdownNow();

// Bước 4: Chờ thêm lần nữa sau shutdownNow
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException ex) {
// Bước 5: Thread bị interrupt → Force shutdown ngay
executor.shutdownNow();

// CRITICAL: Preserve interrupt status
Thread.currentThread().interrupt();
}
}

Tại sao Thread.currentThread().interrupt() quan trọng?

Khi catch InterruptedException, interrupt flag bị clear. Nếu không restore, caller không biết thread đã bị interrupt:

// Nếu không restore interrupt
catch (InterruptedException ex) {
executor.shutdownNow();
// ❌ Caller không biết thread bị interrupt!
}

// Đúng: Restore interrupt status
catch (InterruptedException ex) {
executor.shutdownNow();
Thread.currentThread().interrupt(); // ✅ Preserve interrupt
// Caller có thể check: Thread.interrupted()
}

2. Calculate pool size correctly

Goetz formula (từ "Java Concurrency in Practice"):

N = Ncpu × Ucpu × (1 + W/C)

Trong đó:

  • Ncpu = Số CPU cores (Runtime.getRuntime().availableProcessors())
  • Ucpu = CPU utilization target (0.0 - 1.0, ví dụ 0.8 = 80%)
  • W = Wait time per task (thời gian chờ I/O, network, etc.)
  • C = Compute time per task (thời gian CPU xử lý)

Ví dụ tính toán:

// Hệ thống: 8 cores, target 80% CPU utilization
int Ncpu = Runtime.getRuntime().availableProcessors(); // 8
double Ucpu = 0.8;

// Case 1: CPU-bound tasks (W/C ≈ 0, hầu như không I/O)
// N = 8 × 0.8 × (1 + 0) = 6.4 ≈ 6-8 threads
int cpuBoundPoolSize = Ncpu; // hoặc Ncpu + 1

// Case 2: I/O-bound tasks (W/C = 9, 90% thời gian chờ I/O, 10% CPU)
// N = 8 × 0.8 × (1 + 9) = 64 threads
double W_C_ratio = 9.0; // 9:1 wait-to-compute
int ioBoundPoolSize = (int) (Ncpu * Ucpu * (1 + W_C_ratio)); // 64

// Case 3: Mixed workload (W/C = 1, 50% I/O, 50% CPU)
// N = 8 × 0.8 × (1 + 1) = 12.8 ≈ 12-16 threads
int mixedPoolSize = (int) (Ncpu * Ucpu * (1 + 1.0)); // ~13

Rule of thumb:

  • CPU-bound: poolSize = Ncpu hoặc Ncpu + 1
  • I/O-bound: poolSize = Ncpu * 2 (hoặc nhiều hơn tùy I/O intensity)
  • Database calls: poolSize = 20-50 (hoặc theo DB connection pool size)
  • Pure blocking I/O: poolSize = 50-200+
cẩn thận

Formula chỉ là heuristic — luôn cần đo đạc và tune dựa trên monitoring production!

3. Handle exceptions properly

Đặc biệt trong CompletableFuture:

CompletableFuture.supplyAsync(() -> riskyOperation())
.exceptionally(ex -> {
log.error("Operation failed", ex);
return defaultValue;
})
.thenAccept(result -> process(result));

// Hoặc dùng handle() để xử lý cả success và failure
future.handle((result, ex) -> {
if (ex != null) {
return handleError(ex);
}
return processResult(result);
});

4. Avoid blocking in CompletableFuture

Dùng async methods để tránh block threads:

// ❌ BAD: Block thread pool
CompletableFuture.supplyAsync(() -> fetchData())
.thenApply(data -> {
blockingDatabaseCall(data); // ← Block worker thread!
return result;
});

// ✅ GOOD: Async all the way
CompletableFuture.supplyAsync(() -> fetchData())
.thenApplyAsync(data -> blockingDatabaseCall(data), ioExecutor);
// ↑ Chạy trong dedicated I/O pool

5. Use custom executor for critical tasks

Tránh dùng ForkJoinPool.commonPool() cho business-critical tasks:

// ❌ BAD: Dùng shared common pool
CompletableFuture.supplyAsync(() -> criticalTask());

// ✅ GOOD: Custom executor riêng
ExecutorService criticalExecutor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> criticalTask(), criticalExecutor);

6. Always set timeouts

Tránh chờ vô hạn:

// Future với timeout
try {
result = future.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
// Handle timeout
}

// CompletableFuture với timeout (Java 9+)
future.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
// Handle timeout
}
return defaultValue;
});

7. Name your threads

Giúp debugging dễ dàng hơn:

ThreadFactory namedFactory = new ThreadFactory() {
private AtomicInteger counter = new AtomicInteger(0);
public Thread newThread(Runnable r) {
return new Thread(r, "MyApp-Worker-" + counter.incrementAndGet());
}
};

ExecutorService executor = new ThreadPoolExecutor(
10, 20, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
namedFactory // ← Custom thread names
);

8. Monitor your thread pools

Track metrics trong production:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);

// Periodic monitoring
ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
monitor.scheduleAtFixedRate(() -> {
System.out.println("=== Thread Pool Stats ===");
System.out.println("Active threads: " + executor.getActiveCount());
System.out.println("Pool size: " + executor.getPoolSize());
System.out.println("Queue size: " + executor.getQueue().size());
System.out.println("Completed tasks: " + executor.getCompletedTaskCount());
System.out.println("Total tasks: " + executor.getTaskCount());
}, 0, 10, TimeUnit.SECONDS);

:::

OCP Exam Tips

OCP Exam Tips

Những điểm thường xuất hiện trong bài thi Oracle Certified Professional Java:

1. submit() vs execute()

ExecutorService executor = Executors.newFixedThreadPool(2);

// execute(): Chỉ nhận Runnable, không trả về gì
executor.execute(() -> System.out.println("Task"));
// Không thể get result, không thể cancel

// submit(): Nhận Runnable hoặc Callable, trả về Future
Future<String> future = executor.submit(() -> "Result");
String result = future.get(); // Có thể get result
future.cancel(true); // Có thể cancel
execute(Runnable)submit(Callable<T>)submit(Runnable)
Return typevoidFuture<T>Future<?>
Get result✅ (null)
Cancel task
Exception handlingIn ra consoleWrapped trong ExecutionExceptionWrapped trong ExecutionException

2. Future.get() throws ExecutionException

Future.get() wrap exception gốc trong ExecutionException:

Future<Integer> future = executor.submit(() -> {
throw new IllegalArgumentException("Invalid input"); // ← Exception gốc
});

try {
future.get();
} catch (ExecutionException e) {
// ← Bắt ExecutionException, không phải IllegalArgumentException!
System.out.println("Caught: " + e.getClass().getName());
// ExecutionException

Throwable cause = e.getCause(); // ← Lấy exception gốc
System.out.println("Cause: " + cause.getClass().getName());
// IllegalArgumentException
}

Quy tắc:

  • get() throw ExecutionException (checked) nếu task throw exception
  • get() throw InterruptedException (checked) nếu thread bị interrupt
  • get() throw TimeoutException (checked) nếu dùng timeout
  • Exception gốc nằm trong ExecutionException.getCause()

3. shutdown() vs shutdownNow()

ExecutorService executor = Executors.newFixedThreadPool(5);

// shutdown(): Graceful shutdown
executor.shutdown();
// - Không nhận tasks mới (reject với RejectedExecutionException)
// - Tasks đang chạy: Tiếp tục chạy đến hết
// - Tasks trong queue: Vẫn được chạy
// - Trả về: void

// shutdownNow(): Force shutdown
List<Runnable> notStarted = executor.shutdownNow();
// - Không nhận tasks mới
// - Tasks đang chạy: Interrupt (nếu task hỗ trợ interruption)
// - Tasks trong queue: KHÔNG chạy, trả về list
// - Trả về: List<Runnable> (tasks chưa chạy)
shutdown()shutdownNow()
Nhận tasks mới
Tasks đang chạyHoàn thànhInterrupt
Tasks trong queueChạy hếtBỏ qua, return list
Return typevoidList<Runnable>
Blocking❌ (non-blocking)❌ (non-blocking)

Lưu ý: shutdown()shutdownNow() không block — dùng awaitTermination() để chờ:

executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS); // ← Blocking wait

4. CompletableFuture.supplyAsync() default pool

// Không truyền executor → dùng ForkJoinPool.commonPool()
CompletableFuture.supplyAsync(() -> "Task");
// ↑ Chạy trong ForkJoinPool.commonPool()

// Parallelism của common pool
int parallelism = ForkJoinPool.commonPool().getParallelism();
// = Runtime.getRuntime().availableProcessors() - 1

Exam gotcha:

// Q: Đoạn code này chạy trong bao nhiêu threads?
CompletableFuture.supplyAsync(() -> task1());
CompletableFuture.supplyAsync(() -> task2());
CompletableFuture.supplyAsync(() -> task3());

// A: ForkJoinPool.commonPool() threads (shared)
// Không phải 3 threads mới!

5. ScheduledExecutorService: scheduleAtFixedRate vs scheduleWithFixedDelay

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

// scheduleAtFixedRate: Period cố định, BỎ QUA execution time
scheduler.scheduleAtFixedRate(
() -> { sleep(1500); }, // Task chạy 1.5s
0, // Initial delay
1, // Period = 1s
TimeUnit.SECONDS
);
// Timeline: 0s → 1s → 2s → 3s (start times, ignore 1.5s execution)
// Nếu task chạy lâu hơn period → task chạy liền nhau (không delay)

// scheduleWithFixedDelay: Delay SAU KHI task hoàn thành
scheduler.scheduleWithFixedDelay(
() -> { sleep(1500); }, // Task chạy 1.5s
0, // Initial delay
1, // Delay after completion = 1s
TimeUnit.SECONDS
);
// Timeline: 0s → 2.5s (1.5s run + 1s delay) → 5s → 7.5s

So sánh:

scheduleAtFixedRatescheduleWithFixedDelay
Period/Delay tính từStart time của taskEnd time của task
Execution time < periodChờ đủ period rồi chạyChờ delay sau khi done
Execution time > periodChạy liền (no delay)Vẫn chờ delay
Use casePeriodic polls (fixed rate)Tasks phụ thuộc nhau (với delay)

6. invokeAll() vs invokeAny()

List<Callable<String>> tasks = List.of(
() -> { sleep(1000); return "Task 1"; },
() -> { sleep(2000); return "Task 2"; },
() -> { sleep(3000); return "Task 3"; }
);

// invokeAll(): Chờ TẤT CẢ hoàn thành
List<Future<String>> results = executor.invokeAll(tasks);
// Chờ 3000ms (task chậm nhất)
// Return: List<Future> cho tất cả tasks

// invokeAny(): Trả về kết quả ĐẦU TIÊN hoàn thành
String result = executor.invokeAny(tasks);
// Chờ ~1000ms (task nhanh nhất)
// Return: String (kết quả trực tiếp, không phải Future)
// Các tasks khác bị cancel

7. Callable vs Runnable

// Runnable: Không return, không throw checked exception
Runnable r = () -> System.out.println("No return");
executor.execute(r);

// Callable: Return value, throw checked exception
Callable<Integer> c = () -> {
if (error) throw new Exception("Checked!"); // ✅ OK
return 42;
};
Future<Integer> future = executor.submit(c);
RunnableCallable<V>
Methodvoid run()V call() throws Exception
Return value✅ (type V)
Throw checked exception
Submit với execute()❌ (compile error)
Submit với submit()✅ (return Future<?>)✅ (return Future<V>)

Common Exam Traps

// Trap 1: shutdown() không block
executor.shutdown();
System.out.println("Done"); // ← In ngay, không chờ tasks
// Fix: dùng awaitTermination()

// Trap 2: Future.get() wrap exception
try {
future.get();
} catch (IllegalStateException e) { // ← SAI! Không bắt được
// ExecutionException bao ngoài!
}

// Trap 3: submit(Runnable) return Future<?>, get() return null
Future<?> f = executor.submit(() -> System.out.println("Task"));
Object result = f.get(); // ← null, không phải "Task"!

// Trap 4: CachedThreadPool có thể tạo vô số threads
ExecutorService e = Executors.newCachedThreadPool();
// Không giới hạn threads → nguy hiểm!

Bài tập

Bài 1: Thread Pool Comparison

So sánh hiệu suất:

  • Tạo 1000 threads thủ công vs FixedThreadPool(10)
  • Measure time và memory usage

Bài 2: CompletableFuture Chaining

Tạo pipeline:

  1. Fetch user data (1s)
  2. Fetch user posts (2s)
  3. Fetch comments for each post (1s)
  4. Aggregate results

Bài 3: Parallel File Processing

Xử lý 10 files song song:

  • Read file
  • Count words
  • Aggregate total word count Dùng CompletableFuture.allOf()

Tóm tắt

  • Executor Framework: Quản lý thread pools, tách task submission khỏi execution
  • ExecutorService: Interface chính, cung cấp submit, shutdown
  • 4 loại pools: FixedThreadPool, CachedThreadPool, SingleThreadExecutor, ScheduledThreadPool
  • Future: Đại diện async result, blocking get()
  • CompletableFuture: Non-blocking, chaining, exception handling tốt

Bài tiếp theo: Concurrent Collections - Thread-safe data structures!

Đọc thêm

Tài liệu chính thức

Sách chuyên sâu

  • Java Concurrency in Practice (Brian Goetz)
    • Chapter 6: Task Execution
    • Chapter 8: Applying Thread Pools
    • Section 8.1: Implicit Couplings Between Tasks and Execution Policies
    • Section 8.2: Sizing Thread Pools (Goetz formula)
  • Effective Java (Joshua Bloch)
    • Item 80: Prefer executors, tasks, and streams to threads
    • Item 81: Prefer concurrency utilities to wait and notify

Bài học liên quan

Chủ đề nâng cao

  • Parallel Streams internals và ForkJoinPool
  • Virtual Threads (Project Loom, Java 19+)
  • Reactive Programming với Project Reactor/RxJava
  • Distributed task execution với frameworks như Apache Spark