Редактировать: Этот ответ применим только к старой версии искры и акка. Ответ PH88 - правильный метод для последних версий.
Вы можете использовать промежуточный akka.actor.Actor
, который подает источник (аналогично this question). Нижеприведенное решение не является «реактивным», потому что базовому Актеру нужно будет поддерживать буфер сообщений RDD, которые могут быть удалены, если клиент http-клиента нисходящего потока не потребляет куски достаточно быстро. Но эта проблема возникает независимо от деталей реализации, поскольку вы не можете подключить «дросселирование» обратного давления потока akka к DStream, чтобы замедлить данные. Это связано с тем, что DStream не реализует org.reactivestreams.Publisher
.
Основная топология:
DStream --> Actor with buffer --> Source
Для построения этого toplogy вы должны создать актер, похожий на реализацию here:
//JobManager definition is provided in the link
val actorRef = actorSystem actorOf JobManager.props
создать поток Источник байтовых строк (сообщения) на основе JobManager. Кроме того, конвертировать ByteString
в HttpEntity.ChunkStreamPart
, который является то, что HttpResponse требует:
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.HttpEntity
import akka.util.ByteString
type Message = ByteString
val messageToChunkPart =
Flow[Message].map(HttpEntity.ChunkStreamPart(_))
//Actor with buffer --> Source
val source : Source[HttpEntity.ChunkStreamPart, Unit] =
Source(ActorPublisher[Message](actorRef)) via messageToChunkPart
Ссылка Спарк DStream на актера, так что каждый incomining РДД преобразуется в Iterable из байтовой строки, а затем направляется в Actor:
import org.apache.spark.streaming.dstream.Dstream
import org.apache.spark.rdd.RDD
val dstream : DStream = ???
//This function converts your RDDs to messages being sent
//via the http response
def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ???
def sendMessageToActor(message : Message) = actorRef ! message
//DStream --> Actor with buffer
dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor}
обеспечить источник в HttpResponse:
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}
Примечание: там должно быть очень мало времени/трески e между линией dstream foreachRDD
и HttpReponse, так как внутренний буфер Actor немедленно начнет заполнять сообщением ByteString, исходящим из DStream после выполнения строки foreach
.