2015-02-16 2 views
1

Я использую искру с Кассандрой и я уже выполнен этот скрипт:параллелизовать Resultset

import com.datastax.spark.connector.cql.CassandraConnector 

val cc = CassandraConnector(sc.getConf) 
val select = s"SELECT channel, ctid, cvid , WRITETIME(source) FROM "+CASSANDRA_SCHEMA+"."+table+";" 


val session = cc.openSession() 
val results = session.execute(select) 

PS: Я обязуюсь использовать этот метод из-за WRITETIME(source). Но значение results val является результатом набора данных, и мне интересно, можно ли распараллелить этот результирующий набор, чтобы получить RDD

Спасибо заранее.

+2

Это совсем не похоже на Java. Возможно, вы работаете с Scala. –

+0

да спасибо ^^ –

+0

Вам это нужно в сеансе? Не могли бы вы использовать 'CassandraSQLContext'? –

ответ

1

'results' - это пример ResultSet и не может быть преобразован непосредственно в RDD. Во-первых, вам нужно материализовать полную коллекцию, запрашивая all элементы:

val collection = results.all 

Тем не менее, это не поможет, так как результат является java.util.List и нам нужна коллекция Scala. простое преобразование поможет:

val sCollection = collection.asScala 

Теперь, просто распараллелить его с помощью sparkContext распараллеливания методы:

val rdd = sc.parallelize(sCollection) 

Оттуда вы можете работать на объектов этой коллекции.

+0

Да, спасибо за ваш ответ, но моя проблема - функция 'results.all' возвращает ошибку outOfMemory. Вызывается большим количеством строк. –

+0

После DataStax JIRA можно получить доступ к 'WRITETIME' через искровой драйвер: https://datastax-oss.atlassian.net/browse/SPARKC-8 Вам просто нужно перейти на v1.2.0 – maasg

+0

Да, но я не могу до версии 2.1, потому что я использую Spark Job Server. И нет версии Исправляемый сервер заданий, совместимый с v1.2.0 –

Смежные вопросы