2016-03-03 2 views
1

[Мы тестируем драйвер, который может обеспечить отличный параллелизм при оптимизации. Трюк заключается в том, что он не распараллеливается (при доступе к DB2) внутри разделов Spark, поэтому требование заключается в том, что мы говорим о том, сколько параллельных потоков мы хотим, и мы бросаем запрос для каждого потока. Хотя я надеялся сделать это в цикле с массивом объектов DataFrame, я не мог понять, как написать scala с массивом объектов DataFrame. Для теста грубой силы я сделал:Scala/Spark массив данных кадров

val DF1 = sqlContext.read.format("jdbc"). ...yada yada 
    val DF2 = sqlContext.read.format("jdbc"). ...yada yada 
    val DF3 = sqlContext.read.format("jdbc"). ...yada yada 
    val DF4 = sqlContext.read.format("jdbc"). ...yada yada 

    val unionDF=(((DF1.unionAll(DF2)).unionAll(DF3)).unionAll(DF4)) 

И это отлично работало для распараллеливания на 4 раздела. Я бы предпочел сделать это в цикле, но тогда мне кажется, что мне нужно что-то вроде:

var myDF = new Array [DataFrame] (parallelBreakdown) ... и DataFrame не является типом. Любые мысли о том, как это сделать, с использованием метода грубой силы? Спасибо,

+2

_it не распараллеливает (при доступе к DB2) внутри Spark partition_ - почему бы просто не увеличить количество разделов? То, что вы хотите, - это просто вопрос стандартных операций с коллекциями Scala, но они выглядят как проблема XY. – zero323

+0

Цикл ... вы говорите о чем-то вроде 'Seq (DF1, DF2, DF3, DF4) .reduce (_.unionAll (_))'? –

+0

Прежде всего, спасибо за ответ. Причина, по которой мне нужно сделать это отдельно, это то, что я тестирую новый драйвер, который работает с его собственной формой распараллеливания. Но, в отличие от Spark, где я могу указать раздел, нижнюю границу, yada yada, мне нужно идентифицировать # .. и отправить отдельный запрос для каждого раздела. Моя мысль заключалась в том, что было бы неплохо определить массив объектов DataFrame, а затем просто запустить цикл через диапазон от 1 до NumPartitions. Тогда unionAll каждый элемент массива. Это сработало, когда я просто написал один и тот же объект DF в цикле, но он не распараллеливался. еще раз спасибо –

ответ

0

DataFrame действительно тип

import org.apache.spark.sql.DataFrame 

я был в состоянии определить функцию

def querier(dim_vals: Array[String]): = { 
    dim_vals.flatMap(dim_val => 
     sql(MY_QUERY)) 
    } 

который возвращает Array[DataFrame], и я был в состоянии использовать Robert Congiu «s ответ, чтобы создать единый dataframe , и позвоните по телефону .show().