2014-01-31 3 views
1

У меня есть служба переноса данных, которая считывает данные из одной базы данных в кусках, а затем переносит ее в другую базу данных.Java Fork Join Pool get stuck

Чтобы работать через «куски» данных, я пытаюсь использовать пул рекурсивной атаки и вилки. Причина в том, что я хотел бы работать над выполнением работы над этими «кусками» параллельно, затем получить еще один кусок, затем выполнить, затем получить еще один кусок и т. Д.

Что происходит, мой процесс просто прекращается. Я не вижу никаких исключений в журналах, и я не вижу никаких заторможенных потоков. Мой код ниже, мои вопросы:

  • Я что-то упустил в своем Работнике? Нужно ли мне возвращать какое-то значение или вызвать какой-то метод для освобождения ресурса?
  • Действительно ли это глупо для меня использовать рекурсивную атаку здесь? Должен ли я использовать что-то другое для распараллеливания кусков работы?
  • У меня есть один рабочий поток в моей дампе нитей, который, кажется, просто ждет - это нормальный или какой-то индикатор моей проблемы?

ForkJoinPool-1-работник-18 ID = 12191 = состояние ОЖИДАНИЕ - ожидание < 0x1b5ca93e> (а) java.util.concurrent.ForkJoinPool - заперта < 0x1b5ca93e> (с java.util. concurrent.ForkJoinPool) на sun.misc.Unsafe.park (собственный метод) на java.util.concurrent.locks.LockSupport.park (LockSupport.java:186) на java.util.concurrent.ForkJoinPool.tryAwaitWork (ForkJoinPool. java: 864) на java.util.concurrent.ForkJoinPool.work (ForkJoinPool.java:647) на java.util.concurrent.ForkJoinWorkerThread.run (ForkJoinWorkerThread.java:398)

Код:

@Component 
public class BulkMigrationService { 

final ForkJoinPool pool = new ForkJoinPool(); 

private static final Logger log = LoggerFactory.getLogger(BulkMigrationService.class); 

private SourceDataApi api; 
private final Migrator migrator; 
private MetadataService metadataService; 

@Autowired 
public BulkMigrationService(SourceDataApi api, Migrator migrator, MetadataService metadataService) { 
    this.api = api; 
    this.migrator = migrator; 
    this.metadataService = metadataService; 
} 

public void migrate(Integer batchSize, Long max) throws MigrationException { 

    Long currentCount = 0l; 
    Integer currentIndex = 0; 

    while (currentCount < max) { 
     List<String> itemsToMigrate = api.findItemRange(currentIndex, currentIndex + batchSize); 

     if (assetsToMigrate.size() > 0) { 
      MigrateForkedWorker starter = new MigrateForkedWorker(assetsToMigrate); 
      pool.invoke(starter); 
     } 

     currentCount += assetsToMigrate.size(); 
     currentIndex += batchSize - 1; 
     if (log.isDebugEnabled()) { 
      log.debug("Migrated " + currentCount + " Items."); 
     } 
    } 

} 

public class MigrateForkedWorker extends RecursiveAction { 

    private int max = 10; 
    private List<String> allItems; 

    public MigrateForkedWorker(List<String> allItems) { 
     this.allItems = allItems; 

    } 

    @Override 
    protected void compute() { 
     if (allItems.size() <= max) { 
      for (String itemInfo : allItems) { 

       try { 
        migrator.migrateAsset(itemInfo); 

       } 
       catch (MigrationException e) { 
        e.printStackTrace(); 
       } 
      } 
     } 
     else { 
      int targetSize = allItems.size() % 2 == 0 ? allItems.size()/2 : (allItems.size() + 1)/2; 
      List<List<String>> splits = Lists.partition(allItems, targetSize); 
      MigrateForkedWorker migrateForkedWorkerOne = new MigrateForkedWorker(splits.get(0)); 
      MigrateForkedWorker migrateForkedWorkerTwo = new MigrateForkedWorker(splits.get(1)); 

      invokeAll(migrateForkedWorkerOne, migrateForkedWorkerTwo); 
     } 
    } 
} 
} 
+0

Мне интересно, вижу ли я что-то вроде этого: http://stackoverflow.com/questions/16894929/forkjoinpool-stalls-during-invokeall-join Я, вероятно, просто не понимаю этого материала достаточно хорошо, чтобы его использовать. Прочтите это недавно в Интернете: у разработчика была проблема, и хотя «я знаю, я буду использовать потоки». Теперь у него 10 проблем – eric

ответ

1

Ваша первая проблема заключается в том, что вы используете invokeAll(). Это просто отправляет новые запросы в пул и ждет их завершения. Следуйте примеру JavaDoc: используйте fork() migrateForkedWorkerOne.fork(); migrateForkedWorkerTwo.compute(); migrateForkedWorkerOne.join();

Смежные вопросы