4

Я только начинаю использовать Spark SQL + Cassandra и, вероятно, не хватает чего-то важного, но один простой запрос занимает ~ 45 секунд. Я использую библиотеку cassanda-spark-connector и запускаю локальный веб-сервер, на котором также находится Spark. Так что моя установка примерно так:Spark SQL + Cassandra: плохая производительность

В SBT:

"org.apache.spark" %% "spark-core" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")), 
    "org.apache.spark" %% "spark-sql" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")), 
    "com.datastax.spark" %% "spark-cassandra-connector" % "1.4.0-M3" excludeAll(ExclusionRule(organization = "org.slf4j")) 

В коде у меня есть одиночки, который хозяйничает SparkContext и CassandraSQLContetx. Затем он вызывается из сервлета. Вот как код одноточечно выглядит следующим образом:

object SparkModel { 

    val conf = 
    new SparkConf() 
     .setAppName("core") 
     .setMaster("local") 
     .set("spark.cassandra.connection.host", "127.0.0.1") 

    val sc = new SparkContext(conf) 
    val sqlC = new CassandraSQLContext(sc) 
    sqlC.setKeyspace("core") 

    val df: DataFrame = sqlC.cassandraSql(
    "SELECT email, target_entity_id, target_entity_type " + 
    "FROM tracking_events " + 
    "LEFT JOIN customers " + 
    "WHERE entity_type = 'User' AND entity_id = customer_id") 
} 

А вот как я использую его:

get("/spark") { 
    SparkModel.df.collect().map(r => TrackingEvent(r.getString(0), r.getString(1), r.getString(2))).toList 
} 

Cassandra, Спарк и веб-приложение запустить на одном хосте в виртуальной машине на моем Macbook Pro с приличным спецификации. Запросы Кассандры сами по себе составляют 10-20 миллисекунд.

Когда я вызываю эту конечную точку в первый раз, для возврата результата требуется 70-80 секунд. Последующие запросы занимают ~ 45 секунд. Журнал последующей операции выглядит следующим образом:

12:48:50 INFO org.apache.spark.SparkContext - Starting job: collect at V1Servlet.scala:1146 
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Got job 1 (collect at V1Servlet.scala:1146) with 1 output partitions (allowLocal=false) 
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Final stage: ResultStage 1(collect at V1Servlet.scala:1146) 
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Parents of final stage: List() 
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Missing parents: List() 
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Submitting ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146), which has no missing parents 
12:48:50 INFO org.apache.spark.storage.MemoryStore - ensureFreeSpace(18696) called with curMem=26661, maxMem=825564856 
12:48:50 INFO org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values in memory (estimated size 18.3 KB, free 787.3 MB) 
12:48:50 INFO org.apache.spark.storage.MemoryStore - ensureFreeSpace(8345) called with curMem=45357, maxMem=825564856 
12:48:50 INFO org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.1 KB, free 787.3 MB) 
12:48:50 INFO o.a.spark.storage.BlockManagerInfo - Added broadcast_1_piece0 in memory on localhost:56289 (size: 8.1 KB, free: 787.3 MB) 
12:48:50 INFO org.apache.spark.SparkContext - Created broadcast 1 from broadcast at DAGScheduler.scala:874 
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146) 
12:48:50 INFO o.a.s.scheduler.TaskSchedulerImpl - Adding task set 1.0 with 1 tasks 
12:48:50 INFO o.a.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 1.0 (TID 1, localhost, NODE_LOCAL, 59413 bytes) 
12:48:50 INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 (TID 1) 
12:48:50 INFO com.datastax.driver.core.Cluster - New Cassandra host localhost/127.0.0.1:9042 added 
12:48:50 INFO c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster 
12:49:11 INFO o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB) 
12:49:35 INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver 
12:49:35 INFO o.a.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 1.0 (TID 1) in 45199 ms on localhost (1/1) 
12:49:35 INFO o.a.s.scheduler.TaskSchedulerImpl - Removed TaskSet 1.0, whose tasks have all completed, from pool 
12:49:35 INFO o.a.spark.scheduler.DAGScheduler - ResultStage 1 (collect at V1Servlet.scala:1146) finished in 45.199 s 

Как видно из журнала, самые длинные паузы между этими 3-х линий (21 + 24 секунд):

12:48:50 INFO c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster 
12:49:11 INFO o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB) 
12:49:35 INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver 

Видимо, я «Я делаю что-то неправильно. Что это? Как я могу улучшить это?

EDIT: Важное дополнение: размер таблиц крошечное (~ 200 записей для tracking_events, ~ 20 для customers), поэтому чтение их в целом в памяти не должны принимать каких-либо значительных затрат времени. И это локальная установка Cassandra, не кластер, не задействованы сети.

ответ

3
"SELECT email, target_entity_id, target_entity_type " + 
    "FROM tracking_events " + 
    "LEFT JOIN customers " + 
    "WHERE entity_type = 'User' AND entity_id = customer_id") 

Этот запрос будет считывать все данные из таблицы tracking_events и customers. Я бы сравнил производительность и просто выполнил SELECT COUNT (*) на обеих таблицах. Если это существенно отличается, тогда может возникнуть проблема, но я предполагаю, что это всего лишь время, затрачиваемое на чтение обеих таблиц полностью в память.

Существует несколько кнопок для настройки того, как выполняются считывания, и поскольку значения по умолчанию ориентированы на гораздо больший набор данных, вы можете изменить их.

spark.cassandra.input.split.size_in_mb approx amount of data to be fetched into a Spark partition 64 MB 
spark.cassandra.input.fetch.size_in_rows number of CQL rows fetched per driver request 1000 

Я хотел бы убедиться, вы создаете стольких задач, как у вас есть ядра (как минимум), чтобы вы могли воспользоваться всеми своими ресурсами. Для этого сжимайте input.split.size

Размер выборки определяет, сколько строк выгружается за один раз ядром исполнителя, поэтому увеличение этого может увеличить скорость в некоторых случаях.

+0

Отличный ответ Russ! Я заметил тот же самый хит производительности, но предположил, что это потому, что мой Spark-кластер работал на моей локальной виртуальной машине. – Aaron

+0

Я не могу запустить экземпляр Cassandra прямо сейчас по какой-то причине, но главное, эти 2 таблицы крошечные. 'tracking_events' имеет 200 записей, а' customers' - всего около 20. Это не может занять столько времени из-за загрузки данных. – Haspemulator

+0

Почему вы не проверяете пользовательский интерфейс, он должен точно определить время. – RussS

Смежные вопросы