2015-08-03 5 views
3

Я хотел бы получить быстрое приближенное заданное членство, основанное на строковой функции, примененной к большому Spark RDD String Vectors (~ 1B records). В принципе идея заключалась бы в том, чтобы свести к нулю Bloom filter. Этот цветной фильтр затем может быть передан работникам для дальнейшего использования.Уменьшение с помощью фильтра цветения

Более конкретно, я в настоящее время

rdd: RDD[Vector[String]] 
f: Vector[String] => String 
val uniqueVals = rdd.map(f).distinct().collect() 
val uv = sc.broadcast(uniqueVals) 

Но uniqueVals слишком велико, чтобы быть практичными, и я хотел бы заменить его чем-то меньшим (и известный) размера, т.е. в цвете фильтра.

Мои вопросы:

  • является возможным уменьшить в фильтр Блума, или я должен собрать, а затем построить его в драйвере?

  • Есть ли зрелая реализация фильтра Scala/Java Bloom, которая подходит для этого?

ответ

9

Да, фильтры Bloom могут быть уменьшены, потому что у них есть хорошие свойства (это monoids). Это означает, что вы можете выполнять все операции агрегации параллельно, эффективно используя только один проход над данными, чтобы построить BloomFilter для каждого раздела, а затем объединить эти BloomFilters, чтобы получить один BloomFilter, который вы можете запросить для contains.

Существует не менее двух реализаций BloomFilter в Scala, и оба они кажутся зрелыми проектами (фактически не использовали их в производстве). Первый - Breeze, а второй - Twitter's Algebird. Оба варианта содержат реализации разных эскизов и многое другое.

Это пример того, как сделать это с Breeze:

import breeze.util.BloomFilter 

val nums = List(1 to 20: _*).map(_.toString) 
val rdd = sc.parallelize(nums, 5) 

val bf = rdd.mapPartitions { iter => 
    val bf = BloomFilter.optimallySized[String](10000, 0.001) 
    iter.foreach(i => bf += i) 
    Iterator(bf) 
}.reduce(_ | _) 

println(bf.contains("5")) // true 
println(bf.contains("31")) // false 
+1

Одна проблема с этим решением: Он посылает все цветении фильтры для всех разделов к водителю, прежде чем сливать их, которые легко могут вызвать драйвер для запуска недостаточно памяти. 'treeReduce (_ | _, depth = DEPTH)' помогает с этой проблемой уменьшать древовидным образом. – anthonybell

+0

Отличное решение. Вы также должны добавить объединение между картой и сокращением для лучшей производительности. Поскольку есть только один фильтр цветения по разделам, сокращение напрямую отправляет все фильтры цветения водителю для окончательного слияния. Если есть много разделов, это может быть медленным или даже идти OOM. Объедините с числом разбиений k таким образом, что k * k ~ = начальное число разделов будет оптимальным, даже если некоторые исполнители не используются. – Boris

Смежные вопросы