2015-12-02 4 views
5

Могу ли я установить разную степень параллелизма для разных частей задачи в нашей программе в Flink? Например, как Флинк интерпретирует следующий пример кода? Два пользовательских специалиста MyPartitioner1, MyPartitioner2 разбивают входные данные на два 4 и 2 раздела.Степень параллелизма в Apache Flink

partitionedData1 = inputData1 
    .partitionCustom(new MyPartitioner1(), 1); 
env.setParallelism(4); 
DataSet<Tuple2<Integer, Integer>> output1 = partitionedData1 
    .mapPartition(new calculateFun()); 

partitionedData2 = inputData2 
    .partitionCustom(new MyPartitioner2(), 2); 
env.setParallelism(2); 
DataSet<Tuple2<Integer, Integer>> output2 = partitionedData2 
    .mapPartition(new calculateFun()); 

Я получаю следующее сообщение об ошибке для этого кода:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) 
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:221) 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:80) 
    at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) 
    at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:92) 
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) 
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) 
    at java.lang.Thread.run(Unknown Source) 

ответ

5

ExecutionEnvironment.setParallelism() устанавливает параллелизм для всей программы, т.е. всех операторов программы.

Вы можете указать параллелизм для каждого отдельного оператора, вызвав метод setParallelism() оператора.

Выбрасывается ArrayIndexOutOfBoundsException, потому что ваш пользовательский разделитель возвращает недопустимое количество разделов, вероятно, из-за неожиданной степени параллелизма. Пользовательский разделитель получает фактический параллелизм приемника в качестве параметра в своем методе partition(K key, int numPartitions).