2013-05-26 4 views
1

Я работаю с потоком данных akka, и я хотел бы знать, есть ли способ заставить конкретный блок кода ждать завершения будущего, без явного использования значения этого будущее.поток данных akka и побочные эффекты

Фактический вариант использования заключается в том, что у меня есть файл, и я хочу, чтобы файл был удален, когда определенное будущее завершено, но не раньше. Вот пример. Сначала представьте, у меня есть этот сервис:

trait ASync { 
    def pull: Future[File] 
    def process(input : File): Future[File] 
    def push(input : File): Future[URI] 
} 

И у меня есть рабочий процесс, я хочу работать в неблокирующей образом:

val uriFuture = flow { 
    val pulledFile = async.pull(uri) 
    val processedile = async.process(pulledFile()) 
    val storedUri = async.push(processedFile()) 

    // I'd like the following line executed only after storedUri is completed, 
    // not as soon as pulled file is ready. 
    pulledFile().delete() 

    storedUri() 
} 

ответ

1

Вы могли бы попробовать что-то вроде этого:

val uriFuture = flow { 
    val pulledFile = async.pull(uri) 
    val processedile = async.process(pulledFile()) 
    val storedUri = for(uri <- async.push(processedFile())) yield { 
    pulledFile().delete() 
    uri 
    } 
    storedUri() 
} 

В этом примере pulledFile.delete будет вызываться только в том случае, если Future от push удался. Если это не удается, delete не будет вызываться. Результат будущего storedUri по-прежнему будет результатом вызова push.

Или другой путь будет:

val uriFuture = flow { 
    val pulledFile = async.pull(uri) 
    val processedile = async.process(pulledFile()) 
    val storedUri = async.push(processedFile()) andThen{ 
    case whatever => pulledFile().delete() 
    } 
    storedUri() 
} 

Разница здесь в том, что delete будет вызван независимо от того, если push успеха или неудачи. Результат storedUri по-прежнему будет результатом вызова push.

+0

Проблема в том, что если я попытаюсь поместить drawFile() внутри любого блока, переданного функции Future в async.push, я получаю тонну ошибок компиляции. Тем не менее, я обнаружил, что мне нужно его скомпилировать, если я назначил pullFile() для val вне блока, а затем использовать его внутри блока. Я проверю, правильно ли это работает, а затем обновить ответ. –

0

Вы можете использовать обратные вызовы для неблокируемого процесса:

future onSuccess { 
    case _ => file.delete() //Deal with cases obviously... 

} 

Источник: http://doc.akka.io/docs/akka/snapshot/scala/futures.html

В качестве альтернативы, вы можете заблокировать с Await.result:

val result = Await.result(future, timeout.duration).asInstanceOf[String] 

последнего, как правило, используются когда вам НЕОБХОДИМО блокировать - например, в тестовых случаях - в то время как не блокировка более эффективна, поскольку вы не припарковываете нить, чтобы развернуть другой поток, только чтобы возобновить другой поток снова - это медленнее, чем асинхронная активность из-за служебных ресурсов управления ресурсами.

Типичные сотрудники называют его «реактивным». Это немного модное слово. Я бы смеялся, если бы вы использовали его на рабочем месте.

Смежные вопросы