2014-10-21 4 views
0

Я хочу реализовать безопасную по потоку карту очередей.Thread safe Карта очередей

Я намерен начать с пустой карты. Если ключ не существует, я хочу создать новую запись в карте с новой очередью. Если ключ существует, я хочу добавить его в очередь. Предлагаемый мною реализация выглядит следующим образом:

import java.util.Map; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentLinkedQueue; 

public class StackOverFlowExample { 

    private final Map<String, ConcurrentLinkedQueue<String>> map = new ConcurrentHashMap<>(); 

    public void addElementToQueue(String key, String value){ 
     if (map.containsKey(key)){ 
      map.get(key).add(value); 
     } 
     else{ 
      ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); 
      queue.add(value); 
      map.put(key, queue); 
     }   
    }  
} 

Меня беспокоит то, что, когда несколько потоков пытаются добавить новое значение в карту, то первым будет поставить новую запись карты с новой очередью, то второй будет ждать, а затем добавьте новую очередь для ключа, а не добавьте в очередь. Знания API параллелизма/параллелизма в лучшем случае невелики. Возможно, параллелизм на месте, чтобы избежать этого? Совет будет высоко оценен.

ответ

2

Эта модель, вероятно, был размещен много раз на SO (эффективно добавление к одновременному карте):

Queue<String> q = map.get(key); 
if(q == null) { 
    q = new ConcurrentLinkedQueue<String>(); 
    Queue<String> curQ = map.putIfAbsent(key, q); 
    if(curQ != null) { 
    q = curQ; 
    } 
} 
q.add(value); 
0

Таким образом, ваш страх, что нить А и нить B будет делать следующее:

thread A: lock ConcurrentHashMap Look for Queue "x" (not found) unlock ConcurrentHashMap create Queue "x" lock ConcurrentHashMap Insert Queue X unlock ConcurrentHashMap Thread B: Lock ConcurrentHashMap (while thread A is in 'create Queue X') look for queue X (not found) unlock ConcurrentHashMap (thread A then gets lock) create Queue "x" v2 lock ConcurrentHashMap Insert Queue X v2 (overwriting the old entry) unlock ConcurrentHashMap

То есть на самом деле реальная проблема, но один, который легко решается путем AddElementToQueue быть синхронизирован методом. Тогда может быть только один поток внутри AddElementToQueue в любой момент времени, и поэтому отверстие синхронизации между первой «разблокировкой» и вторым «замком» закрывается.

Таким образом

public synchronized void addElementToQueue(String key, String value){

должен решить проблему потерь очереди.

+0

Что бы иметь смысл, если ConcurrentHashMap используется блокировка при извлечении. Это не так. – spudone

+0

Ah right, ConcurrentHashMap «просто» использует структуры данных, которые устойчивы к множественному считыванию. Блокировка используется только писателями за короткий промежуток времени, необходимый для добавления или удаления элемента с карты. Я продолжаю забывать эту деталь реализации, потому что это действительно не имеет значения с программной точки зрения. Проблема все еще возникает в одном месте (одновременное «существует ли X в hashmap?» Одновременно с «созданием нового хэшмапа»), и решение одно и то же - синхронизировать метод addElementToQueue. –

+0

Используя синхронизированный блок, ему действительно не нужны параллельные структуры данных, если в коде больше нет. – spudone

0

Если Java 8 вариант:

public void addElementToQueue(String key, String value) { 
    map.merge(key, new ConcurrentLinkedQueue<>(Arrays.asList(value)), (oldValue, coming) -> { 
     oldValue.addAll(coming); 
     return oldValue; 
    }); 
}