2015-08-19 2 views
0

У меня есть текстовый файл, который содержит некоторые данные. Данные следующийValueError: не удалось преобразовать строку в float

join2_train = sc.textFile('join2_train.csv',4) 
join2_train.take(3) 

[u'21.9059,TA-00002,S-0066,7/7/2013,0,0,Yes,1,SP-0019,6.35,0.71,137,8,19.05,N,N,N,N,EF-008,EF-008,0,0,0', 
    u'12.3412,TA-00002,S-0066,7/7/2013,0,0,Yes,2,SP-0019,6.35,0.71,137,8,19.05,N,N,N,N,EF-008,EF-008,0,0,0', 
    u'6.60183,TA-00002,S-0066,7/7/2013,0,0,Yes,5,SP-0019,6.35,0.71,137,8,19.05,N,N,N,N,EF-008,EF-008,0,0,0'] 

Теперь я пытаюсь разобрать эту строку в функцию, которая расщепляет каждую из строк текста и преобразовать в LabeledPoint. Я также включил линию для преобразования строковых элементов плавать

функция заключается в следующем

from pyspark.mllib.regression import LabeledPoint 
import numpy as np 

def parsePoint(line): 


    """Converts a comma separated unicode string into a `LabeledPoint`. 

    Args: 
     line (unicode): Comma separated unicode string where the first element is the label and the 
      remaining elements are features. 

    Returns: 
     LabeledPoint: The line is converted into a `LabeledPoint`, which consists of a label and 
      features. 
    """ 
    values = line.split(',') 
    value1 = [map(float,i) for i in values] 
    return LabeledPoint(value1[0],value1[1:]) 

Теперь, когда я пытаюсь сделать некоторые действия на этом разобранной линию, и я получаю ValueError. Действие, которое я пытаюсь сделать это, как показано ниже

parse_train = join2_train.map(parsePoint) 

parse_train.take(5) 

сообщение об ошибке я получаю, как показано ниже

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-63-f53b10964381> in <module>() 
     1 parse_train = join2_train.map(parsePoint) 
     2 
----> 3 parse_train.take(5) 

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in take(self, num) 
    1222 
    1223    p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) 
-> 1224    res = self.context.runJob(self, takeUpToNumLeft, p, True) 
    1225 
    1226    items += res 

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 
    840   mappedRDD = rdd.mapPartitions(partitionFunc) 
    841   port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, 
--> 842           allowLocal) 
    843   return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 
    844 

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    536   answer = self.gateway_client.send_command(command) 
    537   return_value = get_return_value(answer, self.gateway_client, 
--> 538     self.target_id, self.name) 
    539 
    540   for temp_arg in temp_args: 

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    298     raise Py4JJavaError(
    299      'An error occurred while calling {0}{1}{2}.\n'. 
--> 300      format(target_id, '.', name), value) 
    301    else: 
    302     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 22.0 failed 1 times, most recent failure: Lost task 0.0 in stage 22.0 (TID 31, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/worker.py", line 101, in main 
    process() 
    File "/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/worker.py", line 96, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/serializers.py", line 236, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1220, in takeUpToNumLeft 
    yield next(iterator) 
    File "<ipython-input-62-0243c4dd1876>", line 18, in parsePoint 
ValueError: could not convert string to float: . 

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135) 
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
    at org.apache.spark.scheduler.Task.run(Task.scala:64) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
+1

Вещь после двоеточия в ошибке ('.') сообщает вам, что она пытается конвертировать что-то ('.'), который не является чем-то, что он распознает как число. Очистите свои данные или не передавайте плохие данные в эту функцию. – chicks

+0

@chicks Это не распознавание десятичной точки. Если вы удалите все данные и просто оставьте первый номер в приведенном выше примере: «21 .9059», все еще есть ошибка. Любая идея почему? – Nadine

ответ

0

Добавьте эту функцию, чтобы проверить, если строка может быть преобразован в поплавок:

def isfloat(string): 
    try: 
     float(string) 
     return True 
    except ValueError: 
     return False 

, а затем в parsePoint:

value1 = [map(float,i) for i in values if isfloat(i)] 

Изменяя флоят линии следующего

value1 = [float(i) for i in values] 

, а затем разбирает строку только с числовыми значениями, мы можем получить обратно правильное LabeledPoints. Однако реальная проблема заключается в том, чтобы сделать объекты LabeledPoint из строк, которые не могут быть преобразованы в float, например TA-00002, как в объекте join2_train

+0

Интересно, что это не работает над значением '21 .9059. ' По какой-то причине он не распознает десятичную точку. isfloat вернет True, но карта все равно вызовет ошибку. – Nadine

+0

Вы имеете в виду, когда * values ​​* = '21 .9059. ' или * i * = '21 .9059 '? – svfat

+0

Я имею в виду, когда файл csv join2_train.csv (как и в сообщении OP) содержит только одну строку с номером 21.9059. – Nadine