2016-08-30 10 views
0

Вот псевдокод:Почему используется увеличение времени трансляции?

case class IpDB(startIp: Long, endIp: Long, company: String) 
def ipMap(line: Array[String]): 
    val Array(startIp, endIp, company) = line 
    IpDB(startIp.toLong, endIp.toLong, company) 
// The ip-db is just 300M in raw format. 
// format of ip-data from s3: 
// 100000 200000 GOOGLE 
val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value 

val dataA = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_) 
val dataB = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_) 

// will do ip company lookup here 
dataA.fullOuterJoin(dataB).map(doIpCompanyLookUp(ipData, _)).collect() 

код просто получает данные из IP-источников входного сигнала, а затем найти его компании после объединения. вот мой вопрос:

Этот код будет работать в течение 2-3 минут, но при удалении широковещательных данных (просто подключитесь к двум данным) это будет стоить меньше 1 минуты. И когда я смотрю ui искры, я обнаружил, что проблема gc может быть проблемой.

Вот настройки для запуска этого задания:

spark-submit --master yarn --deploy-mode client --driver-memory 4g --num-executors 10 --executor-memory 8800m --executor-cores 4 --class ... XX.jar 

Этой работа выполняется на AWS ОЙ искры кластер

spark version: 1.6.1 
10 m3.xlarge. 
  1. Как можно решить эту проблему (сократить время работы)?
  2. Какая память передает данные в искры?
  3. Почему время работы не меняется, когда я меняю память-исполнитель? я пытаюсь использовать 5 * m3.2xlarge и --executor-memory 16g, никаких существенных изменений в общем времени работы с широковещательными данными.

обновление:

case class IpDB(startIp: Long, endIp: Long, company: String) 
def ipMap(line: Array[String]): 
    val Array(startIp, endIp, company) = line 
    IpDB(startIp.toLong, endIp.toLong, company) 
// The ip-db is just 300M in raw format. 
// format of ip-data from s3: 
// 100000 200000 GOOGLE 
val dataA = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_) 
val dataB = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_) 

// will do ip company lookup here 
val joinResult = dataA.fullOuterJoin(dataB) 
val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value 
joinResult.map(doIpCompanyLookUp(ipData, _)).collect() 

просто переместить генерацию и передачу данных IP-компании после того, как dataA.fullOuterJoin(dataB) .The работает время уменьшить много.

обновление2. , так как производственный код довольно сложный, который отличается от псевдокода выше, после небольшого изменения порядка кода, программа работает быстрее, но я не уверен, что ключом к этой проблеме является позиция для запуска широковещательных данных.

+0

Может быть, это немного наивно, но радиопередача начало отправки 300M по сети и закупоривает исполнители. Можете ли вы попытаться найти DAG? – Vale

ответ

0

Без долгих раздумий о коде линии:

val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value 

беспокоит меня много.

Вы строите распределенный RDD с помощью sc.textFile только, чтобы сделать его местным (с водителем) по collect() затем, сделав раздали снова и доступны для исполнителей на sc.broadcast (!) Как вы можете видеть, что есть много отправка данных туда и обратно.

Вы бы лучше cache ИНГ данные IP, так что остается в памяти, но делать:

sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).cache 
+0

Что произойдет, если вы замените 'broadcast' на' cache' для всех данных ip-компании? Как это влияет на время? –

+0

Поскольку мне нужно выполнить поиск ip-компании, тип данных RDD [IpDB] не может с этим поделать. Вот почему мне нужно транслировать все данные, чтобы убедиться, что у всех исполнителей есть копия данных ip-компании. Просто убедитесь, что я получаю разницу между 'broadcast' и' cache'. Поэтому я не мог использовать «кеш» в программе, чтобы он работал. – lee

Смежные вопросы