5

Я готовлю игрушку spark.ml пример. Spark version 1.6.0, работает поверх Oracle JDK version 1.8.0_65, pyspark, ipython notebook.spark.ml StringIndexer бросает «Невидимая метка» на подгонку()

Во-первых, он почти не имеет никакого отношения к Spark, ML, StringIndexer: handling unseen labels. Исключение возникает при установке конвейера в набор данных, а не на его преобразование. И подавление исключения, возможно, не является решением здесь, так как, боюсь, набор данных в этом случае становится бесполезным.

Мой набор данных составляет около 800 Мб несжатого, поэтому его трудно воспроизвести (меньшие подмножества, похоже, уклоняются от этой проблемы).

Набор данных выглядит следующим образом:

+--------------------+-----------+-----+-------+-----+--------------------+ 
|     url|   ip| rs| lang|label|     txt| 
+--------------------+-----------+-----+-------+-----+--------------------+ 
|http://3d-detmold...|217.160.215|378.0|  de| 0.0|homwillkommskip c...| 
| http://3davto.ru/| 188.225.16|891.0|  id| 1.0|оформить заказ пе...| 
| http://404.szm.com/| 85.248.42| 58.0|  cs| 0.0|kliknite tu alebo...| 
| http://404.xls.hu/| 212.52.166|168.0|  hu| 0.0|honlapkészítés404...| 
|http://a--m--a--t...| 66.6.43|462.0|  en| 0.0|back top archiv r...| 
|http://a-wrf.ru/c...| 78.108.80|126.0|unknown| 1.0|     | 
|http://a-wrf.ru/s...| 78.108.80|214.0|  ru| 1.0|установк фаркопна...| 
+--------------------+-----------+-----+-------+-----+--------------------+ 

Значение пророчат является label. Весь трубопровод применяется к нему:

from pyspark.ml import Pipeline 
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Tokenizer, HashingTF 
from pyspark.ml.classification import LogisticRegression 

train, test = munge(src_dataframe).randomSplit([70., 30.], seed=12345) 

pipe_stages = [ 
    StringIndexer(inputCol='lang', outputCol='lang_idx'), 
    OneHotEncoder(inputCol='lang_idx', outputCol='lang_onehot'), 
    Tokenizer(inputCol='ip', outputCol='ip_tokens'), 
    HashingTF(numFeatures=2**10, inputCol='ip_tokens', outputCol='ip_vector'), 
    Tokenizer(inputCol='txt', outputCol='txt_tokens'), 
    HashingTF(numFeatures=2**18, inputCol='txt_tokens', outputCol='txt_vector'), 
    VectorAssembler(inputCols=['lang_onehot', 'ip_vector', 'txt_vector'], outputCol='features'), 
    LogisticRegression(labelCol='label', featuresCol='features') 
] 

pipe = Pipeline(stages=pipe_stages) 
pipemodel = pipe.fit(train) 

А вот StackTrace:

Py4JJavaError: An error occurred while calling o10793.fit. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 627.0 failed 1 times, most recent failure: Lost task 18.0 in stage 627.0 (TID 23259, localhost): org.apache.spark.SparkException: Unseen label: pl-PL. 
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157) 
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) 
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) 
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) 
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952) 
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007) 
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1136) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1113) 
    at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:271) 
    at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:159) 
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:90) 
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:71) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.SparkException: Unseen label: pl-PL. 
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157) 
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) 
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) 
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) 
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 

Самое интересное заключается в следующем:

org.apache.spark.SparkException: Unseen label: pl-PL. 

Понятия не имею, как pl-PL, которая является значением от lang столбец мог быть замешан в колонке label, который является float, а не string отредактированные: некоторые поспешные coclusions, скорректированные благодаря @ zero323

Я смотрел дальше в него и обнаружил, что pl-PL этого значения от испытательной части набора данных, а не обучения. Так что теперь я даже не знаю, где искать виновника: это может быть код randomSplit, а не StringIndexer, и кто знает, что еще.

Как это выяснить?

ответ

7

Unseen labelis a generic message which doesn't correspond to a specific column. Скорее всего, проблема заключается в следующем этапе:

StringIndexer(inputCol='lang', outputCol='lang_idx') 

с pl-PL присутствует в train("lang") и нет в test("lang").

Вы можете исправить это с помощью setHandleInvalid с skip:

from pyspark.ml.feature import StringIndexer 

train = sc.parallelize([(1, "foo"), (2, "bar")]).toDF(["k", "v"]) 
test = sc.parallelize([(3, "foo"), (4, "foobar")]).toDF(["k", "v"]) 

indexer = StringIndexer(inputCol="v", outputCol="vi") 
indexer.fit(train).transform(test).show() 

## Py4JJavaError: An error occurred while calling o112.showString. 
## : org.apache.spark.SparkException: Job aborted due to stage failure: 
## ... 
## org.apache.spark.SparkException: Unseen label: foobar. 

indexer.setHandleInvalid("skip").fit(train).transform(test).show() 

## +---+---+---+ 
## | k| v| vi| 
## +---+---+---+ 
## | 3|foo|1.0| 
## +---+---+---+ 
+0

Где бы вы разместили setHandleInvalid («skip») в конвейере? – mikeL

+0

@mikeL Где бы вы ни устанавливали 'StringIndexer'. Это 'Param' индексатора, а не' Pipeline'. – zero323

2

Хорошо, я думаю, что я получил это. По крайней мере, я получил эту работу.

Кэширование информационной рамы (в том числе поездов/испытаний) решает проблему. Вот что я нашел в этом выпуске JIRA: https://issues.apache.org/jira/browse/SPARK-12590.

Так что это не ошибка, а только тот факт, что randomSample может дать другой результат на одном и том же, но по-разному распределенном наборе данных. И, по-видимому, некоторые из моих функций munging (или Pipeline) связаны с перераспределением, поэтому результаты пересчета поезда от его определения могут расходиться.

Что меня интересует - это воспроизводимость: это всегда строка pl-PL, которая смешивается в неправильной части набора данных, то есть не является случайным перераспределением. Это детерминированный, просто непоследовательный. Интересно, как именно это происходит.

Смежные вопросы