2015-04-08 3 views
3

У меня есть простой файл сценария я пытаюсь выполнить в свече-оболочки, которая имитирует учебник hereСпарк BlockManager работает на локальном хосте

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 

sc.stop(); 

val conf = new SparkConf().setAppName("MyApp").setMaster("mesos://zk://172.24.51.171:2181/mesos").set("spark.executor.uri", "hdfs://172.24.51.171:8020/spark-1.3.0-bin-hadoop2.4.tgz").set("spark.driver.host", "172.24.51.142") 

val sc2 = new SparkContext(conf) 

val file = sc2.textFile("hdfs://172.24.51.171:8020/input/pg4300.txt") 

val errors = file.filter(line => line.contains("ERROR")) 

errors.count() 

Мой NameNode и Mesos мастера находятся на 172.24.51.171, мой IP-адрес 172.24.51.142. У меня есть эти строки, сохраненный в файл, который я затем запустить с помощью команды:

/opt/spark-1.3.0-bin-hadoop2.4/bin/spark-shell -i WordCount.scala 

Мои удаленных исполнителей все умирающего с ошибками, похожими на следующее:

15/04/08 14:30:39 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks 
java.io.IOException: Failed to connect to localhost/127.0.0.1:48554 
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) 
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) 
    at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) 
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) 
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) 
    at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) 
    at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) 
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:594) 
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:592) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:592) 
    at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:586) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.org$apache$spark$broadcast$TorrentBroadcast$$anonfun$$getRemote$1(TorrentBroadcast.scala:126) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174) 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152) 
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) 
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) 
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) 
    at org.apache.spark.scheduler.Task.run(Task.scala:64) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.ConnectException: Connection refused: localhost/127.0.0.1:48554 
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) 
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) 
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) 
    ... 1 more 

Эта неудача происходит после того, как я бегу команда errors.count(). Ранее в моей оболочке, после того, как я создаю новые SparkContext я вижу строки:

15/04/08 14:31:18 INFO NettyBlockTransferService: Server created on 48554 
15/04/08 14:31:18 INFO BlockManagerMaster: Trying to register BlockManager 
15/04/08 14:31:18 INFO BlockManagerMasterActor: Registering block manager localhost:48554 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 48554) 
15/04/08 14:31:18 INFO BlockManagerMaster: Registered BlockManager 

Я думаю, Что происходит искра записывает адрес BlockManager как локальный хост: 48554, который затем послан всем исполнитель которые пытаются поговорить со своими локальными хостами: 48554, вместо ip-адреса водителя в порту 48554. Почему искра использует localhost как адрес BlockManager, а не spark.driver.host?

Дополнительная информация

  1. В Spark Config есть spark.blockManager.port но не spark.blockManager.host? Существует только spark.driver.host, который вы можете видеть, что я установлен в моем SparkConf.

  2. Возможно, связано с этим JIRA Ticket, хотя это похоже на проблему с сетью. Моя сеть настроена с DNS просто отлично.

ответ

2

Можете ли вы попробовать, указав адрес Spark Master, используя параметр -master при вызове искровой оболочки (или добавить в spark-defaults.conf). У меня была аналогичная проблема (см. Мой пост Spark Shell Listens on localhost instead of configured IP address), и похоже, что BlockManager прослушивает локальный хост, когда контекст динамически создается в оболочке.

Журналы:

  • Когда оригинальный контекст используется (прослушивает имя хоста) BlockManagerInfo: Добавлен broadcast_1_piece0 в памяти о ubuntu64server2: 33301

  • Когда создается новый контекст (прослушивает локальный хост) BlockManagerInfo: Добавлено broadcast_1_piece0 в памяти на localhost: 40235

Мне пришлось подключиться к кластеру Cassandra и смог запросить его, предоставив spark.cassandra.connection.host в spark-defaults.conf и импортируя пакеты com.datastax.spark.connector._ в искровую оболочку.

+0

«BlockManager слушает на локальном хосте, когда контекст динамически создается в оболочке», это тоже мой вывод, но, похоже, противоречит интуитивному поведению. Установка spark.master в spark-defaults.conf исправила это для меня – JHowIX

0

Попробуйте установить SPARK_LOCAL_IP (в командной строке) или spark.local.ip через sparkConf объекта.

+0

Hi maasg - не действует, в соответствии с конфигурационными страницами, spark.local.ip не является частью конфигурации в Spark 1.2.1 или Spark 1.3.0.См. Мой вопрос по ссылке – JHowIX

Смежные вопросы