2016-12-25 3 views
2

примере, приведенном в http://spark.apache.org/docs/latest/streaming-programming-guide.html Позволяет мне получать пакеты данных в TCP потока и прослушивает порт 9999Спарк Scala UDP получить на прослушивание порта

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 

// Create a local StreamingContext with two working thread and batch interval of 1 second. 
// The master requires 2 cores to prevent from a starvation scenario. 

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") 
val ssc = new StreamingContext(conf, Seconds(1)) 


// Create a DStream that will connect to hostname:port, like localhost:9999 
val lines = ssc.socketTextStream("localhost", 9999) 
// Split each line into words 
val words = lines.flatMap(_.split(" ")) 
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 
// Count each word in each batch 
val pairs = words.map(word => (word, 1)) 
val wordCounts = pairs.reduceByKey(_ + _) 

// Print the first ten elements of each RDD generated in this DStream to the console 
wordCounts.print() 
ssc.start()    // Start the computation 
ssc.awaitTermination() // Wait for the computation to terminate 

Я могу передавать данные через TCP путем создания сервера данных с помощью в моей системе Linux $ nc -lk 9999

Вопрос
мне нужно получить поток от телефона андроида потокового с использованием UDP и Scala/Спарк
VAL строки = ssc.socketTextStream ("локальным", 9999)
принимает ТОЛЬКО в TCP-потоки.

Как я могу получать потоки UDP таким же простым способом, используя Scala + Spark и создавать Spark DStream.

ответ

3

Существует не что-то встроенное, но это не слишком много работы, чтобы добиться этого. Вот простое решение, которое я сделал на основе пользовательских UdpSocketInputDStream[T]:

import java.io._ 
import java.net.{ConnectException, DatagramPacket, DatagramSocket, InetAddress} 

import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.dstream.ReceiverInputDStream 
import org.apache.spark.streaming.receiver.Receiver 

import scala.reflect.ClassTag 
import scala.util.control.NonFatal 

class UdpSocketInputDStream[T: ClassTag](
              _ssc: StreamingContext, 
              host: String, 
              port: Int, 
              bytesToObjects: InputStream => Iterator[T], 
              storageLevel: StorageLevel 
             ) extends ReceiverInputDStream[T](_ssc) { 

    def getReceiver(): Receiver[T] = { 
    new UdpSocketReceiver(host, port, bytesToObjects, storageLevel) 
    } 
} 

class UdpSocketReceiver[T: ClassTag](host: String, 
            port: Int, 
            bytesToObjects: InputStream => Iterator[T], 
            storageLevel: StorageLevel) extends Receiver[T](storageLevel) { 

    var udpSocket: DatagramSocket = _ 

    override def onStart(): Unit = { 

    try { 
     udpSocket = new DatagramSocket(port, InetAddress.getByName(host)) 
    } catch { 
     case e: ConnectException => 
     restart(s"Error connecting to $port", e) 
     return 
    } 

    // Start the thread that receives data over a connection 
    new Thread("Udp Socket Receiver") { 
     setDaemon(true) 

     override def run() { 
     receive() 
     } 
    }.start() 
    } 

    /** Create a socket connection and receive data until receiver is stopped */ 
    def receive() { 
    try { 
     val buffer = new Array[Byte](2048) 

     // Create a packet to receive data into the buffer 
     val packet = new DatagramPacket(buffer, buffer.length) 

     udpSocket.receive(packet) 

     val iterator = bytesToObjects(new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength)) 
     // Now loop forever, waiting to receive packets and printing them. 
     while (!isStopped() && iterator.hasNext) { 
     store(iterator.next()) 
     } 

     if (!isStopped()) { 
     restart("Udp socket data stream had no more data") 
     } 
    } catch { 
     case NonFatal(e) => 
     restart("Error receiving data", e) 
    } finally { 
     onStop() 
    } 
    } 

    override def onStop(): Unit = { 
    synchronized { 
     if (udpSocket != null) { 
     udpSocket.close() 
     udpSocket = null 
     } 
    } 
    } 
} 

Для того, чтобы получить StreamingContext добавить метод на себе, мы обогатить ее неявный класс:

object Implicits { 
    implicit class StreamingContextOps(val ssc: StreamingContext) extends AnyVal { 
    def udpSocketStream[T: ClassTag](host: String, 
            port: Int, 
            converter: InputStream => Iterator[T], 
            storageLevel: StorageLevel): InputDStream[T] = { 
     new UdpSocketInputDStream(ssc, host, port, converter, storageLevel) 
    } 
    } 
} 

И вот как вы называете все это:

import java.io.{BufferedReader, InputStream, InputStreamReader} 
import java.nio.charset.StandardCharsets 

import org.apache.spark.SparkContext 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.dstream.InputDStream 
import org.apache.spark.streaming.{Seconds, StreamingContext} 

import scala.reflect.ClassTag 

object TestRunner { 
    import Implicits._ 

    def main(args: Array[String]): Unit = { 
    val sparkContext = new SparkContext("local[*]", "udpTest") 
    val ssc = new StreamingContext(sparkContext, Seconds(4)) 

    val stream = ssc.udpSocketStream("localhost", 
            3003, 
            bytesToLines, 
            StorageLevel.MEMORY_AND_DISK_SER_2) 
    stream.print() 

    ssc.start() 
    ssc.awaitTermination() 
    } 

    def bytesToLines(inputStream: InputStream): Iterator[String] = { 
    val dataInputStream = new BufferedReader(
     new InputStreamReader(inputStream, StandardCharsets.UTF_8)) 
    new NextIterator[String] { 
     protected override def getNext(): String = { 
     val nextValue = dataInputStream.readLine() 
     if (nextValue == null) { 
      finished = true 
     } 
     nextValue 
     } 

     protected override def close() { 
     dataInputStream.close() 
     } 
    } 
    } 

    abstract class NextIterator[U] extends Iterator[U] { 
    protected var finished = false 
    private var gotNext = false 
    private var nextValue: U = _ 
    private var closed = false 

    override def next(): U = { 
     if (!hasNext) { 
     throw new NoSuchElementException("End of stream") 
     } 
     gotNext = false 
     nextValue 
    } 

    override def hasNext: Boolean = { 
     if (!finished) { 
     if (!gotNext) { 
      nextValue = getNext() 
      if (finished) { 
      closeIfNeeded() 
      } 
      gotNext = true 
     } 
     } 
     !finished 
    } 

    def closeIfNeeded() { 
     if (!closed) { 
     closed = true 
     close() 
     } 
    } 

    protected def getNext(): U 
    protected def close() 
    } 
} 

Большая часть этого кода берется из SocketInputDStream[T] предоставленной Спарк, я просто повторно использовать его. Я также взял код для NextIterator, который используется bytesToLines, все, что он делает, потребляет линию из пакета и преобразует ее в String. Если у вас более сложная логика, вы можете предоставить ее, передав converter: InputStream => Iterator[T] свою собственную реализацию.

Тестирование его с простым UDP пакета:

echo -n "hello hello hello!" >/dev/udp/localhost/3003 

Урожайность:

------------------------------------------- 
Time: 1482676728000 ms 
------------------------------------------- 
hello hello hello! 

Конечно, это должно быть дополнительно протестированы. У меня также есть скрытое предположение, что каждый buffer, созданный с DatagramPacket, составляет 2048 байт, что, возможно, вы захотите изменить.

+0

любая возможность использования аналогичного кода в Python. i.e Python с Spark Streaming или Python со структурированной потоковой обработкой Spark. – user3698581

+0

Мне нужно, чтобы пыль от моих навыков питона для этого. Я посмотрю, скоро ли у меня будет время. –

+0

большое спасибо. Я отправил на этот вопрос вопрос и получил комментарий http://stackoverflow.com/questions/42458812/spark-streaming-custom-receiver-in-python-receive-udp-over-socket – user3698581

Смежные вопросы