Я новичок в PySpark, ниже мой формат файла JSON от kafka.чтение json файла в pyspark
{
"header": {
"platform":"atm",
"version":"2.0"
}
"details":[
{
"abc":"3",
"def":"4"
},
{
"abc":"5",
"def":"6"
},
{
"abc":"7",
"def":"8"
}
]
}
как я могу прочитать значения всех "abc"
"def"
в деталях и добавить это в новый список, как этот [(1,2),(3,4),(5,6),(7,8)]
. Новый список будет использоваться для создания кадра искровых данных. как я могу это сделать в pyspark.I пробовал код ниже.
parsed = messages.map(lambda (k,v): json.loads(v))
list = []
summed = parsed.map(lambda detail:list.append((String(['mcc']), String(['mid']), String(['dsrc']))))
output = summed.collect()
print output
Он производит ошибку 'слишком много значений для распаковки'
ниже сообщение об ошибке при постановке summed.collect()
16/09/12 12:46:10 INFO устаревания: mapred .task.is.map устарел. Вместо этого используйте mapreduce.task.ismap 16/09/12 12:46:10 Утечка INFO: mapred.task.partition устарела. Вместо этого используйте mapreduce.task.partition 16.09.12 12:46:10 Утечка INFO: mapred.job.id устарел. Вместо этого используйте mapreduce.job.id 16/09/12 12:46:10 ОШИБКА Исполнитель: Исключение в задаче 1.0 в стадии 0.0 (TID 1) org.apache.spark.api.python.PythonException: Traceback (последнее call last): Файл «/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py», строка 111, в основном процесс() Файл «/ usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py ", строка 106, в процессе serializer.dump_stream (func (split_index, iterator), outfile) Файл«/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py ", строка 263, в dump_stream vs = list (itertools.islice (iterator, batch)) Файл" ", строка 1, в ValueError: слишком много значений для распаковки
Каковы результаты: messages.take (3), разобранные = messages.map (lambda (k, v): json.loads (v)), parsed.take (3)? – Yaron
в этом разобранном = message.map (lambda (k, v): json.loads (v)) statement Я получаю ошибку «слишком много значений для распаковки», потому что «подробности» содержит список json string – anusha
, пожалуйста, укажите результат из "messages.take (3)", также сообщите об ошибке, которую вы получили (вы можете отредактировать свой вопрос и добавить эту информацию) – Yaron