Я только начинаю использовать Spark SQL + Cassandra и, вероятно, не хватает чего-то важного, но один простой запрос занимает ~ 45 секунд. Я использую библиотеку cassanda-spark-connector
и запускаю локальный веб-сервер, на котором также находится Spark. Так что моя установка примерно так:Spark SQL + Cassandra: плохая производительность
В SBT:
"org.apache.spark" %% "spark-core" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")),
"org.apache.spark" %% "spark-sql" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")),
"com.datastax.spark" %% "spark-cassandra-connector" % "1.4.0-M3" excludeAll(ExclusionRule(organization = "org.slf4j"))
В коде у меня есть одиночки, который хозяйничает SparkContext
и CassandraSQLContetx
. Затем он вызывается из сервлета. Вот как код одноточечно выглядит следующим образом:
object SparkModel {
val conf =
new SparkConf()
.setAppName("core")
.setMaster("local")
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(conf)
val sqlC = new CassandraSQLContext(sc)
sqlC.setKeyspace("core")
val df: DataFrame = sqlC.cassandraSql(
"SELECT email, target_entity_id, target_entity_type " +
"FROM tracking_events " +
"LEFT JOIN customers " +
"WHERE entity_type = 'User' AND entity_id = customer_id")
}
А вот как я использую его:
get("/spark") {
SparkModel.df.collect().map(r => TrackingEvent(r.getString(0), r.getString(1), r.getString(2))).toList
}
Cassandra, Спарк и веб-приложение запустить на одном хосте в виртуальной машине на моем Macbook Pro с приличным спецификации. Запросы Кассандры сами по себе составляют 10-20 миллисекунд.
Когда я вызываю эту конечную точку в первый раз, для возврата результата требуется 70-80 секунд. Последующие запросы занимают ~ 45 секунд. Журнал последующей операции выглядит следующим образом:
12:48:50 INFO org.apache.spark.SparkContext - Starting job: collect at V1Servlet.scala:1146
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Got job 1 (collect at V1Servlet.scala:1146) with 1 output partitions (allowLocal=false)
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Final stage: ResultStage 1(collect at V1Servlet.scala:1146)
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Parents of final stage: List()
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Missing parents: List()
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Submitting ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146), which has no missing parents
12:48:50 INFO org.apache.spark.storage.MemoryStore - ensureFreeSpace(18696) called with curMem=26661, maxMem=825564856
12:48:50 INFO org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values in memory (estimated size 18.3 KB, free 787.3 MB)
12:48:50 INFO org.apache.spark.storage.MemoryStore - ensureFreeSpace(8345) called with curMem=45357, maxMem=825564856
12:48:50 INFO org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.1 KB, free 787.3 MB)
12:48:50 INFO o.a.spark.storage.BlockManagerInfo - Added broadcast_1_piece0 in memory on localhost:56289 (size: 8.1 KB, free: 787.3 MB)
12:48:50 INFO org.apache.spark.SparkContext - Created broadcast 1 from broadcast at DAGScheduler.scala:874
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146)
12:48:50 INFO o.a.s.scheduler.TaskSchedulerImpl - Adding task set 1.0 with 1 tasks
12:48:50 INFO o.a.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 1.0 (TID 1, localhost, NODE_LOCAL, 59413 bytes)
12:48:50 INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 (TID 1)
12:48:50 INFO com.datastax.driver.core.Cluster - New Cassandra host localhost/127.0.0.1:9042 added
12:48:50 INFO c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster
12:49:11 INFO o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB)
12:49:35 INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver
12:49:35 INFO o.a.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 1.0 (TID 1) in 45199 ms on localhost (1/1)
12:49:35 INFO o.a.s.scheduler.TaskSchedulerImpl - Removed TaskSet 1.0, whose tasks have all completed, from pool
12:49:35 INFO o.a.spark.scheduler.DAGScheduler - ResultStage 1 (collect at V1Servlet.scala:1146) finished in 45.199 s
Как видно из журнала, самые длинные паузы между этими 3-х линий (21 + 24 секунд):
12:48:50 INFO c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster
12:49:11 INFO o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB)
12:49:35 INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver
Видимо, я «Я делаю что-то неправильно. Что это? Как я могу улучшить это?
EDIT: Важное дополнение: размер таблиц крошечное (~ 200 записей для tracking_events
, ~ 20 для customers
), поэтому чтение их в целом в памяти не должны принимать каких-либо значительных затрат времени. И это локальная установка Cassandra, не кластер, не задействованы сети.
Отличный ответ Russ! Я заметил тот же самый хит производительности, но предположил, что это потому, что мой Spark-кластер работал на моей локальной виртуальной машине. – Aaron
Я не могу запустить экземпляр Cassandra прямо сейчас по какой-то причине, но главное, эти 2 таблицы крошечные. 'tracking_events' имеет 200 записей, а' customers' - всего около 20. Это не может занять столько времени из-за загрузки данных. – Haspemulator
Почему вы не проверяете пользовательский интерфейс, он должен точно определить время. – RussS