2016-08-12 2 views
4

Добрый день.ExecutorService никогда не останавливается. При выполнении новой задачи внутри другой выполняемой задачи

У меня проблема с блокером с моим проектом веб-гусениц. Логика проста. Сначала создает один Runnable, он загружает html-документ, просматривает все ссылки, а затем по всем финансируемым ссылкам создает новые объекты Runnable. Каждый новый созданный Runnable, в свою очередь, создает новые объекты Runnable для каждой ссылки и выполняет их.

Проблема в том, что ExecutorService никогда не останавливается.

CrawlerTest.java

public class CrawlerTest { 

    public static void main(String[] args) throws InterruptedException { 
     new CrawlerService().crawlInternetResource("https://jsoup.org/"); 
    } 
} 

CrawlerService.java

import java.io.IOException; 
import java.util.Collections; 
import java.util.Set; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

import org.jsoup.Jsoup; 
import org.jsoup.nodes.Document; 
import org.jsoup.nodes.Element; 
import org.jsoup.select.Elements; 

public class CrawlerService { 

    private Set<String> uniqueUrls = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>(10000)); 
    private ExecutorService executorService = Executors.newFixedThreadPool(8); 
    private String baseDomainUrl; 

    public void crawlInternetResource(String baseDomainUrl) throws InterruptedException { 
     this.baseDomainUrl = baseDomainUrl; 
     System.out.println("Start"); 
     executorService.execute(new Crawler(baseDomainUrl)); //Run first thread and scan main domain page. This thread produce new threads. 
     executorService.awaitTermination(10, TimeUnit.MINUTES); 
     System.out.println("End"); 
    } 

    private class Crawler implements Runnable { // Inner class that encapsulates thread and scan for links 

     private String urlToCrawl; 

     public Crawler(String urlToCrawl) { 
      this.urlToCrawl = urlToCrawl; 
     } 

     public void run() { 
      try { 
       findAllLinks(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 

     private void findAllLinks() throws InterruptedException { 
      /*Try to add new url in collection, if url is unique adds it to collection, 
      * scan document and start new thread for finded links*/ 
      if (uniqueUrls.add(urlToCrawl)) { 
       System.out.println(urlToCrawl); 

       Document htmlDocument = loadHtmlDocument(urlToCrawl); 
       Elements findedLinks = htmlDocument.select("a[href]"); 

       for (Element link : findedLinks) { 
        String absLink = link.attr("abs:href"); 
        if (absLink.contains(baseDomainUrl) && !absLink.contains("#")) { //Check that we are don't go out of domain 
         executorService.execute(new Crawler(absLink)); //Start new thread for each funded link 
        } 
       } 
      } 
     } 

     private Document loadHtmlDocument(String internetResourceUrl) { 
      Document document = null; 
      try { 
       document = Jsoup.connect(internetResourceUrl).ignoreHttpErrors(true).ignoreContentType(true) 
         .userAgent("Mozilla/5.0 (Windows NT 6.1; WOW64; rv:48.0) Gecko/20100101 Firefox/48.0") 
         .timeout(10000).get(); 
      } catch (IOException e) { 
       System.out.println("Page load error"); 
       e.printStackTrace(); 
      } 
      return document; 
     } 
    } 
} 

Это приложение потребуется около 20 секунд для сканирования jsoup.org для всех уникальных ссылок. Но подождите 10 минут executorService.awaitTermination(10, TimeUnit.MINUTES); , а затем я вижу мертвую главную нить и все еще работающий исполнитель.

Threads

Как правильно заставить ExecutorService работу?

Я думаю, проблема заключается в том, что он вызывает executorService.execute внутри другой задачи вместо основного потока.

+0

Обработайте 'executorService' в try catch и напишите' executorService.shutdown(); 'в' finally' блоке. [Ссылка] (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html) – Imran

+0

@Imran Не работает. Он все еще ждет 10 минут, пока главный нить не умрет. Я думаю, проблема заключается в том, что он вызывает executorService.execute внутри другой задачи вместо основного потока. – Redeemer

ответ

2

Я вижу свой комментарий от ранее:

Я не могу использовать CountDownLatch, потому что я не знаю заранее, сколько уникальных ссылок соберу из ресурса.

Прежде всего, vsminkov находится на месте с ответом о том, почему awaitTermniation будет сидеть и ждать 10 минут. Я предложу альтернативное решение.

Вместо использования CountDownLatch используйте Phaser. Для каждой новой задачи вы можете зарегистрироваться и дождаться завершения.

Создание единого фазер и register каждый раз execute.submit вызывается и arrive каждый раз, когда Runnable завершается.

public void crawlInternetResource(String baseDomainUrl) { 
    this.baseDomainUrl = baseDomainUrl; 

    Phaser phaser = new Phaser(); 
    executorService.execute(new Crawler(phaser, baseDomainUrl)); 
    int phase = phaser.getPhase(); 
    phase.awaitAdvance(phase); 
} 

private class Crawler implements Runnable { 

    private final Phaser phaser; 
    private String urlToCrawl; 

    public Crawler(Phaser phaser, String urlToCrawl) { 
     this.urlToCrawl = urlToCrawl; 
     this.phaser = phaser; 
     phaser.register(); // register new task 
    } 

    public void run(){ 
     ... 
     phaser.arrive(); //may want to surround this in try/finally 
    } 
3

Вы неправильно используете awaitTermination. По словам Javadoc вы должны назвать shutdown первыми:

блоков, пока все задачи не будут завершены выполнение после запроса завершения работы, или тайм-аут происходит, или текущий поток прерывается, в зависимости от того происходит первым.

Для достижения своей цели я предлагаю использовать CountDownLatch (или защелка, что приращения поддержки как this one), чтобы определить точный момент, когда нет задачи осталось, так что вы смело можете сделать shutdown.

+0

Я не могу использовать CountDownLatch, потому что заранее не знаю, сколько уникальных ссылок я соберу с ресурса. – Redeemer

+0

@Redeemer Я отредактировал свой ответ – vsminkov

+0

Если вызов executorService.shutdown(); перед выполнением службы. Служба поддержки (10, TimeUnit.MINUTES); он просто ждет только первого потока, а crawller собирает только первую ссылку https://jsoup.org/. Я думаю, проблема заключается в том, что он вызывает executorService.execute внутри другой задачи вместо основного потока. – Redeemer

0

Вы не вызываете выключение.

Это может работать - переменная AtomicLong в CrawlerService. Инкремент перед каждой новой подзадачей передается службе исполнителя.

Измените метод Run() для уменьшения этого счетчика, и если 0, завершение работы службы исполнитель

public void run() { 
    try { 
     findAllLinks(); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } finally { 
     //decrements counter 
     //If 0, shutdown executor from here or just notify CrawlerService who would be doing wait(). 
    } 
} 

В «наконец», уменьшить счетчик и когда счетчик равен нулю, остановка исполнитель или просто уведомить CrawlerService. 0 означает, что это последний, никто другой не работает, ни один из них не находится в очереди. Никакая задача не представит никаких новых подзадач.

0

Как правильно заставить ExecutorService работу?

Я думаю, проблема в том, что он вызывает executorService.execute внутри другой задачи вместо основного потока.

Нет. Проблема не в ExecutorService. Вы используете API-интерфейсы некорректно и, следовательно, не получаете правильного результата.

Вы должны использовать три API в определенном порядке, чтобы получить правильный результат.

1. shutdown 
2. awaitTermination 
3. shutdownNow 

Рекомендуемый способ от оракула документации странице ExecutorService:

void shutdownAndAwaitTermination(ExecutorService pool) { 
    pool.shutdown(); // Disable new tasks from being submitted 
    try { 
    // Wait a while for existing tasks to terminate 
    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { 
     pool.shutdownNow(); // Cancel currently executing tasks 
     // Wait a while for tasks to respond to being cancelled 
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) 
      System.err.println("Pool did not terminate"); 
    } 
    } catch (InterruptedException ie) { 
    // (Re-)Cancel if current thread also interrupted 
    pool.shutdownNow(); 
    // Preserve interrupt status 
    Thread.currentThread().interrupt(); 
    } 

shutdown(): Инициирует процедуру завершения работы, в котором выполняются ранее представленные задачи, но никакие новые задачи не будут приняты.

shutdownNow(): Пытается прекратить все активное выполнение задач, останавливает обработку ожидающих задач и возвращает список задач, ожидающих выполнения.

awaitTermination(): Блокирует до тех пор, пока все задачи не будут выполнены после запроса на завершение работы или произойдет тайм-аут, или текущий поток будет прерван, в зависимости от того, что произойдет раньше.

На другой ноте: Если вы хотите ждать, пока все задачи для завершения, обратитесь к этому связанные SE вопрос:

wait until all threads finish their work in java

Я предпочитаю использовать invokeAll() или ForkJoinPool(), которые лучше всего подходят для использования дело.

Смежные вопросы