У меня есть rdd с около 50 миллионами диктов в Spark 2.0. Они довольно малы и занимают около 12 ГБ в памяти (на вкладке «Хранилище» в веб-интерфейсе Spark). Я прошел через все обработанные, которые я хочу на этом RDD, и теперь я хочу вытащить его из Spark, потому что теперь мне нужно кормить эти данные в другую систему.Получение данных из Spark - Python
Я никуда не могу с этим и нуждаюсь в некоторой помощи. В идеале, что я хочу сделать, это отправить каждый раздел в драйвер и выгрузить данные локально через другой модуль python. Это потребует минимального дополнительного кодирования.
Я надеялся, что что-то подобное будет работать:
for x in processed_data.toDF().toLocalIterator():
index.add(x)
Но никакой радости, я получил это удобный трассировки стека:
<ipython-input-20-b347e9bd2075> in <module>()
----> 1 for x in processed_data.toDF().toLocalIterator():
2 index.add(x)
/apps/spark2/python/pyspark/rdd.py in _load_from_socket(port, serializer)
140 try:
141 rf = sock.makefile("rb", 65536)
--> 142 for item in serializer.load_stream(rf):
143 yield item
144 finally:
/apps/spark2/python/pyspark/serializers.py in load_stream(self, stream)
137 while True:
138 try:
--> 139 yield self._read_with_length(stream)
140 except EOFError:
141 return
/apps/spark2/python/pyspark/serializers.py in _read_with_length(self, stream)
154
155 def _read_with_length(self, stream):
--> 156 length = read_int(stream)
157 if length == SpecialLengths.END_OF_DATA_SECTION:
158 raise EOFError
/apps/spark2/python/pyspark/serializers.py in read_int(stream)
541
542 def read_int(stream):
--> 543 length = stream.read(4)
544 if not length:
545 raise EOFError
/usr/lib/python3.4/socket.py in readinto(self, b)
372 while True:
373 try:
--> 374 return self._sock.recv_into(b)
375 except timeout:
376 self._timeout_occurred = True
timeout: timed out
Я проверил все файлы журналов, и я понятия не имею, что это могло быть. Я даже пытался переразделить rdd, поэтому у меня есть более мелкие разделы и до сих пор не повезло.
Поскольку мой водитель имеет около 40 Гб оперативной памяти выделяется, я тогда пытался собрать его, а затем я начал получать кучу из них:
ExecutorLostFailure (executor 3 exited caused by one of the running
tasks) Reason: Remote RPC client disassociated. Likely due to
containers exceeding thresholds, or network issues. Check driver logs
for WARN messages.
я проверил журналы и даже не видят каких-либо проблем. Единственное, что даже отдаленно закончил выполнение пишет ФР к HDFS:
processed_data.toDF().write.json()
Однако проблема заключается в том, что тогда я просто получить дамп данных без надлежащего синтаксиса JSON, как запятые после каждого объекта .. ..
Я что-то упустил? Это действительно разочаровывает, потому что я пробовал это с меньшим набором данных, и toLocalIterator работал отлично.
Заранее спасибо
Это скорее всего симптом, а не основная проблема. Также почему вы переходите в 'DataFrame' перед сбором. Это не имеет никакого смысла. – zero323
Я не думаю, что у rdd есть метод записи, его на кадре данных. Что такое хороший способ продолжить устранение некоторых из этих проблем? – browskie
Он предоставляет несколько способов записи, и он до сих пор не объясняет, почему вы конвертируете, когда вы не выполняете запись. Re O 'DataFrame.write.json' - предоставляет действительный документ JSON для каждой строки. Там не должно быть проблем с чтением. – zero323