7

У меня искра 2.0 dataframe example со следующей структурой:Spark DataFrame: делает groupBy после заказа. Поддерживайте этот заказ?

id, hour, count 
id1, 0, 12 
id1, 1, 55 
.. 
id1, 23, 44 
id2, 0, 12 
id2, 1, 89 
.. 
id2, 23, 34 
etc. 

Он содержит 24 записей для каждого идентификатора (по одному на каждый час дня) и упорядочен идентификатором, час с помощью функции OrderBy.

Я создал агрегатор groupConcat:

def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable { 
    override def zero: String = "" 

    override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat) 

    override def merge(b1: String, b2: String) = b1 + b2 

    override def finish(b: String) = b.substring(1) 

    override def bufferEncoder: Encoder[String] = Encoders.STRING 

    override def outputEncoder: Encoder[String] = Encoders.STRING 
    }.toColumn 

Это помогает мне конкатенации столбцов в строки, чтобы получить эту последнюю dataframe:

id, hourly_count 
id1, 12:55:..:44 
id2, 12:89:..:34 
etc. 

Мой вопрос, если я example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count"), делает эту гарантию что почасовые подсчеты будут правильно упорядочены в соответствующих ведрах?

Я читал, что это не обязательно для RDD (см. Spark sort by key and then group by to get ordered iterable?), но, возможно, это отличается от DataFrames?

Если нет, то как я могу обойти это?

ответ

3

Короткий ответ: Да, часовой подсчет будет поддерживать тот же порядок.

Чтобы обобщить, важно, чтобы вы сортировали, прежде чем группировать. Также сортировка должна быть такой же, как и группа + столбец, для которого вы действительно хотите сортировку.

Примером может быть, как:

employees 
    .sort("company_id", "department_id", "employee_role") 
    .groupBy("company_id", "department_id") 
    .agg(Aggregators.groupConcat(":", 2) as "count_per_role") 
+1

Есть ли у вас какие-либо ссылки о том, что groupBy поддерживает порядок? Я не мог найти ничего в официальных документах –

+0

У меня нет официальных документов, но у меня есть эта статья, которая объясняет немного лучше механизм https://bzhangusc.wordpress.com/2015/05/28/groupby-on -dataframe-is-not-the-groupby-on-rdd /. Замечания также интересны. – Interfector

+1

Интересно, что даже сам Шон Оуэн утверждает, что заказ не может быть сохранен (https://issues.apache.org/jira/browse/SPARK-16207?focusedCommentId=15356725&page=com.atlassian.jira.plugin.system.issuetabpanels% 3Acomment-tabpanel # comment-15356725) –

1

У меня есть случай, когда порядок не всегда хранятся: иногда да, в основном нет.

Мой dataframe имеет 200 разделов, работающих на Спарк 1,6

df_group_sort = data.orderBy(times).groupBy(group_key).agg(
                F.sort_array(F.collect_list(times)), 
                F.collect_list(times) 
                  ) 

проверить порядок сравниваю возвращаемые значения

F.sort_array(F.collect_list(times)) 

и

F.collect_list(times) 

давая, например, (Слева: sort_array (collect_list()); справа: collect_list())

2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000 
2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000 
2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000 
2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000 
2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000 
2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000 
2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000 
2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000 
2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000 
2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000 
2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000 
2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000 
2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000 
2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000 

В левой колонке всегда сортируются, в то время как правый столбец состоит только из отсортированных блоков. Для разных исполнений take() порядок блоков в правом столбце отличается.

+0

В принятом ответе указано, что вам нужно сортировать как по столбцу, который вы хотите отсортировать, так и по столбцам, которые вы группируете, т.е. 'orderBy (times, group_key) .groupBy (group_key)'. Вы попробовали это? – Shaido

0

порядок может быть или не быть одинаковым, в зависимости от количества разделов и распределения данных. Мы можем решить, используя rdd.

Например ::

Я сохранил следующие выборочные данные в файл и загрузить его в HDFS.

1,type1,300 
2,type1,100 
3,type2,400 
4,type2,500 
5,type1,400 
6,type3,560 
7,type2,200 
8,type3,800 

и выполнил следующую команду:

sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3).groupBy(_(1)).mapValues(x=>x.toList.sortBy(_(2)).map(_(0)).mkString("~")).collect() 

выход:

Array[(String, String)] = Array((type3,6~8), (type1,2~1~5), (type2,7~3~4)) 

То есть, мы сгруппировали данные по типу, затем отсортированы по цене, и сцепляются идентификаторы с «~» в качестве разделителя. Приведенная выше команда может быть разбита, как показано ниже:

val validData=sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3) 

val groupedData=validData.groupBy(_(1)) //group data rdds 

val sortedJoinedData=groupedData.mapValues(x=>{ 
    val list=x.toList 
    val sortedList=list.sortBy(_(2)) 
    val idOnlyList=sortedList.map(_(0)) 
    idOnlyList.mkString("~") 
} 
) 
sortedJoinedData.collect() 

мы можем взять определенную группу с помощью команды

sortedJoinedData.filter(_._1=="type1").collect() 

выход:

Array[(String, String)] = Array((type1,2~1~5)) 
Смежные вопросы