2014-10-28 2 views
2

Я тщетно пытался загрузить файлы непосредственно на s3 с помощью Iteratees. Я по-прежнему новичок в функциональном программировании, и мне сложно скомпоновать некоторые рабочие коды.Загрузка файлов непосредственно на S3 chunk-by-chunk с помощью Play Scala с использованием Iteratees

Я написал итерацию, которая обрабатывает куски загруженного файла и отправляет их на S3. Ошибка загрузки в конце с ошибкой.

Пожалуйста, помогите мне исправить это.

Ниже приведен код, я придумал

контроллер Handler

def uploadFile = Action.async(BodyParser(rh => S3UploadHelper("bucket-name").s3Iteratee())) { implicit request => 
    Future { 
     if(uploadLogger.isInfoEnabled) uploadLogger.info(s"Contents of Uploaded file :: \n " + request.body) 
     Ok(views.html.index("File uploaded")) 
    } 
    } 

Helper Класс

case class S3UploadHelper(bucket: String, key: String = UUID.generate()) { 

    private val AWS_ACCESS_KEY = "" 
    private val AWS_SECRET_KEY = "" 
    private val yourAWSCredentials = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY) 
    val amazonS3Client = new AmazonS3Client(yourAWSCredentials) 


    private val initRequest = new InitiateMultipartUploadRequest(bucket, key) 
    private val initResponse = amazonS3Client.initiateMultipartUpload(initRequest) 
    val uploadId = initResponse.getUploadId 

    val uploadLogger = Logger("upload") 

    def s3Iteratee(etags: Seq[PartETag] = Seq.empty[PartETag]): Iteratee[Array[Byte], Either[Result, CompleteMultipartUploadResult]] = Cont { 
    case in: El[Array[Byte]] => 
     // Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk 
     val uploadRequest = new UploadPartRequest() 
     .withBucketName(bucket) 
     .withKey(key) 
     .withPartNumber(etags.length + 1) 
     .withUploadId(uploadId) 
     .withInputStream(new ByteArrayInputStream(in.e)) 
     .withPartSize(in.e.length) 
     if(uploadLogger.isDebugEnabled) uploadLogger.debug(">> " + String.valueOf(in.e)) 
     val etag = Future { amazonS3Client.uploadPart(uploadRequest).getPartETag } 
     etag.map(etags :+ _) 
     Await.result(etag, 1.seconds) 
     s3Iteratee(etags) 
    case in @ Empty => s3Iteratee(etags) 
    case in @ EOF => 
     import scala.collection.JavaConversions._ 
     val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toList) 
     val result = amazonS3Client.completeMultipartUpload(compRequest) 
     Done(Right(result), in) 
    case in => s3Iteratee(etags) 
    } 

} 

Хотя Iteratee, кажется, работает, и я в состоянии обработать файл кусок от chunk, загрузка не выполняется с необычной ошибкой. Вот журналы

[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[debug] upload - >> [[email protected] 
[error] play - Cannot invoke the action, eventually got an error: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 98h72s0EBA7653AD, AWS Error Code: MalformedXML, AWS Error Message: The XML you provided was not well-formed or did not validate against our published schema, S3 Extended Request ID: R7e44g8oRy5b4xd7MU++atibwrBSRFezeMxNCXE38gyzcwci5Zf 
[error] application - 

! @6k2maob49 - Internal server error, for (POST) [/v1/file_upload] -> 

play.api.Application$$anon$1: Execution exception[[AmazonS3Exception: The XML you provided was not well-formed or did not validate against our published schema]] 
     at play.api.Application$class.handleError(Application.scala:296) ~[play_2.10-2.3.2.jar:2.3.2] 
     at play.api.DefaultApplication.handleError(Application.scala:402) [play_2.10-2.3.2.jar:2.3.2] 
     at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2] 
     at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2] 
     at scala.Option.map(Option.scala:145) [scala-library.jar:na] 
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The XML you provided was not well-formed or did not validate against our published schema 
     at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:556) ~[aws-java-sdk-1.3.11.jar:na] 
     at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:289) ~[aws-java-sdk-1.3.11.jar:na] 
     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:170) ~[aws-java-sdk-1.3.11.jar:na] 
     at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:2723) ~[aws-java-sdk-1.3.11.jar:na] 
     at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:1964) ~[aws-java-sdk-1.3.11.jar:na] 

ответ

3

я сделал это в прошлом, Amazon s3 нуждается 5mb глыбы, я возвращался кортеж наконец, вы можете изменить в соответствии с вашими требованиями.

val client = new AmazonS3Client(new BasicAWSCredentials(access_key, secret_key)) 

def my_parser = BodyParser { 

val consume_5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5) &>> Iteratee.consume() 
val rechunkAdapter: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped(consume_5MB) 

multipartFormData(Multipart.handleFilePart({ 

    case Multipart.FileInfo(partName, file_name, content_type) => { 

    val object_id = java.util.UUID.randomUUID().toString().replaceAll("-", "") 
    val object_id_key = if (content_type.getOrElse("").contains("video") || content_type.getOrElse("").contains("audio")) object_id else object_id + file_name.substring(file_name.lastIndexOf('.')) 
    var position = 0 
    val etags = new java.util.ArrayList[PartETag]() 

    val initRequest = new InitiateMultipartUploadRequest(bucket, object_id_key) 
    val initResponse = client.initiateMultipartUpload(initRequest) 
    println("fileName = " + file_name) 
    println("contentType = " + content_type) 

    (rechunkAdapter &>> Iteratee.foldM[Array[Byte], Int](1) { (c, bytes) => 
     Future { 
     println("got a chunk! :" + c + " size in KB: " + (bytes.length/1024)); 
     val is = new java.io.ByteArrayInputStream(bytes) 

     val uploadRequest = new UploadPartRequest() 
      .withBucketName(bucket).withKey(object_id_key) 
      .withUploadId(initResponse.getUploadId()) 
      .withPartNumber(c) 
      .withFileOffset(position) 
      .withInputStream(is) 
      .withPartSize(bytes.length) 

     etags.add(client.uploadPart(uploadRequest).getPartETag) 
     position = position + bytes.length 

     c + 1 
     } 
    }).map { v => 
     try { 
     val compRequest = new CompleteMultipartUploadRequest(
      bucket, 
      object_id_key, 
      initResponse.getUploadId(), 
      etags) 
     client.completeMultipartUpload(compRequest) 
     println("Finished uploading " + file_name) 
     client.setObjectAcl(bucket, object_id_key, com.amazonaws.services.s3.model.CannedAccessControlList.PublicRead) 
     (object_id_key, file_name, content_type.getOrElse("application/octet-stream")) 
     } catch { 
     case e: Exception => { 
      println("S3 upload Ex " + e.getMessage()) 
      val abortMPURequest = new AbortMultipartUploadRequest("xxxxxxx", object_id, initResponse.getUploadId()) 
      client.abortMultipartUpload(abortMPURequest); 
     ("error", file_name, content_type.getOrElse("application/octet-stream")) 
     } 
     } 
    } 
    } 
})) 

}

+0

благодаря Картик. использование rechunker, похоже, работает. – Pradeep