2015-12-30 8 views
0

Я отправил некоторые Runnables в ExecutorService. Внутри этих Runnables вызывается wait() и notify(). Код работает с newFixedThreadPool как ExecutorService. С помощью newWorkStealingPool процесс неожиданно завершается без сообщения об ошибке.WorkStealingPool неожиданно завершает работу

import java.net.URL; 
import java.io.BufferedReader; 
import java.io.InputStreamReader; 
import java.io.IOException; 

// For regular expressions 
import java.util.regex.Matcher; 
import java.util.regex.MatchResult; 
import java.util.regex.Pattern; 

import java.util.*; 
import java.util.concurrent.*; 

public class TestPipeline { 
    public static void main(String[] args) { 
    runAsThreads(); 
    } 

    private static void runAsThreads() { 
    final BlockingQueue<String> urls = new OneItemQueue<String>(); 
    final BlockingQueue<Webpage> pages = new OneItemQueue<Webpage>(); 
    final BlockingQueue<Link> refPairs = new OneItemQueue<Link>(); 
    final BlockingQueue<Link> uniqRefPairs = new OneItemQueue<Link>(); 

    final ExecutorService executor = Executors.newWorkStealingPool(6); 
// final ExecutorService executor = Executors.newFixedThreadPool(6); 

    executor.submit(new UrlProducer(urls)); 
    executor.submit(new PageGetter(urls, pages)); 
    executor.submit(new LinkScanner(pages,refPairs)); 
    executor.submit(new Uniquifier<Link>(refPairs,uniqRefPairs)); 
    executor.submit(new LinkPrinter(uniqRefPairs)); 
    } 
} 

class UrlProducer implements Runnable { 
    private final BlockingQueue<String> output; 

    public UrlProducer(BlockingQueue<String> output) { 
    this.output = output; 
    } 

    public void run() { 
    System.out.println("in producer"); 
    for (int i=0; i<urls.length; i++) 
     output.put(urls[i]); 
    } 

    private static final String[] urls = 
    { "http://www.itu.dk", "http://www.di.ku.dk", "http://www.miele.de", 
    "http://www.microsoft.com", "http://www.cnn.com", "http://www.dr.dk", 
    "http://www.vg.no", "http://www.tv2.dk", "http://www.google.com", 
    "http://www.ing.dk", "http://www.dtu.dk", "http://www.bbc.co.uk" 
    }; 
} 

class PageGetter implements Runnable { 
    private final BlockingQueue<String> input; 
    private final BlockingQueue<Webpage> output; 

    public PageGetter(BlockingQueue<String> input, BlockingQueue<Webpage> output) { 
    this.input = input; 
    this.output = output; 
    } 

    public void run() { 
    while (true) { 
    System.out.println("in pagegetter"); 
     String url = input.take(); 
     //  System.out.println("PageGetter: " + url); 
     try { 
     String contents = getPage(url, 200); 
     output.put(new Webpage(url, contents)); 
     } catch (IOException exn) { System.out.println(exn); } 
    } 
    } 

    public static String getPage(String url, int maxLines) throws IOException { 
    // This will close the streams after use (JLS 8 para 14.20.3): 
    try (BufferedReader in 
     = new BufferedReader(new InputStreamReader(new URL(url).openStream()))) { 
     StringBuilder sb = new StringBuilder(); 
     for (int i=0; i<maxLines; i++) { 
     String inputLine = in.readLine(); 
     if (inputLine == null) 
      break; 
     else 
     sb.append(inputLine).append("\n"); 
     } 
     return sb.toString(); 
    } 
    } 
} 

class Uniquifier<T> implements Runnable{ 
    private final Set<T> set = new HashSet<T>(); 
    private final BlockingQueue<T> input; 
    private final BlockingQueue<T> output; 

    public Uniquifier(BlockingQueue<T> input, BlockingQueue<T> output){ 
    this.input = input; 
    this.output = output; 
    } 


    public void run(){ 
    while(true){ 
     System.out.println("in uniquifier"); 
     T item = input.take(); 
     if(!set.contains(item)){ 
     set.add(item); 
     output.put(item); 
     } 
    } 
    } 

} 

class LinkScanner implements Runnable { 
    private final BlockingQueue<Webpage> input; 
    private final BlockingQueue<Link> output; 

    public LinkScanner(BlockingQueue<Webpage> input, 
        BlockingQueue<Link> output) { 
    this.input = input; 
    this.output = output; 
    } 

    private final static Pattern urlPattern 
    = Pattern.compile("a href=\"(\\p{Graph}*)\""); 

    public void run() { 
    while (true) { 
     System.out.println("in link scanner"); 
     Webpage page = input.take(); 
     //  System.out.println("LinkScanner: " + page.url); 
     // Extract links from the page's <a href="..."> anchors 
     Matcher urlMatcher = urlPattern.matcher(page.contents); 
     while (urlMatcher.find()) { 
     String link = urlMatcher.group(1); 
     output.put(new Link(page.url, link)); 
     } 
    } 
    } 
} 

class LinkPrinter implements Runnable { 
    private final BlockingQueue<Link> input; 

    public LinkPrinter(BlockingQueue<Link> input) { 
    this.input = input; 
    } 

    public void run() { 
    while (true) { 
     System.out.println("in link printer"); 
     Link link = input.take(); 
     //  System.out.println("LinkPrinter: " + link.from); 
     System.out.printf("%s links to %s%n", link.from, link.to); 
    } 
    } 
} 


class Webpage { 
    public final String url, contents; 
    public Webpage(String url, String contents) { 
    this.url = url; 
    this.contents = contents; 
    } 
} 

class Link { 
    public final String from, to; 
    public Link(String from, String to) { 
    this.from = from; 
    this.to = to; 
    } 

    // Override hashCode and equals so can be used in HashSet<Link> 

    public int hashCode() { 
    return (from == null ? 0 : from.hashCode()) * 37 
     + (to == null ? 0 : to.hashCode()); 
    } 

    public boolean equals(Object obj) { 
    Link that = obj instanceof Link ? (Link)obj : null; 
    return that != null 
     && (from == null ? that.from == null : from.equals(that.from)) 
     && (to == null ? that.to == null : to.equals(that.to)); 
    } 
} 

// Different from java.util.concurrent.BlockingQueue: Allows null 
// items, and methods do not throw InterruptedException. 

interface BlockingQueue<T> { 
    void put(T item); 
    T take(); 
} 

class OneItemQueue<T> implements BlockingQueue<T> { 
    private T item; 
    private boolean full = false; 

    public void put(T item) { 
    synchronized (this) { 
     while (full) { 
     try { this.wait(); } 
     catch (InterruptedException exn) { } 
     } 
     full = true; 
     this.item = item; 
     this.notifyAll(); 
    } 
    } 

    public T take() { 
    synchronized (this) { 
     while (!full) { 
     try { this.wait(); } 
     catch (InterruptedException exn) { } 
     } 
     full = false; 
     this.notifyAll(); 
     return item; 
    } 
    } 
} 
+0

попробуйте добавить заявление отладки после последнего отправления. Я предполагаю, что вы увидите это до выхода программы. Я думаю, что происходит, связано с тем, что пул workstealing может изменять размеры потоков. Вероятно, нет, когда вы дойдете до конца метода runAsThreads, а затем выйдет основной поток. Вы видите свои другие sysouts? – JimmyJames

+0

Вы на месте. Постановка ожиданий. После последней подачи решает проблему. – PNT

+0

Прохладный, я отвечу. – JimmyJames

ответ

0

Поскольку бассейн выделяет темы динамически, нет никаких нитей в живых после runAsThreads выходов, потому что это конец основного потока. Для поддержания работоспособности приложения должно быть по крайней мере на потоке. Необходимо добавить вызов для ожидания. Он не нужен для фиксированного пула, потому что он всегда будет иметь активные потоки, пока он не будет явно закрыт, как указано в JavaDocs.

Смежные вопросы