0

Я довольно новичок в Spark, и мне интересно, как сделать что-то очень простое с каскадной структурой.Пользовательская функция агрегатора до Spark 1.5

Давайте предположим, что у меня есть следующий набор данных:

<date> <cpt_id> <mesure_type> <value> 
20160603093021556 cpt1 idx1 11 
20160603093021556 cpt1 idx2 22 
20160603093021556 cpt1 idx3 33 
20160603093021556 cpt1 idx4 44 
20160603113021556 cpt2 idx1 09 
20160603113021556 cpt2 idx2 45 
20160603113021556 cpt2 idx3 66 
20160603193021556 cpt1 idx1 13 
20160603193021556 cpt1 idx2 25 
20160603193021556 cpt1 idx3 33 
20160603193021556 cpt1 idx4 44 

, и я хочу, чтобы агрегировать, что, чтобы получить следующие результаты (разновидность денормализация):

<date> <cpt_id> <idx1> <idx2> <idx3> <idx4> 
20160603093021556 cpt1 11 22 33 44 
20160603113021556 cpt2 09 45 66 null 
20160603193021556 cpt1 13 25 33 44 

С каскадного Я хотел бы использовать GroupBy (с датой и cpt-id в качестве групп группировки) и каждый буфер для генерации денормализованных кортежей.

С помощью Spark, похоже, потребуется функция пользовательского агрегатора, но она доступна только после того, как Spark 1.5 (и 1.3.1 доступен на моем кластере).

Я не вижу, как это сделать с помощью API 1.3.1.

Спасибо за вашу помощь и предложения

ответ

0

мне удалось сделать это с помощью следующего процесса:

1- Групповые линии с помощью составного ключа (дата, cpt_id) В результате я получаю JavaPairRDD > набор данные

2- Применить преобразование карты для этого набора данных, делая «денормализацию» в функции передается в качестве аргумента к карте

Вот мой код:

@Test 
public void testCustomAggregator2() { 

    DataFrame df = sqlContext.load("src/test/resources/index.json", "json").select("date_mesure", "compteur_id", "type_mesure", "value"); 

    JavaRDD<Row> rows = df.javaRDD(); 

    JavaPairRDD<IndexKey, Iterable<Row>> groupedIndex = rows.groupBy(new Function<Row, IndexKey>() { 
     @Override 
     public IndexKey call(Row row) throws Exception { 
      return new IndexKey(row.getString(0), row.getString(1)); 
     } 
    }); 

    JavaRDD<Row> computedRows = groupedIndex.map(new Function<Tuple2<IndexKey, Iterable<Row>>, Row>() { 

     @Override 
     public Row call(Tuple2<IndexKey, Iterable<Row>> indexKeyIterableTuple2) throws Exception { 

      Row result = null; 

      IndexKey key = indexKeyIterableTuple2._1; 

      Iterable<Row> rowsForKey = indexKeyIterableTuple2._2; 

      String idx1 = null; 

      String idx2 = null; 

      String idx3 = null; 

      for (Row rowForKey : rowsForKey) { 

       String typeMesure = rowForKey.getString(2); 

       String value = rowForKey.getString(3); 

       switch(typeMesure) { 

        case "idx1" : 
         idx1 = value; 
         break; 

        case "idx2" : 
         idx2 = value; 
         break; 

        case "idx3" : 
         idx3 = value; 
         break; 

        default : 
         break; 
       } 
      } 

      result = RowFactory.create(key.getDateMesure(), 
             key.getCompteurId(), 
             idx1, 
             idx2, 
             idx3); 

      return result; 
     } 
    }); 

    List<Row> resultRows = computedRows.collect(); 

    boolean found = false; 

    for (Row resultRow : resultRows) { 

     String dateMesure = resultRow.getString(0); 

     String compteurId = resultRow.getString(1); 

     if ("20160603093021556".equals(dateMesure) 
       && "cpt1".equals(compteurId)) { 

      found = true; 

      String idx1 = resultRow.getString(2); 
      String idx2 = resultRow.getString(3); 
      String idx3 = resultRow.getString(4); 

      Assert.assertEquals("11", idx1); 
      Assert.assertEquals("22", idx2); 
      Assert.assertEquals("33", idx3); 
     } 
    } 

    if (!found) { 

     Assert.fail("Ligne d'index non trouvée"); 
    } 
} 

Надеюсь, что это поможет, и если в коде произойдет что-то не так, сообщите мне.

Как я уже сказал, я совершенно новый для Искры и с нетерпением жду улучшения.