2015-08-14 7 views
0

Ниже кода вычисляет расстояние метрики между двумя пользователями, как указано в прецедентном классе:Правильное ли использование .par в Scala?

case class User(name: String, features: Vector[Double]) 

    val ul = for (a <- 1 to 100) yield (User("a", Vector(1, 2, 4))) 
    var count = 0; 

    def distance(userA: User, userB: User) = { 
    val subElements = (userA.features zip userB.features) map { 
     m => (m._1 - m._2) * (m._1 - m._2) 
    } 
    val summed = subElements.sum 
    val sqRoot = Math.sqrt(summed) 
    count += 1; 
    println("count is " + count) 

    ((userA.name, userB.name), sqRoot) 
    } 

    val distances = ul.par.map(m => ul.map(m2 => { 
    (distance(m, m2)) 
    })).toList.flatten 

    val sortedDistances = distances.groupBy(_._1._1).map(m => (m._1, m._2.sortBy(s => s._2))) 

    println(sortedDistances.get("a").get.size); 

Это выполняет декартово произведение сравнения 100 пользователей: 10000 сравнений. Я рассчитываю на каждое сравнение, представленное bu var count Часто значение счета будет меньше 10000, но количество элементов, переименованных заново, всегда равно 10000. Является ли причиной этого, что, поскольку par генерирует несколько потоков, некоторые из них будут закончены до Выполняется инструкция println. Однако все закончится в блоке пар кода - до вычисления distances.groupBy(_._1._1).map(m => (m._1, m._2.sortBy(s => s._2))).

ответ

0

В вашем примере у вас есть одна несинхронизированная переменная, которую вы мутируете из нескольких потоков, как вы сказали. Это означает, что каждый поток в любой момент может иметь устаревшую копию count, поэтому, когда они увеличивают ее, они будут выдавливать любые другие записи, которые произошли, в результате чего число меньше, чем должно быть.

Вы можете решить эту проблему с помощью функции synchronized,

... 
    val subElements = (userA.features zip userB.features) map { 
     m => (m._1 - m._2) * (m._1 - m._2) 
    } 
    val summed = subElements.sum 
    val sqRoot = Math.sqrt(summed) 
    this.synchronized { 
     count += 1;  
    }  

    println("count is " + count) 

    ((userA.name, userB.name), sqRoot) 
... 

Использование «this.synchronized» будет использовать объект, содержащий в качестве объекта блокировки. Для получения дополнительной информации о синхронизации Scala я предлагаю прочитать Twitter's Scala School.

+0

спасибо, поэтому блок кода .par является потокобезопасным? –

+0

Да, .par сам по себе является потокобезопасным, но мутируя состояние вне функции, которую вы передаете .par никогда не является потоковым, если вы явно не охраняете его. – Chris

+0

В качестве альтернативы можно использовать javas 'AtomicInteger' вместо' symchronized'. –

Смежные вопросы