Multithreading Best Practices
Sau bài này, bạn sẽ:
- Nắm được 4 chiến lược đạt thread safety: Immutability, Confinement, Synchronization, Atomic variables
- Sử dụng được Atomic variables cho lock-free operations
- Hiểu được ThreadLocal và use cases của nó
- Biết cách test concurrent code với CountDownLatch và CyclicBarrier
- Nhận biết và tránh các common pitfalls (shared mutable state, deadlock, etc.)
Bài trước: Locks và Conditions — Đã học về ReentrantLock, ReadWriteLock, và StampedLock. Bài này sẽ tổng hợp các best practices và common pitfalls trong lập trình đa luồng.
Thread Safety Strategies
Có 4 cách chính để đạt thread safety:
1. Immutability
Immutable objects tự động thread-safe:
public final class ImmutablePoint {
private final int x;
private final int y;
public ImmutablePoint(int x, int y) {
this.x = x;
this.y = y;
}
public int getX() { return x; }
public int getY() { return y; }
// Không có setters!
public ImmutablePoint move(int deltaX, int deltaY) {
return new ImmutablePoint(x + deltaX, y + deltaY);
}
@Override
public String toString() {
return "(" + x + ", " + y + ")";
}
}
Sử dụng:
public class ImmutableDemo {
public static void main(String[] args) throws InterruptedException {
ImmutablePoint point = new ImmutablePoint(0, 0);
// 10 threads cùng "modify" (thực ra tạo instance mới)
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
final int id = i;
threads[i] = new Thread(() -> {
ImmutablePoint newPoint = point.move(id, id * 2);
System.out.println(Thread.currentThread().getName() +
": " + newPoint);
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
System.out.println("Original point unchanged: " + point);
// (0, 0) - không bị modify!
}
}
- Thread-safe tự động: Không cần synchronization
- Dễ reasoning: State không thay đổi
- Cacheable: An toàn để cache
- Hashable: Có thể dùng làm HashMap key
Nhược điểm:
- Tạo nhiều objects → garbage collection overhead
- Không phù hợp với mutable state
Ví dụ thực tế: Immutable User
import java.time.LocalDate;
import java.util.*;
public final class ImmutableUser {
private final String username;
private final String email;
private final LocalDate registeredDate;
private final List<String> roles;
public ImmutableUser(String username, String email,
LocalDate registeredDate, List<String> roles) {
this.username = username;
this.email = email;
this.registeredDate = registeredDate;
// Defensive copy để đảm bảo immutability
this.roles = Collections.unmodifiableList(new ArrayList<>(roles));
}
public String getUsername() { return username; }
public String getEmail() { return email; }
public LocalDate getRegisteredDate() { return registeredDate; }
public List<String> getRoles() {
return roles; // Đã unmodifiable
}
// "Modify" bằng cách tạo instance mới
public ImmutableUser withEmail(String newEmail) {
return new ImmutableUser(username, newEmail, registeredDate, roles);
}
public ImmutableUser addRole(String role) {
List<String> newRoles = new ArrayList<>(roles);
newRoles.add(role);
return new ImmutableUser(username, email, registeredDate, newRoles);
}
}
2. Confinement (Thread Confinement)
Đảm bảo data chỉ được truy cập bởi một thread duy nhất:
a) Stack Confinement
Biến local (stack variables) tự động thread-confined:
public void method() {
int localVar = 0; // Mỗi thread có stack riêng
// localVar tự động thread-safe
localVar++;
}
b) ThreadLocal
ThreadLocal cung cấp per-thread instance:
public class ThreadLocalDemo {
// Mỗi thread có ThreadId riêng
private static ThreadLocal<Integer> threadId = ThreadLocal.withInitial(() -> 0);
public static void main(String[] args) {
for (int i = 1; i <= 5; i++) {
final int id = i;
new Thread(() -> {
threadId.set(id);
System.out.println(Thread.currentThread().getName() +
" has ID: " + threadId.get());
// Mỗi thread thấy giá trị riêng của mình
try { Thread.sleep(1000); } catch (InterruptedException e) {}
System.out.println(Thread.currentThread().getName() +
" still has ID: " + threadId.get());
}, "Thread-" + i).start();
}
}
}
Output:
Thread-1 has ID: 1
Thread-2 has ID: 2
Thread-3 has ID: 3
Thread-4 has ID: 4
Thread-5 has ID: 5
Thread-1 still has ID: 1
Thread-2 still has ID: 2
...
Use cases:
- User context trong web applications
- Database connections per thread
- SimpleDateFormat (không thread-safe)
// SimpleDateFormat không thread-safe
private static ThreadLocal<SimpleDateFormat> dateFormat =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
public String formatDate(Date date) {
return dateFormat.get().format(date);
}
ThreadLocal Memory Leak
ThreadLocal có thể gây memory leak nếu không được sử dụng đúng cách. Hiểu rõ cơ chế bên trong là rất quan trọng.
Cơ chế bên trong
ThreadLocal lưu data trong ThreadLocalMap — một map nội bộ của mỗi Thread:
Thread
└─> ThreadLocalMap
└─> Entry[] table
├─> Entry(WeakReference<ThreadLocal> key, Object value)
├─> Entry(WeakReference<ThreadLocal> key, Object value)
└─> ...
Vấn đề:
- Key (ThreadLocal): Là
WeakReference→ GC có thể thu hồi - Value: Là strong reference → GC không thể thu hồi nếu Entry còn tồn tại
- Khi ThreadLocal bị GC, key = null, nhưng value vẫn còn → memory leak
Tại sao Thread Pools làm tệ hơn?
// Thread pool reuse threads
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executor.submit(() -> {
ThreadLocal<byte[]> local = ThreadLocal.withInitial(() -> new byte[1024 * 1024]); // 1MB
local.set(new byte[1024 * 1024]);
// Không remove() → memory leak!
});
}
// 10 threads × 1000 tasks = entries tích lũy trong ThreadLocalMap
// Memory leak lên đến hàng GB!
Thread pools reuse threads → ThreadLocalMap không bị xóa → entries tích lũy → memory leak nghiêm trọng.
Pattern đúng: Luôn remove() trong finally
private static ThreadLocal<UserContext> userContext = new ThreadLocal<>();
public void handleRequest() {
try {
// Set value
userContext.set(new UserContext("user123"));
// Use value
processRequest();
} finally {
// CRITICAL: Remove để tránh memory leak
userContext.remove();
}
}
InheritableThreadLocal
InheritableThreadLocal cho phép child threads kế thừa giá trị từ parent thread:
private static InheritableThreadLocal<String> inheritedId =
new InheritableThreadLocal<>();
public static void main(String[] args) {
inheritedId.set("parent-123");
new Thread(() -> {
// Child thread thấy giá trị từ parent
System.out.println("Child sees: " + inheritedId.get()); // parent-123
}).start();
}
Lưu ý: InheritableThreadLocal không work với thread pools (threads được reuse, không phải child threads mới).
3. Synchronization
Dùng locks để bảo vệ shared data (đã học ở các bài trước):
// synchronized, ReentrantLock, ReadWriteLock, etc.
public synchronized void method() {
// Critical section
}
4. Atomic Variables
Dùng atomic operations cho single variables.
Atomic Variables
Package java.util.concurrent.atomic cung cấp atomic variables:
AtomicIntegerAtomicLongAtomicBooleanAtomicReference<V>
Compare-And-Swap (CAS)
CAS là hardware-level operation:
boolean compareAndSet(expectedValue, newValue) {
if (currentValue == expectedValue) {
currentValue = newValue;
return true; // Success
}
return false; // Failed (value changed by other thread)
}
Lock-free (không dùng khóa): Không cần lock, dùng CPU instruction atomic.
CPU-Level Implementation
CAS được implement bằng CPU instruction:
- x86/x64:
CMPXCHG(Compare and Exchange) - ARM:
LDREX/STREX(Load-Exclusive/Store-Exclusive)
; x86 Assembly cho CAS
LOCK CMPXCHG [memory], newValue
; LOCK prefix đảm bảo atomicity trên multi-core
Spin-waiting dưới Contention (tranh chấp)
Khi nhiều threads cạnh tranh (high contention), CAS sẽ spin-wait (chờ quay vòng):
// Simplified AtomicInteger.incrementAndGet()
public final int incrementAndGet() {
for (;;) { // Infinite loop - spin waiting
int current = get();
int next = current + 1;
if (compareAndSet(current, next)) {
return next; // Success
}
// Failed → retry (spin)
}
}
Vấn đề với high contention:
- Threads liên tục retry → waste CPU cycles
- Performance degradation (giảm hiệu suất)
- Lock có thể tốt hơn khi contention cao (threads sleep thay vì spin)
CAS vs Lock Performance
| Contention Level | CAS Performance | Lock Performance |
|---|---|---|
| Low (1-2 threads) | Excellent (no overhead - chi phí phụ) | Good (lock overhead) |
| Medium (3-10 threads) | Good (some spinning) | Good |
| High (100+ threads) | Poor (excessive spinning) | Better (threads sleep) |
Kết luận: CAS tốt cho low-contention, locks tốt hơn cho high-contention.
AtomicInteger
import java.util.concurrent.atomic.*;
public class AtomicIntegerDemo {
private AtomicInteger counter = new AtomicInteger(0);
public void increment() {
counter.incrementAndGet(); // Atomic operation
}
public int get() {
return counter.get();
}
public static void main(String[] args) throws InterruptedException {
AtomicIntegerDemo demo = new AtomicIntegerDemo();
// 10 threads, mỗi thread increment 1000 lần
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
demo.increment();
}
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
System.out.println("Counter: " + demo.get()); // 10000 (đúng!)
}
}
Methods:
| Method | Mô tả |
|---|---|
get() | Lấy giá trị hiện tại |
set(newValue) | Set giá trị mới |
getAndSet(newValue) | Get rồi set (atomic) |
incrementAndGet() | ++counter (atomic) |
getAndIncrement() | counter++ (atomic) |
decrementAndGet() | --counter (atomic) |
getAndDecrement() | counter-- (atomic) |
addAndGet(delta) | counter += delta (atomic) |
getAndAdd(delta) | Get rồi add (atomic) |
compareAndSet(expect, update) | CAS operation |
AtomicReference
import java.util.concurrent.atomic.*;
class User {
String name;
int age;
User(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return name + " (" + age + ")";
}
}
public class AtomicReferenceDemo {
private AtomicReference<User> userRef = new AtomicReference<>(
new User("Alice", 25)
);
public void updateUser(String name, int age) {
User oldUser = userRef.get();
User newUser = new User(name, age);
// CAS: Chỉ update nếu giá trị chưa thay đổi
if (userRef.compareAndSet(oldUser, newUser)) {
System.out.println("Updated to: " + newUser);
} else {
System.out.println("Update failed, retry...");
}
}
public static void main(String[] args) {
AtomicReferenceDemo demo = new AtomicReferenceDemo();
// 5 threads cùng update
for (int i = 1; i <= 5; i++) {
final int id = i;
new Thread(() -> {
demo.updateUser("User-" + id, 20 + id);
}).start();
}
}
}
ABA Problem trong CAS
ABA Problem là một vấn đề tinh vi với CAS: giá trị thay đổi A → B → A, nhưng CAS nghĩ rằng không có thay đổi.
Ví dụ: Stack Pop
class Node {
int value;
Node next;
Node(int value, Node next) {
this.value = value;
this.next = next;
}
}
class ConcurrentStack {
private AtomicReference<Node> head = new AtomicReference<>();
public void push(int value) {
Node newHead = new Node(value, head.get());
while (!head.compareAndSet(newHead.next, newHead)) {
newHead.next = head.get();
}
}
public Integer pop() {
Node oldHead = head.get();
if (oldHead == null) return null;
// ABA PROBLEM XẢY RA Ở ĐÂY!
// Thread 1: Đọc oldHead = A (next = B)
// Thread 2: Pop A, Pop B, Push A lại → head = A (nhưng next khác!)
// Thread 1: CAS thành công (A == A), nhưng next đã sai!
while (!head.compareAndSet(oldHead, oldHead.next)) {
oldHead = head.get();
if (oldHead == null) return null;
}
return oldHead.value;
}
}
Timeline của ABA Problem:
Initial: head → A → B → C
Thread 1: oldHead = A, next = B (bị interrupt)
Thread 2: Pop A → head = B
Thread 2: Pop B → head = C
Thread 2: Push A → head = A → C (A được reuse!)
Thread 1: CAS(A, B) thành công! (A == A)
Result: head → B (SAI! C bị mất!)
Giải pháp: AtomicStampedReference
AtomicStampedReference thêm version stamp để phát hiện ABA:
import java.util.concurrent.atomic.*;
class SafeConcurrentStack {
// Pair(reference, stamp)
private AtomicStampedReference<Node> head =
new AtomicStampedReference<>(null, 0);
public void push(int value) {
int[] stampHolder = new int[1];
Node oldHead = head.get(stampHolder);
int oldStamp = stampHolder[0];
Node newHead = new Node(value, oldHead);
// CAS với cả reference VÀ stamp
while (!head.compareAndSet(oldHead, newHead, oldStamp, oldStamp + 1)) {
oldHead = head.get(stampHolder);
oldStamp = stampHolder[0];
newHead.next = oldHead;
}
}
public Integer pop() {
int[] stampHolder = new int[1];
Node oldHead = head.get(stampHolder);
int oldStamp = stampHolder[0];
if (oldHead == null) return null;
// CAS kiểm tra cả reference VÀ stamp → ABA detected!
while (!head.compareAndSet(oldHead, oldHead.next, oldStamp, oldStamp + 1)) {
oldHead = head.get(stampHolder);
oldStamp = stampHolder[0];
if (oldHead == null) return null;
}
return oldHead.value;
}
}
Cách hoạt động:
- Stamp tăng mỗi lần update
- Thread 1 thấy
(A, stamp=1) - Thread 2 pop A, push A lại →
(A, stamp=3) - Thread 1 CAS
(A, stamp=1)→ FAILED (stamp khác!)
AtomicMarkableReference
Alternative nhẹ hơn khi chỉ cần boolean flag thay vì version:
AtomicMarkableReference<Node> head = new AtomicMarkableReference<>(null, false);
// CAS với mark bit
boolean success = head.compareAndSet(
expectedRef, newRef,
expectedMark, newMark
);
Use case: Đánh dấu node đã bị deleted trong concurrent data structures.
Atomic vs synchronized
| Feature | Atomic | synchronized |
|---|---|---|
| Locking | Lock-free (CAS) | Lock-based |
| Performance | Tốt hơn (low contention) | Chậm hơn |
| Complexity | Phức tạp hơn với custom logic | Đơn giản |
| Use case | Single variable operations | Complex multi-variable operations |
Dùng Atomic khi:
- Chỉ cần protect single variable
- Cần performance cao
- Low contention (ít threads cạnh tranh)
Dùng synchronized khi:
- Nhiều variables phải update cùng lúc (compound actions)
- Cần simplicity
- High contention
ThreadLocal
Use Case: Request Context
import java.util.*;
public class RequestContext {
// Mỗi thread (request) có context riêng
private static ThreadLocal<Map<String, Object>> context =
ThreadLocal.withInitial(() -> new HashMap<>());
public static void set(String key, Object value) {
context.get().put(key, value);
}
public static Object get(String key) {
return context.get().get(key);
}
public static void clear() {
context.remove(); // QUAN TRỌNG: Tránh memory leak
}
public static void main(String[] args) throws InterruptedException {
// Giả lập 3 requests
for (int i = 1; i <= 3; i++) {
final int requestId = i;
new Thread(() -> {
try {
// Set request context
RequestContext.set("userId", "user-" + requestId);
RequestContext.set("requestId", requestId);
// Simulate processing
processRequest();
// Context vẫn còn
System.out.println("Request " + requestId +
" user: " + RequestContext.get("userId"));
} finally {
// Clean up
RequestContext.clear();
}
}, "Request-" + i).start();
}
}
private static void processRequest() {
// Có thể access context ở bất kỳ đâu trong call stack
System.out.println("Processing: " + RequestContext.get("requestId"));
try { Thread.sleep(1000); } catch (InterruptedException e) {}
}
}
Immutable Objects for Thread Safety
Builder Pattern cho Immutable Objects
import java.time.LocalDateTime;
import java.util.*;
public final class Transaction {
private final String id;
private final String accountId;
private final double amount;
private final LocalDateTime timestamp;
private final String description;
private Transaction(Builder builder) {
this.id = builder.id;
this.accountId = builder.accountId;
this.amount = builder.amount;
this.timestamp = builder.timestamp;
this.description = builder.description;
}
// Getters
public String getId() { return id; }
public String getAccountId() { return accountId; }
public double getAmount() { return amount; }
public LocalDateTime getTimestamp() { return timestamp; }
public String getDescription() { return description; }
// Builder
public static class Builder {
private String id;
private String accountId;
private double amount;
private LocalDateTime timestamp;
private String description;
public Builder id(String id) {
this.id = id;
return this;
}
public Builder accountId(String accountId) {
this.accountId = accountId;
return this;
}
public Builder amount(double amount) {
this.amount = amount;
return this;
}
public Builder timestamp(LocalDateTime timestamp) {
this.timestamp = timestamp;
return this;
}
public Builder description(String description) {
this.description = description;
return this;
}
public Transaction build() {
return new Transaction(this);
}
}
@Override
public String toString() {
return "Transaction{id='" + id + "', amount=" + amount + "}";
}
}
// Usage
Transaction tx = new Transaction.Builder()
.id("TX-001")
.accountId("ACC-123")
.amount(100.50)
.timestamp(LocalDateTime.now())
.description("Payment")
.build();
Testing Concurrent Code
Challenges
- Non-deterministic: Lỗi không tái hiện được
- Race conditions: Phụ thuộc timing
- Hard to debug: Thread interleaving phức tạp
Testing Strategies
1. Stress Testing
import java.util.concurrent.*;
import org.junit.Test;
import static org.junit.Assert.*;
public class ConcurrentCounterTest {
@Test
public void testConcurrentIncrement() throws InterruptedException {
// Counter cần test
AtomicInteger counter = new AtomicInteger(0);
int numThreads = 10;
int incrementsPerThread = 1000;
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
CountDownLatch latch = new CountDownLatch(numThreads);
// Submit tasks
for (int i = 0; i < numThreads; i++) {
executor.submit(() -> {
for (int j = 0; j < incrementsPerThread; j++) {
counter.incrementAndGet();
}
latch.countDown();
});
}
// Wait for completion
latch.await();
executor.shutdown();
// Verify
assertEquals(numThreads * incrementsPerThread, counter.get());
}
}
2. CountDownLatch cho Synchronization
import java.util.concurrent.*;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int numThreads = 5;
CountDownLatch startSignal = new CountDownLatch(1); // Start gate
CountDownLatch doneSignal = new CountDownLatch(numThreads); // Finish gate
for (int i = 1; i <= numThreads; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("Thread-" + threadId + " ready");
startSignal.await(); // Chờ start signal
// Do work
System.out.println("Thread-" + threadId + " working");
Thread.sleep(1000 + threadId * 100);
doneSignal.countDown(); // Signal done
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(1000);
System.out.println("Starting all threads...");
startSignal.countDown(); // Release all threads
doneSignal.await(); // Wait for all threads to finish
System.out.println("All threads finished");
}
}
3. CyclicBarrier cho Phased Execution
import java.util.concurrent.*;
public class CyclicBarrierDemo {
public static void main(String[] args) {
int numThreads = 3;
CyclicBarrier barrier = new CyclicBarrier(numThreads, () -> {
System.out.println("=== All threads reached barrier ===\n");
});
for (int i = 1; i <= numThreads; i++) {
final int threadId = i;
new Thread(() -> {
try {
for (int phase = 1; phase <= 3; phase++) {
System.out.println("Thread-" + threadId +
" phase " + phase);
Thread.sleep(threadId * 1000);
barrier.await(); // Chờ tất cả threads đến barrier
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
Common Pitfalls
1. Shared Mutable State
Tránh:
// BAD: Shared mutable state
public class BadCounter {
private int count = 0; // Mutable, shared
public void increment() {
count++; // Race condition!
}
}
Tốt hơn:
// GOOD: Synchronized hoặc Atomic
public class GoodCounter {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet(); // Thread-safe
}
}
2. Incorrect Synchronization
Tránh:
// BAD: Synchronized trên object có thể thay đổi
private String lock = "lock";
public void method() {
synchronized (lock) { // Nguy hiểm!
lock = "new lock"; // Lock object thay đổi!
}
}
Tốt hơn:
// GOOD: Synchronized trên final object
private final Object lock = new Object();
public void method() {
synchronized (lock) {
// Safe
}
}
3. Double-Checked Locking (Sai)
Tránh (trước Java 5):
// BAD: Broken double-checked locking
private Singleton instance;
public Singleton getInstance() {
if (instance == null) { // Check 1
synchronized (this) {
if (instance == null) { // Check 2
instance = new Singleton(); // Có thể thấy partially constructed object!
}
}
}
return instance;
}
Tại sao cần volatile?
Instruction instance = new Singleton() không phải atomic, gồm 3 bước:
1. Allocate memory cho Singleton object
2. Initialize object (gọi constructor)
3. Assign address vào biến instance
Vấn đề: Instruction reordering (sắp xếp lại lệnh)
CPU/Compiler có thể reorder thành:
1. Allocate memory
3. Assign address vào instance (instance != null!)
2. Initialize object (chưa chạy constructor!)
Timeline gây lỗi:
Thread 1: Allocate memory
Thread 1: instance = address (instance != null, nhưng chưa init!)
Thread 2: if (instance == null) → false
Thread 2: return instance → PARTIALLY CONSTRUCTED OBJECT!
Thread 1: Initialize object (too late!)
Volatile prevents reordering
volatile tạo memory barrier (hàng rào bộ nhớ):
- Write barrier: Đảm bảo tất cả writes trước volatile write được visible
- Read barrier: Đảm bảo tất cả reads sau volatile read thấy latest values
private volatile Singleton instance; // volatile!
// Với volatile:
// 1. Allocate
// 2. Initialize (MUST complete before step 3)
// 3. Assign (volatile write → memory barrier)
// → Thread khác thấy instance != null → object đã initialized
Tốt hơn:
// GOOD: Với volatile (Java 5+)
private volatile Singleton instance;
public Singleton getInstance() {
if (instance == null) {
synchronized (this) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
// BEST: Initialization-on-demand holder idiom
private static class Holder {
static final Singleton INSTANCE = new Singleton();
}
public static Singleton getInstance() {
return Holder.INSTANCE;
}
4. Deadlock từ Nested Locks
Tránh:
// BAD: Different lock order → deadlock
public void transfer(Account from, Account to, int amount) {
synchronized (from) {
synchronized (to) {
from.debit(amount);
to.credit(amount);
}
}
}
Tốt hơn:
// GOOD: Consistent lock order
public void transfer(Account from, Account to, int amount) {
Account first = from.getId() < to.getId() ? from : to;
Account second = from.getId() < to.getId() ? to : from;
synchronized (first) {
synchronized (second) {
from.debit(amount);
to.credit(amount);
}
}
}
5. Thread Leaks với ExecutorService
Tránh:
// BAD: Không shutdown executor
ExecutorService executor = Executors.newFixedThreadPool(10);
executor.submit(task);
// Quên shutdown → threads không terminate, JVM không thoát!
Tốt hơn:
// GOOD: Luôn shutdown
ExecutorService executor = Executors.newFixedThreadPool(10);
try {
executor.submit(task);
// Do work
} finally {
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
}
6. Không dùng String làm Lock Object
Tránh:
// BAD: String interning gây deadlock tiềm ẩn
public class BadLocking {
private String lock1 = "LOCK"; // Interned string
private String lock2 = "LOCK"; // Cùng object với lock1!
public void method1() {
synchronized (lock1) { // Lock trên "LOCK"
System.out.println("Method 1");
}
}
public void method2() {
synchronized (lock2) { // Lock trên cùng "LOCK"!
System.out.println("Method 2");
}
}
}
// Vấn đề: lock1 == lock2 → unintended sharing!
String interning làm cho string literals trỏ đến cùng object:
String s1 = "LOCK";
String s2 = "LOCK";
System.out.println(s1 == s2); // true! (cùng object)
// Class khác cũng có thể lock trên cùng string!
class OtherClass {
private String lock = "LOCK"; // Same object!
public void method() {
synchronized (lock) { // Deadlock risk!
// ...
}
}
}
Tốt hơn:
// GOOD: Dùng dedicated lock objects
public class GoodLocking {
private final Object lock1 = new Object(); // Unique object
private final Object lock2 = new Object(); // Unique object
public void method1() {
synchronized (lock1) {
System.out.println("Method 1");
}
}
public void method2() {
synchronized (lock2) {
System.out.println("Method 2");
}
}
}
Best practices cho lock objects:
- Dùng
private final Object lock = new Object() - Không dùng String, Integer, Boolean (interned/cached)
- Không dùng public objects (external code có thể lock)
- Không dùng
thischo public classes (external lock risk)
Performance Considerations
Context Switching Overhead
Quá nhiều threads → context switching overhead:
// BAD: 1000 threads cho 1000 tasks
for (int i = 0; i < 1000; i++) {
new Thread(task).start(); // Context switching overhead!
}
// GOOD: Thread pool với số threads hợp lý
ExecutorService executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
for (int i = 0; i < 1000; i++) {
executor.submit(task);
}
executor.shutdown();
Thread Pool Sizing — Công thức Goetz
Brian Goetz formula (từ "Java Concurrency in Practice"):
N_threads = N_cpu × U_cpu × (1 + W/C)
Trong đó:
N_cpu= số CPU cores (Runtime.getRuntime().availableProcessors())U_cpu= target CPU utilization (0.0 đến 1.0)W= wait time (thời gian chờ I/O, network, etc.)C= compute time (thời gian tính toán thuần túy)
Ví dụ 1: CPU-bound Task
Scenario: Image processing (100% CPU, không có I/O)
int N_cpu = Runtime.getRuntime().availableProcessors(); // 8 cores
double U_cpu = 1.0; // 100% utilization
double W = 0; // Không có waiting
double C = 1; // 100% compute
int poolSize = (int) (N_cpu * U_cpu * (1 + W/C));
// = 8 × 1.0 × (1 + 0/1) = 8 threads
ExecutorService executor = Executors.newFixedThreadPool(poolSize);
Giải thích: CPU-bound tasks tối ưu khi N_threads ≈ N_cpu (hoặc N_cpu + 1 để tận dụng context switching).
Ví dụ 2: I/O-bound Task
Scenario: HTTP API calls (50ms compute, 500ms network I/O)
int N_cpu = 8; // 8 cores
double U_cpu = 0.5; // 50% target utilization
double W = 500; // 500ms waiting (network I/O)
double C = 50; // 50ms compute
double ratio = W / C; // 500/50 = 10
int poolSize = (int) (N_cpu * U_cpu * (1 + ratio));
// = 8 × 0.5 × (1 + 10) = 44 threads
ExecutorService executor = Executors.newFixedThreadPool(poolSize);
Giải thích:
- Ratio W/C = 10 → task chờ 10× lâu hơn compute
- Cần nhiều threads để tận dụng CPU trong lúc threads khác chờ I/O
- 44 threads đảm bảo luôn có ~4 threads active (8 cores × 0.5 utilization)
Công thức đơn giản hóa
Nếu không đo được W/C chính xác:
CPU-bound:
int poolSize = Runtime.getRuntime().availableProcessors() + 1;
I/O-bound (moderate I/O):
int poolSize = Runtime.getRuntime().availableProcessors() * 2;
I/O-bound (heavy I/O - database, network):
int poolSize = Runtime.getRuntime().availableProcessors() * 10; // Hoặc cao hơn
Little's Law cho Queue Sizing
Little's Law: L = λ × W
L= average number of items in systemλ= arrival rate (tasks/second)W= average time in system (seconds)
Ví dụ: 100 requests/sec, mỗi request mất 2s → cần queue size ≥ 200
int queueSize = (int) (arrivalRate * averageProcessingTime);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, maxPoolSize,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueSize)
);
Debugging Concurrent Code
Debugging multithreading issues là một trong những thách thức khó nhất. Dưới đây là các tools và techniques.
Thread Dumps với jstack
Thread dump hiển thị trạng thái của tất cả threads tại một thời điểm:
# Lấy PID của Java process
jps
# Output:
# 12345 MyApplication
# 12346 Jps
# Generate thread dump
jstack 12345 > threaddump.txt
# Hoặc trigger từ bên trong JVM (Linux/Mac)
kill -3 12345 # Gửi SIGQUIT → thread dump to stdout
Ví dụ thread dump:
"Thread-1" #12 prio=5 os_prio=31 tid=0x00007f9a8c800000 nid=0x5603 waiting on condition
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued
"Thread-2" #13 prio=5 os_prio=31 tid=0x00007f9a8d000000 nid=0x5803 waiting for monitor entry
java.lang.Thread.State: BLOCKED (on object monitor)
at MyClass.method2(MyClass.java:45)
- waiting to lock <0x000000076ab12345> (a java.lang.Object)
Phân tích:
WAITING: Thread đang chờ (LockSupport.park, Object.wait)BLOCKED: Thread chờ monitor lockRUNNABLE: Thread đang chạy hoặc có thể chạywaiting to lock <0x...>: Đang chờ lock trên object này
ThreadMXBean — Programmatic Detection
import java.lang.management.*;
public class DeadlockDetector {
public static void detectDeadlock() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
// Detect deadlocked threads
long[] deadlockedThreads = threadBean.findDeadlockedThreads();
if (deadlockedThreads != null) {
System.out.println("DEADLOCK DETECTED!");
ThreadInfo[] infos = threadBean.getThreadInfo(deadlockedThreads, true, true);
for (ThreadInfo info : infos) {
System.out.println("Thread: " + info.getThreadName());
System.out.println(" State: " + info.getThreadState());
System.out.println(" Lock: " + info.getLockName());
System.out.println(" Lock owner: " + info.getLockOwnerName());
System.out.println(" Stack trace:");
for (StackTraceElement element : info.getStackTrace()) {
System.out.println(" " + element);
}
}
}
}
public static void main(String[] args) throws InterruptedException {
// Chạy detector mỗi 10 giây
Timer timer = new Timer(true); // Daemon thread
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
detectDeadlock();
}
}, 0, 10000);
// Your application code
}
}
jconsole và jvisualvm
JConsole và VisualVM là GUI tools để monitor JVM:
# Launch JConsole
jconsole
# Launch VisualVM
jvisualvm
Chức năng:
- Threads tab: Xem thread count, detect deadlocks
- CPU/Memory graphs: Monitor resource usage
- Thread dumps: Visual thread dump với filtering
- Heap dumps: Analyze memory leaks
VisualVM plugins hữu ích:
- Thread Inspector: Visual timeline của thread states
- Profiler: CPU/Memory profiling
- Sampler: Lightweight profiling
IntelliJ IDEA Thread Debugging
Breakpoint Suspend Policy:
public void method() {
int x = 0; // Breakpoint ở đây
x++;
}
Cấu hình breakpoint:
- Right-click breakpoint → "More" (Ctrl+Shift+F8)
- Suspend:
All(default): Dừng tất cả threadsThread: Chỉ dừng thread hiện tại (non-blocking - không chặn)
- Condition:
Thread.currentThread().getName().equals("Thread-1")
Frames window:
- Xem call stack của tất cả threads
- Click vào thread → xem variables
- Phát hiện deadlock: 2 threads chờ lẫn nhau
Evaluate Expression (Alt+F8):
// Trong debug, evaluate:
Thread.getAllStackTraces().keySet()
// → Xem tất cả active threads
Common Patterns trong Thread Dumps
1. Deadlock Pattern
"Thread-1" waiting to lock <0x12345>
- locked <0x67890>
"Thread-2" waiting to lock <0x67890>
- locked <0x12345>
→ Circular wait = deadlock
2. Thread Starvation
100 threads: RUNNABLE (active)
10 threads: WAITING (idle)
→ Thread pool quá nhỏ → starvation
3. Busy Spinning
"Thread-1" RUNNABLE
at MyClass.spinWait(MyClass.java:10)
at MyClass.spinWait(MyClass.java:10) // Same line nhiều lần
at MyClass.spinWait(MyClass.java:10)
→ Infinite loop hoặc busy-wait
4. Lock Contention (tranh chấp)
50 threads: BLOCKED waiting to lock <0x12345>
1 thread: RUNNABLE holding <0x12345>
→ High lock contention → performance issue
Solution: Reduce lock scope, use concurrent collections, atomic variables.
Bài tập: Multi-threaded File Downloader
Implement file downloader:
Yêu cầu
- Download nhiều files song song
- Progress tracking cho mỗi file
- Thread pool để giới hạn concurrent downloads
- Retry logic khi download thất bại
- Graceful cancellation
- Thread-safe statistics (total downloaded, speed)
Skeleton Code
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
class DownloadTask {
private String url;
private String destination;
private AtomicLong bytesDownloaded = new AtomicLong(0);
private volatile boolean cancelled = false;
public DownloadTask(String url, String destination) {
this.url = url;
this.destination = destination;
}
public void download() throws Exception {
// TODO: Implement download logic
// - Simulate download với sleep
// - Update bytesDownloaded periodically
// - Check cancelled flag
// - Throw exception để trigger retry
}
public void cancel() {
cancelled = true;
}
public long getBytesDownloaded() {
return bytesDownloaded.get();
}
}
public class MultiThreadedDownloader {
private ExecutorService executor;
private ConcurrentHashMap<String, DownloadTask> activeTasks;
private AtomicLong totalDownloaded = new AtomicLong(0);
public MultiThreadedDownloader(int maxConcurrentDownloads) {
this.executor = Executors.newFixedThreadPool(maxConcurrentDownloads);
this.activeTasks = new ConcurrentHashMap<>();
}
public Future<?> submitDownload(String url, String destination) {
// TODO: Submit download task với retry logic
return null;
}
public void cancelDownload(String url) {
// TODO: Cancel specific download
}
public void printProgress() {
// TODO: Print progress của tất cả downloads
}
public void shutdown() {
// TODO: Graceful shutdown
}
public static void main(String[] args) {
// TODO: Test với multiple downloads
}
}
Hints
- Dùng
CompletableFuturecho retry logic - Dùng
ScheduledExecutorServicecho progress updates - Dùng
CountDownLatchđể chờ tất cả downloads - Dùng
AtomicLongcho thread-safe counters - Dùng
ConcurrentHashMapcho active tasks tracking
Exam topics về concurrency (OCP Java SE 17):
-
Thread Safety:
- Nhớ 4 strategies: Immutability, Confinement, Synchronization, Atomic
synchronizedmethods vs blocksvolatilekeyword: visibility, không đảm bảo atomicity
-
Atomic Classes:
AtomicInteger,AtomicLong,AtomicBoolean,AtomicReference- Methods:
get(),set(),getAndSet(),compareAndSet() - incrementAndGet() vs getAndIncrement() (++i vs i++)
-
ThreadLocal:
- Mỗi thread có copy riêng
- Phải remove() trong finally để tránh memory leak
ThreadLocal.withInitial(() -> initialValue)
-
Common Pitfalls:
- Shared mutable state → race condition
- Synchronized trên String/Integer (interned) → unintended sharing
- Double-checked locking cần
volatile - Nested locks → deadlock risk
-
ExecutorService:
submit()vsexecute(): submit returns Future- Luôn shutdown() trong finally
awaitTermination()để chờ tasks complete
-
Concurrent Collections:
CopyOnWriteArrayList: Tốt cho read-heavy, write-lightConcurrentHashMap: Better thanCollections.synchronizedMap()- Không dùng
HashtablehoặcVector(legacy, synchronized)
Câu hỏi trick thường gặp:
- AtomicInteger vs synchronized int (khi nào dùng cái nào?)
- volatile có đảm bảo atomicity không? (Không! Chỉ visibility)
- ThreadLocal có thread-safe không? (Có, mỗi thread riêng copy)
- CopyOnWriteArrayList iterator có throw ConcurrentModificationException không? (Không!)
Tóm tắt
Thread Safety Strategies
- Immutability: Tự động thread-safe
- Confinement: ThreadLocal, stack variables
- Synchronization: synchronized, locks
- Atomic variables: Lock-free operations
Best Practices
- Prefer immutability
- Minimize scope of synchronization
- Use thread pools, not manual threads
- Always shutdown ExecutorService
- Avoid shared mutable state
- Use concurrent collections
- Test with stress tests
Common Pitfalls
- Shared mutable state
- Incorrect synchronization
- Deadlocks from nested locks
- Thread leaks
- Race conditions
Testing
- CountDownLatch, CyclicBarrier
- Stress testing với nhiều threads
- Monitor for deadlocks, race conditions
Chúc mừng bạn đã hoàn thành Module 9: Multithreading!
Đọc thêm
Sách chuyên sâu
- Java Concurrency in Practice (Brian Goetz) — Chapter 12: Testing Concurrent Programs
- Effective Java (Joshua Bloch) — Item 79: Avoid excessive synchronization, Item 81: Prefer concurrency utilities to wait and notify
- Modern Java in Action — Chapter 7: Parallel data processing and performance
Tài liệu chính thức
- Oracle: Java Concurrency Tutorial
- java.util.concurrent API Docs
- Java Language Specification: Memory Model
Bài học liên quan trong course
- Thread Creation — Tạo và quản lý threads
- Synchronization — synchronized, wait/notify
- Locks và Conditions — ReentrantLock, ReadWriteLock
- Executor Framework — Thread pools và ExecutorService
- Concurrent Collections — Thread-safe collections