Я запускаю серию различных заданий с использованием Python/Spark по одному. Чтобы избежать создания SparkContex каждый раз, что занимает некоторое время, я хочу отправить контекст как параметр для каждого задания. Кроме того, я хочу, чтобы менеджер (код, который создает контекст и запускает задания) имеет механизм тайм-аута.Передача SparkContext в новый процесс (модуль многопроцессорности python)
У меня странная ошибка при первом запуске работы, после чего она исчезает.
Traceback (most recent call last):
File "/usr/lib/python3.4/multiprocessing/process.py", line 254, in _bootstrap
self.run()
File "/usr/lib/python3.4/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/home/dev/ComponentEngine/components/ExampleComponent/ExampleComponent.py", line 35, in run
numbers = sparkContext.parallelize([1,2,3,4,5,6,7,8,9,10])
File "/home/dev/Programs/spark-1.4.1-bin-hadoop2.6/python/pyspark/context.py", line 395, in parallelize
readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
File "/home/dev/Programs/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 772, in __getattr__
raise Py4JError('{0} does not exist in the JVM'.format(name))
py4j.protocol.Py4JError: PythonRDD does not exist in the JVM
Код:
#!/bin/python3
import multiprocessing
from pyspark import SparkContext
def doJob(sc):
try:
sc.parallelize([1,2,3,4,5,6,7,8,9,10])
except Exception as e:
print('Got excpetion {}'.format(e))
def runWithTimeout(sc):
p = multiprocessing.Process(target=doJob, name="runWithTimeout", args=(sc))
p.start()
# Wait till the timeout
p.join(10)
if p.is_alive():
p.terminate()
p.join()
if __name__ == '__main__':
sc = SparkContext()
for i in range(3):
runWithTimeout(sc)
Почему появляется эта ошибка?
Есть ли проблема передать SparkContext таким образом? Я знаю, что он сериализуется и используется процессом на другом конце. И если задание выполняет какое-либо изменение в контексте, копия движка не будет затронута.
Есть ли такие изменения, которые могут помешать выполнению других заданий?
Центральный сервер Spark звучит великолепно, но по-прежнему существует проблема с созданием механизма тайм-аута. Я нашел этот пост http://stackoverflow.com/questions/13682249/how-to-terminate-a-thread-in-python-without-loop-in-run-method, поэтому я пришел к выводу, что лучше использовать процесс нить. Я сделал несколько тестов. Если я открою файл/сокет в родительском процессе и передам его потомку, он все равно сможет его записать. И кроме ошибки в моем сообщении, следующий «doJob» отлично работает, используя контекст, переданный ему из родительского процесса. Итак, если сокеты в объекте контекста в порядке, не должно быть никаких проблем? – Rtik88