2016-09-10 3 views
0

Я новичок в PySpark, ниже мой формат файла JSON от kafka.чтение json файла в pyspark

{ 
     "header": { 
     "platform":"atm", 
     "version":"2.0" 
     } 
     "details":[ 
     { 
     "abc":"3", 
     "def":"4" 
     }, 
     { 
     "abc":"5", 
     "def":"6" 
     }, 
     { 
     "abc":"7", 
     "def":"8" 
     }  
     ] 
    } 

как я могу прочитать значения всех "abc""def" в деталях и добавить это в новый список, как этот [(1,2),(3,4),(5,6),(7,8)]. Новый список будет использоваться для создания кадра искровых данных. как я могу это сделать в pyspark.I пробовал код ниже.

parsed = messages.map(lambda (k,v): json.loads(v)) 
list = [] 
summed = parsed.map(lambda detail:list.append((String(['mcc']), String(['mid']), String(['dsrc'])))) 
output = summed.collect() 
print output 

Он производит ошибку 'слишком много значений для распаковки'

ниже сообщение об ошибке при постановке summed.collect()

16/09/12 12:46:10 INFO устаревания: mapred .task.is.map устарел. Вместо этого используйте mapreduce.task.ismap 16/09/12 12:46:10 Утечка INFO: mapred.task.partition устарела. Вместо этого используйте mapreduce.task.partition 16.09.12 12:46:10 Утечка INFO: mapred.job.id устарел. Вместо этого используйте mapreduce.job.id 16/09/12 12:46:10 ОШИБКА Исполнитель: Исключение в задаче 1.0 в стадии 0.0 (TID 1) org.apache.spark.api.python.PythonException: Traceback (последнее call last): Файл «/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py», строка 111, в основном процесс() Файл «/ usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py ", строка 106, в процессе serializer.dump_stream (func (split_index, iterator), outfile) Файл«/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py ", строка 263, в dump_stream vs = list (itertools.islice (iterator, batch)) Файл" ", строка 1, в ValueError: слишком много значений для распаковки

+0

Каковы результаты: messages.take (3), разобранные = messages.map (lambda (k, v): json.loads (v)), parsed.take (3)? – Yaron

+0

в этом разобранном = message.map (lambda (k, v): json.loads (v)) statement Я получаю ошибку «слишком много значений для распаковки», потому что «подробности» содержит список json string – anusha

+0

, пожалуйста, укажите результат из "messages.take (3)", также сообщите об ошибке, которую вы получили (вы можете отредактировать свой вопрос и добавить эту информацию) – Yaron

ответ

1

Прежде всего, JSON является недействительным. После заголовка отсутствует ,.

Это, как говорится, давайте это JSON:

{"header":{"platform":"atm","version":"2.0"},"details":[{"abc":"3","def":"4"},{"abc":"5","def":"6"},{"abc":"7","def":"8"}]} 

Это может быть обработано:

>>> df = sqlContext.jsonFile('test.json') 
>>> df.first() 
Row(details=[Row(abc='3', def='4'), Row(abc='5', def='6'), Row(abc='7', def='8')], header=Row(platform='atm', version='2.0')) 

>>> df = df.flatMap(lambda row: row['details']) 
PythonRDD[38] at RDD at PythonRDD.scala:43 

>>> df.collect() 
[Row(abc='3', def='4'), Row(abc='5', def='6'), Row(abc='7', def='8')] 

>>> df.map(lambda entry: (int(entry['abc']),  int(entry['def']))).collect() 
[(3, 4), (5, 6), (7, 8)] 

Надеется, что это помогает!

+0

@ Fokko Driesprong Привет Fokko Driesprong: Я столкнулся с следующей ошибкой: @step: df.collect() - сбой при ошибке ниже: org.apache.spark.api.python.PythonException: Traceback (последний вызов last): vs = list (itertools.islice (iterator, batch)) Файл «», строка 1, в ТипError: строковые индексы должны быть целыми, а не str o/p из df.first() ниже: «Строка (подробности = [Row (abc = u'3 ', def = u'4'), Row (abc = u'5 ', def = u'6'), Row (abc = u'7 ', def = u '8')], header = Row (platform = u'atm ', version = u'2.0')) «Я заметил, что значения в« деталях »находятся в Юникоде. Должен ли я что-нибудь там делать? Еще раз спасибо! – anusha

+0

Каков ваш точный ввод? Содержимое моего файла: '{" header ": {" platform ":" atm "," version ":" 2.0 "}," details ": [{" abc ":" 3 "," def ": «4»}, {«abc»: «5», «def»: «6»}, {"abc": "7", "def": "8"}]} ' Только одна строка без символов перевода строки , –

+0

@ Fokko Driesprong да, используя один и тот же ввод, в одной строке без символов новой строки. – anusha

0

Согласно информации в комментариях, каждая строка в сообщениях РДД имеет одну строку из файла JSon

u'{', 
u' "header": {', 
u' "platform":"atm",' 

Ваш код неисправного в следующей строке:

parsed = messages.map(lambda (k,v): json.loads(v)) 

Ваш код принимает например: '{' и попытаться преобразовать его в ключ, значение и выполнить json.loads (значение)

ясно, что python/spark не сможет разделить один символ '{' на key- пара значений.

Команда json.loads() должна быть выполнена на полном JSon-данных объекта

Эта специфическая задача может быть выполнена легче с чистым питона