2014-10-10 3 views
5

Я пытаюсь реализовать программу Producer Consumer в scala без использования очереди. Поскольку я думаю, что Actor уже реализовал «почтовую очередь» или что-то еще, было бы лишним снова написать код.Каков правильный способ реализации Producer Consumer в scala

Я попытался написать программу в Актер чисто. Ниже представлена ​​многопользовательская программа для нескольких производителей. Продюсер некоторое время спит для моделирования чего-то. Потребители вообще не спят.

Однако я не знаю, как завершение работы программы, если не добавить супервизора актера контролировать потребителей, а также объект Promise для использования (класс супервизора в коде) «Await»

ли все равно, чтобы избавиться от них?

import akka.actor.Actor.Receive 
import akka.actor._ 
import akka.routing._; 
import akka.util._ 

import scala.concurrent.{Await, Promise} 
import scala.concurrent.duration._ 

class Producer(val pool:ActorRef)(val name:String) extends Actor { 

    def receive = { 
    case _ => 
     while (true) { 
     val sleepTime = scala.util.Random.nextInt(1000) 
     Thread.sleep(sleepTime) 
     println("Producer %s send food" format name) 
     pool ! name 
     } 
    } 
} 

class Consumer(supervisor : ActorRef)(val name:String) extends Actor { 

    var counter = 0 

    def receive = { 
    case s => 
     counter += 1 
     println("%s eat food produced by %s" format (name,s)) 

     if (counter >= 10) { 
     println("%s is full" format name) 

     context.stop(self) 
     supervisor ! 1 
     } 
    } 
} 

class Supervisor(p:Promise[String]) extends Actor { 

    var r = 3 

    def receive = { 
    case _ => 
     r -= 1 
     if (0 == r) { 
     println("All consumer stopped") 
     context.stop(self) 
     p success ("Good") 
     } 
    } 

} 

object Try3 { 

    def work(): Unit = { 
    val system = ActorSystem("sys1") 
    val nProducer = 5; 
    val nConsumer = 3; 
    val p = Promise[String] 
    val supervisor = system.actorOf(Props(new Supervisor(p))); 
    val arrConsumer = for (i <- 1 to nConsumer) yield system.actorOf(Props(new Consumer(supervisor)("Consumer %d" format (i)))) 
    val poolConsumer = system.actorOf(Props.empty.withRouter(RoundRobinRouter(arrConsumer))) 
    val arrProducer = for (i <- 1 to nProducer) yield system.actorOf(Props(new Producer(poolConsumer)("Producer %d" format (i)))) 

    arrProducer foreach (_ ! "start") 

    Await.result(p.future,Duration.Inf) 
    println("great!") 
    system.shutdown 
    } 

    def main(args:Array[String]): Unit = { 
    work() 
    } 
} 

Приемный класс функций Производитель имеет проблему, что она не будет закрыта, потому что в то время, не нарушая условий.

Единственный способ, о котором я могу думать, это «отправить сообщение самому продюсеру». Интересно, это обычный способ реализовать такой запрос?

Вот измененный код:

class Producer(val pool:ActorRef)(val name:String) extends Actor { 

    // original implementation: 
    // def receive = { 
    // case _ => 
    // while (true){ 
    //  val sleepTime = scala.util.Random.nextInt(1000) 
    //  Thread.sleep(sleepTime) 
    //  println("Producer %s send food" format name) 
    //  pool ! name 
    // } 
    // } 

    case object Loop; 

    def receive = { 
    case _ => 
     val sleepTime = scala.util.Random.nextInt(1000) 
     Thread.sleep(sleepTime) 
     println("Producer %s send food" format name) 
     pool ! name 
     self ! Loop //send message to itself 
    } 
} 

Независимо от моей реализации, что является правильным способом для осуществления Производитель потребительских программ в Скале, с актером или будущего/Promise?

ответ

2

Вы никогда не должны блокировать (в вашем случае Thread.sleep, while loop) внутри актера. Блокировка внутри актера свирепствует поток из пула потоков, используемый всеми участниками. Даже небольшое количество продюсеров, подобных вашим, сделало бы актера в ActorSystem лишенным потоков и сделало бы их непригодными для использования.

Вместо этого используйте Scheduler, чтобы запланировать периодическое периодическое уведомление о посещении вашего продюсера.

override def preStart(): Unit = { 
    import scala.concurrent.duration._ 
    import context.dispatcher 
    context.system.scheduler.schedule(
    initialDelay = 0.seconds, 
    interval = 1.second, 
    receiver = pool, 
    message = name 
) 
} 
+1

Спасибо @Martynas. Вы решили мою проблему «Loop». Я все еще ищу ответ для элегантной реализации Продюсер-Потребителя. – worldterminator

0

Что вы думаете о реализации Terminator Актер :)

object Terminator { 
    case class WatchMe(ref: ActorRef) 
} 
class Terminator extends Actor { 
    var consumers: Map[ActorRef, ActorRef] = Map() 

    def receive = { 
     case WatchMe(ref) => { 
     consumers += ref -> ref 
     context.watch(ref) 
     } 
     case Terminated(ref) => { 
     context.unwatch(ref) 
     consumers.get(ref).foreach { ref -> ref ! PoisonPill } 
     consumers -= ref 
     //If all consumers are dead stop.self and delegate NoConsumers message higher in hierarchy 
     if(consumers.size == 0) { 
      delegate() 
      context.stop(self) 
     } 
     } 
    } 
} 
Смежные вопросы