2017-01-21 1 views
1

Я хотел бы пополнить zip, созданный «на лету» (не помещая его полностью в память) с помощью play-framework 2.5, используя поток akka с противодавлением. Здесь мой код с небольшим zip, созданным «на лету» (16 КБ). Когда клиент загружает URL-адрес, связанный с действием, загрузка не запускается.поток zip, созданный на ходу с игрой 2.5 и аккой с обратным протиранием

import java.util.zip.{ ZipEntry, ZipOutputStream, GZIPOutputStream } 
import akka.stream.scaladsl._ 
import akka.util.ByteString 
import play.api.mvc._ 
import scala.concurrent.duration._ 
import java.io.{ BufferedOutputStream, ByteArrayOutputStream } 
import scala.concurrent.{ Promise, Future } 
import akka.stream.OverflowStrategy 
class ZipController extends Controller { 

    def getStreamedZip = Action { 
    val source: Source[ByteString, java.io.OutputStream] = StreamConverters.asOutputStream() 
    val result = source.mapMaterializedValue(x => { 
     val zip = new ZipOutputStream(x) 
     (0 to 100).map { i => 
     zip.putNextEntry(new ZipEntry("test-zip/README-" + i + ".txt")) 
     zip.write("This is the line:\n".map(_.toByte).toArray) 
     zip.closeEntry() 
     } 
     zip.close 
     x 
    }) 
    Ok.chunked(result).withHeaders(
     "Content-Type" -> "application/zip", 
     "Content-Disposition" -> "attachment; filename=test.zip" 
    ) 
    } 


} 

В принципе, я хочу передать zip-файл 2 ГБ на 1 ГБ сервер памяти. И этот zip будет состоять из файлов размером около 15 МБ. Можно ли написать zip без загрузки полностью каждого файла в память? Если скажем, что 3 клиента загружают zip со скоростью 1MB/second. Примерно, сколько памяти будут загружены эти загрузки? Заранее спасибо.

+0

Вы могли бы поразить это: https://github.com/playframework/playframework/issues/6743 – Somatik

ответ

1

Вот реализация источников в https://gist.github.com/kirked/412b5156f94419e71ce4a84ec1d54761

/* License: MIT */ 

import com.typesafe.scalalogging.slf4j.StrictLogging 
import java.io.{ByteArrayOutputStream, InputStream, IOException} 
import java.util.zip.{ZipEntry, ZipOutputStream} 
import play.api.libs.iteratee.{Enumeratee, Enumerator} 
import scala.concurrent.{Future, ExecutionContext} 

/** 
* Play iteratee-based reactive zip-file generation. 
*/ 
object ZipEnumerator extends StrictLogging { 

    /** 
    * A source to zip. 
    * 
    * @param filepath The zip-file path at which to store the data. 
    * @param stream The data stream provider. 
    */ 
    case class Source(filepath: String, stream:() => Future[Option[InputStream]]) 

    /** 
    * Given sources, returns an Enumerator that feeds a zip-file of the source contents. 
    */ 
    def apply(sources: Iterable[Source])(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = { 
    val resolveSources: Enumerator[ResolvedSource] = Enumerator.unfoldM(sources) { sources => 
     sources.headOption match { 
     case None         => Future(None) 
     case Some(Source(filepath, futureStream)) => 
      futureStream().map { _.map(stream => (sources.tail, ResolvedSource(filepath, stream))) } 
     } 
    } 

    val buffer = new ZipBuffer(8192) 

    val writeCentralDirectory = Enumerator.generateM(Future { 
     if (buffer.isClosed) None 
     else { 
     buffer.close 
     Some(buffer.bytes) 
     } 
    }) 

    resolveSources &> zipeach(buffer) andThen writeCentralDirectory 
    } 


    private def zipeach(buffer: ZipBuffer)(implicit ec: ExecutionContext): Enumeratee[ResolvedSource, Array[Byte]] = { 
    Enumeratee.mapConcat[ResolvedSource] { source => 
     buffer.zipStream.putNextEntry(new ZipEntry(source.filepath)) 
     var done = false 

     def entryDone: Unit = { 
     done = true 
     buffer.zipStream.closeEntry 
     source.stream.close 
     } 

     def restOfStream: Stream[Array[Byte]] = { 
     if (done) Stream.empty 
     else { 
      while (!done && !buffer.full) { 
      try { 
       val byte = source.stream.read 
       if (byte == -1) entryDone 
       else buffer.zipStream.write(byte) 
      } 
      catch { 
       case e: IOException => 
       logger.error(s"reading/zipping stream [${source.filepath}]", e) 
       entryDone 
      } 
      } 
      buffer.bytes #:: restOfStream 
     } 
     } 

     restOfStream 
    } 
    } 


    private case class ResolvedSource(filepath: String, stream: InputStream) 


    private class ZipBuffer(capacity: Int) { 
    private val buf = new ByteArrayOutputStream(capacity) 
    private var closed = false 
    val zipStream = new ZipOutputStream(buf) 

    def close(): Unit = { 
     if (!closed) { 
     closed = true 
     reset 
     zipStream.close // writes central directory 
     } 
    } 

    def isClosed = closed 

    def reset: Unit = buf.reset 

    def full: Boolean = buf.size >= capacity 

    def bytes: Array[Byte] = { 
     val result = buf.toByteArray 
     reset 
     result 
    } 
    } 
} 

Использование выглядит примерно так:

val s3 = ... 
val sources = items.map(item => ZipEnumerator.Source(item.filename, {() => s3.getInputStream(item.storagePath) })) 
Ok.chunked(ZipEnumerator(sources))(play.api.http.Writeable.wBytes).withHeaders(
      CONTENT_TYPE -> "application/zip", 
      CONTENT_DISPOSITION -> s"attachment; filename=MyFiles.zip; filename*=UTF-8''My%20Files.zip" 
     ) 
+0

Akka потоки реализации здесь: https://gist.github.com/kirked/03c7f111de0e9a1f74377bf95d3f0f60 – WeaponsGrade

+0

Реализация Акко в вашей сутью (см. комментарий по gist) – Felix

0

Каким-то образом эти методы не работали для меня. Вот мой код, который работает для zip-файлов на лету и загружает их через фреймворк.

import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream} 
import java.util.zip.{ZipEntry, ZipOutputStream} 
import akka.stream.scaladsl.{StreamConverters} 
import org.apache.commons.io.FileUtils 
import play.api.mvc.{Action, Controller} 

class HomeController extends Controller { 
    def single() = Action { 
         Ok.sendFile(
          content = new java.io.File("C:\\Users\\a.csv"), 
          fileName = _ => "a.csv" 
         ) 
         } 

    def zip() = Action { 
        Ok.chunked(StreamConverters.fromInputStream(fileByteData)).withHeaders(
         CONTENT_TYPE -> "application/zip", 
         CONTENT_DISPOSITION -> s"attachment; filename = test.zip" 
        ) 
        } 

    def fileByteData(): ByteArrayInputStream = { 
    val fileList = List(
     new java.io.File("C:\\Users\\a.csv"), 
     new java.io.File("C:\\Users\\b.csv") 
    ) 

    val baos = new ByteArrayOutputStream() 
    val zos = new ZipOutputStream(new BufferedOutputStream(baos)) 

    try { 
     fileList.map(file => { 
     zos.putNextEntry(new ZipEntry(file.toPath.getFileName.toString)) 
     zos.write(FileUtils.readFileToByteArray(file)) 
     zos.closeEntry() 
     }) 
    } finally { 
     zos.close() 
    } 

    new ByteArrayInputStream(baos.toByteArray) 
    } 
} 

Основная идея молнии() конвертируют файлы в ByteArrayInputStream и использовать StreamConverter, чтобы отправить его в качестве блочных данных.

+1

Это будет работать для небольших файлов, но с большими ошибками OutOfMemoryErrors будет работать с большими файлами, так как вы держите все данные в памяти. Вы также не передаете данные из своего источника всем клиентам. – WeaponsGrade

+0

@WeaponsGrade Спасибо, что указали это! Я очень новичок в этом. Есть ли у вас предложения по улучшению? Одним из требований является то, что файлы не могут быть записаны на локальный диск. – rileyss

+0

См. Ответ выше для решения Play Iteratee или комментарий ниже для решения Akka Streams. – WeaponsGrade

Смежные вопросы