2016-10-21 5 views
-1

Ниже приведен мой класс, в котором я запускаю задачи одновременно. Моя проблема заключается в том, что мое приложение никогда не заканчивается даже после получения результата для всех функций. Я подозреваю, что пул потоков не закрывается, что приводит мое приложение к жизни даже после моих задач. Поверьте мне, я много искал, но не повезло. Что мне здесь не хватает?Scala Future concurrency Issue

import scala.concurrent.ExecutionContext.Implicits.global 
    import scala.concurrent.Future 
    import scala.collection.mutable.ListBuffer 
    import scala.util.Failure 
    import scala.util.Success 

    object AppLauncher{ 

     def launchAll(): ListBuffer[Future[String]] = { 
     // My code logic where I launch all my threads say 50 
     null 
     } 

def main(args:Array[String]):Unit= { 
register(launchAll()) 
} 



     def register(futureList: ListBuffer[Future[String]]): Unit = 
     { 
      futureList.foreach { future => 
      { 
       future.onComplete { 
       case Success(successResult) => { 
        println(successResult) 
       } 
       case Failure(failureResult) => { println(failureResult) } 
       } 
      } 
      } 
     } 
    } 
+0

Как вы создаете эти фьючерсы? Я попытался запустить пример. Я создал фиктивные фьючерсы со списком ListBuffer.fill (50) (Future («asd»)). Основной корпус закончился, прежде чем что-либо было напечатано на консоли. Я добавил Thread.sleep (1000) после вызова метода регистрации. Программа напечатала asd двадцать раз и закрылась через 1000 секунд. Так что, может быть, есть проблема с вашим будущим или что-то в этом роде? Не могли бы вы предоставить более подробную информацию о ваших потоках? – NieMaszNic

ответ

-1

Наконец мне удалось выяснить вопрос issue.The является certianly из пула потоков не было прекращено даже после того, как мои фьючерсы были завершены успешно. Я попытался изолировать проблему, слегка изменив мою реализацию, как показано ниже.

// импорт scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.Future 
    import scala.collection.mutable.ListBuffer 
    import scala.util.Failure 
    import scala.util.Success 

    //Added ExecutionContex explicitly 
    import java.util.concurrent.Executors 
    import concurrent.ExecutionContext 

    object AppLauncher { 

    //Implemented EC explicitly 
    private val pool = Executors.newFixedThreadPool(1000) 
    private implicit val executionContext = ExecutionContext.fromExecutorService(pool) 

    def launchAll(): ListBuffer[Future[String]] = { 
     // My code logic where I launch all my threads say 50 
     null 
    } 

    def main(args: Array[String]): Unit = { 
     register(launchAll()) 
    } 

    def register(futureList: ListBuffer[Future[String]]): Unit = 
     { 
     futureList.foreach { future => 
      { 

      println("Waiting...") 
      val result = Await.result(future, scala.concurrent.duration.Duration.Inf) 

      println(result) 

      } 

     } 
 pool.shutdownNow() 
     executionContext.shutdownNow() 
 println(pool.isTerminated() + " Pool terminated") 
     println(pool.isShutdown() + " Pool shutdown") 

     println(executionContext.isTerminated() + " executionContext terminated") 
     println(executionContext.isShutdown() + " executionContext shutdown") 
     } 

    } 

Результат Перед добавлением подсвеченный код для выключения бассейнов

ложный бассейн прекращен

правда бассейн отключения

ложные ExecutionContext прекращено

правда ExecutionContext выключение

После добавления выделен код решить мою проблему. Я не обеспечил утечку ресурсов в моем коде. Мой сценарий позволяет мне убивать пул, когда все фьючерсы выполняются. Я знаю, что я изменил элегантную реализацию обратного вызова на блокировку реализации, но все-таки решил мою проблему.

0

Обычно, когда вы работаете на итератор в Future с, вы должны использовать Future.sequence, который изменяет скажем, Seq[Future[T]] к Future[Seq[T]].

Таким образом, использовать что-то вроде:

def register(futureList: Seq[Future[String]]) = Future.sequence(futureList) foreach { results => 
    println("received result") 
} 

, если вы хотите, чтобы отобразить каждый из будущих и печати результатов по мере выполнения, вы также можете сделать что-то на линии;

def register(futureList: Seq[Future[String]]) = Future.sequence (
    futureList.map(f => f.map { v => 
    println(s"$v is complete") 
    v 
    })) map { vs => 
    println("all values done $vs") 
    vs 
} 
+0

Благодарим вас за ответ. Я пытался с Future.sequence (futureList), но результат такой же – BDR

+0

вы можете рассказать мне, как ваш метод launchAll() запускает все фьючерсы? – Ashesh

+0

Будущее [String] { // Мои логические логики идут здесь } Давайте скажем, что у меня будет несколько фьючерсов, которые будут называться динамически исходя из обстоятельств. – BDR

Смежные вопросы