2015-10-28 3 views
5

Я создаю REST API, который запускает некоторые вычисления в кластере Spark и отвечает фрагментированным потоком результатов. Учитывая поток искры с результатами расчета, я могу использоватьИдиоматический способ использования Spark DStream в качестве источника потока Akka

dstream.foreachRDD() 

для отправки данных из Искры. Я посылаю блочный ответ HTTP с Аккой-HTTP:

val requestHandler: HttpRequest => HttpResponse = { 
    case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) => 
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source)) 
} 

Для простоты я пытаюсь получить простой текст работает первым, добавит JSON маршалинга позже.

Но каков идиоматический способ использования Spark DStream в качестве источника потока Akka? Я решил, что смогу сделать это через сокет, но так как драйвер Spark и конечная точка REST сидят на одном и том же JVM, открывая сокет только для этого, кажется, немного перебор.

ответ

1

Редактировать: Этот ответ применим только к старой версии искры и акка. Ответ PH88 - правильный метод для последних версий.

Вы можете использовать промежуточный akka.actor.Actor, который подает источник (аналогично this question). Нижеприведенное решение не является «реактивным», потому что базовому Актеру нужно будет поддерживать буфер сообщений RDD, которые могут быть удалены, если клиент http-клиента нисходящего потока не потребляет куски достаточно быстро. Но эта проблема возникает независимо от деталей реализации, поскольку вы не можете подключить «дросселирование» обратного давления потока akka к DStream, чтобы замедлить данные. Это связано с тем, что DStream не реализует org.reactivestreams.Publisher.

Основная топология:

DStream --> Actor with buffer --> Source 

Для построения этого toplogy вы должны создать актер, похожий на реализацию here:

//JobManager definition is provided in the link 
val actorRef = actorSystem actorOf JobManager.props 

создать поток Источник байтовых строк (сообщения) на основе JobManager. Кроме того, конвертировать ByteString в HttpEntity.ChunkStreamPart, который является то, что HttpResponse требует:

import akka.stream.actor.ActorPublisher 
import akka.stream.scaladsl.Source 
import akka.http.scaladsl.model.HttpEntity 
import akka.util.ByteString 

type Message = ByteString 

val messageToChunkPart = 
    Flow[Message].map(HttpEntity.ChunkStreamPart(_)) 

//Actor with buffer --> Source 
val source : Source[HttpEntity.ChunkStreamPart, Unit] = 
    Source(ActorPublisher[Message](actorRef)) via messageToChunkPart 

Ссылка Спарк DStream на актера, так что каждый incomining РДД преобразуется в Iterable из байтовой строки, а затем направляется в Actor:

import org.apache.spark.streaming.dstream.Dstream 
import org.apache.spark.rdd.RDD 

val dstream : DStream = ??? 

//This function converts your RDDs to messages being sent 
//via the http response 
def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ??? 

def sendMessageToActor(message : Message) = actorRef ! message 

//DStream --> Actor with buffer 
dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor} 

обеспечить источник в HttpResponse:

val requestHandler: HttpRequest => HttpResponse = { 
    case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) => 
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source)) 
} 

Примечание: там должно быть очень мало времени/трески e между линией dstream foreachRDD и HttpReponse, так как внутренний буфер Actor немедленно начнет заполнять сообщением ByteString, исходящим из DStream после выполнения строки foreach.

7

Не уверен относительно версии api во время вопроса. Но теперь, с akka-stream 2.0.3, я считаю, что вы можете это сделать:

val source = Source 
    .actorRef[T](/* buffer size */ 100, OverflowStrategy.dropHead) 
    .mapMaterializedValue[Unit] { actorRef => 
    dstream.foreach(actorRef ! _) 
    } 
Смежные вопросы