2016-02-18 2 views
3

У меня есть Спарк Streaming в Pyspark с «Batch интервал» = 30 секСпарк Streaming

ssc = StreamingContext(sc, 30) 

Тогда я хотел бы использовать окно() функции для получения данных за последний час и нарезка каждые 30 сек по этим данным.

kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) 

counts = kvs.map(lambda (k, v): json.loads(v))\ 
      .map(TransformInData).window(108000) 

и я получил ошибку

16/02/18 10:23:01 INFO JobScheduler: Added jobs for time 1455790980000 ms 
16/02/18 10:23:30 INFO PythonTransformedDStream: Slicing from 1455683040000 ms to 1455791010000 ms (aligned to 1455683040000 ms and 1455791010000 ms) 
16/02/18 10:23:30 INFO PythonTransformedDStream: Time 1455790650000 ms is invalid as zeroTime is 1455790650000 ms and slideDuration is 30000 ms and difference is 0 ms 
16/02/18 10:23:31 INFO JobScheduler: Added jobs for time 1455791010000 ms 

Я прочитал эту https://groups.google.com/forum/#!topic/spark-users/GQoxJHAAtX4 , но я не понимаю, почему он не работает

+1

Какую версию Спарк вы используете? – sgvd

+0

1.6.0 версия spark –

+0

Same Issue: Я предполагаю, что это «время недействительно», поскольку «время - ZeroTIme» (разность) метода isTimeValid() - это 0 класса Dstream, а это означает «time = zeroTime» 'и поэтому' time <= zeroTime' возвращает true, приводя к методу 'isTimeValid', чтобы' return false' с указанным выше сообщением. –

ответ

1

У меня была та же проблема. Обновление до Spark 2.0.1 исправлено.

0

Да пожалуйста, обновите Спарк 2.1 Затем добавить kafka frequency = 1000 в ms и добавьте offset в вас Dstream()

Смежные вопросы