2012-02-16 2 views
13

Я пытаюсь построить свои индексы в Lucene с несколькими потоками. Итак, я начал свою кодировку и написал следующий код. Сначала я нахожу файлы и для каждого файла, я создаю поток для его индексации. После этого я присоединяюсь к потокам и оптимизирую индексы. Он работает, но я не уверен ... могу ли я доверять ему в больших масштабах? Есть ли способ улучшить его?Улучшить многопоточное индексирование с помощью lucene

import java.io.File; 
import java.io.FileFilter; 
import java.io.FileReader; 
import java.io.IOException; 
import java.io.File; 
import java.io.FileReader; 
import java.io.BufferedReader; 
import org.apache.lucene.index.IndexWriter; 
import org.apache.lucene.document.Field; 
import org.apache.lucene.document.Document; 
import org.apache.lucene.store.RAMDirectory; 
import org.apache.lucene.analysis.standard.StandardAnalyzer; 
import org.apache.lucene.analysis.StopAnalyzer; 
import org.apache.lucene.index.IndexReader; 
import org.apache.lucene.store.Directory; 
import org.apache.lucene.store.FSDirectory; 
import org.apache.lucene.util.Version; 
import org.apache.lucene.index.TermFreqVector; 

public class mIndexer extends Thread { 

    private File ifile; 
    private static IndexWriter writer; 

    public mIndexer(File f) { 
    ifile = f.getAbsoluteFile(); 
    } 

    public static void main(String args[]) throws Exception { 
    System.out.println("here..."); 

    String indexDir; 
     String dataDir; 
    if (args.length != 2) { 
     dataDir = new String("/home/omid/Ranking/docs/"); 
     indexDir = new String("/home/omid/Ranking/indexes/"); 
    } 
    else { 
     dataDir = args[0]; 
     indexDir = args[1]; 
    } 

    long start = System.currentTimeMillis(); 

    Directory dir = FSDirectory.open(new File(indexDir)); 
    writer = new IndexWriter(dir, 
    new StopAnalyzer(Version.LUCENE_34, new File("/home/omid/Desktop/stopwords.txt")), 
    true, 
    IndexWriter.MaxFieldLength.UNLIMITED); 
    int numIndexed = 0; 
    try { 
     numIndexed = index(dataDir, new TextFilesFilter()); 
    } finally { 
     long end = System.currentTimeMillis(); 
     System.out.println("Indexing " + numIndexed + " files took " + (end - start) + " milliseconds"); 
     writer.optimize(); 
     System.out.println("Optimization took place in " + (System.currentTimeMillis() - end) + " milliseconds"); 
     writer.close(); 
    } 
    System.out.println("Enjoy your day/night"); 
    } 

    public static int index(String dataDir, FileFilter filter) throws Exception { 
    File[] dires = new File(dataDir).listFiles(); 
    for (File d: dires) { 
     if (d.isDirectory()) { 
     File[] files = new File(d.getAbsolutePath()).listFiles(); 
     for (File f: files) { 
      if (!f.isDirectory() && 
      !f.isHidden() && 
      f.exists() && 
      f.canRead() && 
      (filter == null || filter.accept(f))) { 
       Thread t = new mIndexer(f); 
       t.start(); 
       t.join(); 
      } 
     } 
     } 
    } 
    return writer.numDocs(); 
    } 

    private static class TextFilesFilter implements FileFilter { 
    public boolean accept(File path) { 
     return path.getName().toLowerCase().endsWith(".txt"); 
    } 
    } 

    protected Document getDocument() throws Exception { 
    Document doc = new Document(); 
    if (ifile.exists()) { 
     doc.add(new Field("contents", new FileReader(ifile), Field.TermVector.YES)); 
     doc.add(new Field("path", ifile.getAbsolutePath(), Field.Store.YES, Field.Index.NOT_ANALYZED)); 
     String cat = "WIR"; 
     cat = ifile.getAbsolutePath().substring(0, ifile.getAbsolutePath().length()-ifile.getName().length()-1); 
     cat = cat.substring(cat.lastIndexOf('/')+1, cat.length()); 
     //doc.add(new Field("category", cat.subSequence(0, cat.length()), Field.Store.YES)); 
     //System.out.println(cat.subSequence(0, cat.length())); 
    } 
    return doc; 
    } 

    public void run() { 
    try { 
     System.out.println("Indexing " + ifile.getAbsolutePath()); 
     Document doc = getDocument(); 
     writer.addDocument(doc); 
    } catch (Exception e) { 
     System.out.println(e.toString()); 
    } 

    } 
} 

Любой hep считается.

ответ

13

Если вы хотите, чтобы распараллелить индексацию, есть две вещи, которые вы можете сделать:

  • распараллеливания звонки в addDocument,
  • увеличивающие максимальное количество потоков вашего слияния планировщика.

Вы находитесь на правильном пути для распараллеливания вызовов в addDocuments, но нерестование одного потока на документ не будет масштабироваться по мере увеличения количества документов, которое вам нужно индексировать. Вы должны использовать фиксированный размер ThreadPoolExecutor. Поскольку эта задача в основном зависит от процессора (в зависимости от вашего анализатора и способа получения ваших данных), задайте количество процессоров вашего компьютера, так как максимальное количество потоков может быть хорошим началом.

Что касается планировщика слияния, вы можете увеличить максимальное количество потоков, которое может использоваться с setMaxThreadCount method of ConcurrentMergeScheduler. Помните, что диски намного лучше при последовательном чтении/записи, чем случайное чтение/запись, поскольку, как следствие, слишком высокое максимальное количество потоков для вашего планировщика слияния, скорее всего, замедлит индексирование, чем ускоряет его.

Но прежде чем пытаться распараллелить процесс индексирования, вы должны, вероятно, попытаться найти место узкого места. Если ваш диск слишком медленный, узким местом, скорее всего, будет флеш и шаги слияния, в результате параллелизирующие вызовы addDocument (которые по существу состоят в анализе документа и буферизации результата анализа в памяти) не улучшат скорость индексирования вообще.

Некоторые побочные ноты:

  • Существует некоторая постоянная работа в разрабатываемой версии Lucene с целью улучшения индексации параллелизм (промывочный часть особенно это blog entry объясняет, как это работает).

  • У Lucene есть хорошая страница wiki на how to improve indexing speed, где вы найдете другие способы улучшения скорости индексирования.

+0

Я очень ценю ваш Ваш комментарий к количеству потоков был действительно полезен. Я не упоминал об этом раньше ... – orezvani

5

Я думаю, что более современный способ сделать это - использовать ThreadPoolExecutor и отправить Runnable, что делает вашу индексацию. Вы можете дождаться завершения всех потоков с использованием .awaitTermination или CountdownLatch.

Я не большой поклонник расширения вашего основного класса Thread, просто создайте runnable внутренний класс, который принимает его depdencies в конструкторе. Это делает ваш код более читабельным, так как работа, выполняемая потоками, четко отделена от вашего кода установки приложения.

Несколько примечаний по стилю, я не большой поклонник вашего исключительного исключения класса, это обычно означает, что у вас нет четкого представления о разных проверенных случаях исключения, код, который вы используете, может бросать , Обычно это нехорошо, если у вас нет особых причин.

+0

Благодарим вас заранее. На самом деле я внедрил Runnable, который был хорошей идеей и использовал ThreadPoolExecutor, который решил настоящую ошибку в программе, упомянутой jpountz. – orezvani

+0

Недостатком 'awaitTermination' является то, что он не дожидается завершения всех потоков, но выйдет после n единиц времени. :-(Нужна петля. –

+0

согласны с этим, получится, что IndexWriter не будет закрыт должным образом, а writer_lock все равно будет существовать, даже индекс Directory не манипулирует индексиром. – JasonHuang