У меня есть список файлов. Я хочу:Akka streams: Чтение нескольких файлов
- Чтобы прочитать их все как один источник.
- Файлы следует читать последовательно, в порядке. (no round-robin)
- Ни в коем случае не следует требовать, чтобы какой-либо файл был полностью в памяти.
- Ошибка чтения из файла должна свернуть поток.
Он чувствовал, как это должно работать: (Scala, Акка-потоки v2.4.7)
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
)
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_)))
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
Но это приводит к ошибке компиляции, поскольку FileIO
имеет материализовалась значение, связанное с ним, и Source.combine
Безразлично Не поддерживайте это.
Картирование материализованная значение далеко заставляет меня задаться вопросом, как файл-чтение ошибок получить обрабатываются, но компилируется:
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
.mapMaterializedValue(f => NotUsed.getInstance())
)
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_)))
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
Но бросает IllegalArgumentException во время выполнения:
java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]
Я искал модульные, поэтому я ценю это. Я использовал количество строк в качестве примера того, что я мог сделать с файлами, и 'lineCounter', как написанные conflates, которые с чтением файла. (Это раковина). Но если я переведу фальцовку и все после нее в другом месте, я остаюсь с Flow [Path, String, NotUsed], который именно то, что я искал. – randomstatistic
Не могли бы вы предоставить импорт с помощью своих примеров, они являются неотъемлемой частью кода. –
@OsskarWerrewka Все должно быть в akka.stream.scaladsl и java IO/NIO. У вас была проблема с этим? –