2015-11-06 2 views
2

Я делаю приложение с Spark, которое будет запускать некоторые алгоритмы вытеснения темы. Для этого сначала мне нужно сделать некоторую предварительную обработку, доведя матрицу документа до конца. Ive мог это сделать, но для (не так уж много) большой коллекции документов (всего 2 тысячи, 5 МБ) этот процесс длится вечно.Spark - Слишком длинная операция

Итак, отлаживая, Ive обнаружил, что программа похожа на stacks, и она находится в операции уменьшения. То, что я делаю в этой части кода, - это подсчет того, сколько раз каждый термин происходит в коллекции, поэтому сначала я сделал «карту», ​​вычеркнув ее для каждого rdd, и я «уменьшу» ее, сохранив результат внутри хешмап. Операция карты выполняется очень быстро, но в сокращении ее разделение выполняется в 40 блоков, и каждый блок занимает 5 ~ 10 минут для выполнения.

Так что я пытаюсь выяснить, что я делаю неправильно, или если операции сокращения настолько дорогостоящие.

SparkConf: Автономный режим, используя локальный [2]. Я попытался использовать его как «spark: // master: 7077», и он сработал, но все тот же медленный.

Код:

«Filesin» является JavaPairRDD, где ключевым является путь к файлу, а значение содержание файла. Итак, сначала карта, в которой я беру этот «filesIn», разбиваю слова и подсчитываю их частоту (в этом случае не имеет значения, какой документ) А потом сокращение, где я создаю HashMap (термин, частота).

JavaRDD<HashMap<String, Integer>> termDF_ = filesIn.map(new Function<Tuple2<String, String>, HashMap<String, Integer>>() { 

     @Override 
     public HashMap<String, Integer> call(Tuple2<String, String> t) throws Exception { 
      String[] allWords = t._2.split(" "); 

      HashMap<String, Double> hashTermFreq = new HashMap<String, Double>(); 
      ArrayList<String> words = new ArrayList<String>(); 
      ArrayList<String> terms = new ArrayList<String>(); 
      HashMap<String, Integer> termDF = new HashMap<String, Integer>(); 

      for (String term : allWords) { 

       if (hashTermFreq.containsKey(term)) { 
        Double freq = hashTermFreq.get(term); 
        hashTermFreq.put(term, freq + 1); 
       } else { 
        if (term.length() > 1) { 
         hashTermFreq.put(term, 1.0); 
         if (!terms.contains(term)) { 
          terms.add(term); 
         } 
         if (!words.contains(term)) { 
          words.add(term); 
          if (termDF.containsKey(term)) { 
           int value = termDF.get(term); 
           value++; 
           termDF.put(term, value); 
          } else { 
           termDF.put(term, 1); 
          } 
         } 
        } 
       } 
      } 
      return termDF; 
     } 
    }); 

HashMap<String, Integer> termDF = termDF_.reduce(new Function2<HashMap<String, Integer>, HashMap<String, Integer>, HashMap<String, Integer>>() { 

     @Override 
     public HashMap<String, Integer> call(HashMap<String, Integer> t1, HashMap<String, Integer> t2) throws Exception { 
      HashMap<String, Integer> result = new HashMap<String, Integer>(); 

      Iterator iterator = t1.keySet().iterator(); 

      while (iterator.hasNext()) { 
       String key = (String) iterator.next(); 
       if (result.containsKey(key) == false) { 
        result.put(key, t1.get(key)); 
       } else { 
        result.put(key, result.get(key) + 1); 
       } 

      } 

      iterator = t2.keySet().iterator(); 

      while (iterator.hasNext()) { 
       String key = (String) iterator.next(); 
       if (result.containsKey(key) == false) { 
        result.put(key, t2.get(key)); 
       } else { 
        result.put(key, result.get(key) + 1); 
       } 

      } 

      return result; 
     } 
    }); 

Спасибо!

ответ

1

ОК, так что в непосредственной близости от верхней части моей головы:

  • Свечи преобразования являются ленивыми. Это означает, что map не выполняется, пока вы не вызовете последующее reduce действие так, что вы описали, как медленно reduce, скорее всего, медленно map + reduce
  • ArrayList.contains является O (N), так что все эти words.contains и terms.contains крайне неэффективно
  • map логика запахи рыбные. В частности:
    • если срок уже видели вы никогда не получите в else ветви
    • на первый взгляд words и terms должны иметь точно такое же содержание и должно быть эквивалентно hashTermFreq ключей или termDF ключей.
    • Похоже, что значения в termDF могут принимать только значение 1. Если это то, что вы хотите, и вы игнорируете частоты, то зачем создавать hashTermFreq?
  • reduce Фаза, как реализовано здесь, означает неэффективное линейное сканирование с растущим объектом по данным, в то время как вы, что вы действительно хотите, это reduceByKey.

Использование Scala в псевдокоде весь ваш код может быть эффективно выражена следующим образом:

val termDF = filesIn.flatMap{ 
    case (_, text) => 
    text.split(" ") // Split 
    .toSet // Take unique terms 
    .filter(_.size > 1) // Remove single characters 
    .map(term => (term, 1))} // map to pairs 
    .reduceByKey(_ + _) // Reduce by key 

termDF.collectAsMap // Optionally 

Наконец он выглядит, как вы заново изобретать колесо. По крайней мере, некоторые инструменты, которые вам нужны, уже реализованы в mllib.feature или ml.feature

+0

Я преобразовал ваш код в java, и он работал нормально. Только часть «collectasmap» все еще выглядит медленной, но я думаю, мне не нужно ее собирать, так что все в порядке. Есть две причины, по которым я пытаюсь сам ее создать: 1. Preprocess сильно меняется, поэтому я подумал, что если я создаю собственное приложение с нуля, будет легче сделать изменения/улучшения. 2. Я предполагаю, что основная причина: когда я использовал mllib, я мог легко сделать TFIDF, но я не мог понять, как связать TF в векторе с термином и его документом. Спасибо за вашу помощь ноль. –

Смежные вопросы