2016-06-22 4 views
3

Недавно я хотел использовать Cython с Spark, для которого я следовал the following reference.Spark with Cython

я написал следующие программы, как упоминалось, но я получаю:

TypeError: 
fib_mapper_cython() takes exactly 1 argument (0 given) 

spark-tools.py

def spark_cython(module, method): 
    def wrapped(*args, **kwargs): 
     global cython_function_ 
     try: 
      return cython_function_(*args, **kwargs) 
     except: 
      import pyximport 
      pyximport.install() 
      cython_function_ = getattr(__import__(module), method) 
     return cython_function_(*args, **kwargs) 
    return wrapped() 

fib.pyx
def fib_mapper_cython(n): 
    ''' 
    Return the first fibonnaci number > n. 
    ''' 
    cdef int a = 0 
    cdef int b = 0 
    cdef int j = int(n) 
    while b<j: 
     a, b = b, a+b 
    return b, 1 

main.py
from spark_tools import spark_cython 
import pyximport 
import os 
from pyspark import SparkContext 
from pyspark import SparkConf 
pyximport.install() 


os.environ["SPARK_HOME"] = "/home/spark-1.6.0" 
conf = (SparkConf().setMaster('local').setAppName('Fibo')) 

sc = SparkContext() 
sc.addPyFile('file:///home/Cythonize/fib.pyx') 
sc.addPyFile('file:///home/Cythonize/spark_tools.py') 
lines = sc.textFile('file:///home/Cythonize/nums.txt') 

mapper = spark_cython('fib', 'fib_mapper_cython') 
fib_frequency = lines.map(mapper).reduceByKey(lambda a, b: a+b).collect() 
print fib_frequency 

Я получаю TypeError всякий раз, когда запускаю программу. Есть идеи?

+0

Начальные значения fib_mapper_cython будут циклически выполняться бесконечно. Изменение b = 1 должно исправить проблему – MrChristine

ответ

4

Это не Cython или вопрос PySpark, вы, к сожалению, добавили дополнительный вызов функции во время определения spark_cython. В частности, функция, которая завершает вызов к cython_function вызывается без аргументов по возвращении:

return wrapped() # call made, no args supplied. 

В результате вы не возвращает обернутую функцию при выполнении этого вызова. Что вы делаете, звоните wrapped без *args или **kwargs. wrapped затем вызывает fib_mapper_cython без аргументов (с *args, **kwargs не прилагаются), следовательно TypeError.

Вы должны вместо этого:

return wrapped 

и этот вопрос больше не должен присутствовать.

+1

Спасибо за помощь – StarLord