2015-05-09 2 views
6

Я должен реализовать интерфейс Iterator (как определено Java API), с методами hasNext() и next(), которые должны возвращать элементы результата которые исходят из асинхронно обработанного HTTP-ответа (обрабатываются актерами Akka).Iterate with hasNext() и next() над асинхронно генерируемым потоком элементов

Этих требования должны быть удовлетворено:

  • не блокируют и ждать асинхронной операцию, чтобы закончить, как поколение большого набора результатов, может занять некоторое время (итератор должен возвращать результирующие элементы, как только они становятся доступными)
  • Iterator.next() должен блокироваться до тех пор, пока не будет доступен следующий элемент (или выбросьте исключение, если больше не будет элементов)
  • Iterator.hasNext() должен возвращать true, пока есть больше элементов (даже если следующий пока недоступен)
  • Общее количество результатов неизвестно заранее. Агенты, производящие результат, отправят определенное «конечное сообщение», когда оно будет завершено.
  • старайтесь избегать использования InterruptedException, например. когда итератор ожидает пустой очереди, но не будет генерироваться больше элементов.

Я еще не изучал потоки Java 8 или потоки Akka. Но поскольку я в основном должен перебирать очередь (конечный поток), я сомневаюсь, что есть какое-то подходящее решение.

В настоящее время моя Scala окурок реализация использует java.util.concurrent.BlockingQueue и выглядит следующим образом:

class ResultStreamIterator extends Iterator[Result] { 
    val resultQueue = new ArrayBlockingQueue[Option[Result]](100) 

    def hasNext(): Boolean = ??? // return true if not done yet 
    def next(): Result = ???  // take() next element if not done yet 

    case class Result(value: Any) // sent by result producing actor 
    case object Done    // sent by result producing actor when finished 

    class ResultCollector extends Actor { 
     def receive = { 
      case Result(value) => resultQueue.put(Some(value)) 
      case Done   => resultQueue.put(None) 
     } 
    } 
} 

Я использую вариант [Результат], чтобы указать конец результирующего потока с None. Я экспериментировал с подглядыванием следующего элемента и использованием флага «done», но я надеюсь, что есть более легкое решение.

Бонус вопросы:

  • Как реализация синхронной/асинхронной быть покрыты юнит тестами, особенно тестирования поколение отсроченного результата?
  • Как можно сделать итератор потокобезопасным?
+0

Если вы используете Java 8, реализовать 'Spliterator' и' StreamSupport.stream (yourSpliterator, ложь/истина) ' – fge

+0

Собираетесь ли вы использовать Java или Scala? – Bubletan

+0

@Bubletan Я бы предпочел Скала. Но алгоритм не должен иметь большого значения в Java и Scala (кроме синтаксиса). – goerlitz

ответ

0

Я следовал предложения Джиро и сделал несколько адаптаций по мере необходимости. В общем, мне нравится подход getNext() и next(), выполненный как сообщения ask, отправленные актеру. Это гарантирует, что в любой момент будет только один поток, который изменяет очередь.

Однако, я не уверен в выполнении этой реализации как ask и Await.result создаст два потока для каждого вызова hasNext() и next().

import scala.concurrent.{Await, Future} 
import scala.concurrent.duration._ 
import scala.language.postfixOps 

import akka.actor.{ActorRef, ActorSystem, Props, Stash} 
import akka.pattern.ask 
import akka.util.Timeout 

case object HasNext 
case object GetNext 

case class Result(value: Any) 
case object Done 

class ResultCollector extends Actor with Stash { 

    val queue = scala.collection.mutable.Queue.empty[Result] 

    def collecting: Actor.Receive = { 
    case HasNext  => if (queue.isEmpty) stash else sender ! true 
    case GetNext  => if (queue.isEmpty) stash else sender ! queue.dequeue 
    case value: Result => unstashAll; queue += value 
    case Done   => unstashAll; context become serving 
    } 

    def serving: Actor.Receive = { 
    case HasNext => sender ! queue.nonEmpty 
    case GetNext => sender ! { if (queue.nonEmpty) queue.dequeue else new NoSuchElementException } 
    } 

    def receive = collecting 
} 

class ResultStreamIteration(resultCollector: ActorRef) extends Iterator { 

    implicit val timeout: Timeout = Timeout(30 seconds) 

    override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match { 
    case b: Boolean => b 
    } 

    override def next(): Any = Await.result(resultCollector ? GetNext, Duration.Inf) match { 
    case Result(value: Any) => value 
    case e: Throwable  => throw e 
    } 
} 

object Test extends App { 
    implicit val exec = scala.concurrent.ExecutionContext.global 
    val system = ActorSystem.create("Test") 
    val actorRef = system.actorOf(Props[ResultCollector]) 
    Future { 
    for (i <- 1 to 10000) actorRef ! Result(s"Result $i"); actorRef ! Done 
    } 
    val iterator = new ResultStreamIteration(actorRef) 
    while (iterator.hasNext()) println(iterator.next) 
    system.shutdown() 
} 
0

Следующий код будет соответствовать требованиям. Поля актера могут быть безопасно изменены в приемнике Актера. Итак, resultQueue не должен находиться в поле Итератора, но находиться в поле Actor.

// ResultCollector should be initialized. 
// Initilize code is like... 
// resultCollector ! Initialize(100) 
class ResultStreamIterator(resultCollector: ActorRef) extends Iterator[Result] { 

    implicit val timeout: Timeout = ??? 

    override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match { 
    case ResponseHasNext(hasNext) => hasNext 
    } 

    @scala.annotation.tailrec 
    final override def next(): Result = Await.result(resultCollector ? RequestResult, Duration.Inf) match { 
    case ResponseResult(result) => result 
    case Finished => throw new NoSuchElementException("There is not result.") 
    case WaitingResult => next()// should be wait for a moment. 
    } 

} 

case object RequestResult 
case object HasNext 

case class ResponseResult(result: Result) 
case class ResponseHasNext(hasNext: Boolean) 
case object Finished 
case object WaitingResult 

case class Initialize(expects: Int) 

// This code may be more ellegant if using Actor FSM 
// Acotr's State is (beforeInitialized)->(collecting)->(allCollected) 
class ResultCollector extends Actor with Stash { 

    val results = scala.collection.mutable.Queue.empty[Result] 

    var expects = 0 

    var counts = 0 

    var isAllCollected = false 

    def beforeInitialized: Actor.Receive = { 
    case Initialize(n) => 
     expects = n 
     if (expects != 0) context become collecting 
     else context become allCollected 
     unstashAll 
    case _ => stash() 
    } 

    def collecting: Actor.Receive = { 
    case RequestResult => 
     if (results.isEmpty) sender ! WaitingResult 
     else sender ! ResponseResult(results.dequeue()) 
    case HasNext => ResponseHasNext(true) 
    case result: Result => 
     results += result 
     counts += 1 
     isAllCollected = counts >= expects 
     if (isAllCollected) context become allCollected 
    } 

    def allCollected: Actor.Receive = { 
    case RequestResult => 
     if (results.isEmpty) sender ! Finished 
     else sender ! ResponseResult(results.dequeue()) 
    case HasNext => ResponseHasNext(!results.isEmpty) 
    } 

    def receive = beforeInitialized 
} 
+0

Спасибо за подсказку, чтобы поставить очередь в Актер. Таким образом, как вы говорите, это намного безопаснее и чище, потому что весь доступ к очереди синхронизируется с помощью обработчика событий актера. Тогда также имеет смысл использовать запросы 'ask', разные состояния (мне нужно будет изучить FSM) и' Stash'. – goerlitz

+0

Вам не нужно использовать BlockingQueue, если очередь является полем актера. Если очередь является полем итератора, а методы итератора реализованы с использованием очереди напрямую, этот метод должен быть объявлен как синхронизированный, потому что потоки актера и потоки итератора разные. – jiro

+0

Единственное, что я не знаю, сколько результатов будет создано (я обновил требования в вопросе, чтобы прояснить это). Следовательно, я должен полагаться на конкретное событие (например, «case object Done») из нисходящего актора, указывая, что он завершил обработку результата. – goerlitz

0

Вы можете хранить следующий элемент, используя переменную и просто ждать его в начале обоих методов:

private var nextNext: Option[Result] = null 

def hasNext(): Boolean = { 
    if (nextNext == null) nextNext = resultQueue.take() 
    return !nextNext.isEmpty 
} 

def next(): Result = { 
    if (nextNext == null) nextNext = resultQueue.take() 
    if (nextNext.isEmpty) throw new NoSuchElementException() 
    val result = nextNext.get 
    nextNext = null 
    return result 
} 
+0

Я пробовал что-то подобное, и с первого взгляда кажется, что код может работать. Hoewer, в Scala нельзя использовать 'null', но инициализировать' nextNext' с 'None'. Но тогда необходимо отличить исходный «Нет» от «Нет», который указывает, что результатов больше не будет (например, с использованием логического флага). Я думаю, что решение jiro более элегантно в смысле Scala/Akka, а также более безопасно в отношении синхронизации - даже если оно намного длиннее. Но спасибо, что посмотрели на это. – goerlitz

+0

@goerlitz Нет проблем. Я согласен с инициализацией 'nextNext', поскольку null является плохим, хотя и не вижу никакой причины, чтобы другое решение было более безопасным. – Bubletan