2016-10-01 2 views
0

У меня есть rdd с около 50 миллионами диктов в Spark 2.0. Они довольно малы и занимают около 12 ГБ в памяти (на вкладке «Хранилище» в веб-интерфейсе Spark). Я прошел через все обработанные, которые я хочу на этом RDD, и теперь я хочу вытащить его из Spark, потому что теперь мне нужно кормить эти данные в другую систему.Получение данных из Spark - Python

Я никуда не могу с этим и нуждаюсь в некоторой помощи. В идеале, что я хочу сделать, это отправить каждый раздел в драйвер и выгрузить данные локально через другой модуль python. Это потребует минимального дополнительного кодирования.

Я надеялся, что что-то подобное будет работать:

for x in processed_data.toDF().toLocalIterator(): 
    index.add(x) 

Но никакой радости, я получил это удобный трассировки стека:

<ipython-input-20-b347e9bd2075> in <module>() 
----> 1 for x in processed_data.toDF().toLocalIterator(): 
     2  index.add(x) 

/apps/spark2/python/pyspark/rdd.py in _load_from_socket(port, serializer) 
    140  try: 
    141   rf = sock.makefile("rb", 65536) 
--> 142   for item in serializer.load_stream(rf): 
    143    yield item 
    144  finally: 

/apps/spark2/python/pyspark/serializers.py in load_stream(self, stream) 
    137   while True: 
    138    try: 
--> 139     yield self._read_with_length(stream) 
    140    except EOFError: 
    141     return 

/apps/spark2/python/pyspark/serializers.py in _read_with_length(self, stream) 
    154 
    155  def _read_with_length(self, stream): 
--> 156   length = read_int(stream) 
    157   if length == SpecialLengths.END_OF_DATA_SECTION: 
    158    raise EOFError 

/apps/spark2/python/pyspark/serializers.py in read_int(stream) 
    541 
    542 def read_int(stream): 
--> 543  length = stream.read(4) 
    544  if not length: 
    545   raise EOFError 

/usr/lib/python3.4/socket.py in readinto(self, b) 
    372   while True: 
    373    try: 
--> 374     return self._sock.recv_into(b) 
    375    except timeout: 
    376     self._timeout_occurred = True 

timeout: timed out 

Я проверил все файлы журналов, и я понятия не имею, что это могло быть. Я даже пытался переразделить rdd, поэтому у меня есть более мелкие разделы и до сих пор не повезло.

Поскольку мой водитель имеет около 40 Гб оперативной памяти выделяется, я тогда пытался собрать его, а затем я начал получать кучу из них:

ExecutorLostFailure (executor 3 exited caused by one of the running 
tasks) Reason: Remote RPC client disassociated. Likely due to 
containers exceeding thresholds, or network issues. Check driver logs 
for WARN messages. 

я проверил журналы и даже не видят каких-либо проблем. Единственное, что даже отдаленно закончил выполнение пишет ФР к HDFS:

processed_data.toDF().write.json() 

Однако проблема заключается в том, что тогда я просто получить дамп данных без надлежащего синтаксиса JSON, как запятые после каждого объекта .. ..

Я что-то упустил? Это действительно разочаровывает, потому что я пробовал это с меньшим набором данных, и toLocalIterator работал отлично.

Заранее спасибо

+0

Это скорее всего симптом, а не основная проблема. Также почему вы переходите в 'DataFrame' перед сбором. Это не имеет никакого смысла. – zero323

+0

Я не думаю, что у rdd есть метод записи, его на кадре данных. Что такое хороший способ продолжить устранение некоторых из этих проблем? – browskie

+0

Он предоставляет несколько способов записи, и он до сих пор не объясняет, почему вы конвертируете, когда вы не выполняете запись. Re O 'DataFrame.write.json' - предоставляет действительный документ JSON для каждой строки. Там не должно быть проблем с чтением. – zero323

ответ

0

Я понимаю, что это известная ошибка: https://issues.apache.org/jira/browse/SPARK-18281

Она должна быть исправлена ​​в версии 2.0.3 и 2.1.1 (ни один из них не будет отпущена еще и 2,1-видимому, еще есть ошибка).

В то же время, если память не является проблемой, замена toLocalIterator на collect должна работать.

Смежные вопросы