2016-07-21 2 views
0

Ниже код работает нормально, но требуется время, чтобы написать в cassandra, когда у нас огромный приток транзакций.Spark Streaming - использование foreachPartition и saveToCassandra для лучшего распараллеливания

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) 
val parsedStream = stream.map(_._2).map(EmpParser.parse(_)).cache() 

Ниже код записывается в cassandra в последовательном порядке и выполняется в одном исполнителе.

parsedStream.saveToCassandra("test", "ct_table", SomeColumns("emp_id","emp_name","emp_sal","emp_dept")) 

Но я хотел распараллелить запись в cassandra, выполнив foreachPartition. Но я не вижу вариант saveToCassandra в foreachPartition.

parsedStream.foreachRDD{rdd => 
    rdd.foreachPartition { partition => 
     partition.saveToCassandra("test", "ct_table", SomeColumns("emp_id","emp_name","emp_sal","emp_dept")) 
    } 
} 

Любой способ добиться этого?

+0

'saveToCassandra' определен на уровне' RDD'/'DStream', а' partition' - простой scala 'Iterator', поэтому вы не видите его определенного. –

+0

ОК. Любая идея, как мы могли добиться письменности к кассандре в параллельных исполнениях всеми моими исполнителями? – JKPEAK

+0

вы можете сделать 'parseStream.repartition (num) .saveToCassandra' – Knight71

ответ

0

Поскольку вы уже используете прямой поток, есть два способа увеличить параллелизм.

  1. Увеличить количество разделов Kafka. При использовании прямого потока Spark автоматически создает столько разделов, как Kafka. Однако, в зависимости от вашей настройки, это может оказаться невыполнимым.

  2. Использование Spark's repartition. В большинстве случаев лучше использовать repartition на входе, а не на выходе.

    val num: Int = ? // Number of partitition 
    val parsedStream = stream.repartition(num).map(_._2).map(EmpParser.parse(_)) 
    parsedStream.saveToCassandra(...) 
    

Если вам нужно только использовать parsedStream один раз, это не нужно кэшировать.

+0

У меня есть требование написать те же данные и для поиска эластичности. parsedStream.saveToCassandra («test», «ct_table», SomeColumns («emp_id», «emp_name», «emp_sal», «emp_dept»)) В настоящее время я сохраняю эластичность, как показано ниже. пожалуйста, предложите parrlelized подход, чтобы сохранить ElasticSearch. parsedStream.foreachRDD (rdd => {rdd.saveToEs ("test/emp")}) – JKPEAK

+0

Когда мой код записывается как в cassandra, так и в elasticsearch, время обработки увеличивается до 3 секунд (это зависит от размера данных). Но если я пытаюсь писать только на cassandra, время обработки сокращается до 0,3 - 0,7 секунды. – JKPEAK

Смежные вопросы