2017-02-22 4 views
1

У меня есть следующая запись в файле conf. Но я не уверен, если это диспетчерская установка подхватила и что конечная стоимость параллелизма используетсяКак проверить, какой диспетчер настроен в приложении akka

 akka{ 
     actor{ 
     default-dispatcher { 
       type = Dispatcher 
       executor = "fork-join-executor" 
       throughput = 3 
       fork-join-executor { 
       parallelism-min = 40 
       parallelism-factor = 10 
       parallelism-max = 100 
       } 
      } 
     } 
    } 

Я 8 процессорных ядра, так что я ожидаю параллельных потоков, чтобы быть в состоянии готовности 40мина < 80 (коэффициент 8 * 10) < 100max. Я хотел бы узнать, какое значение использует akka для максимального параллельного потока. Я создал 45 дочерних исполнителей, и в моих журналах я печатаю идентификатор потока [application-akka.actor.default-dispatcher-xx], и я не вижу более 20 потоков, работающих параллельно.

ответ

0

Предполагая, что у вас есть экземпляр ActorSystem, вы можете проверить значения, установленные в его конфигурации. Вот как вы могли бы получить свою руку от значений, которые вы установили в файле конфигурации:

val system = ActorSystem() 
val config = system.settings.config.getConfig("akka.actor.default-dispatcher") 
config.getString("type") 
config.getString("executor") 
config.getString("throughput") 
config.getInt("fork-join-executor.parallelism-min") 
config.getInt("fork-join-executor.parallelism-max") 
config.getDouble("fork-join-executor.parallelism-factor") 

Надеюсь, это поможет. Вы также можете обратиться к странице this для получения дополнительной информации об определенных настройках конфигурации.

Update

Я выкопал немного больше в Акку, чтобы узнать точно, что он использует для настройки. Как вы уже могли ожидать, он использует ForkJoinPool. Параллелизм используется для построения его определяется по формуле:

object ThreadPoolConfig { 
    ... 
    def scaledPoolSize(floor: Int, multiplier: Double, ceiling: Int): Int = 
math.min(math.max((Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt, floor), ceiling) 
    ... 
} 

Эта функция используется в какой-то момент, чтобы построить ForkJoinExecutorServiceFactory:

new ForkJoinExecutorServiceFactory(
    validate(tf), 
    ThreadPoolConfig.scaledPoolSize(
    config.getInt("parallelism-min"), 
    config.getDouble("parallelism-factor"), 
    config.getInt("parallelism-max")), 
    asyncMode) 

Во всяком случае, это параллелизм, который будет использоваться для создания ForkJoinPool, который фактически является экземпляром java.lang.ForkJoinPool. Теперь мы должны спросить, сколько потоков использует этот пул? Короткий ответ заключается в том, что он будет использовать всю емкость (80 потоков в нашем случае), только если это понадобится.

Чтобы проиллюстрировать этот сценарий, я провел несколько тестов с различными способами использования Thread.sleep внутри актера. То, что я выяснил, это то, что он может использовать примерно из 10 потоков (если вызов сна не выполняется) вокруг потоков максимум 80 (если я вызываю сон на 1 секунду). Испытания проводились на машине с 8 сердечниками.

Подводя итог, вам нужно будет проверить реализацию, используемую Akka, чтобы увидеть, как именно используется этот параллелизм, поэтому я заглянул в ForkJoinPool. Кроме того, что я смотрю на файл конфигурации, а затем проверяю эту конкретную реализацию, я не думаю, что вы, к сожалению, можете сделать следующее :(

Надеюсь, это прояснит ответ - изначально я думал, что вы хотите посмотреть, как настроен диспетчер системы актера.

+0

Да, это даст значения, которые были прочитаны из config системой akka. Однако мне интересно узнать, есть ли способ узнать, какой параллелизм выбрал диспетчер akka, в конечном счете, из трех полей - min, factor & max. – Sree

+0

Я обновил ответ после запуска некоторых тестов. Кроме того, @Stefano Bonetti прав - чтобы максимизировать параллелизм, все актеры должны работать одновременно :) –

0

Чтобы максимизировать коэффициент параллелизма, все участники должны обрабатывать некоторые сообщения одновременно. Вы уверены, что это так в вашем приложении?

Возьмем, например, следующий код

object Test extends App { 

    val system = ActorSystem() 

    (1 to 80).foreach{ _ => 
    val ref = system.actorOf(Props[Sleeper]) 
    ref ! "hello" 
    } 
} 

class Sleeper extends Actor { 
    override def receive: Receive = { 
    case msg => 
     //Thread.sleep(60000) 
     println(msg) 
    } 
} 

Если вы считаете, ваш конфиг и 8 ядер, вы увидите небольшое количество потоков которые породил (4, 5?), Как обработка сообщений слишком быстро для какого-то реального параллелизма для наращивания.

Наоборот, если вы держите своих актеров в CPU занято, раскомментируя неприятный Thread.sleep, вы увидите, что количество потоков увеличится до 80. Однако это будет продолжаться только 1 минуту, после чего потоки будут постепенно выбыл из бассейна.

Я предполагаю, что основной трюк: не думайте, что каждый актер работает на отдельной теме. Это когда одно или несколько сообщений появляются в почтовом ящике актера, который диспетчер просыпается и, действительно, отправляет задачу обработки сообщений в назначенный пул.

+0

У меня очень похожий код. Я создаю 45 актеров в цикле for. Согласно моим журналам, для вычисления, выполняемого в дочернем актере, требуется от 1 до 1,5 секунд. Я надеялся, что будет поток, посланный каждому дочернему актеру параллельно. Думаю, этого не происходит. Любой способ обеспечить его соблюдение? – Sree

+0

Невозможно обеспечить выполнение 45 нитей, если для них недостаточно задач. Скорее всего, в вашем случае нет. Не видя своего кода, трудно сказать. –

Смежные вопросы