2015-06-17 2 views
3

Я развиваю шахматный движок с помощью Scala и Apache Spark (и мне нужно подчеркнуть, что мое рассудочное отношение не является предметом этого вопроса). Моя проблема заключается в том, что алгоритм Negamax является рекурсивным по своей сути, и когда я пытаюсь наивный подход:Возможны ли рекурсивные вычисления с Apache Spark?

class NegaMaxSparc(@transient val sc: SparkContext) extends Serializable { 
    val movesOrdering = new Ordering[Tuple2[Move, Double]]() { 
    override def compare(x: (Move, Double), y: (Move, Double)): Int = 
     Ordering[Double].compare(x._2, y._2) 
    } 

    def negaMaxSparkHelper(game: Game, color: PieceColor, depth: Int, previousMovesPar: RDD[Move]): (Move, Double) = { 
    val board = game.board 

    if (depth == 0) { 
     (null, NegaMax.evaluateDefault(game, color)) 
    } else { 
     val moves = board.possibleMovesForColor(color) 
     val movesPar = previousMovesPar.context.parallelize(moves) 

     val moveMappingFunc = (m: Move) => { negaMaxSparkHelper(new Game(board.boardByMakingMove(m), color.oppositeColor, null), color.oppositeColor, depth - 1, movesPar) } 
     val movesWithScorePar = movesPar.map(moveMappingFunc) 
     val move = movesWithScorePar.min()(movesOrdering) 

     (move._1, -move._2) 
    } 
    } 

    def negaMaxSpark(game: Game, color: PieceColor, depth: Int): (Move, Double) = { 
    if (depth == 0) { 
     (null, NegaMax.evaluateDefault(game, color)) 
    } else { 
     val movesPar = sc.parallelize(new Array[Move](0)) 

     negaMaxSparkHelper(game, color, depth, movesPar) 
    } 
    } 
} 

class NegaMaxSparkBot(val maxDepth: Int, sc: SparkContext) extends Bot { 
    def nextMove(game: Game): Move = { 
    val nms = new NegaMaxSparc(sc) 
    nms.negaMaxSpark(game, game.colorToMove, maxDepth)._1 
    } 
} 

я получаю:

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. 

Возникает вопрос: может этот алгоритм будет реализован рекурсивно с использованием Спарк? Если нет, то каков правильный Spark-способ решения этой проблемы?

ответ

2

Это ограничение, которое имеет смысл с точки зрения реализации, но это может быть болью для работы.

Вы можете попробовать вытащить рекурсию на верхний уровень, как раз в код «драйвера», который создает и работает с RDD? Что-то вроде:

def step(rdd: Rdd[Move], limit: Int) = 
    if(0 == limit) rdd 
    else { 
    val newRdd = rdd.flatMap(...) 
    step(newRdd, limit - 1) 
    } 

Поочередно всегда можно перевести рекурсию в итерацию, путем управления «стек» явно вручную (хотя это может привести к более громоздкого кода).

2

Только драйвер может запускать вычисления на RDD. Причина в том, что даже если RDD «чувствует», как регулярные коллекции данных, за сценой они все еще распределены коллекциями, поэтому запуск на них требует координации выполнения задач на всех удаленных ведомых, которые искры скрываются от нас большую часть времени.

Таким образом, возврат из подчиненных устройств, т. Е. Запуск новых распределенных задач динамически непосредственно из ведомых, невозможен: только диск может позаботиться о такой координации.

Вот возможная альтернатива упрощению вашей проблемы (если я правильно понял). Идея состоит в том, чтобы последовательно создавать экземпляры Moves, каждый из которых представляет полную последовательность Move из исходного состояния.

Каждый экземпляр Moves может превратиться в набор Moves, каждый из которых соответствует одной и той же последовательности Move плюс один возможный следующий Move.

Оттуда водитель должен просто плотно свернуть Moves так глубоко, как мы хотим, и в результате RDD [Moves] выполнит все операции параллельно для нас.

Недостатком такого подхода является то, что все уровень глубины сохраняются синхронизированы, то есть мы должны вычислить все ходы на уровне n (т.е. RDD[Moves] на уровне n) перед переходом к следующему.

Код ниже не проверен, он, вероятно, имеет недостатки и даже не компилируется, но, надеюсь, он дает представление о том, как подойти к проблеме.

/* one modification to the board */ 
case class Move(from: String, to: String) 

case class PieceColor(color: String) 

/* state of the game */ 
case class Board { 

    // TODO 
    def possibleMovesForColor(color: PieceColor): Seq[Move] = 
     Move("here", "there") :: Move("there", "over there") :: Move("there", "here") :: Nil 

    // TODO: compute a new instance of board here, based on current + this move 
    def update(move: Move): Board = new Board 
} 


/** Solution, i.e. a sequence of moves*/ 
case class Moves(moves: Seq[Move], game: Board, color: PieceColor) {  
    lazy val score = NegaMax.evaluateDefault(game, color) 

    /** @return all valid next Moves */ 
    def nextPossibleMoves: Seq[Moves] = 
     board.possibleMovesForColor(color).map { 
      nextMove => 
       play.copy(moves = nextMove :: play.moves, 
         game = play.game.update(nextMove) 
     } 

} 

/** Driver code: negaMax: looks for the best next move from a give game state */ 
def negaMax(sc: SparkContext, game: Board, color: PieceColor, maxDepth: Int):Moves = { 

    val initialSolution = Moves(Seq[moves].empty, game, color) 

    val allPlays: rdd[Moves] = 
     (1 to maxDepth).foldLeft (sc.parallelize(Seq(initialSolution))) { 
     rdd => rdd.flatMap(_.nextPossibleMoves) 
    } 

    allPlays.reduce { case (m1, m2) => if (m1.score < m2.score) m1 else m2} 

} 
Смежные вопросы