2010-10-16 3 views
1

Я пишу какое-то клиент-серверное приложение, где мне приходится иметь дело с несколькими потоками. У меня есть несколько серверов, которые посылают живые пакеты каждые несколько секунд. Эти серверы поддерживаются в ConcurrentHashMap, который содержит их конечные точки в паре с временем последнего живого пакета, полученного от соответствующего сервера.Безопасность потоков при повторении через параллельные коллекции

Теперь у меня есть поток, который должен «сортировать» все серверы, которые не отправили живые пакеты за определенное количество времени.

Я думаю, я не могу просто так сделать, не так ли?

for(IPEndPoint server : this.fileservers.keySet()) 
{ 
    Long time = this.fileservers.get(server); 

    //If server's time is updated here, I got a problem 

    if(time > fileserverTimeout) 
     this.fileservers.remove(server); 
} 

Есть ли способ я могу обойти это без aquiring блокировки для всего цикла (что я тогда уважать в других потоках, а)?

+0

Просто указывая, что вы не можете удалить элементы, используя упрощенный синтаксис каждого цикла. Вам нужно использовать итератор явно. – sjlee

+0

yep, я понял это тоже ... Я вроде как набрал это, прежде чем на самом деле написал код ^^ – DeX3

ответ

3

Возможно, здесь нет проблем, в зависимости от того, что именно вы храните на карте. Ваш код выглядит немного странным для меня, поскольку вы, похоже, сохраняете «продолжительность, для которой сервер не был активным».

Моя первая идея для записи этих данных заключалась в том, чтобы сохранить «последнюю временную метку, на которой был активен сервер». Тогда ваш код будет выглядеть следующим образом:

package so3950354; 

import java.util.Iterator; 
import java.util.Map; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentMap; 

public class ServerManager { 

    private final ConcurrentMap<Server, Long> lastActive = new ConcurrentHashMap<Server, Long>(); 

    /** May be overridden by a special method for testing. */ 
    protected long now() { 
    return System.currentTimeMillis(); 
    } 

    public void markActive(Server server) { 
    lastActive.put(server, Long.valueOf(now())); 
    } 

    public void removeInactive(long timeoutMillis) { 
    final long now = now(); 

    Iterator<Map.Entry<Server, Long>> it = lastActive.entrySet().iterator(); 
    while (it.hasNext()) { 
     final Map.Entry<Server, Long> entry = it.next(); 
     final long backThen = entry.getValue().longValue(); 
     /* 
     * Even if some other code updates the timestamp of this server now, 
     * the server had timed out at some point in time, so it may be 
     * removed. It's bad luck, but impossible to avoid. 
     */ 
     if (now - backThen >= timeoutMillis) { 
     it.remove(); 
     } 
    } 
    } 

    static class Server { 

    } 
} 

Если вы действительно хотите, чтобы избежать этого никакого кода никогда не вызывает markActive во время вызова removeInactive, нет никакого пути вокруг явной блокировки. Вероятно, вы хотите:

  • Разрешены одновременные звонки markActive.
  • в течение markActive не разрешено звонить в removeInactive.
  • в течение removeInactive не позвоните по номеру markActive.

Это выглядит как типичный сценарий для ReadWriteLock, где markActive является «чтение» операция и removeInactive является «писать» Операция.

+0

Отлично! Более полное развитие, чем я собрал ... +1 – andersoj

-1

Во-первых, делают карту синхронизируется

this.fileservers = Collections.synchronizedMap(Map) 

затем использовать стратегию, которая используется в классах Singleton

if(time > fileserverTimeout) 
    { 
     synchronized(this.fileservers) 
     { 
      if(time > fileserverTimeout) 
        this.fileservers.remove(server); 
     } 
    } 

Теперь это убеждается, что как только вы внутри синхронизированного блока, обновлений не может произойти. Это связано с тем, что после того, как будет зафиксирована блокировка на карте, карта (синхронизированная оболочка) не будет доступна для обеспечения блокировки потока для нее для обновления, удаления и т. Д.

Проверка времени в два раза гарантирует, что синхронизация используется только тогда, когда есть подлинный случай удаления

+1

Синхронизация параллельного хэшмапа по крайней мере сомнительна. – whiskeysierra

+0

Согласованные @Willi и FindBugs надежно напоминают одно из них, чтобы никто не забыл ... – andersoj

0

(См Roland's answer, который принимает идеи здесь и конкретизирует их в более полной, например, с некоторыми большими дополнительными прозрений.)

Поскольку это одновременно хэш-карта, вам может сделать следующее. Обратите внимание, что итераторы CHM реализуют необязательные методы, в том числе remove(), которые вы хотите. См CHM API docs, в котором говорится:

Этот класс и его взгляды и итераторы реализации всех дополнительных методов интерфейсов Map и Iterator.

Этот код должен работать (я не знаю, типа Key в вашем CHM):

ConcurrentHashMap<K,Long> fileservers = ...; 

for(Iterator<Map.Entry<K,Long>> fsIter = fileservers.entrySet().iterator(); fileservers.hasNext();) 
{ 
    Map.Entry<K,Long> thisEntry = fsIter.next(); 
    Long time = thisEntry.getValue(); 

    if(time > fileserverTimeout) 
     fsIter.remove(server); 
} 

Но обратите внимание, что там может быть условие гонки в другом месте ... Вы должны убедиться, что другие биты кода, обращающиеся к карте, могут справиться с таким спонтанным удалением - т. е., вероятно, если вы коснетесь fileservers.put(), вам понадобится немного логики с участием fileservers.putIfAbsent(). Это решение с меньшей вероятностью создает узкие места, чем использование synchronized, но это также требует немного больше мысли.

Где вы написали «Если время сервера обновляется здесь, у меня возникла проблема», именно там находится putIfAbsent(). Если запись отсутствует, либо вы ее раньше не видели, либо недавно ее сбросили с Таблица. Если обе стороны этого необходимо скоординировать, то вместо этого вы можете захотеть ввести заблокированную запись для записи и выполнить синхронизацию на этом уровне (т. Е. Синхронизировать запись, делая remove(), а не на всей таблице).Затем конец put() может также синхронизироваться на одной записи, исключая потенциальную гонку.

1

Я не вижу, как другой поток может обновлять время сервера в этой точке вашего кода. После того как вы получили время сервера с карты, используя this.fileservers.get(server), другой поток не может изменить его значение, поскольку длинные объекты неизменяемы. Да, другой поток может помещать новый новый объект Long для этого сервера в карту, но это не влияет на этот поток, поскольку он уже получил время сервера.

Так как он стоит, я не вижу ничего плохого в вашем коде. Итераторы в ConcurrentHashMap слабо согласованы, что означает, что они могут терпеть одновременную модификацию, поэтому риск возникновения ConcurrentModificationException также не возникает.