2015-10-06 3 views
4

У меня есть приложение в SparkSQL, которое возвращает большое количество строк, которые очень трудно поместиться в память, поэтому я не смогу использовать функцию сбора в DataFrame, есть ли с помощью которого я могу получить все эти строки как Iterable, установленный для всех строк в виде списка.Как получить Итератор строк с использованием Dataframe в SparkSQL

Примечание: Я уверен, выполнение этой SparkSQL приложение с использованием пряжи-клиент

ответ

5

Вообще передача всех данных водитель выглядит довольно плохая идея и большую часть времени есть лучшее решение там, но если вы на самом деле хочу пойти с этим вы можете использовать метод toLocalIterator на РДУ:

val df: org.apache.spark.sql.DataFrame = ??? 
df.cache // Optional, to avoid repeated computation, see docs for details 
val iter: Iterator[org.apache.spark.sql.Row] = df.rdd.toLocalIterator 
+0

Немного поздно, но не могли бы вы подробнее рассказать о нескольких лучших решениях? – irregular

+0

@irregular Это зависит от конкретного приложения, но большую часть времени «mapPartitions», 'foreachPartition' или аналогичный метод более чем достаточно. У вас есть какой-то конкретный случай использования? – zero323

+0

Я также смотрю на набор данных, который слишком велик для памяти. К сожалению, я столкнулся с этой ошибкой https://issues.apache.org/jira/browse/SPARK-10189 при использовании toLocalIterator. Таким образом, я изучал добавление ROW_NUMBER для запроса через db https://paste.pound-python.org/show/sk4bPE5P9QsKhmcYlsK0/. Я не совсем уверен, как настроить разделы, так вот как я иду это atm – irregular

0

на самом деле вы можете просто использовать: df.toLocalIterator, вот ссылка на источник Спарк код:

/** 
* Return an iterator that contains all of [[Row]]s in this Dataset. 
* 
* The iterator will consume as much memory as the largest partition in this Dataset. 
* 
* Note: this results in multiple Spark jobs, and if the input Dataset is the result 
* of a wide transformation (e.g. join with different partitioners), to avoid 
* recomputing the input Dataset should be cached first. 
* 
* @group action 
* @since 2.0.0 
*/ 
def toLocalIterator(): java.util.Iterator[T] = withCallback("toLocalIterator", toDF()) { _ => 
withNewExecutionId { 
    queryExecution.executedPlan.executeToIterator().map(boundEnc.fromRow).asJava 
    } 
} 
+0

Спасибо @Kehe CAI за ваш ответ, но этот вопрос довольно старый, и я попросил об этом конкретном искрах 1.2 –

Смежные вопросы