Вот псевдокод:Почему используется увеличение времени трансляции?
case class IpDB(startIp: Long, endIp: Long, company: String)
def ipMap(line: Array[String]):
val Array(startIp, endIp, company) = line
IpDB(startIp.toLong, endIp.toLong, company)
// The ip-db is just 300M in raw format.
// format of ip-data from s3:
// 100000 200000 GOOGLE
val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value
val dataA = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
val dataB = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
// will do ip company lookup here
dataA.fullOuterJoin(dataB).map(doIpCompanyLookUp(ipData, _)).collect()
код просто получает данные из IP-источников входного сигнала, а затем найти его компании после объединения. вот мой вопрос:
Этот код будет работать в течение 2-3 минут, но при удалении широковещательных данных (просто подключитесь к двум данным) это будет стоить меньше 1 минуты. И когда я смотрю ui искры, я обнаружил, что проблема gc может быть проблемой.
Вот настройки для запуска этого задания:
spark-submit --master yarn --deploy-mode client --driver-memory 4g --num-executors 10 --executor-memory 8800m --executor-cores 4 --class ... XX.jar
Этой работа выполняется на AWS ОЙ искры кластер
spark version: 1.6.1
10 m3.xlarge.
- Как можно решить эту проблему (сократить время работы)?
- Какая память передает данные в искры?
- Почему время работы не меняется, когда я меняю память-исполнитель? я пытаюсь использовать 5 * m3.2xlarge и --executor-memory 16g, никаких существенных изменений в общем времени работы с широковещательными данными.
обновление:
case class IpDB(startIp: Long, endIp: Long, company: String)
def ipMap(line: Array[String]):
val Array(startIp, endIp, company) = line
IpDB(startIp.toLong, endIp.toLong, company)
// The ip-db is just 300M in raw format.
// format of ip-data from s3:
// 100000 200000 GOOGLE
val dataA = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
val dataB = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
// will do ip company lookup here
val joinResult = dataA.fullOuterJoin(dataB)
val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value
joinResult.map(doIpCompanyLookUp(ipData, _)).collect()
просто переместить генерацию и передачу данных IP-компании после того, как dataA.fullOuterJoin(dataB)
.The работает время уменьшить много.
обновление2. , так как производственный код довольно сложный, который отличается от псевдокода выше, после небольшого изменения порядка кода, программа работает быстрее, но я не уверен, что ключом к этой проблеме является позиция для запуска широковещательных данных.
Может быть, это немного наивно, но радиопередача начало отправки 300M по сети и закупоривает исполнители. Можете ли вы попытаться найти DAG? – Vale