У меня есть служба переноса данных, которая считывает данные из одной базы данных в кусках, а затем переносит ее в другую базу данных.Java Fork Join Pool get stuck
Чтобы работать через «куски» данных, я пытаюсь использовать пул рекурсивной атаки и вилки. Причина в том, что я хотел бы работать над выполнением работы над этими «кусками» параллельно, затем получить еще один кусок, затем выполнить, затем получить еще один кусок и т. Д.
Что происходит, мой процесс просто прекращается. Я не вижу никаких исключений в журналах, и я не вижу никаких заторможенных потоков. Мой код ниже, мои вопросы:
- Я что-то упустил в своем Работнике? Нужно ли мне возвращать какое-то значение или вызвать какой-то метод для освобождения ресурса?
- Действительно ли это глупо для меня использовать рекурсивную атаку здесь? Должен ли я использовать что-то другое для распараллеливания кусков работы?
- У меня есть один рабочий поток в моей дампе нитей, который, кажется, просто ждет - это нормальный или какой-то индикатор моей проблемы?
ForkJoinPool-1-работник-18 ID = 12191 = состояние ОЖИДАНИЕ - ожидание < 0x1b5ca93e> (а) java.util.concurrent.ForkJoinPool - заперта < 0x1b5ca93e> (с java.util. concurrent.ForkJoinPool) на sun.misc.Unsafe.park (собственный метод) на java.util.concurrent.locks.LockSupport.park (LockSupport.java:186) на java.util.concurrent.ForkJoinPool.tryAwaitWork (ForkJoinPool. java: 864) на java.util.concurrent.ForkJoinPool.work (ForkJoinPool.java:647) на java.util.concurrent.ForkJoinWorkerThread.run (ForkJoinWorkerThread.java:398)
Код:
@Component
public class BulkMigrationService {
final ForkJoinPool pool = new ForkJoinPool();
private static final Logger log = LoggerFactory.getLogger(BulkMigrationService.class);
private SourceDataApi api;
private final Migrator migrator;
private MetadataService metadataService;
@Autowired
public BulkMigrationService(SourceDataApi api, Migrator migrator, MetadataService metadataService) {
this.api = api;
this.migrator = migrator;
this.metadataService = metadataService;
}
public void migrate(Integer batchSize, Long max) throws MigrationException {
Long currentCount = 0l;
Integer currentIndex = 0;
while (currentCount < max) {
List<String> itemsToMigrate = api.findItemRange(currentIndex, currentIndex + batchSize);
if (assetsToMigrate.size() > 0) {
MigrateForkedWorker starter = new MigrateForkedWorker(assetsToMigrate);
pool.invoke(starter);
}
currentCount += assetsToMigrate.size();
currentIndex += batchSize - 1;
if (log.isDebugEnabled()) {
log.debug("Migrated " + currentCount + " Items.");
}
}
}
public class MigrateForkedWorker extends RecursiveAction {
private int max = 10;
private List<String> allItems;
public MigrateForkedWorker(List<String> allItems) {
this.allItems = allItems;
}
@Override
protected void compute() {
if (allItems.size() <= max) {
for (String itemInfo : allItems) {
try {
migrator.migrateAsset(itemInfo);
}
catch (MigrationException e) {
e.printStackTrace();
}
}
}
else {
int targetSize = allItems.size() % 2 == 0 ? allItems.size()/2 : (allItems.size() + 1)/2;
List<List<String>> splits = Lists.partition(allItems, targetSize);
MigrateForkedWorker migrateForkedWorkerOne = new MigrateForkedWorker(splits.get(0));
MigrateForkedWorker migrateForkedWorkerTwo = new MigrateForkedWorker(splits.get(1));
invokeAll(migrateForkedWorkerOne, migrateForkedWorkerTwo);
}
}
}
}
Мне интересно, вижу ли я что-то вроде этого: http://stackoverflow.com/questions/16894929/forkjoinpool-stalls-during-invokeall-join Я, вероятно, просто не понимаю этого материала достаточно хорошо, чтобы его использовать. Прочтите это недавно в Интернете: у разработчика была проблема, и хотя «я знаю, я буду использовать потоки». Теперь у него 10 проблем – eric