2016-12-23 2 views
3

В нашем коде Dataframe был создан как:Преобразование Dataframe в РДУ уменьшает разделы

DataFrame DF = hiveContext.sql("select * from table_instance"); 

Когда я конвертировать мой dataframe в РДУ и попытаться получить его количество разделов, как

RDD<Row> newRDD = Df.rdd(); 
System.out.println(newRDD.getNumPartitions()); 

Это уменьшает количество разделов до 1 (1 печатается на консоли). Изначально у моего DataFrame было 102 раздела.

UPDATE:

Во время чтения я repartitoned в dataframe:

DataFrame DF = hiveContext.sql("select * from table_instance").repartition(200); 

, а затем преобразуется в РДД, поэтому он дал мне только 200 разделов. ли

JavaSparkContext

имеет определенную роль в этом? Когда мы преобразуем dataframe в rdd, по умолчанию минимальный флаг разделов также рассматривается на уровне искрового контекста?

UPDATE:

Я сделал отдельный пример программы, в которой я читал ту же самую таблицу в dataframe и преобразуется в РДУ. Никакой дополнительный этап не был создан для преобразования RDD, и количество разделов также было правильным. Теперь я задаюсь вопросом, что я делаю в своей основной программе.

Пожалуйста, дайте мне знать, если мое понимание здесь не так.

ответ

1

В основном это зависит от реализации hiveContext.sql(). Поскольку я новичок в Hive, моя догадка hiveContext.sql не знает ИЛИ не может разделить данные, присутствующие в таблице.

Например, когда вы читаете текстовый файл из HDFS, искровой контекст рассматривает количество блоков, используемых этим файлом для определения разделов.

То, что вы сделали с repartition является очевидным решением для такого рода проблем. (Примечание: Передел может привести к операции воспроизведения в случайном порядке, если надлежащее разметки не используются, хэш Разметка используется по умолчанию)

Приходя к вашему сомнению , hiveContext может рассмотреть свойство минимального значения по умолчанию. Но, полагаясь на свойство по умолчанию, не будет решить все ваши проблемы. Например, если размер вашей таблицы улей увеличивается, ваша программа по-прежнему использует количество разделов по умолчанию.

Update: Избегайте перетасовку во переделе

Определите пользовательские разметки:

public class MyPartitioner extends HashPartitioner { 
    private final int partitions; 
    public MyPartitioner(int partitions) { 
     super(); 
     this.partitions = partitions; 
    } 
    @Override 
    public int numPartitions() { 
     return this.partitions; 
    } 

    @Override 
    public int getPartition(Object key) { 
     if (key instanceof String) { 
      return super.getPartition(key); 
     } else if (key instanceof Integer) { 
      return (Integer.valueOf(key.toString()) % this.partitions); 
     } else if (key instanceof Long) { 
      return (int)(Long.valueOf(key.toString()) % this.partitions); 
     } 
     //TOD ... add more types 
    } 
} 

Используйте свои собственные разметки:

JavaPairRDD<Long, SparkDatoinDoc> pairRdd = hiveContext.sql("select * from table_instance") 
.mapToPair(//TODO ... expose the column as key) 

rdd = rdd.partitionBy(new MyPartitioner(200)); 
//... rest of processing 
+0

Спасибо за ответ @code. Я когда-нибудь застрял в этом вопросе. Разделы из hivecontext.sql() считываются как 102.Я запустил счетное действие на dataframe и узнал, что запущено 102 задачи и, следовательно, 102 раздела. Теперь, когда я делаю перераспределение, это вызывает много перетасовки. Я хочу сделать разбивку на основе некоторых столбцов, пожалуйста, предложите технику перераспределения, которая может выполнять минимальный перетасовка –

+0

Не могли бы вы объяснить, какой тип столбца и какой диапазон значений? – code

+0

В основном это определяется пользователем. Это может быть строка, long, bigint. Любой тип данных столбца может присутствовать. –

Смежные вопросы