2013-08-29 2 views
3

Я использую Hazelcast 2.0.1 для частого обновления данных (около 2 минут), который включает в себя сначала удаление, а затем загрузку данных из БД. Однако где-то вниз по линии один из потоков содержит блокировку ключа, которая предотвращает операцию удаления и генерирует исключение (java.util.ConcurrentModificationException: Another thread holds a lock for the key: [email protected]). Пожалуйста, помогите мне в том, чтобы мои карты в кареглайке обновлялись.Как часто обновлять карту Hazelcast

Я даю мой код ниже

DeltaParallelizer

def customerDetails = dataOperations.getDistributedStore(DataStructures.customer_project.name()).keySet() 
ExecutorService service = Hazelcast.getExecutorService() 

def result 
try{ 
    customerDetails?.each{customerEmail-> 
     log.info String.format('Creating delta task for customer:%s',customerEmail) 
     def dTask = new DistributedTask(new EagerDeltaTask(customerEmail)) 
     service.submit(dTask); 
    } 
    customerDetails?.each {customerEmail -> 
     log.info String.format('Creating task customer aggregation for %s',customerEmail) 
     def task = new DistributedTask(new EagerCustomerAggregationTask(customerEmail)) 
     service.submit(task) 
    } 
} 
catch(Exception e){ 
    e.printStackTrace() 
} 

EagerDeltaTask

class EagerDeltaTask implements Callable,Serializable { 
    private final def emailId 
    EagerDeltaTask(email){ 
     emailId = email 
    } 
    @Override 
    public Object call() throws Exception { 
     log.info(String.format("Eagerly computing delta for %s",emailId))  
     def dataOperations = new DataOperator() 
     def tx = Hazelcast.getTransaction() 
     tx.begin() 
     try{ 
      deleteAll(dataOperations) 
      loadAll(dataOperations) 
      tx.commit() 
     } 
     catch(Exception e){ 
      tx.rollback() 
      log.error(String.format('Delta computation is screwed while loading data for the project:%s',emailId),e) 
     }  
    } 

    private void deleteAll(dataOperations){ 
     log.info String.format('Deleting entries for customer %s',emailId)  
     def projects = dataOperations.getDistributedStore(DataStructures.customer_project.name()).get(emailId) 
     projects?.each{project-> 
      log.info String.format('Deleting entries for project %s',project[DataConstants.PROJECT_NUM.name()]) 
      def srs = dataOperations.srs(project[DataConstants.PROJECT_NUM.name()])?.collect{it[DataConstants.SR_NUM.name()]} 
      def activitiesStore = dataOperations.getDistributedStore(DataStructures.sr_activities.name()) 
      srs?.each{sr -> 
       activitiesStore.remove(sr) 
      } 
      dataOperations.getDistributedStore(DataStructures.project_sr_aggregation.name()).remove(project[DataConstants.PROJECT_NUM.name()]) 
     }  
     dataOperations.getDistributedStore(DataStructures.customer_project.name()).remove(emailId) 
    } 

    private void loadAll(dataOperations){ 
     log.info(String.format('Loading entries for customer %s',emailId)) 
     def projects = dataOperations.projects(emailId) 
     projects?.each{project-> 
      log.info String.format('Loading entries for project %s',project[DataConstants.PROJECT_NUM.name()]) 
      def srs = dataOperations.srs(project[DataConstants.PROJECT_NUM.name()]) 
      srs?.each{sr-> 
       dataOperations.activities(sr[DataConstants.SR_NUM.name()]) 
      } 
     }  
    } 
} 

DataOperator

class DataOperator { 
def getDistributedStore(String name){ 
    Hazelcast.getMap(name) 
} 
} 

я получаю исключение в DeleteAll СГД, поэтому некоторые из содержимого карт удаляются и новые данные загружаются только для карты, содержимое которой было удалено, а остальная часть карты имеет старые данные. Поэтому я не получаю обновленные данные на моей карте Hazelcast. Пожалуйста, предложите свои мнения о том, как я могу обновить данные на моей карте Hazelcast.

Также этот Hazelcast.getTransaction клиент работает для этой цели?

Примечание: клиент может иметь много project_num, 1 project_num может совместно использоваться несколькими клиентами слишком -project_num может иметь несколько SR_NUM

ответ

3

Я использовал Hazelcast eviction policy, который решил мою проблему. Я использовал <time-to-live-seconds>300</time-to-live-seconds>, который очищает содержимое карты каждые 5 минут, и когда любой запрос поступает из пользовательского интерфейса для любой карты, он перезагружает это содержимое карты из загрузчика.

Ниже один из карты конфигурации Hazelcast

... 
<map name="customer_project" > 
    <map-store enabled="true"> 
     <class-name>com.abc.arena.datagrid.loader.CustomerProjectData</class-name> 
    </map-store> 
    <time-to-live-seconds>300</time-to-live-seconds> 
</map> 
... 

CustomerProjectData погрузчик класс просто загружает данные в карту из БД. Так что теперь я больше не нужно DeltaParallelizer или EagerDeltaTask класса

Различные подходы также приветствуются :)

Смежные вопросы