2013-06-09 5 views
0

У меня есть система на основе актера, в которой я читаю внешний файл, сидящий в ведре S3, и перемещаясь, беря каждую из строк файла и отправляя его другому актеру, который обрабатывает эту конкретную строку. Мне сложно понять, что происходит, когда при чтении файла возникает исключение.Исключение, возникающее при повторении в scala

Мой код выглядит следующим образом:

import akka.actor._ 
import akka.actor.ActorSystem 

class FileWorker(processorWorker: ActorRef) extends Actor with ActorLogging { 

    val fileUtils = new S3Utils() 

    private def processFile(fileLocation: String): Unit = { 
    try{ 
     fileUtils.getLinesFromLocation(fileLocation).foreach { 
     r => 
     { 
      //Some processing happens for the line 
      } 
      } 
     } 
    } 
    }catch{ 
     case e:Exception => log.error("Issue processing files from the following location %s".format(fileLocation)) 
    } 
    } 

    def receive() = { 
    case fileLocation: String => { 
     processFile(fileLocation) 
    } 
    } 
} 

В моем S3Utils классе я определил метод getLinesFromLocation следующим образом:

def getLinesFromLocation(fileLocation: String): Iterator[String] = { 
    try{ 
     for { 
      fileEntry <- getFileInfo(root,fileLocation) 
      } yield fileEntry 
    }catch{ 
     case e:Exception => logger.error("Issue with file location %s:   %s".format(fileLocation,e.getStackTraceString));throw e 
    } 
    } 

метод, где я на самом деле чтение файла определяется в частный метод getFileInfo

private def getFileInfo(rootBucket: String,fileLocation: String): Iterator[String] = { 
    implicit val codec = Codec(Codec.UTF8) 
    codec.onMalformedInput(CodingErrorAction.IGNORE) 
    codec.onUnmappableCharacter(CodingErrorAction.IGNORE) 
    Source.fromInputStream(s3Client. 
         getObject(rootBucket,fileLocation). 
         getObjectContent()).getLines 
    } 

Я написал вышеприведенные части с предположением, что основной файл, сидящий на S3, не будет кэшироваться нигде, и я буду просто перебирать отдельные строки в постоянном пространстве и обрабатывать их. Если есть проблема с чтением определенной строки, итератор будет двигаться дальше, не затрагивая Актера.

Моим первым вопросом было бы, правильно ли я понял итераторы? Во всей действительности я действительно просматриваю строки из базовой файловой системы (в данном случае ведро S3), не применяя никакого давления к памяти/или не создавая утечек памяти.

Следующий вопрос: если итератор сталкивается с ошибкой при чтении отдельной записи, весь процесс итерации убит или переходит к следующей записи.

Мой последний вопрос был бы, правильно ли написана логика обработки файлов?

Будет полезно ознакомиться с этим.

Благодаря

+0

Сообщение stacktrace и типа исключений +, вероятно, могло бы помочь людям понять, что здесь происходит. – drexin

ответ

1

Похоже амазонка s3 не имеет реализации асинхронной и мы застряли с возлагали актерами. Таким образом, ваша реализация верна, предоставляя вам выделение потока для каждого соединения и не будет блокировать ввод и не будет использовать слишком много соединений.

Важные шаги предпринять:

1) ProcessFile не должен блокировать текущий поток. Предпочтительно он должен передать это на вход другого актера:

private def processFile(fileLocation: String): Unit = { 
    ... 
     fileUtils.getLinesFromLocation(fileLocation).foreach { r => 
      lineWorker ! FileLine(fileLocation, r) 
     } 

    ... 
} 

2) Сделать FileWorker закрепленный актер:

## in application.config: 
my-pinned-dispatcher { 
    executor = "thread-pool-executor" 
    type = PinnedDispatcher 
} 

// in the code: 
val fileWorker = context.actorOf(Props(classOf[FileWorker], lineWorker).withDispatcher("my-pinned-dispatcher"), "FileWorker") 

если итератор обнаруживает ошибку при чтении отдельной записи, делает цельные процесс итерации убит?

Да, весь ваш процесс будет убит, и актер займет следующую работу из своего почтового ящика.

+0

Спасибо.getLinesFromLocation была записана для изменения текущей строки в другую форму, класс case, представляющий модель домена, в этой конкретной ситуации. После прочтения вашего поста я более озадачен нынешним дизайном. Моя цель проекта состояла в том, чтобы просканировать весь файл, не сохраняя его в памяти и обрабатывая отдельные записи в строке при чтении файла, который, как мне кажется, Source.fromInputStream (...). Getlines. Но похоже, что мое понимание ошибочно на этом фронте. Есть ли способ достичь того, что я только что описал? –

+0

Или нет, Это просто я глуп. карта над итератором вернет вам итератор, ваш код действительно будет использовать небольшую память. Простите! Я попробую поставить здесь какой-то асинхронный эскиз, чтобы компенсировать это. – vitalii

+0

Это будет здорово. Большое спасибо. –

Смежные вопросы