2016-02-05 2 views
4

Я делаю сравнение между реализациями Scala vs Java Reactive Spec с использованием akka-stream и RxJava соответственно. Мой пример использования упрощен grep: Учитывая каталог, фильтр файлов и текст поиска, я смотрю в этом каталоге все соответствующие файлы, содержащие текст. Затем я передаю пару (filename -> matching line). Это отлично работает для Java, но для Scala ничего не печатается. Исключений нет, но выхода нет. Данные для теста загружаются из Интернета, но, как вы можете видеть, код может быть легко протестирован с любым локальным каталогом.Scala vs Java Streaming: Scala ничего не печатает, работает Java

Scala:

object Transformer { 
    implicit val system = ActorSystem("transformer") 
    implicit val materializer = ActorMaterializer() 
    implicit val executionContext: ExecutionContext = { 
    implicitly 
    } 

    import collection.JavaConverters._ 

    def run(path: String, text: String, fileFilter: String) = { 
    Source.fromIterator {() => 
     Files.newDirectoryStream(Paths.get(path), fileFilter).iterator().asScala 
    }.map(p => { 
     val lines = io.Source.fromFile(p.toFile).getLines().filter(_.contains(text)).map(_.trim).to[ImmutableList] 
     (p, lines) 
    }) 
     .runWith(Sink.foreach(e => println(s"${e._1} -> ${e._2}"))) 
    } 
} 

Java:

public class Transformer { 
    public static void run(String path, String text, String fileFilter) { 
     Observable.from(files(path, fileFilter)).flatMap(p -> { 
      try { 
       return Observable.from((Iterable<Map.Entry<String, List<String>>>) Files.lines(p) 
         .filter(line -> line.contains(text)) 
         .map(String::trim) 
         .collect(collectingAndThen(groupingBy(pp -> p.toAbsolutePath().toString()), Map::entrySet))); 
      } catch (IOException e) { 
       throw new UncheckedIOException(e); 
      } 
     }).toBlocking().forEach(e -> System.out.printf("%s -> %s.%n", e.getKey(), e.getValue())); 
    } 

    private static Iterable<Path> files(String path, String fileFilter) { 
     try { 
      return Files.newDirectoryStream(Paths.get(path), fileFilter); 
     } catch (IOException e) { 
      throw new UncheckedIOException(e); 
     } 
    } 
} 

тест Unit с помощью Scala Test:

class TransformerSpec extends FlatSpec with Matchers { 
    "Transformer" should "extract temperature" in { 
    Transformer.run(NoaaClient.currentConditionsPath(), "temp_f", "*.xml") 
    } 

    "Java Transformer" should "extract temperature" in { 
    JavaTransformer.run(JavaNoaaClient.currentConditionsPath(false), "temp_f", "*.xml") 
    } 
} 
+1

Я не знаю, о AKKA потоков, но в вашем Java версии вы преобразования Observable 'toBlocking()', чтобы ваш поток выполнения не закончит, пока цепь не будет завершена, но это выглядит как Акку версия все работает async. Может быть, это просто факт, что ваш виртуальный/тестовый тест заканчивается до того, как версия Akka получит шанс завершить? – tddmonkey

+0

Вы пробовали включать в себя заявление о печати здесь и там в версии scala и посмотреть, действительно ли он работает? –

+0

@MrWiggles Вы были рядом, см. Мой ответ ниже. Спасибо. –

ответ

1

Dang, я забыл, что Source возвращает Future, что означает, что поток никогда не запускался. Комментарий @MrWiggles дал мне подсказку. Следующий код Scala дает эквивалентный результат в виде версии Java.

Примечание: Код в моем вопросе не закрывал DirectoryStream, который, для каталогов с большим количеством файлов, вызвавшим java.io.IOException: Too many open files in system. Приведенный ниже код правильно закрывает ресурсы.

def run(path: String, text: String, fileFilter: String) = { 
    val files = Files.newDirectoryStream(Paths.get(path), fileFilter) 

    val future = Source(files.asScala.toList).map(p => { 
    val lines = io.Source.fromFile(p.toFile).getLines().filter(_.contains(text)).map(_.trim).to[ImmutableList] 
    (p, lines) 
    }) 
    .filter(!_._2.isEmpty) 
    .runWith(Sink.foreach(e => println(s"${e._1} -> ${e._2}"))) 

    Await.result(future, 10.seconds) 

    files.close 

    true // for testing 
}