Существует не что-то встроенное, но это не слишком много работы, чтобы добиться этого. Вот простое решение, которое я сделал на основе пользовательских UdpSocketInputDStream[T]
:
import java.io._
import java.net.{ConnectException, DatagramPacket, DatagramSocket, InetAddress}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import scala.reflect.ClassTag
import scala.util.control.NonFatal
class UdpSocketInputDStream[T: ClassTag](
_ssc: StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](_ssc) {
def getReceiver(): Receiver[T] = {
new UdpSocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
class UdpSocketReceiver[T: ClassTag](host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel) extends Receiver[T](storageLevel) {
var udpSocket: DatagramSocket = _
override def onStart(): Unit = {
try {
udpSocket = new DatagramSocket(port, InetAddress.getByName(host))
} catch {
case e: ConnectException =>
restart(s"Error connecting to $port", e)
return
}
// Start the thread that receives data over a connection
new Thread("Udp Socket Receiver") {
setDaemon(true)
override def run() {
receive()
}
}.start()
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
try {
val buffer = new Array[Byte](2048)
// Create a packet to receive data into the buffer
val packet = new DatagramPacket(buffer, buffer.length)
udpSocket.receive(packet)
val iterator = bytesToObjects(new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength))
// Now loop forever, waiting to receive packets and printing them.
while (!isStopped() && iterator.hasNext) {
store(iterator.next())
}
if (!isStopped()) {
restart("Udp socket data stream had no more data")
}
} catch {
case NonFatal(e) =>
restart("Error receiving data", e)
} finally {
onStop()
}
}
override def onStop(): Unit = {
synchronized {
if (udpSocket != null) {
udpSocket.close()
udpSocket = null
}
}
}
}
Для того, чтобы получить StreamingContext
добавить метод на себе, мы обогатить ее неявный класс:
object Implicits {
implicit class StreamingContextOps(val ssc: StreamingContext) extends AnyVal {
def udpSocketStream[T: ClassTag](host: String,
port: Int,
converter: InputStream => Iterator[T],
storageLevel: StorageLevel): InputDStream[T] = {
new UdpSocketInputDStream(ssc, host, port, converter, storageLevel)
}
}
}
И вот как вы называете все это:
import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.reflect.ClassTag
object TestRunner {
import Implicits._
def main(args: Array[String]): Unit = {
val sparkContext = new SparkContext("local[*]", "udpTest")
val ssc = new StreamingContext(sparkContext, Seconds(4))
val stream = ssc.udpSocketStream("localhost",
3003,
bytesToLines,
StorageLevel.MEMORY_AND_DISK_SER_2)
stream.print()
ssc.start()
ssc.awaitTermination()
}
def bytesToLines(inputStream: InputStream): Iterator[String] = {
val dataInputStream = new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8))
new NextIterator[String] {
protected override def getNext(): String = {
val nextValue = dataInputStream.readLine()
if (nextValue == null) {
finished = true
}
nextValue
}
protected override def close() {
dataInputStream.close()
}
}
}
abstract class NextIterator[U] extends Iterator[U] {
protected var finished = false
private var gotNext = false
private var nextValue: U = _
private var closed = false
override def next(): U = {
if (!hasNext) {
throw new NoSuchElementException("End of stream")
}
gotNext = false
nextValue
}
override def hasNext: Boolean = {
if (!finished) {
if (!gotNext) {
nextValue = getNext()
if (finished) {
closeIfNeeded()
}
gotNext = true
}
}
!finished
}
def closeIfNeeded() {
if (!closed) {
closed = true
close()
}
}
protected def getNext(): U
protected def close()
}
}
Большая часть этого кода берется из SocketInputDStream[T]
предоставленной Спарк, я просто повторно использовать его. Я также взял код для NextIterator
, который используется bytesToLines
, все, что он делает, потребляет линию из пакета и преобразует ее в String
. Если у вас более сложная логика, вы можете предоставить ее, передав converter: InputStream => Iterator[T]
свою собственную реализацию.
Тестирование его с простым UDP пакета:
echo -n "hello hello hello!" >/dev/udp/localhost/3003
Урожайность:
-------------------------------------------
Time: 1482676728000 ms
-------------------------------------------
hello hello hello!
Конечно, это должно быть дополнительно протестированы. У меня также есть скрытое предположение, что каждый buffer
, созданный с DatagramPacket
, составляет 2048 байт, что, возможно, вы захотите изменить.
любая возможность использования аналогичного кода в Python. i.e Python с Spark Streaming или Python со структурированной потоковой обработкой Spark. – user3698581
Мне нужно, чтобы пыль от моих навыков питона для этого. Я посмотрю, скоро ли у меня будет время. –
большое спасибо. Я отправил на этот вопрос вопрос и получил комментарий http://stackoverflow.com/questions/42458812/spark-streaming-custom-receiver-in-python-receive-udp-over-socket – user3698581