Заявление о проблемах: Мы добавляем все входящие параметры запроса пользователя для определенного модуля в таблицу DB DB в виде строки (это огромные данные). Теперь мы хотим разработать процесс, который будет считывать каждую запись из этой таблицы и получать дополнительную информацию об этом запросе пользователя, вызывая сторонние API, и после этого он будет помещать эту возвращенную метаинформацию в другую таблицу.Проблема с использованием Scala + Slick + MySQL + Akka + Stream
Текущие попытки:
Я использую Scala + Slick, чтобы сделать это. Поскольку данные для чтения огромны, я хочу читать эту таблицу по одной строке за раз и обрабатывать ее. Я попытался с помощью скользким + AKKA потоков, однако я получаю «java.util.concurrent.RejectedExecutionException»
Ниже будет грубой логикой, что я пытался,
implicit val system = ActorSystem("Example")
import system.dispatcher
implicit val materializer = ActorMaterializer()
val future = db.stream(SomeQuery.result)
Source.fromPublisher(future).map(row => {
id = dataEnrichmentAPI.process(row)
}).runForeach(id => println("Processed row : "+ id))
dataEnrichmentAPI.process: Эта функция делает сторонний вызов REST, а также выполняет некоторые запросы БД для получения требуемых данных. Этот DB запрос делается с помощью метода «db.run», и он также ждет, пока он не закончит (Использование ОЖИДАНИЯ)
например,
def process(row: RequestRecord): Int = {
// SomeQuery2 = Check if data is already there in DB
val retId: Seq[Int] = Await.result(db.run(SomeQuery2.result), Duration.Inf)
if(retId.isEmpty){
val metaData = RestCall()
// SomeQuery3 = Store this metaData in DB
Await.result(db.run(SomeQuery3.result), Duration.Inf)
return metaData.id;
}else{
// SomeQuery4 = Get meta data id
return Await.result(db.run(SomeQuery4.result), Duration.Inf)
}
}
Я получаю это исключение, когда я использую блокирующий вызов к БД. Я не думаю, что если я смогу избавиться от него, как возвращаемое значение необходимо для продолжения последующего потока.
Является ли «блокирующий вызов» причиной этого исключения? Какова наилучшая практика для решения этой проблемы?
Спасибо.
Не скользкий 3,0 имеют поддержку Akka Streams из коробки? – Martijn
да, это так, поэтому я использовал метод Source.fromPublisher библиотеки akka, чтобы потреблять ручку потока, заданную slick – aks
, почему бы вам не сделать это в методе процесса, а? Вместо того, чтобы ждать просто flapmap над потоками – Martijn