2016-12-13 2 views
8

Я довольно новичок в потоках Akka и Akka HTTP.Как использовать Akka HTTP для генерации содержимого через выходной поток

Я хотел бы создать простой HTTP-сервер, который может генерировать zip-файл из содержимого папки и отправлять ее клиенту.

org.zeroturnaround.zip.ZipUtil делает задачу создания zip-файла очень простой, но ему нужен outputStream.

Вот мое решение (написанный на Scala языке):

  val os = new ByteArrayOutputStream() 
      ZipUtil.pack(myFolder, os) 
      HttpResponse(entity = HttpEntity(
       MediaTypes.`application/zip`, 
       os.toByteArray)) 

Это решение работает, но сохраняет все содержимое памяти, так что это не является масштабируемым.

Я думаю, что ключ к решению этой проблемы является использование этого:

val source = StreamConverters.asOutputStream() 

, но не знаю, как использовать его. :-(

Любая помощь, пожалуйста?

ответ

3

У меня была такая же проблема. Для того, чтобы сделать его противодавление-совместимый я должен был написать искусственный InputStream, который позже преобразуется в Source через StreamConverters.fromInputStream(() => input), который, в свою очередь, когда вы вернетесь из вашего акка -http DSL complete директива.

Вот что я написал.

import java.io.{File, IOException, InputStream} 
import java.nio.charset.StandardCharsets 
import java.time.LocalDate 
import java.time.format.DateTimeFormatter 

import org.apache.commons.compress.archivers.sevenz.{SevenZArchiveEntry, SevenZFile} 

import scala.annotation.tailrec 
import scala.util.{Failure, Success, Try} 

class DownloadStatsZipReader(path: String, password: String) extends InputStream { 

    private val (archive, targetDate) = { 
    val inputFile = new SevenZFile(new File(path), password.getBytes(StandardCharsets.UTF_16LE.displayName())) 

    @tailrec 
    def findValidEntry(): Option[(LocalDate, SevenZArchiveEntry)] = 
     Option(inputFile.getNextEntry) match { 
     case Some(entry) => 
      if (!entry.isDirectory) { 
      val parts = entry.getName.toLowerCase.split("\\.(?=[^\\.]+$)") 
      if (parts(1) == "tab" && entry.getSize > 0) 
       Try(LocalDate.parse(parts(0), DateTimeFormatter.ISO_LOCAL_DATE)) match { 
       case Success(localDate) => 
        Some(localDate -> entry) 
       case Failure(_) => 
        findValidEntry() 
       } 
      else 
       findValidEntry() 
      } else 
      findValidEntry() 
     case None => None 
     } 

    val (date, _) = findValidEntry().getOrElse { 
     throw new RuntimeException(s"$path has no files named as `YYYY-MM-DD.tab`") 
    } 
    inputFile -> date 
    } 

    private val buffer = new Array[Byte](1024) 
    private var offsetBuffer: Int = 0 
    private var sizeBuffer: Int = 0 

    def getTargetDate: LocalDate = targetDate 

    override def read(): Int = 
    sizeBuffer match { 
     case -1 => 
     -1 
     case 0 => 
     loadNextChunk() 
     read() 
     case _ => 
     if (offsetBuffer < sizeBuffer) { 
      val result = buffer(offsetBuffer) 
      offsetBuffer += 1 
      result 
     } else { 
      sizeBuffer = 0 
      read() 
     } 
    } 

    @throws[IOException] 
    override def close(): Unit = { 
    archive.close() 
    } 

    private def loadNextChunk(): Unit = try { 
    val bytesRead = archive.read(buffer) 
    if (bytesRead >= 0) { 
     offsetBuffer = 0 
     sizeBuffer = bytesRead 
    } else { 
     offsetBuffer = -1 
     sizeBuffer = -1 
    } 
    } catch { 
    case ex: Throwable => 
     ex.printStackTrace() 
     throw ex 
    } 
} 

Если вы нашли ошибки в своем коде, пожалуйста, дайте мне знать.

9

Попробуйте

val byteSource: Source[ByteString, Unit] = StreamConverters.asOutputStream() 
    .mapMaterializedValue(os => ZipUtil.pack(myFolder, os)) 
HttpResponse(entity = HttpEntity(
      MediaTypes.`application/zip`, 
      byteSource)) 

Вы только получаете доступ к OutputStream как только источник получает овеществленного, который не может произойти сразу. Теоретически источник может также существовать много раз, поэтому вы должны иметь возможность справиться с этим.

+2

Если бы я знал это раньше :) – expert

+1

пишу это себя также было весьма поучительно и весело ... –

+0

Это выглядит легко, но он не в моем случае: 'java.lang.IllegalStateException: еще не инициализирован: только SetHandler разрешено в GraphStageLogic constructor' –

Смежные вопросы