2016-04-01 2 views
3

Заявление о проблемах: Мы добавляем все входящие параметры запроса пользователя для определенного модуля в таблицу DB DB в виде строки (это огромные данные). Теперь мы хотим разработать процесс, который будет считывать каждую запись из этой таблицы и получать дополнительную информацию об этом запросе пользователя, вызывая сторонние API, и после этого он будет помещать эту возвращенную метаинформацию в другую таблицу.Проблема с использованием Scala + Slick + MySQL + Akka + Stream

Текущие попытки:

Я использую Scala + Slick, чтобы сделать это. Поскольку данные для чтения огромны, я хочу читать эту таблицу по одной строке за раз и обрабатывать ее. Я попытался с помощью скользким + AKKA потоков, однако я получаю «java.util.concurrent.RejectedExecutionException»

Ниже будет грубой логикой, что я пытался,

implicit val system = ActorSystem("Example") 
import system.dispatcher 
implicit val materializer = ActorMaterializer() 

val future = db.stream(SomeQuery.result) 
Source.fromPublisher(future).map(row => { 
     id = dataEnrichmentAPI.process(row) 

}).runForeach(id => println("Processed row : "+ id)) 

dataEnrichmentAPI.process: Эта функция делает сторонний вызов REST, а также выполняет некоторые запросы БД для получения требуемых данных. Этот DB запрос делается с помощью метода «db.run», и он также ждет, пока он не закончит (Использование ОЖИДАНИЯ)

например,

def process(row: RequestRecord): Int = { 
    // SomeQuery2 = Check if data is already there in DB 
    val retId: Seq[Int] = Await.result(db.run(SomeQuery2.result), Duration.Inf) 
    if(retId.isEmpty){ 
     val metaData = RestCall() 
     // SomeQuery3 = Store this metaData in DB 
     Await.result(db.run(SomeQuery3.result), Duration.Inf) 
     return metaData.id;  
    }else{ 
     // SomeQuery4 = Get meta data id 
     return Await.result(db.run(SomeQuery4.result), Duration.Inf)  
    } 
} 

Я получаю это исключение, когда я использую блокирующий вызов к БД. Я не думаю, что если я смогу избавиться от него, как возвращаемое значение необходимо для продолжения последующего потока.

Является ли «блокирующий вызов» причиной этого исключения? Какова наилучшая практика для решения этой проблемы?

Спасибо.

+0

Не скользкий 3,0 имеют поддержку Akka Streams из коробки? – Martijn

+0

да, это так, поэтому я использовал метод Source.fromPublisher библиотеки akka, чтобы потреблять ручку потока, заданную slick – aks

+0

, почему бы вам не сделать это в методе процесса, а? Вместо того, чтобы ждать просто flapmap над потоками – Martijn

ответ

4

Я не знаю, если это ваша проблема (слишком мало деталей), но вы никогда не должны блокироваться.

Говоря о лучших практиках, мы вместо async stages. Это более или менее то, что ваш код будет выглядеть без использования Await.result:

def process(row: RequestRecord): Future[Int] = { 
    db.run(SomeQuery2.result) flatMap { 
     case retId if retId.isEmpty => 
     // what is this? is it a sync call? if it's a rest call it should return a future 
     val metaData = RestCall() 
     db.run(SomeQuery3.result).map(_ => metaData.id) 

     case _ => db.run(SomeQuery4.result) 
    } 
} 


Source.fromPublisher(db.stream(SomeQuery.result)) 
    // choose your own parallelism 
    .mapAsync(2)(dataEnrichmentAPI.process) 
    .runForeach(id => println("Processed row : "+ id)) 

Таким образом, вы будете обработкой противодавления и параллельности явно и идиоматический.

Попробуйте никогда Await.result вызова в рабочем коде и только compose futures using map, flatMap and for comprehensions

+0

Спасибо за ваш комментарий. Этот технологический метод содержит следующие этапы: def process (строка: RequestRecord): Int = { ... Что-то ... Await.result (db.run (SomeQuery2.result), Duration.Inf) ... Что-то ... } – aks

+0

@aks использует карту/плоскую карту (или для понимания) и заставляет ее возвращать будущее, никогда не используйте 'Await.result' в производственном коде –

+0

Я обновил метод обработки. Как вы видите, есть много зависимостей в функции процесса для выполнения требуемых инструкций. Следовательно, я использую этот блокирующий вызов. Любая идея, как избавиться от этого, используя AsyncCall. – aks

Смежные вопросы