1

Я запускаю серию различных заданий с использованием Python/Spark по одному. Чтобы избежать создания SparkContex каждый раз, что занимает некоторое время, я хочу отправить контекст как параметр для каждого задания. Кроме того, я хочу, чтобы менеджер (код, который создает контекст и запускает задания) имеет механизм тайм-аута.Передача SparkContext в новый процесс (модуль многопроцессорности python)

У меня странная ошибка при первом запуске работы, после чего она исчезает.

Traceback (most recent call last): 
File "/usr/lib/python3.4/multiprocessing/process.py", line 254, in _bootstrap 
    self.run() 
    File "/usr/lib/python3.4/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "/home/dev/ComponentEngine/components/ExampleComponent/ExampleComponent.py", line 35, in run 
    numbers = sparkContext.parallelize([1,2,3,4,5,6,7,8,9,10]) 
    File "/home/dev/Programs/spark-1.4.1-bin-hadoop2.6/python/pyspark/context.py", line 395, in parallelize 
    readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile 
    File "/home/dev/Programs/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 772, in __getattr__ 
    raise Py4JError('{0} does not exist in the JVM'.format(name)) 
py4j.protocol.Py4JError: PythonRDD does not exist in the JVM 

Код:

#!/bin/python3 
import multiprocessing 
from pyspark import SparkContext 

def doJob(sc): 
    try: 
     sc.parallelize([1,2,3,4,5,6,7,8,9,10]) 
    except Exception as e: 
     print('Got excpetion {}'.format(e)) 

def runWithTimeout(sc): 
    p = multiprocessing.Process(target=doJob, name="runWithTimeout", args=(sc)) 
    p.start() 

    # Wait till the timeout 
    p.join(10) 

    if p.is_alive(): 
     p.terminate() 
     p.join() 


if __name__ == '__main__': 
    sc = SparkContext() 
    for i in range(3): 
     runWithTimeout(sc) 

Почему появляется эта ошибка?
Есть ли проблема передать SparkContext таким образом? Я знаю, что он сериализуется и используется процессом на другом конце. И если задание выполняет какое-либо изменение в контексте, копия движка не будет затронута.
Есть ли такие изменения, которые могут помешать выполнению других заданий?

ответ

1

Сериализация контекста Spark не работает на любом из поддерживаемых языков. В общем, что-то сделано в соответствии с целями IBM Spark Kernel или сервером рабочих мест ooyola, где один процесс * содержит контекст Spark, и несколько клиентов разговаривают с сервером. В python Spark Context содержит сетевые сокеты, которые используются для связи с JVM SparkContext, а сетевые сокеты - не сериализуемые объекты. Глядя на py4j (библиотека Spark использует для связи между python и JVM) многопоточность может работать, поскольку они могут совместно использовать сокеты, но несколько процессов не так много.

+0

Центральный сервер Spark звучит великолепно, но по-прежнему существует проблема с созданием механизма тайм-аута. Я нашел этот пост http://stackoverflow.com/questions/13682249/how-to-terminate-a-thread-in-python-without-loop-in-run-method, поэтому я пришел к выводу, что лучше использовать процесс нить. Я сделал несколько тестов. Если я открою файл/сокет в родительском процессе и передам его потомку, он все равно сможет его записать. И кроме ошибки в моем сообщении, следующий «doJob» отлично работает, используя контекст, переданный ему из родительского процесса. Итак, если сокеты в объекте контекста в порядке, не должно быть никаких проблем? – Rtik88

Смежные вопросы