Concurrent Collections
Sau bài này, bạn sẽ:
- Hiểu vấn đề ConcurrentModificationException và cách giải quyết
- Sử dụng được ConcurrentHashMap cho thread-safe map operations
- Biết cách dùng CopyOnWriteArrayList cho read-heavy scenarios
- Nắm được các BlockingQueue implementations và use cases
- So sánh được các concurrent collections và chọn đúng loại cho từng tình huống
Bài trước: Executor Framework — Đã học cách quản lý thread pools với ExecutorService. Bài này sẽ giới thiệu các thread-safe data structures trong java.util.concurrent.
Vấn đề với Collections trong Multithreading
ConcurrentModificationException
import java.util.*;
public class CollectionProblem {
public static void main(String[] args) {
List<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5));
// Thread 1: Iterate
Thread t1 = new Thread(() -> {
try {
for (Integer num : list) {
System.out.println(num);
Thread.sleep(100);
}
} catch (Exception e) {
e.printStackTrace();
}
});
// Thread 2: Modify
Thread t2 = new Thread(() -> {
try {
Thread.sleep(50);
list.add(6); // ConcurrentModificationException!
} catch (Exception e) {
e.printStackTrace();
}
});
t1.start();
t2.start();
}
}
Exception:
java.util.ConcurrentModificationException
Các Collections thông thường (ArrayList, HashMap, HashSet) không thread-safe!
Race Condition với HashMap
Map<String, Integer> map = new HashMap<>();
// 2 threads cùng put
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
map.put("key-" + i, i);
}
});
Thread t2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
map.put("key-" + i, i);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Size: " + map.size());
// Có thể < 2000 hoặc thậm chí infinite loop trong Java 7!
Collections.synchronized Wrappers
Cách đơn giản nhất: Wrap collections với synchronized:
List<String> list = Collections.synchronizedList(new ArrayList<>());
Map<String, Integer> map = Collections.synchronizedMap(new HashMap<>());
Set<String> set = Collections.synchronizedSet(new HashSet<>());
Ví dụ
import java.util.*;
public class SynchronizedWrapperDemo {
public static void main(String[] args) throws InterruptedException {
List<Integer> syncList = Collections.synchronizedList(new ArrayList<>());
// 10 threads cùng add
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
syncList.add(threadId * 100 + j);
}
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
System.out.println("Size: " + syncList.size()); // 1000 (đúng)
}
}
Hạn chế của Synchronized Wrappers
-
Iteration không thread-safe: Phải synchronized thủ công
synchronized (syncList) {
for (String item : syncList) {
System.out.println(item);
}
} -
Performance: Lock toàn bộ collection → chỉ 1 thread tại 1 thời điểm
-
No compound operations: put-if-absent, replace, etc. không atomic
ConcurrentHashMap
ConcurrentHashMap là HashMap thread-safe với hiệu suất cao:
Đặc điểm
- CAS + per-bin locking (Java 8+): Từ Java 8, ConcurrentHashMap sử dụng CAS (Compare-And-Swap) và synchronized per-bin (từng node đầu bucket (thùng/ngăn chứa))
- Lock-free reads: Đọc không cần lock
- Atomic operations: putIfAbsent, replace, compute, merge
- Thread-safe iteration: Không throw ConcurrentModificationException
- Null safety: Không cho phép null key hoặc null value (throws NullPointerException)
Java 7 và trước đó: Sử dụng segment-based locking (16 segments mặc định). Mỗi segment là một lock riêng, cho phép 16 threads write đồng thời.
Java 8+: Thay đổi hoàn toàn cơ chế:
- Empty bin (ngăn) → CAS insert (không cần lock)
- Occupied bin → synchronized trên head node của bin đó
- Khi bin có > 8 entries VÀ table size >= 64 → treeify (chuyển đổi thành cây): bin chuyển từ linked list sang red-black tree
Traditional HashMap (synchronized)
┌──────────────────────────────┐
│ Lock toàn bộ map │ ← Chỉ 1 thread tại 1 thời điểm
└──────────────────────────────┘
ConcurrentHashMap (Java 8+)
┌──────┬──────┬──────┬──────┐
│ Bin1 │ Bin2 │ Bin3 │ Bin4 │ ← Mỗi bin lock độc lập
└──────┴──────┴──────┴──────┘
↑ ↑ ↑ ↑
Thread1 Thread2 Thread3 Thread4 ← Nhiều threads đồng thời!
Empty bin → CAS (no lock)
Occupied bin → synchronized(head node)
Ví dụ cơ bản
import java.util.concurrent.*;
public class ConcurrentHashMapDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 10 threads cùng thao tác
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
String key = "key-" + (threadId * 100 + j);
map.put(key, j);
}
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
System.out.println("Size: " + map.size()); // 1000
System.out.println("Mapping count: " + map.mappingCount()); // 1000
// Iteration thread-safe (không cần synchronized block)
map.forEach((key, value) -> {
if (value > 95) {
System.out.println(key + " = " + value);
}
});
}
}
Null Key/Value Không Được Phép
ConcurrentHashMap không cho phép null keys hoặc null values:
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// NullPointerException!
try {
map.put(null, 100);
} catch (NullPointerException e) {
System.out.println("Cannot put null key: " + e.getMessage());
}
// NullPointerException!
try {
map.put("key1", null);
} catch (NullPointerException e) {
System.out.println("Cannot put null value: " + e.getMessage());
}
Lý do: Tránh ambiguity (tính mơ hồ) trong môi trường concurrent:
// Với HashMap (cho phép null value):
HashMap<String, Integer> hashMap = new HashMap<>();
hashMap.put("key1", null);
Integer value = hashMap.get("key1"); // null - nhưng key tồn tại!
Integer value2 = hashMap.get("key2"); // null - key không tồn tại!
Trong single-threaded, bạn có thể dùng containsKey() để phân biệt. Nhưng trong concurrent environment:
// Thread 1
if (map.containsKey("key1")) {
// Thread 2 có thể remove("key1") ở đây!
Integer value = map.get("key1"); // null - vì key bị xóa hay value là null?
}
Không thể phân biệt được get(key) == null là do:
- Key không tồn tại
- Value thực sự là null
→ ConcurrentHashMap cấm null để tránh tình huống này.
size() vs mappingCount()
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// Thêm dữ liệu
for (int i = 0; i < 1000; i++) {
map.put("key-" + i, i);
}
// size() trả về int (có thể truncate nếu > Integer.MAX_VALUE)
int size = map.size();
// mappingCount() trả về long (chính xác hơn)
long count = map.mappingCount();
System.out.println("size(): " + size);
System.out.println("mappingCount(): " + count);
size(): Trả vềint→ nếu map có > 2^31 - 1 entries, giá trị sẽ bị truncatemappingCount(): Trả vềlong→ chính xác hơn cho large maps- Best practice: Dùng
mappingCount()cho production code
Bulk Operations (Java 8+)
ConcurrentHashMap cung cấp các bulk operations (forEach, search, reduce) song song:
ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<>();
scores.put("Alice", 95);
scores.put("Bob", 87);
scores.put("Charlie", 92);
scores.put("David", 78);
// forEach - duyệt qua tất cả entries song song
scores.forEach(1, (key, value) -> {
System.out.println(key + ": " + value);
});
// search - tìm entry đầu tiên thỏa điều kiện
String topStudent = scores.search(1, (key, value) -> {
return value > 90 ? key : null;
});
System.out.println("Top student: " + topStudent);
// reduce - tính tổng tất cả scores
Integer totalScore = scores.reduce(1,
(key, value) -> value, // Transformer
(v1, v2) -> v1 + v2 // Reducer
);
System.out.println("Total score: " + totalScore);
Tham số đầu tiên (1 trong ví dụ trên) là parallelism threshold:
- Nếu map size > threshold → operation thực hiện song song (parallel)
- Nếu map size ≤ threshold → operation thực hiện tuần tự (sequential)
1= luôn song song,Long.MAX_VALUE= luôn tuần tự
Atomic Operations
1. putIfAbsent()
ConcurrentHashMap<String, Integer> cache = new ConcurrentHashMap<>();
// Thread-safe: chỉ put nếu key chưa tồn tại
Integer oldValue = cache.putIfAbsent("user-1", 100);
if (oldValue == null) {
System.out.println("Inserted");
} else {
System.out.println("Already exists: " + oldValue);
}
2. replace()
// Replace chỉ khi key tồn tại
cache.replace("user-1", 200);
// Replace chỉ khi value hiện tại match
boolean replaced = cache.replace("user-1", 100, 200);
System.out.println("Replaced: " + replaced);
3. compute()
// Update value dựa trên key và value hiện tại
cache.compute("user-1", (key, oldValue) -> {
return (oldValue == null) ? 1 : oldValue + 1;
});
// Tương tự với computeIfAbsent, computeIfPresent
cache.computeIfAbsent("user-2", key -> expensiveComputation(key));
4. merge()
// Word count example
ConcurrentHashMap<String, Integer> wordCount = new ConcurrentHashMap<>();
String[] words = {"apple", "banana", "apple", "cherry", "banana", "apple"};
for (String word : words) {
// Merge: nếu key tồn tại thì merge, không thì insert
wordCount.merge(word, 1, (oldCount, newCount) -> oldCount + newCount);
}
wordCount.forEach((word, count) -> {
System.out.println(word + ": " + count);
});
// apple: 3, banana: 2, cherry: 1
Ví dụ thực tế: URL Visit Counter
import java.util.concurrent.*;
public class URLVisitCounter {
private ConcurrentHashMap<String, Integer> visitCount = new ConcurrentHashMap<>();
public void recordVisit(String url) {
visitCount.merge(url, 1, Integer::sum);
}
public int getVisitCount(String url) {
return visitCount.getOrDefault(url, 0);
}
public void printTopVisited(int n) {
visitCount.entrySet().stream()
.sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
.limit(n)
.forEach(entry -> {
System.out.println(entry.getKey() + ": " + entry.getValue() + " visits");
});
}
public static void main(String[] args) throws InterruptedException {
URLVisitCounter counter = new URLVisitCounter();
// Giả lập 100 requests đồng thời
Thread[] threads = new Thread[100];
String[] urls = {"/home", "/products", "/about", "/contact", "/home"};
for (int i = 0; i < 100; i++) {
final int index = i;
threads[i] = new Thread(() -> {
String url = urls[index % urls.length];
counter.recordVisit(url);
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
System.out.println("Top visited URLs:");
counter.printTopVisited(3);
}
}
CopyOnWriteArrayList
CopyOnWriteArrayList tạo copy mới của array mỗi khi modify:
Đặc điểm
- Read không cần lock: Cực kỳ nhanh
- Write tốn kém: Tạo copy mới → chậm, tốn memory
- Snapshot (ảnh chụp nhanh) iterator: Iterator hoạt động trên snapshot của array tại thời điểm tạo iterator
- Thread-safe iteration: Iterator không bao giờ throw ConcurrentModificationException
- Use case: Đọc nhiều, ghi ít (listeners, observers)
Cơ chế Copy-on-Write
Khi có write operation (add, set, remove), toàn bộ internal array được copy:
// Pseudo-code của add() trong CopyOnWriteArrayList
public boolean add(E e) {
synchronized (lock) {
Object[] oldArray = getArray();
int len = oldArray.length;
// Tạo array mới lớn hơn 1 element
Object[] newArray = Arrays.copyOf(oldArray, len + 1);
newArray[len] = e;
// Swap reference
setArray(newArray);
return true;
}
}
Tại sao read không cần lock?
- Readers luôn đọc từ current array reference
- Writers tạo copy mới và swap reference
- Array reference assignment là atomic operation
- Readers không bao giờ thấy incomplete state
import java.util.concurrent.*;
import java.util.List;
public class CopyOnWriteArrayListDemo {
public static void main(String[] args) throws InterruptedException {
List<String> list = new CopyOnWriteArrayList<>();
// Thread 1: Add items
Thread writer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
list.add("Item-" + i);
System.out.println("Added: Item-" + i);
try { Thread.sleep(100); } catch (InterruptedException e) {}
}
});
// Thread 2: Iterate (không cần synchronized)
Thread reader = new Thread(() -> {
for (int i = 0; i < 3; i++) {
System.out.print("Reading: ");
for (String item : list) {
System.out.print(item + " ");
}
System.out.println();
try { Thread.sleep(150); } catch (InterruptedException e) {}
}
});
writer.start();
Thread.sleep(50);
reader.start();
writer.join();
reader.join();
}
}
Output:
Added: Item-0
Reading: Item-0
Added: Item-1
Reading: Item-0 Item-1
Added: Item-2
Added: Item-3
Reading: Item-0 Item-1 Item-2 Item-3
Added: Item-4
Snapshot Iterator
Iterator của CopyOnWriteArrayList hoạt động trên snapshot:
import java.util.concurrent.*;
import java.util.*;
public class SnapshotIteratorDemo {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("A");
list.add("B");
list.add("C");
// Tạo iterator → snapshot tại thời điểm này
Iterator<String> iterator = list.iterator();
// Modify list AFTER tạo iterator
list.add("D");
list.add("E");
list.remove("A");
System.out.println("Iterator sees (snapshot):");
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
// Output: A, B, C (snapshot tại thời điểm tạo iterator)
System.out.println("\nActual list:");
list.forEach(System.out::println);
// Output: B, C, D, E (current state)
// Iterator KHÔNG BAO GIỜ throw ConcurrentModificationException
}
}
- Iterator thấy snapshot của list tại thời điểm tạo iterator
- Modifications sau đó KHÔNG phản ánh trong iterator
- Điều này khác với ArrayList iterator (throw ConcurrentModificationException)
Performance Characteristics
import java.util.*;
import java.util.concurrent.*;
public class CopyOnWritePerformanceDemo {
public static void main(String[] args) {
CopyOnWriteArrayList<Integer> cowList = new CopyOnWriteArrayList<>();
// Write performance: O(n) - phải copy toàn bộ array
long start = System.nanoTime();
for (int i = 0; i < 10000; i++) {
cowList.add(i); // Mỗi add() copy toàn bộ array!
}
long writeTime = System.nanoTime() - start;
System.out.println("Write time: " + writeTime / 1_000_000 + " ms");
// Read performance: O(1) - không lock
start = System.nanoTime();
for (int i = 0; i < 10000; i++) {
int value = cowList.get(i);
}
long readTime = System.nanoTime() - start;
System.out.println("Read time: " + readTime / 1_000_000 + " ms");
// Write time >> Read time
}
}
Phù hợp:
- Event listeners/Observers: Danh sách listeners ít thay đổi, fire events nhiều
- Configuration data: Config ít update, đọc thường xuyên
- Small collections với read >> write (ratio 100:1 hoặc cao hơn)
KHÔNG phù hợp:
- Large lists: Copy 10,000 elements mỗi lần add → rất tốn kém
- Write-heavy workloads: Mỗi modification copy toàn bộ → memory và CPU cao
- Memory-constrained environments: Tạm thời có 2 copies của array
Rule of thumb: Chỉ dùng khi read:write ratio >= 10:1 VÀ list size < 1000.
BlockingQueue Interface
BlockingQueue là queue hỗ trợ blocking operations:
put(): Chờ nếu queue đầytake(): Chờ nếu queue rỗng
Các implementations
| Implementation | Đặc điểm |
|---|---|
| ArrayBlockingQueue | Bounded queue, array-based |
| LinkedBlockingQueue | Optionally bounded, linked nodes |
| PriorityBlockingQueue | Unbounded, sorted by priority |
| SynchronousQueue | Không có capacity, hand-off trực tiếp |
| DelayQueue | Elements available after delay |
Methods: Sự khác biệt giữa add/offer/put
BlockingQueue có 3 nhóm methods cho insertion và removal:
| Method | Blocking | Timeout | Full/Empty Behavior |
|---|---|---|---|
| Insertion | |||
add(e) | Không | Không | Throws IllegalStateException nếu full |
offer(e) | Không | Không | Returns false nếu full |
put(e) | Có | Không | Blocks cho đến khi có space |
offer(e, timeout, unit) | Có | Có | Blocks tối đa timeout, returns false nếu hết thời gian |
| Removal | |||
remove() | Không | Không | Throws NoSuchElementException nếu empty |
poll() | Không | Không | Returns null nếu empty |
take() | Có | Không | Blocks cho đến khi có element |
poll(timeout, unit) | Có | Có | Blocks tối đa timeout, returns null nếu hết thời gian |
| Examination | |||
element() | Không | Không | Throws exception nếu empty |
peek() | Không | Không | Returns null nếu empty |
So sánh add() vs offer() vs put()
import java.util.concurrent.*;
public class BlockingQueueMethodsDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
// 1. add() - throws exception nếu full
queue.add("A");
queue.add("B");
try {
queue.add("C"); // IllegalStateException: Queue full
} catch (IllegalStateException e) {
System.out.println("add() failed: " + e.getMessage());
}
// 2. offer() - returns false nếu full
queue.clear();
queue.offer("A");
queue.offer("B");
boolean result = queue.offer("C"); // Returns false
System.out.println("offer() result: " + result); // false
// 3. put() - blocks cho đến khi có space
queue.clear();
Thread producer = new Thread(() -> {
try {
queue.put("A");
queue.put("B");
System.out.println("Before put C - queue is full");
queue.put("C"); // BLOCKS ở đây!
System.out.println("After put C - space available");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumer = new Thread(() -> {
try {
Thread.sleep(2000); // Đợi 2 giây
String item = queue.take(); // Lấy 1 item → tạo space
System.out.println("Consumed: " + item);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
// 4. offer() với timeout
queue.clear();
try {
queue.offer("A");
queue.offer("B");
boolean success = queue.offer("C", 1, TimeUnit.SECONDS);
System.out.println("offer() with timeout: " + success); // false sau 1 giây
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
add():
- Dùng khi queue không bao giờ full (hoặc full là error)
- Bounded queue với guaranteed space
- Muốn exception nếu fail
offer():
- Dùng khi không muốn block và có thể handle failure
- Non-blocking producer
- Có fallback strategy khi queue full
put():
- Dùng khi producer phải chờ nếu queue full
- Producer-consumer pattern chuẩn
- Backpressure mechanism
offer() with timeout:
- Dùng khi chờ có giới hạn
- Tránh infinite blocking
- Producer có deadline
ArrayBlockingQueue
Fixed-size queue:
import java.util.concurrent.*;
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// Producer
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.put(i); // Chờ nếu queue đầy
System.out.println("Produced: " + i + " | Queue size: " + queue.size());
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// Consumer
Thread consumer = new Thread(() -> {
try {
while (true) {
Integer item = queue.take(); // Chờ nếu queue rỗng
System.out.println(" Consumed: " + item + " | Queue size: " + queue.size());
Thread.sleep(1000); // Consumer chậm hơn producer
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}
Output:
Produced: 1 | Queue size: 1
Consumed: 1 | Queue size: 0
Produced: 2 | Queue size: 1
Produced: 3 | Queue size: 2
Consumed: 2 | Queue size: 1
Produced: 4 | Queue size: 2
Produced: 5 | Queue size: 3
Produced: 6 | Queue size: 4
Consumed: 3 | Queue size: 3
...
LinkedBlockingQueue
Optionally bounded (mặc định unbounded):
// Unbounded
BlockingQueue<String> queue1 = new LinkedBlockingQueue<>();
// Bounded
BlockingQueue<String> queue2 = new LinkedBlockingQueue<>(100);
Producer-Consumer với BlockingQueue
import java.util.concurrent.*;
class Task {
private int id;
public Task(int id) {
this.id = id;
}
public void process() {
System.out.println("Processing task " + id + " in " +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Task-" + id;
}
}
public class ProducerConsumerBlockingQueue {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Task> queue = new ArrayBlockingQueue<>(10);
// Producer
Thread producer = new Thread(() -> {
for (int i = 1; i <= 20; i++) {
try {
Task task = new Task(i);
queue.put(task);
System.out.println("Produced: " + task);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Producer");
// 3 Consumers
for (int i = 1; i <= 3; i++) {
Thread consumer = new Thread(() -> {
while (true) {
try {
Task task = queue.take();
task.process();
} catch (InterruptedException e) {
break;
}
}
}, "Consumer-" + i);
consumer.start();
}
producer.start();
producer.join();
Thread.sleep(5000); // Chờ consumers xử lý xong
}
}
ConcurrentLinkedQueue
Non-blocking queue sử dụng CAS (Compare-And-Swap):
import java.util.concurrent.*;
public class ConcurrentLinkedQueueDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
// 5 threads cùng add
Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
queue.offer("Thread-" + threadId + "-Item-" + j);
}
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
System.out.println("Queue size: " + queue.size()); // 500
// Poll items
int count = 0;
while (!queue.isEmpty() && count < 10) {
System.out.println(queue.poll());
count++;
}
}
}
So sánh Concurrent Collections
| Collection | Thread-Safety Mechanism | Performance | Use Case |
|---|---|---|---|
| ConcurrentHashMap | CAS + per-bin locking (Java 8+) | High (reads lock-free) | General purpose map, high concurrency |
| CopyOnWriteArrayList | Copy-on-write | Reads: High, Writes: Low | Read-heavy, few writes (listeners) |
| ConcurrentLinkedQueue | Lock-free (CAS) | High | Non-blocking queue |
| ArrayBlockingQueue | Lock-based (ReentrantLock) | Medium | Bounded producer-consumer |
| LinkedBlockingQueue | Two-lock (put/take separate) | Medium | Unbounded/bounded producer-consumer |
| synchronized Map | Full lock on entire map | Low | Legacy code, low concurrency |
Weakly Consistent Iterators
Tất cả concurrent collections có weakly consistent (nhất quán yếu) iterators:
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("A", 1);
map.put("B", 2);
map.put("C", 3);
// Iterator được tạo
Iterator<String> iterator = map.keySet().iterator();
// Modifications SAU khi tạo iterator
map.put("D", 4);
map.remove("A");
// Iterator CÓ THỂ hoặc KHÔNG phản ánh modifications
// Nó KHÔNG throw ConcurrentModificationException
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
Fail-Fast (ArrayList, HashMap):
- Iterator check
modCountmỗi lần operation - Throw
ConcurrentModificationExceptionnếu collection bị modify - Không thread-safe
Weakly Consistent (Concurrent Collections):
- Iterator KHÔNG throw exception
- CÓ THỂ phản ánh concurrent modifications (không guarantee)
- Thread-safe
- Trade-off: consistency yếu hơn để đổi lấy performance và thread-safety
Ví dụ thực tế: Task Queue System
import java.util.concurrent.*;
import java.util.*;
class WorkerTask {
private String taskName;
private int priority;
public WorkerTask(String taskName, int priority) {
this.taskName = taskName;
this.priority = priority;
}
public void execute() {
System.out.println("[" + Thread.currentThread().getName() +
"] Executing: " + taskName);
try {
Thread.sleep(1000 + new Random().nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return taskName + " (priority: " + priority + ")";
}
}
public class TaskQueueSystem {
private BlockingQueue<WorkerTask> taskQueue;
private ConcurrentHashMap<String, Integer> taskStatus;
private ExecutorService executor;
public TaskQueueSystem(int queueSize, int workers) {
this.taskQueue = new ArrayBlockingQueue<>(queueSize);
this.taskStatus = new ConcurrentHashMap<>();
this.executor = Executors.newFixedThreadPool(workers);
// Start worker threads
for (int i = 0; i < workers; i++) {
executor.submit(() -> {
while (!Thread.interrupted()) {
try {
WorkerTask task = taskQueue.take();
taskStatus.put(task.toString(), 1); // Processing
task.execute();
taskStatus.put(task.toString(), 2); // Completed
} catch (InterruptedException e) {
break;
}
}
});
}
}
public void submitTask(WorkerTask task) throws InterruptedException {
taskStatus.put(task.toString(), 0); // Queued
taskQueue.put(task);
System.out.println("Submitted: " + task);
}
public void printStatus() {
System.out.println("\n=== Task Status ===");
taskStatus.forEach((task, status) -> {
String statusStr = status == 0 ? "Queued" :
status == 1 ? "Processing" : "Completed";
System.out.println(task + " -> " + statusStr);
});
System.out.println("Queue size: " + taskQueue.size());
}
public void shutdown() {
executor.shutdown();
}
public static void main(String[] args) throws InterruptedException {
TaskQueueSystem system = new TaskQueueSystem(10, 3);
// Submit 10 tasks
for (int i = 1; i <= 10; i++) {
system.submitTask(new WorkerTask("Task-" + i, i));
Thread.sleep(200);
}
Thread.sleep(2000);
system.printStatus();
Thread.sleep(5000);
system.printStatus();
system.shutdown();
}
}
ConcurrentHashMap:
- Null key/value KHÔNG được phép → throws
NullPointerException size()trả vềint,mappingCount()trả vềlong(dùng cho large maps)size()có thể không chính xác trong concurrent environment (weakly consistent)- Từ Java 8+: CAS + synchronized per-bin, KHÔNG phải segment locking
- Bulk operations:
forEach(),search(),reduce()với parallelism threshold
CopyOnWriteArrayList:
- Iterator là snapshot → KHÔNG BAO GIỜ throw
ConcurrentModificationException - Write operations (add, set, remove) copy TOÀN BỘ internal array
- Phù hợp: read >> write (ratio >= 10:1), list size nhỏ (< 1000)
- Không phù hợp: large lists, write-heavy workloads
BlockingQueue:
add()→ throwsIllegalStateExceptionnếu fulloffer()→ returnsfalsenếu full (non-blocking)put()→ BLOCKS cho đến khi có spacetake()→ BLOCKS cho đến khi có elementoffer(e, timeout, unit)vàpoll(timeout, unit)→ block với timeout
Collections.synchronizedMap() vs ConcurrentHashMap:
synchronizedMap(): Lock toàn bộ map → chỉ 1 thread tại 1 thời điểmConcurrentHashMap: Fine-grained locking (per-bin) → nhiều threads đồng thờisynchronizedMap()iteration phải synchronized thủ côngConcurrentHashMapiteration thread-safe
Weakly Consistent Iterators:
- Concurrent collections có weakly consistent iterators
- KHÔNG throw
ConcurrentModificationException - CÓ THỂ hoặc KHÔNG phản ánh concurrent modifications (không guarantee)
Best Practices
-
Chọn đúng collection:
- Map: ConcurrentHashMap (hầu hết trường hợp)
- List (read-heavy): CopyOnWriteArrayList
- Queue: BlockingQueue cho producer-consumer
-
Tránh synchronized wrappers: Dùng concurrent collections thực sự
-
Dùng atomic operations: putIfAbsent, compute, merge thay vì check-then-act
-
Bounded queues: Tránh OutOfMemoryError với unbounded queues
-
Graceful shutdown: Luôn shutdown ExecutorService
-
Monitor queue size: Detect bottlenecks sớm
-
Null safety: ConcurrentHashMap không cho phép null → validate input
-
Use mappingCount(): Thay vì
size()cho large maps
Bài tập
Bài 1: Thread-Safe Cache
Implement cache với ConcurrentHashMap:
put(key, value, ttl): Cache với time-to-liveget(key): Return null nếu expired- Background thread dọn dẹp expired entries
Bài 2: Multi-Producer Multi-Consumer
Tạo hệ thống:
- 5 producers tạo tasks
- 10 consumers xử lý tasks
- Dùng BlockingQueue
- Track số tasks processed bởi mỗi consumer
Bài 3: Event Listeners
Implement Observer pattern với CopyOnWriteArrayList:
addListener(listener)removeListener(listener)notifyListeners(event)- Test với 100 threads đồng thời
Tóm tắt
- Concurrent Collections: Thread-safe, hiệu suất cao
- ConcurrentHashMap: CAS + per-bin locking (Java 8+), không cho phép null,
mappingCount()>size() - CopyOnWriteArrayList: Snapshot iterator, read-heavy use cases (ratio >= 10:1)
- BlockingQueue: Producer-consumer pattern,
put()vsoffer()vsadd() - Weakly consistent iterators: Không throw ConcurrentModificationException
- Tránh Collections.synchronized: Dùng concurrent collections thực sự
Bài tiếp theo: Locks và Conditions - Kiểm soát synchronization linh hoạt hơn!
Đọc thêm
- Oracle/Java API: java.util.concurrent
- Java Concurrency in Practice - Chapter 5: Building Blocks
- Synchronization - Đồng bộ hóa threads với synchronized và locks
- Best Practices - Các best practices cho multithreading