Chuyển tới nội dung chính

Concurrent Collections

Mục tiêu bài học

Sau bài này, bạn sẽ:

  • Hiểu vấn đề ConcurrentModificationException và cách giải quyết
  • Sử dụng được ConcurrentHashMap cho thread-safe map operations
  • Biết cách dùng CopyOnWriteArrayList cho read-heavy scenarios
  • Nắm được các BlockingQueue implementations và use cases
  • So sánh được các concurrent collections và chọn đúng loại cho từng tình huống

Bài trước: Executor Framework — Đã học cách quản lý thread pools với ExecutorService. Bài này sẽ giới thiệu các thread-safe data structures trong java.util.concurrent.

Vấn đề với Collections trong Multithreading

ConcurrentModificationException

import java.util.*;

public class CollectionProblem {
public static void main(String[] args) {
List<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5));

// Thread 1: Iterate
Thread t1 = new Thread(() -> {
try {
for (Integer num : list) {
System.out.println(num);
Thread.sleep(100);
}
} catch (Exception e) {
e.printStackTrace();
}
});

// Thread 2: Modify
Thread t2 = new Thread(() -> {
try {
Thread.sleep(50);
list.add(6); // ConcurrentModificationException!
} catch (Exception e) {
e.printStackTrace();
}
});

t1.start();
t2.start();
}
}

Exception:

java.util.ConcurrentModificationException
cảnh báo

Các Collections thông thường (ArrayList, HashMap, HashSet) không thread-safe!

Race Condition với HashMap

Map<String, Integer> map = new HashMap<>();

// 2 threads cùng put
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
map.put("key-" + i, i);
}
});

Thread t2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
map.put("key-" + i, i);
}
});

t1.start();
t2.start();
t1.join();
t2.join();

System.out.println("Size: " + map.size());
// Có thể < 2000 hoặc thậm chí infinite loop trong Java 7!

Collections.synchronized Wrappers

Cách đơn giản nhất: Wrap collections với synchronized:

List<String> list = Collections.synchronizedList(new ArrayList<>());
Map<String, Integer> map = Collections.synchronizedMap(new HashMap<>());
Set<String> set = Collections.synchronizedSet(new HashSet<>());

Ví dụ

import java.util.*;

public class SynchronizedWrapperDemo {
public static void main(String[] args) throws InterruptedException {
List<Integer> syncList = Collections.synchronizedList(new ArrayList<>());

// 10 threads cùng add
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
syncList.add(threadId * 100 + j);
}
});
threads[i].start();
}

for (Thread t : threads) {
t.join();
}

System.out.println("Size: " + syncList.size()); // 1000 (đúng)
}
}

Hạn chế của Synchronized Wrappers

cảnh báo
  1. Iteration không thread-safe: Phải synchronized thủ công

    synchronized (syncList) {
    for (String item : syncList) {
    System.out.println(item);
    }
    }
  2. Performance: Lock toàn bộ collection → chỉ 1 thread tại 1 thời điểm

  3. No compound operations: put-if-absent, replace, etc. không atomic

ConcurrentHashMap

ConcurrentHashMap là HashMap thread-safe với hiệu suất cao:

Đặc điểm

  • CAS + per-bin locking (Java 8+): Từ Java 8, ConcurrentHashMap sử dụng CAS (Compare-And-Swap) và synchronized per-bin (từng node đầu bucket (thùng/ngăn chứa))
  • Lock-free reads: Đọc không cần lock
  • Atomic operations: putIfAbsent, replace, compute, merge
  • Thread-safe iteration: Không throw ConcurrentModificationException
  • Null safety: Không cho phép null key hoặc null value (throws NullPointerException)
Lịch sử cơ chế locking

Java 7 và trước đó: Sử dụng segment-based locking (16 segments mặc định). Mỗi segment là một lock riêng, cho phép 16 threads write đồng thời.

Java 8+: Thay đổi hoàn toàn cơ chế:

  • Empty bin (ngăn) → CAS insert (không cần lock)
  • Occupied bin → synchronized trên head node của bin đó
  • Khi bin có > 8 entries VÀ table size >= 64 → treeify (chuyển đổi thành cây): bin chuyển từ linked list sang red-black tree
Traditional HashMap (synchronized)
┌──────────────────────────────┐
│ Lock toàn bộ map │ ← Chỉ 1 thread tại 1 thời điểm
└──────────────────────────────┘

ConcurrentHashMap (Java 8+)
┌──────┬──────┬──────┬──────┐
│ Bin1 │ Bin2 │ Bin3 │ Bin4 │ ← Mỗi bin lock độc lập
└──────┴──────┴──────┴──────┘
↑ ↑ ↑ ↑
Thread1 Thread2 Thread3 Thread4 ← Nhiều threads đồng thời!
Empty bin → CAS (no lock)
Occupied bin → synchronized(head node)

Ví dụ cơ bản

import java.util.concurrent.*;

public class ConcurrentHashMapDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// 10 threads cùng thao tác
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
String key = "key-" + (threadId * 100 + j);
map.put(key, j);
}
});
threads[i].start();
}

for (Thread t : threads) {
t.join();
}

System.out.println("Size: " + map.size()); // 1000
System.out.println("Mapping count: " + map.mappingCount()); // 1000

// Iteration thread-safe (không cần synchronized block)
map.forEach((key, value) -> {
if (value > 95) {
System.out.println(key + " = " + value);
}
});
}
}

Null Key/Value Không Được Phép

ConcurrentHashMap không cho phép null keys hoặc null values:

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// NullPointerException!
try {
map.put(null, 100);
} catch (NullPointerException e) {
System.out.println("Cannot put null key: " + e.getMessage());
}

// NullPointerException!
try {
map.put("key1", null);
} catch (NullPointerException e) {
System.out.println("Cannot put null value: " + e.getMessage());
}
Tại sao không cho phép null?

Lý do: Tránh ambiguity (tính mơ hồ) trong môi trường concurrent:

// Với HashMap (cho phép null value):
HashMap<String, Integer> hashMap = new HashMap<>();
hashMap.put("key1", null);

Integer value = hashMap.get("key1"); // null - nhưng key tồn tại!
Integer value2 = hashMap.get("key2"); // null - key không tồn tại!

Trong single-threaded, bạn có thể dùng containsKey() để phân biệt. Nhưng trong concurrent environment:

// Thread 1
if (map.containsKey("key1")) {
// Thread 2 có thể remove("key1") ở đây!
Integer value = map.get("key1"); // null - vì key bị xóa hay value là null?
}

Không thể phân biệt được get(key) == null là do:

  1. Key không tồn tại
  2. Value thực sự là null

ConcurrentHashMap cấm null để tránh tình huống này.

size() vs mappingCount()

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// Thêm dữ liệu
for (int i = 0; i < 1000; i++) {
map.put("key-" + i, i);
}

// size() trả về int (có thể truncate nếu > Integer.MAX_VALUE)
int size = map.size();

// mappingCount() trả về long (chính xác hơn)
long count = map.mappingCount();

System.out.println("size(): " + size);
System.out.println("mappingCount(): " + count);
size() vs mappingCount()
  • size(): Trả về int → nếu map có > 2^31 - 1 entries, giá trị sẽ bị truncate
  • mappingCount(): Trả về long → chính xác hơn cho large maps
  • Best practice: Dùng mappingCount() cho production code

Bulk Operations (Java 8+)

ConcurrentHashMap cung cấp các bulk operations (forEach, search, reduce) song song:

ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<>();
scores.put("Alice", 95);
scores.put("Bob", 87);
scores.put("Charlie", 92);
scores.put("David", 78);

// forEach - duyệt qua tất cả entries song song
scores.forEach(1, (key, value) -> {
System.out.println(key + ": " + value);
});

// search - tìm entry đầu tiên thỏa điều kiện
String topStudent = scores.search(1, (key, value) -> {
return value > 90 ? key : null;
});
System.out.println("Top student: " + topStudent);

// reduce - tính tổng tất cả scores
Integer totalScore = scores.reduce(1,
(key, value) -> value, // Transformer
(v1, v2) -> v1 + v2 // Reducer
);
System.out.println("Total score: " + totalScore);
Parallelism Threshold

Tham số đầu tiên (1 trong ví dụ trên) là parallelism threshold:

  • Nếu map size > threshold → operation thực hiện song song (parallel)
  • Nếu map size ≤ threshold → operation thực hiện tuần tự (sequential)
  • 1 = luôn song song, Long.MAX_VALUE = luôn tuần tự

Atomic Operations

1. putIfAbsent()

ConcurrentHashMap<String, Integer> cache = new ConcurrentHashMap<>();

// Thread-safe: chỉ put nếu key chưa tồn tại
Integer oldValue = cache.putIfAbsent("user-1", 100);

if (oldValue == null) {
System.out.println("Inserted");
} else {
System.out.println("Already exists: " + oldValue);
}

2. replace()

// Replace chỉ khi key tồn tại
cache.replace("user-1", 200);

// Replace chỉ khi value hiện tại match
boolean replaced = cache.replace("user-1", 100, 200);
System.out.println("Replaced: " + replaced);

3. compute()

// Update value dựa trên key và value hiện tại
cache.compute("user-1", (key, oldValue) -> {
return (oldValue == null) ? 1 : oldValue + 1;
});

// Tương tự với computeIfAbsent, computeIfPresent
cache.computeIfAbsent("user-2", key -> expensiveComputation(key));

4. merge()

// Word count example
ConcurrentHashMap<String, Integer> wordCount = new ConcurrentHashMap<>();

String[] words = {"apple", "banana", "apple", "cherry", "banana", "apple"};

for (String word : words) {
// Merge: nếu key tồn tại thì merge, không thì insert
wordCount.merge(word, 1, (oldCount, newCount) -> oldCount + newCount);
}

wordCount.forEach((word, count) -> {
System.out.println(word + ": " + count);
});
// apple: 3, banana: 2, cherry: 1

Ví dụ thực tế: URL Visit Counter

import java.util.concurrent.*;

public class URLVisitCounter {
private ConcurrentHashMap<String, Integer> visitCount = new ConcurrentHashMap<>();

public void recordVisit(String url) {
visitCount.merge(url, 1, Integer::sum);
}

public int getVisitCount(String url) {
return visitCount.getOrDefault(url, 0);
}

public void printTopVisited(int n) {
visitCount.entrySet().stream()
.sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
.limit(n)
.forEach(entry -> {
System.out.println(entry.getKey() + ": " + entry.getValue() + " visits");
});
}

public static void main(String[] args) throws InterruptedException {
URLVisitCounter counter = new URLVisitCounter();

// Giả lập 100 requests đồng thời
Thread[] threads = new Thread[100];
String[] urls = {"/home", "/products", "/about", "/contact", "/home"};

for (int i = 0; i < 100; i++) {
final int index = i;
threads[i] = new Thread(() -> {
String url = urls[index % urls.length];
counter.recordVisit(url);
});
threads[i].start();
}

for (Thread t : threads) {
t.join();
}

System.out.println("Top visited URLs:");
counter.printTopVisited(3);
}
}

CopyOnWriteArrayList

CopyOnWriteArrayList tạo copy mới của array mỗi khi modify:

Đặc điểm

  • Read không cần lock: Cực kỳ nhanh
  • Write tốn kém: Tạo copy mới → chậm, tốn memory
  • Snapshot (ảnh chụp nhanh) iterator: Iterator hoạt động trên snapshot của array tại thời điểm tạo iterator
  • Thread-safe iteration: Iterator không bao giờ throw ConcurrentModificationException
  • Use case: Đọc nhiều, ghi ít (listeners, observers)

Cơ chế Copy-on-Write

Khi có write operation (add, set, remove), toàn bộ internal array được copy:

// Pseudo-code của add() trong CopyOnWriteArrayList
public boolean add(E e) {
synchronized (lock) {
Object[] oldArray = getArray();
int len = oldArray.length;

// Tạo array mới lớn hơn 1 element
Object[] newArray = Arrays.copyOf(oldArray, len + 1);
newArray[len] = e;

// Swap reference
setArray(newArray);
return true;
}
}

Tại sao read không cần lock?

  • Readers luôn đọc từ current array reference
  • Writers tạo copy mới và swap reference
  • Array reference assignment là atomic operation
  • Readers không bao giờ thấy incomplete state
import java.util.concurrent.*;
import java.util.List;

public class CopyOnWriteArrayListDemo {
public static void main(String[] args) throws InterruptedException {
List<String> list = new CopyOnWriteArrayList<>();

// Thread 1: Add items
Thread writer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
list.add("Item-" + i);
System.out.println("Added: Item-" + i);
try { Thread.sleep(100); } catch (InterruptedException e) {}
}
});

// Thread 2: Iterate (không cần synchronized)
Thread reader = new Thread(() -> {
for (int i = 0; i < 3; i++) {
System.out.print("Reading: ");
for (String item : list) {
System.out.print(item + " ");
}
System.out.println();
try { Thread.sleep(150); } catch (InterruptedException e) {}
}
});

writer.start();
Thread.sleep(50);
reader.start();

writer.join();
reader.join();
}
}

Output:

Added: Item-0
Reading: Item-0
Added: Item-1
Reading: Item-0 Item-1
Added: Item-2
Added: Item-3
Reading: Item-0 Item-1 Item-2 Item-3
Added: Item-4

Snapshot Iterator

Iterator của CopyOnWriteArrayList hoạt động trên snapshot:

import java.util.concurrent.*;
import java.util.*;

public class SnapshotIteratorDemo {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("A");
list.add("B");
list.add("C");

// Tạo iterator → snapshot tại thời điểm này
Iterator<String> iterator = list.iterator();

// Modify list AFTER tạo iterator
list.add("D");
list.add("E");
list.remove("A");

System.out.println("Iterator sees (snapshot):");
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
// Output: A, B, C (snapshot tại thời điểm tạo iterator)

System.out.println("\nActual list:");
list.forEach(System.out::println);
// Output: B, C, D, E (current state)

// Iterator KHÔNG BAO GIỜ throw ConcurrentModificationException
}
}
Iterator là Snapshot
  • Iterator thấy snapshot của list tại thời điểm tạo iterator
  • Modifications sau đó KHÔNG phản ánh trong iterator
  • Điều này khác với ArrayList iterator (throw ConcurrentModificationException)

Performance Characteristics

import java.util.*;
import java.util.concurrent.*;

public class CopyOnWritePerformanceDemo {
public static void main(String[] args) {
CopyOnWriteArrayList<Integer> cowList = new CopyOnWriteArrayList<>();

// Write performance: O(n) - phải copy toàn bộ array
long start = System.nanoTime();
for (int i = 0; i < 10000; i++) {
cowList.add(i); // Mỗi add() copy toàn bộ array!
}
long writeTime = System.nanoTime() - start;
System.out.println("Write time: " + writeTime / 1_000_000 + " ms");

// Read performance: O(1) - không lock
start = System.nanoTime();
for (int i = 0; i < 10000; i++) {
int value = cowList.get(i);
}
long readTime = System.nanoTime() - start;
System.out.println("Read time: " + readTime / 1_000_000 + " ms");

// Write time >> Read time
}
}
Khi nào dùng CopyOnWriteArrayList?

Phù hợp:

  • Event listeners/Observers: Danh sách listeners ít thay đổi, fire events nhiều
  • Configuration data: Config ít update, đọc thường xuyên
  • Small collections với read >> write (ratio 100:1 hoặc cao hơn)

KHÔNG phù hợp:

  • Large lists: Copy 10,000 elements mỗi lần add → rất tốn kém
  • Write-heavy workloads: Mỗi modification copy toàn bộ → memory và CPU cao
  • Memory-constrained environments: Tạm thời có 2 copies của array

Rule of thumb: Chỉ dùng khi read:write ratio >= 10:1 VÀ list size < 1000.

BlockingQueue Interface

BlockingQueue là queue hỗ trợ blocking operations:

  • put(): Chờ nếu queue đầy
  • take(): Chờ nếu queue rỗng

Các implementations

ImplementationĐặc điểm
ArrayBlockingQueueBounded queue, array-based
LinkedBlockingQueueOptionally bounded, linked nodes
PriorityBlockingQueueUnbounded, sorted by priority
SynchronousQueueKhông có capacity, hand-off trực tiếp
DelayQueueElements available after delay

Methods: Sự khác biệt giữa add/offer/put

BlockingQueue có 3 nhóm methods cho insertion và removal:

MethodBlockingTimeoutFull/Empty Behavior
Insertion
add(e)KhôngKhôngThrows IllegalStateException nếu full
offer(e)KhôngKhôngReturns false nếu full
put(e)KhôngBlocks cho đến khi có space
offer(e, timeout, unit)Blocks tối đa timeout, returns false nếu hết thời gian
Removal
remove()KhôngKhôngThrows NoSuchElementException nếu empty
poll()KhôngKhôngReturns null nếu empty
take()KhôngBlocks cho đến khi có element
poll(timeout, unit)Blocks tối đa timeout, returns null nếu hết thời gian
Examination
element()KhôngKhôngThrows exception nếu empty
peek()KhôngKhôngReturns null nếu empty

So sánh add() vs offer() vs put()

import java.util.concurrent.*;

public class BlockingQueueMethodsDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);

// 1. add() - throws exception nếu full
queue.add("A");
queue.add("B");
try {
queue.add("C"); // IllegalStateException: Queue full
} catch (IllegalStateException e) {
System.out.println("add() failed: " + e.getMessage());
}

// 2. offer() - returns false nếu full
queue.clear();
queue.offer("A");
queue.offer("B");
boolean result = queue.offer("C"); // Returns false
System.out.println("offer() result: " + result); // false

// 3. put() - blocks cho đến khi có space
queue.clear();
Thread producer = new Thread(() -> {
try {
queue.put("A");
queue.put("B");
System.out.println("Before put C - queue is full");
queue.put("C"); // BLOCKS ở đây!
System.out.println("After put C - space available");
} catch (InterruptedException e) {
e.printStackTrace();
}
});

Thread consumer = new Thread(() -> {
try {
Thread.sleep(2000); // Đợi 2 giây
String item = queue.take(); // Lấy 1 item → tạo space
System.out.println("Consumed: " + item);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

producer.start();
consumer.start();

// 4. offer() với timeout
queue.clear();
try {
queue.offer("A");
queue.offer("B");
boolean success = queue.offer("C", 1, TimeUnit.SECONDS);
System.out.println("offer() with timeout: " + success); // false sau 1 giây
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Khi nào dùng method nào?

add():

  • Dùng khi queue không bao giờ full (hoặc full là error)
  • Bounded queue với guaranteed space
  • Muốn exception nếu fail

offer():

  • Dùng khi không muốn block và có thể handle failure
  • Non-blocking producer
  • Có fallback strategy khi queue full

put():

  • Dùng khi producer phải chờ nếu queue full
  • Producer-consumer pattern chuẩn
  • Backpressure mechanism

offer() with timeout:

  • Dùng khi chờ có giới hạn
  • Tránh infinite blocking
  • Producer có deadline

ArrayBlockingQueue

Fixed-size queue:

import java.util.concurrent.*;

public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

// Producer
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.put(i); // Chờ nếu queue đầy
System.out.println("Produced: " + i + " | Queue size: " + queue.size());
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});

// Consumer
Thread consumer = new Thread(() -> {
try {
while (true) {
Integer item = queue.take(); // Chờ nếu queue rỗng
System.out.println(" Consumed: " + item + " | Queue size: " + queue.size());
Thread.sleep(1000); // Consumer chậm hơn producer
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});

producer.start();
consumer.start();
}
}

Output:

Produced: 1 | Queue size: 1
Consumed: 1 | Queue size: 0
Produced: 2 | Queue size: 1
Produced: 3 | Queue size: 2
Consumed: 2 | Queue size: 1
Produced: 4 | Queue size: 2
Produced: 5 | Queue size: 3
Produced: 6 | Queue size: 4
Consumed: 3 | Queue size: 3
...

LinkedBlockingQueue

Optionally bounded (mặc định unbounded):

// Unbounded
BlockingQueue<String> queue1 = new LinkedBlockingQueue<>();

// Bounded
BlockingQueue<String> queue2 = new LinkedBlockingQueue<>(100);

Producer-Consumer với BlockingQueue

import java.util.concurrent.*;

class Task {
private int id;

public Task(int id) {
this.id = id;
}

public void process() {
System.out.println("Processing task " + id + " in " +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public String toString() {
return "Task-" + id;
}
}

public class ProducerConsumerBlockingQueue {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Task> queue = new ArrayBlockingQueue<>(10);

// Producer
Thread producer = new Thread(() -> {
for (int i = 1; i <= 20; i++) {
try {
Task task = new Task(i);
queue.put(task);
System.out.println("Produced: " + task);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Producer");

// 3 Consumers
for (int i = 1; i <= 3; i++) {
Thread consumer = new Thread(() -> {
while (true) {
try {
Task task = queue.take();
task.process();
} catch (InterruptedException e) {
break;
}
}
}, "Consumer-" + i);
consumer.start();
}

producer.start();
producer.join();

Thread.sleep(5000); // Chờ consumers xử lý xong
}
}

ConcurrentLinkedQueue

Non-blocking queue sử dụng CAS (Compare-And-Swap):

import java.util.concurrent.*;

public class ConcurrentLinkedQueueDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

// 5 threads cùng add
Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
queue.offer("Thread-" + threadId + "-Item-" + j);
}
});
threads[i].start();
}

for (Thread t : threads) {
t.join();
}

System.out.println("Queue size: " + queue.size()); // 500

// Poll items
int count = 0;
while (!queue.isEmpty() && count < 10) {
System.out.println(queue.poll());
count++;
}
}
}

So sánh Concurrent Collections

CollectionThread-Safety MechanismPerformanceUse Case
ConcurrentHashMapCAS + per-bin locking (Java 8+)High (reads lock-free)General purpose map, high concurrency
CopyOnWriteArrayListCopy-on-writeReads: High, Writes: LowRead-heavy, few writes (listeners)
ConcurrentLinkedQueueLock-free (CAS)HighNon-blocking queue
ArrayBlockingQueueLock-based (ReentrantLock)MediumBounded producer-consumer
LinkedBlockingQueueTwo-lock (put/take separate)MediumUnbounded/bounded producer-consumer
synchronized MapFull lock on entire mapLowLegacy code, low concurrency

Weakly Consistent Iterators

Tất cả concurrent collections có weakly consistent (nhất quán yếu) iterators:

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("A", 1);
map.put("B", 2);
map.put("C", 3);

// Iterator được tạo
Iterator<String> iterator = map.keySet().iterator();

// Modifications SAU khi tạo iterator
map.put("D", 4);
map.remove("A");

// Iterator CÓ THỂ hoặc KHÔNG phản ánh modifications
// Nó KHÔNG throw ConcurrentModificationException
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
Weakly Consistent vs Fail-Fast

Fail-Fast (ArrayList, HashMap):

  • Iterator check modCount mỗi lần operation
  • Throw ConcurrentModificationException nếu collection bị modify
  • Không thread-safe

Weakly Consistent (Concurrent Collections):

  • Iterator KHÔNG throw exception
  • CÓ THỂ phản ánh concurrent modifications (không guarantee)
  • Thread-safe
  • Trade-off: consistency yếu hơn để đổi lấy performance và thread-safety

Ví dụ thực tế: Task Queue System

import java.util.concurrent.*;
import java.util.*;

class WorkerTask {
private String taskName;
private int priority;

public WorkerTask(String taskName, int priority) {
this.taskName = taskName;
this.priority = priority;
}

public void execute() {
System.out.println("[" + Thread.currentThread().getName() +
"] Executing: " + taskName);
try {
Thread.sleep(1000 + new Random().nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public String toString() {
return taskName + " (priority: " + priority + ")";
}
}

public class TaskQueueSystem {
private BlockingQueue<WorkerTask> taskQueue;
private ConcurrentHashMap<String, Integer> taskStatus;
private ExecutorService executor;

public TaskQueueSystem(int queueSize, int workers) {
this.taskQueue = new ArrayBlockingQueue<>(queueSize);
this.taskStatus = new ConcurrentHashMap<>();
this.executor = Executors.newFixedThreadPool(workers);

// Start worker threads
for (int i = 0; i < workers; i++) {
executor.submit(() -> {
while (!Thread.interrupted()) {
try {
WorkerTask task = taskQueue.take();
taskStatus.put(task.toString(), 1); // Processing
task.execute();
taskStatus.put(task.toString(), 2); // Completed
} catch (InterruptedException e) {
break;
}
}
});
}
}

public void submitTask(WorkerTask task) throws InterruptedException {
taskStatus.put(task.toString(), 0); // Queued
taskQueue.put(task);
System.out.println("Submitted: " + task);
}

public void printStatus() {
System.out.println("\n=== Task Status ===");
taskStatus.forEach((task, status) -> {
String statusStr = status == 0 ? "Queued" :
status == 1 ? "Processing" : "Completed";
System.out.println(task + " -> " + statusStr);
});
System.out.println("Queue size: " + taskQueue.size());
}

public void shutdown() {
executor.shutdown();
}

public static void main(String[] args) throws InterruptedException {
TaskQueueSystem system = new TaskQueueSystem(10, 3);

// Submit 10 tasks
for (int i = 1; i <= 10; i++) {
system.submitTask(new WorkerTask("Task-" + i, i));
Thread.sleep(200);
}

Thread.sleep(2000);
system.printStatus();

Thread.sleep(5000);
system.printStatus();

system.shutdown();
}
}
OCP Exam Tips

ConcurrentHashMap:

  • Null key/value KHÔNG được phép → throws NullPointerException
  • size() trả về int, mappingCount() trả về long (dùng cho large maps)
  • size() có thể không chính xác trong concurrent environment (weakly consistent)
  • Từ Java 8+: CAS + synchronized per-bin, KHÔNG phải segment locking
  • Bulk operations: forEach(), search(), reduce() với parallelism threshold

CopyOnWriteArrayList:

  • Iterator là snapshot → KHÔNG BAO GIỜ throw ConcurrentModificationException
  • Write operations (add, set, remove) copy TOÀN BỘ internal array
  • Phù hợp: read >> write (ratio >= 10:1), list size nhỏ (< 1000)
  • Không phù hợp: large lists, write-heavy workloads

BlockingQueue:

  • add() → throws IllegalStateException nếu full
  • offer() → returns false nếu full (non-blocking)
  • put()BLOCKS cho đến khi có space
  • take()BLOCKS cho đến khi có element
  • offer(e, timeout, unit)poll(timeout, unit) → block với timeout

Collections.synchronizedMap() vs ConcurrentHashMap:

  • synchronizedMap(): Lock toàn bộ map → chỉ 1 thread tại 1 thời điểm
  • ConcurrentHashMap: Fine-grained locking (per-bin) → nhiều threads đồng thời
  • synchronizedMap() iteration phải synchronized thủ công
  • ConcurrentHashMap iteration thread-safe

Weakly Consistent Iterators:

  • Concurrent collections có weakly consistent iterators
  • KHÔNG throw ConcurrentModificationException
  • CÓ THỂ hoặc KHÔNG phản ánh concurrent modifications (không guarantee)

Best Practices

mẹo
  1. Chọn đúng collection:

    • Map: ConcurrentHashMap (hầu hết trường hợp)
    • List (read-heavy): CopyOnWriteArrayList
    • Queue: BlockingQueue cho producer-consumer
  2. Tránh synchronized wrappers: Dùng concurrent collections thực sự

  3. Dùng atomic operations: putIfAbsent, compute, merge thay vì check-then-act

  4. Bounded queues: Tránh OutOfMemoryError với unbounded queues

  5. Graceful shutdown: Luôn shutdown ExecutorService

  6. Monitor queue size: Detect bottlenecks sớm

  7. Null safety: ConcurrentHashMap không cho phép null → validate input

  8. Use mappingCount(): Thay vì size() cho large maps

Bài tập

Bài 1: Thread-Safe Cache

Implement cache với ConcurrentHashMap:

  • put(key, value, ttl): Cache với time-to-live
  • get(key): Return null nếu expired
  • Background thread dọn dẹp expired entries

Bài 2: Multi-Producer Multi-Consumer

Tạo hệ thống:

  • 5 producers tạo tasks
  • 10 consumers xử lý tasks
  • Dùng BlockingQueue
  • Track số tasks processed bởi mỗi consumer

Bài 3: Event Listeners

Implement Observer pattern với CopyOnWriteArrayList:

  • addListener(listener)
  • removeListener(listener)
  • notifyListeners(event)
  • Test với 100 threads đồng thời

Tóm tắt

  • Concurrent Collections: Thread-safe, hiệu suất cao
  • ConcurrentHashMap: CAS + per-bin locking (Java 8+), không cho phép null, mappingCount() > size()
  • CopyOnWriteArrayList: Snapshot iterator, read-heavy use cases (ratio >= 10:1)
  • BlockingQueue: Producer-consumer pattern, put() vs offer() vs add()
  • Weakly consistent iterators: Không throw ConcurrentModificationException
  • Tránh Collections.synchronized: Dùng concurrent collections thực sự

Bài tiếp theo: Locks và Conditions - Kiểm soát synchronization linh hoạt hơn!

Đọc thêm