Я делаю сравнение между реализациями Scala vs Java Reactive Spec с использованием akka-stream и RxJava соответственно. Мой пример использования упрощен grep
: Учитывая каталог, фильтр файлов и текст поиска, я смотрю в этом каталоге все соответствующие файлы, содержащие текст. Затем я передаю пару (filename -> matching line). Это отлично работает для Java, но для Scala ничего не печатается. Исключений нет, но выхода нет. Данные для теста загружаются из Интернета, но, как вы можете видеть, код может быть легко протестирован с любым локальным каталогом.Scala vs Java Streaming: Scala ничего не печатает, работает Java
Scala:
object Transformer {
implicit val system = ActorSystem("transformer")
implicit val materializer = ActorMaterializer()
implicit val executionContext: ExecutionContext = {
implicitly
}
import collection.JavaConverters._
def run(path: String, text: String, fileFilter: String) = {
Source.fromIterator {() =>
Files.newDirectoryStream(Paths.get(path), fileFilter).iterator().asScala
}.map(p => {
val lines = io.Source.fromFile(p.toFile).getLines().filter(_.contains(text)).map(_.trim).to[ImmutableList]
(p, lines)
})
.runWith(Sink.foreach(e => println(s"${e._1} -> ${e._2}")))
}
}
Java:
public class Transformer {
public static void run(String path, String text, String fileFilter) {
Observable.from(files(path, fileFilter)).flatMap(p -> {
try {
return Observable.from((Iterable<Map.Entry<String, List<String>>>) Files.lines(p)
.filter(line -> line.contains(text))
.map(String::trim)
.collect(collectingAndThen(groupingBy(pp -> p.toAbsolutePath().toString()), Map::entrySet)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).toBlocking().forEach(e -> System.out.printf("%s -> %s.%n", e.getKey(), e.getValue()));
}
private static Iterable<Path> files(String path, String fileFilter) {
try {
return Files.newDirectoryStream(Paths.get(path), fileFilter);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
тест Unit с помощью Scala Test:
class TransformerSpec extends FlatSpec with Matchers {
"Transformer" should "extract temperature" in {
Transformer.run(NoaaClient.currentConditionsPath(), "temp_f", "*.xml")
}
"Java Transformer" should "extract temperature" in {
JavaTransformer.run(JavaNoaaClient.currentConditionsPath(false), "temp_f", "*.xml")
}
}
Я не знаю, о AKKA потоков, но в вашем Java версии вы преобразования Observable 'toBlocking()', чтобы ваш поток выполнения не закончит, пока цепь не будет завершена, но это выглядит как Акку версия все работает async. Может быть, это просто факт, что ваш виртуальный/тестовый тест заканчивается до того, как версия Akka получит шанс завершить? – tddmonkey
Вы пробовали включать в себя заявление о печати здесь и там в версии scala и посмотреть, действительно ли он работает? –
@MrWiggles Вы были рядом, см. Мой ответ ниже. Спасибо. –