2016-06-13 5 views
5

У меня есть список файлов. Я хочу:Akka streams: Чтение нескольких файлов

  1. Чтобы прочитать их все как один источник.
  2. Файлы следует читать последовательно, в порядке. (no round-robin)
  3. Ни в коем случае не следует требовать, чтобы какой-либо файл был полностью в памяти.
  4. Ошибка чтения из файла должна свернуть поток.

Он чувствовал, как это должно работать: (Scala, Акка-потоки v2.4.7)

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath) 
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true)) 
    .map(bs => bs.utf8String) 
) 
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_))) 
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines 

Но это приводит к ошибке компиляции, поскольку FileIO имеет материализовалась значение, связанное с ним, и Source.combine Безразлично Не поддерживайте это.

Картирование материализованная значение далеко заставляет меня задаться вопросом, как файл-чтение ошибок получить обрабатываются, но компилируется:

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath) 
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true)) 
    .map(bs => bs.utf8String) 
    .mapMaterializedValue(f => NotUsed.getInstance()) 
) 
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_))) 
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines 

Но бросает IllegalArgumentException во время выполнения:

java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out] 

ответ

8

Код, приведенный ниже, не настолько краток, насколько это могло бы быть, чтобы четко разделить различные проблемы.

// Given a stream of bytestrings delimited by the system line separator we can get lines represented as Strings 
val lines = Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true).map(bs => bs.utf8String) 

// given as stream of Paths we read those files and count the number of lines 
val lineCounter = Flow[Path].flatMapConcat(path => FileIO.fromPath(path).via(lines)).fold(0l)((count, line) => count + 1).toMat(Sink.head)(Keep.right) 

// Here's our test data source (replace paths with real paths) 
val testFiles = Source(List("somePathToFile1", "somePathToFile2").map(new File(_).toPath)) 

// Runs the line counter over the test files, returns a Future, which contains the number of lines, which we then print out to the console when it completes 
testFiles runWith lineCounter foreach println 
+0

Я искал модульные, поэтому я ценю это. Я использовал количество строк в качестве примера того, что я мог сделать с файлами, и 'lineCounter', как написанные conflates, которые с чтением файла. (Это раковина). Но если я переведу фальцовку и все после нее в другом месте, я остаюсь с Flow [Path, String, NotUsed], который именно то, что я искал. – randomstatistic

+0

Не могли бы вы предоставить импорт с помощью своих примеров, они являются неотъемлемой частью кода. –

+1

@OsskarWerrewka Все должно быть в akka.stream.scaladsl и java IO/NIO. У вас была проблема с этим? –

-1

У меня есть один ответ из ворот - не используйте akka.FileIO. Это, похоже, работает нормально, например:

val sources = Seq("sample.txt", "sample2.txt").map(io.Source.fromFile(_).getLines()).reduce(_ ++ _) 
val source = Source.fromIterator[String](() => sources) 
val lineCount = source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) 

Я все еще хотел бы знать, есть ли лучшее решение.

+0

С помощью 'io.Source' вы теряете много энергии. Для небольших файлов это может работать, но это не вариант для больших. – jarandaf

+0

@jarandaf Вы можете уточнить? У меня создалось впечатление, что io.Source просто использовал BufferedReader под капотом, а итератор getLines не загружает весь файл сразу или что-то в этом роде. – randomstatistic

+0

Лучше подумайте, возможно, вы правы (хотя 'FileIO' обрабатывает' ByteString' вместо 'String', который должен быть более совершенным). С другой стороны, с 'io.Source' всегда нужно иметь в виду закрыть источник (который не выполняется по умолчанию). – jarandaf

2

Update О, я не видел, принятый ответ, потому что я не обновить страницу> _ <. В любом случае, я оставлю это здесь, так как также добавил некоторые примечания об обработке ошибок.

Я считаю, что следующая программа делает то, что вы хотите:

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, IOResult} 
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source} 
import akka.util.ByteString 
import scala.concurrent.{Await, Future} 
import scala.util.{Failure, Success} 
import scala.util.control.NonFatal 
import java.nio.file.Paths 
import scala.concurrent.duration._ 

object TestMain extends App { 
    implicit val actorSystem = ActorSystem("test") 
    implicit val materializer = ActorMaterializer() 
    implicit def ec = actorSystem.dispatcher 

    val sources = Vector("build.sbt", ".gitignore") 
    .map(Paths.get(_)) 
    .map(p => 
     FileIO.fromPath(p) 
     .viaMat(Framing.delimiter(ByteString(System.lineSeparator()), Int.MaxValue, allowTruncation = true))(Keep.left) 
     .mapMaterializedValue { f => 
      f.onComplete { 
      case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from $p") 
      case Success(r) => println(s"Something went wrong when reading $p: ${r.getError}") 
      case Failure(NonFatal(e)) => println(s"Something went wrong when reading $p: $e") 
      } 
      NotUsed 
     } 
    ) 
    val finalSource = Source(sources).flatMapConcat(identity) 

    val result = finalSource.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) 
    result.onComplete { 
    case Success(n) => println(s"Read $n lines total") 
    case Failure(e) => println(s"Reading failed: $e") 
    } 
    Await.ready(result, 10.seconds) 

    actorSystem.terminate() 
} 

Ключевым моментом здесь является flatMapConcat() метод: он преобразует каждый элемент потока в источник и возвращает поток элементов, выданное этими источниками, если это они запускаются последовательно.

Что касается обработки ошибок, вы можете добавить обработчик к будущему в mapMaterializedValue аргументе, или вы можете обрабатывать окончательную ошибку бегущего потока, помещая обработчик на Sink.foreach материализовался будущей стоимостью. Я сделал оба в приведенном выше примере, и если вы протестируете его, скажем, в несуществующем файле, вы увидите, что одно и то же сообщение об ошибке будет напечатано дважды. К сожалению, flatMapConcat() не собирает материализованные значения, и, честно говоря, я не вижу, как он может сделать это здорово, поэтому вы должны обращаться с ними отдельно, если это необходимо.

Смежные вопросы