2016-09-29 4 views
0

Я использую искру с mongodb, я хочу знать, как вход rdd разделялся на разных рабочих узлах кластера, потому что моя задача состоит в том, чтобы соединить две записи (одна - запрос другой - ответ) в один, на основе флагов msg_id, флаг (флаг указывает на запрос или ответ), msg_id одинаково в обоих записях. В то время как разрывающий входной сигнал rdd, каждый разбивается на каждый узел, тогда как обрабатывать регистр, если запись запроса в одном узле и запись ответа в другом узле.Как искровой мастер разбивает входные данные на разные узлы

ответ

0

Во-первых, мастер искры не разделяет данные. Он просто контролирует работников.

Во-вторых, разрывы rdd (при чтении из внешних источников) определяются InputSplits, реализованными в формате ввода. Эта часть довольно похожа на сокращение карты. Таким образом, в вашем случае rdd-расщепления (или разделы, в искровых условиях) определяются форматом ввода mongodb.

В вашем случае я считаю, что вы ищете совместное размещение всех записей для идентификатора msg на одном узле. Этого можно добиться с помощью функции partitionByKey.

+0

Могу ли я иметь один пример разделаBYKey в java, пожалуйста, – anush

+0

Я не человек java .... Прошу пройти через документ ..... Я могу дать вам пример python :) –

0

RDD будут построены на основе ваших преобразований (подлежит сценарию), а мастеру меньше места, чтобы играть роль здесь. Отправьте эту ссылку How does Spark paralellize slices to tasks/executors/workers?. В вашем случае вам может потребоваться реализовать groupby() или groupbykey() (это не рекомендуется) для группировки ваших значений на основе ключей (msg_id).

Например

 val baseRDD = sc.parallelize(Array("1111,REQUEST,abcd","1111,RESPONSE,wxyz","2222,REQUEST,abcd","2222,RESPONSE,wxyz")) 
     //convert your base rdd to keypair RDD  
     val keyValRDD =baseRDD.map { line => (line.split(",")(0),line)} 

     //Group it by message_id 
     val groupedRDD = keyValRDD.groupBy(keyvalue => keyvalue._1) 

     groupedRDD.saveAsTextFile("c:\\result") 

Результат:

(1111, CompactBuffer ((1111,1111, ЗАПРОС, ABCD), (1111,1111, РЕАКЦИЯ, WXYZ))) (2222 , CompactBuffer ((2222,2222, REQUEST, abcd)), (2222,2222, RESPONSE, wxyz)))

В приведенном выше случае возможность наличия всех значений для ключа в одном разделе высока (подвержена объема данных и доступного вычислительного ресурса при запуске ti me)

Смежные вопросы