2016-08-30 4 views
2

У меня есть RDD с этой структуройScala: Карта и Flatmap на РДУ

 RDD[((String, String), List[(Int, Timestamp, String)])] 

и данные

((D2,Saad Arif),List((4,2011-10-05 00:00:00.0,C101), (5,2010-01-27 00:00:00.0,C101))) 
    ((D3,Faran Abid),List((7,2016-10-05 00:00:00.0,C101))) 
    ((D1,Atif Shahzad),List((1,2012-04-15 00:00:00.0,C101), (2,2011-10-05 00:00:00.0,C101), (3,2006-12-25 00:00:00.0,C101))) 

рассматривать это как таблицы означает

'(D2,Saad Arif)' 

, как ключ и

'List((4,2011-10-05 00:00:00.0,C101), (5,2010-01-27 00:00:00.0,C101)' 

как строки для этого ключа. Теперь я хочу, чтобы проверить для каждой строки, что если есть запись (история) с кодом «C101» до более или два года, то установить уровень 2 в противном случае 1. Таким образом, в результате РДД должен выглядеть следующим образом

((D2,Saad Arif),List((4,2011-10-05 00:00:00.0,C101, 1), (5,2010-01-27 00:00:00.0,C101, 1))) 
((D3,Faran Abid),List((7,2016-10-05 00:00:00.0,C101, 1))) 
((D1,Atif Shahzad),List((1,2012-04-15 00:00:00.0,C101, 2), (2,2011-10-05 00:00:00.0,C101, 2), (3,2006-12-25 00:00:00.0,C101, 1))) 

Обратите внимание на новый уровень после отметки времени. Как я могу это сделать с помощью карты или карты?

+0

Вы понимаете разницу между 'map' и' flatMap'? Это, очевидно, прецедент для «карты». –

+0

Также ... Пожалуйста, посмотрите на свои прошлые вопросы. И если кто-то правильно ответил на ваш вопрос, не забудьте оценить усилия этого человека, отметив его ответ как принятый. –

+0

@Sarvesh Kumar Singh да, у меня есть общее представление о карте и плоской карте, но я не знаю, как использовать в этом сценарии. –

ответ

1
import java.time.LocalDate 
import java.time.format.DateTimeFormatter 
import java.time.Period  


val df1 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S") 

val futureDate = LocalDate.parse("2100-01-01 00:00:00.0", df1) 

val yourRequiredRdd = yourRdd 
    .map({ 
    case (t, list) => { 
     val list1 = list.map({ 
     case (id, dateStr, id2) => (id, LocalDate.parse(dateStr, df1), id2) 
     }) 

     val oldestDate = list1 
     .filter({ case (id, date, id2) => id2.equals("C101") }) 
     .map(_._2) 
     .foldLeft(futureDate)((oldestDate, date) => { 
      val period = Period.between(oldestDate, date) 
      if (!period.isNegative()) oldestDate else date 
     }) 

     val newList = list1 
     .map({ 
      case (id, date, "C101") => { 
      val periodFromOldestDate = Period.between(oldestDate, date) 
      val extraNumber = if (periodFromOldestDate.getYears() >= 2) 2 else 1 
      (id, date, "C101", extraNumber) 
      } 
      case (id, date, id2) => { 
      (id, date, id2, 1) 
      } 
     }) 

     (t, newList) 
    } 
    }) 
    .flatMap({ 
    case ((pid, name), list) => list.map({ 
     case (id, date, code, level) => (id, name, code, pid, date, level) 
    }) 
    }) 
+0

спасибо. Можете ли вы кратко объяснить код, чтобы найти «oldestDate». –

+0

отфильтровывайте записи, отличные от 'C101', затем сопоставляйте, чтобы сохранить даты. Теперь у вас есть список дат. Теперь сверните в этот список, чтобы найти самую старую дату. –

+0

requiredRDD имеет эту структуру 'RDD [((String, String), List [(Int, LocalDate, String, Int)])]' как я сопоставляю его с 'RDD [(Int, String, String, String, LocalDate, Int)] путем использования map {case ((pid, name), (id, date, code, level)) => (id, name, code, pid, date, level)} –

Смежные вопросы