2015-07-10 3 views
3

У меня очень простая настройка SparkSQL для подключения к базе данных Postgres, и я пытаюсь получить DataFrame из таблицы DataFrame с числом X разделов (скажем 2). Код будет выглядеть следующим образом:SparkSQL PostgresQL разделов Dataframe

Map<String, String> options = new HashMap<String, String>(); 
options.put("url", DB_URL); 
options.put("driver", POSTGRES_DRIVER); 
options.put("dbtable", "select ID, OTHER from TABLE limit 1000"); 
options.put("partitionColumn", "ID"); 
options.put("lowerBound", "100"); 
options.put("upperBound", "500"); 
options.put("numPartitions","2"); 
DataFrame housingDataFrame = sqlContext.read().format("jdbc").options(options).load(); 

По какой-то причине один раздел DataFrame содержит почти все строки.

Для того, что я могу понять, lowerBound/upperBound - это параметры, используемые для этого. В документации SparkSQL (Spark 1.4.0 - spark-sql_2.11) говорится, что они используются для определения шага, а не для фильтрации/диапазона столбца раздела. Но это вызывает несколько вопросов:

  1. Штраем является частота (количество элементов, возвращаемых каждым запросом), с которыми Spark будет запрашивать БД для каждого исполнителя (раздела)?
  2. Если нет, какова цель этих параметров, от чего они зависят и как я могу стабильно сбалансировать разделы DataFrame (не спрашивая, что все разделы содержат одинаковое количество элементов, просто есть равновесие - например, 2 перегородки 100 элементов 55/45, 60/40 или даже 65/35)

Не может показаться, что я нашел ответ на эти вопросы, и мне было интересно, может быть, некоторые из вас могут очистить это указывает на меня, потому что прямо сейчас влияет на производительность моего кластера при обработке X миллионов строк, и весь тяжелый подъем переходит к одному исполнителю.

Приветствия и благодарности за ваше время.

ответ

2

Нижняя граница действительно используется против столбца разделения; обратитесь к этому кода (текущая версия на момент написания этого):

https://github.com/apache/spark/blob/40ed2af587cedadc6e5249031857a922b3b234ca/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala

Функция columnPartition содержит код для логического разделения и использование нижней/верхней границы.

5

По существу нижняя и верхняя граница и количество разделов используются для вычисления приращения или разделения для каждой параллельной задачи.

Скажем, таблица содержит раздел столбец «год», и есть данные с 2006 по 2016 год

Если определить количество разделов, как 10, с нижней границей 2006 и верхней границы 2016 года, вы будете иметь каждый задача выборки данных за собственный год - идеальный случай.

Даже если вы неправильно указали нижнюю и/или верхнюю границу, например. set lower = 0 и upper = 2016, будет некоторая перекос в передаче данных, но вы не потеряете или не сможете получить какие-либо данные, потому что:

Первая задача будет получать данные за год < 0.

Вторая задача будет получать данные за год между 0 и 2016/10.

Третья задача будет получать данные за год между 2016/10 и 2 * 2016/10.

...

И последняя задача будет иметь где состояние с года-> 2016 года.

Т.

0

LowerBound и UpperBound были в настоящее время определены, чтобы сделать то, что они делают в предыдущих ответах. Следующим шагом в этом будет то, как сбалансировать данные по разделам, не глядя на значения min max или если ваши данные сильно искажены.

Если ваша база данных поддерживает функцию «хэш», это может сделать трюк.

partitionColumn = "хэш (column_name)% num_partitions"

numPartitions = 10 // все, что вы хотите

LowerBound = 0

UpperBound = numPartitions

Это будет работать до тех пор, операция модуля возвращает равномерное распределение по [0, numPartitions)

Смежные вопросы