2011-01-24 3 views
0

Мне интересно, какой лучший подход к этой проблеме будет. У меня есть (абстрактно говоря) простой метод, который вызывает веб-сервис и сохраняет результат в локальном кэше в оперативной памяти, что-то вроде:Синхронизация одного конкретного вызова webservice

public Document callWebservice(SomeObject parameter) { 
    Document result = cache.get(parameter); 
    if (result == null) { 
     result = parse(retrieve(parameter)); 
     cache.put(result); 
    } 
    return result; 
} 

Теперь, если документ находится в кэше, он может просто вернуться без проблемы, хорошо. В однопотоковой среде этот подход отлично работает. Однако в многопоточной среде получается, что каждый поток обращается к биту else, и вызывает веб-службу несколько раз.

Я мог бы вырезать синхронизированный блок в части «else», но я считаю, что это слишком «широкий» блокировки - весь метод будет недоступен для вызова потоков, хотя они называют совершенно разные вещи.

Это нормально, если веб-сервис вызывается дважды, если запрос отличается (т. Е. Параметр SomeObject, в данном случае).

Теперь вопрос: какой лучший подход принять в этом случае?

Я думал о сохранении параметра в объекте коллекции (threadafe). Если содержимое параметра одинаков, он будет вызывать тот же результат hashCode/equals и будет найден в объекте коллекции, указывая, что другой поток уже обрабатывает этот запрос. Если это будет так, вызывающий поток может быть приостановлен до тех пор, пока веб-служба не вернется. (Я должен был бы выяснить, как заставить вызывающий поток ждать, хотя). Будет ли это работать с блокировкой на объекте параметра SomeObject? например:

private Map<SomeObject, SomeObject> currentlyProcessingItems = new ConcurrentHashMap<SomeObject, SomeObject>(); 
public Document callWebservice(SomeObject parameter) { 
    if (currentlyProcessedItems.contains(parameter)) { 
     parameter = currentlyProcessedItems.get(parameter); 
    } else { 
     currentlyProcessedItems.putIfAbsent(parameter); 
    } 
    synchronized(parameter) { 
     Document result = cache.get(parameter); 
     if (result == null) { 
      Document result = parse(retrieve(parameter)); 
      cache.put(result); 
     } 
     currentlyProcessedItems.remove(parameter); 
     return result; 
    } 
} 

(примечание: логика для отслеживания в настоящее время обработки запросов, использование ConcurrentHashMap и замок может быть неоптимальным или откровенная неправильно)

Нет, я никогда не закончил читать книгу о многопоточности , Я должен.

Я уверен, что эта конкретная проблема довольно распространена, я просто не смог найти ответ. Что такое ситуация, подобная этой (например, блокировка на конкретном объекте), если я могу спросить?

+0

У вас есть equals и hashCode прямо на SomeObject? –

+0

Да, параметр имеет хороший 'hasCode()' и 'equals()' метод, основанный на Commons Lang's HashCodeBuilder и EqualsBuilder. Внутри он также имеет некоторые объекты, которые также правильно реализуют 'hashCode()' и 'equals()'. – fwielstra

ответ

0

Я сам думал о проблеме, особенно когда retrieve (parameter) занимает много времени (для меня он подключается к серверу для проверки подлинности, запроса процесса/фильтра на задней панели , и т.д.). Теперь я еще не пробовал это сам, но ради обсуждения, как это звучит?

Чтобы использовать кеш, нужно вызвать новый MyCache (ключ) .getValue() из потока, поскольку getValue будет заблокирован в методе getCacheValue(), пока это значение не станет доступно (для всех ожидающих потоков).

public class MyCache { 

    private final static HashMap cacheMap = new HashMap(); // One Map for all 
    private final static Vector fetchList = new Vector(); // One List for all 

    private Object cacheValue; 
    private boolean waitingState; 

    public MyCache (Object key) { 
     if (cacheMap.containsKey (key)) {  // somebody has done it 
      cacheValue = cacheMap.get (key); 
     } else { 
      waitingState = true; 
      if (fetchInProgress (key, false)) // someone else is doing it 
       return; 
      new Thread (new MyFetch (key)).start(); 
    }} 

    synchronized private static boolean fetchInProgress (Object key, boolean remove) { 
     if (remove) { 
      fetchList.remove (key); 
     } else { 
      boolean fetchingNow = fetchList.contains (key); 
      if (fetchingNow) 
       return true; 
      fetchList.addElement (key); 
     } 
     return false; 
    } 

    public Object getValue() { 
     if (waitingState) 
      getCacheValue (true); 
     return cacheValue; 
    } 

    synchronized private void getCacheValue (boolean waitOnLock) { 
     if (waitOnLock) { 
      while (waitingState) { 
       try { 
        wait(); 
       } catch (InterruptedException intex) { 
     }}} else { 
      waitingState = false; 
      notifyAll(); 
    }} 

    public class MyFetch implements Runnable { 
     private Object fetchKey; 
     public MyFetch (Object key) { 
      fetchKey = key;  // run() needs to know what to fetch 
     } 

     public void run() {  // Grab the resource, handle exception, etc. 
      Object fetchedValue = null; 
      // so it compiles, need to replace by callWebService (fetchKey); 
      cacheMap.put (fetchKey, fetchedValue); // Save for future use 
      fetchInProgress (fetchKey, true); // Remove from list 
      getCacheValue (false);   // Notify waiting threads 
}}} 
1

Внимание, объекты XML DOM являются не потокобезопасной. поэтому (игнорируя проблемы безопасности потока самого кеша), вы не можете совместно использовать экземпляр Document из нескольких потоков. мы получили это в нашей собственной базе кода, поэтому я знаю, что это проблематично (не только какая-то теоретическая проблема).

, как для самого кэширования, я считаю, гуава имеет то, что вы ищете: a computing map

+0

+1 на предупреждение, а еще один +1 указывает мне на ComputingMap, который выглядит так же, как и то, что я искал. – fwielstra

0

Я дал свою собственную идею идти, и это на самом деле, кажется, работает нормально - первый, он использует ConcurrentHashMap (хотя хороший параллельный набор был бы лучше, пока не нашел правильной реализации) для поддержания списка обрабатываемого объекта.

В самом методе он смотрит вверх, если карта уже содержит экземпляр идентификатора. Если это не так, он пытается вставить идентификатор с помощью putIfAbsent(). Если между проверкой и вставкой вставлен другой поток, метод putIfAbsent возвращает вставленный элемент другого потока и тот, который используется.

Затем, либо с новым идентификатором, либо с существующим, начинается синхронизированный блок, блокирующий экземпляр идентификатора. В синхронизированном блоке запрашивается кеш, и если это не возвращает результаты, вызывается веб-служба. Фактически это означает, что все процессы, выполняющие один и тот же запрос (идентифицированный идентификатором), могут обрабатывать только один из этих запросов одновременно, поэтому, когда первый вызывает веб-сервис, другие ожидают очереди. Когда настанет очередь, результат будет в кеше, и ожидающие потоки могут извлечь его оттуда.

Что касается объекта Document, который не является потокобезопасным, я пока не уверен, что делать с этим. На моей локальной машине это, кажется, не вызывает никаких проблем, но такие вещи имеют привычку появляться в производстве наугад. Grr. Похоже, мне придется это понять.

Смежные вопросы