2016-06-13 3 views
6

Я реализовал простой язык для процесса ETL, используя свободную монаду. При использовании List в качестве входных и выходных данных для сбора и хранения данных все работает нормально. Однако я использую асинхронное библиотеки и работать с Future[List]Как использовать свободную монаду с Future [M [_]]

общего импорта и определения

import scala.concurrent.Future 
import scala.concurrent.ExecutionContext.Implicits.global 
import cats.free.Free 
import cats.free.Free._ 

sealed trait Ops[A] 
type OpsF[A] = Free[Ops, A] 

работы с List

case class Fetch(offset: Int, amount: Int) extends Ops[List[Record]] 
case class Store(recs: List[Record]) extends Ops[List[Response]] 

def fetch(offset: Int, amount: Int): OpsF[List[Record]] = 
    liftF[Ops, List[Record]](Fetch(offset, amount)) 
def store(recs: List[Record]): OpsF[List[Response]] = 
    liftF[Ops, List[Response]](Store(recs)) 

def simpleEtl(offset: Int, amount: Int): Free[Ops, List[Response]] = 
    fetch(offset, amount).flatMap(r => store(r)) 

не работает с Future[List]

case class Fetch(offset: Int, amount: Int) extends Ops[Future[List[Record]]] 
case class Store(recs: List[Record]) extends Ops[Future[List[Response]]] 

def fetch(offset: Int, amount: Int): OpsF[Future[List[Record]]] = 
    liftF[Ops, Future[List[Record]]](Fetch(offset, amount)) 
def store(recs: List[Record]): OpsF[Future[List[Response]]] = 
    liftF[Ops, Future[List[Response]]](Store(recs)) 

// explicit types in case I am misunderstanding more than I think 
def simpleEtl(offset: Int, amount: Int): Free[Ops, Future[List[Response]]] = 
fetch(offset, amount).flatMap { rf: Future[List[Record]] => 
    val getResponses: OpsF[Future[List[Response]]] = rf map { r: List[Record] => 
    store(r) 
    } 
    getResponses 
} 

, как и ожидалось, тип, возвращаемый из flatMap/map неправильно - я не получаю OpsF[Future] но Future[OpsF]

Error:(34, 60) type mismatch; 
found : scala.concurrent.Future[OpsF[scala.concurrent.Future[List[Response]]]] 
(which expands to) scala.concurrent.Future[cats.free.Free[Ops,scala.concurrent.Future[List[String]]]] 
required: OpsF[scala.concurrent.Future[List[Response]]] 
(which expands to) cats.free.Free[Ops,scala.concurrent.Future[List[String]]] 
    val getResponses: OpsF[Future[List[Response]]] = rf map { r: List[Record] => 

мой текущий обходной путь, чтобы иметь store принять Future[List[Record]] и дать карту переводчика по Future, но он чувствует себя неуклюжим.

Вопрос не относится к List - например. Option было бы полезно.

Я делаю это неправильно? Для этого есть какой-то монадный трансформатор?

+0

Это похоже на типичный образец для монадного трансформатора, на первый взгляд кажется Haskell каким-то образом «FreeT», но не смог найти его в скалясе или котах. –

+3

У скаляза есть 'FreeT', поскольку [7.2.0] (https://oss.sonatype.org/service/local/repositories/releases/archive/org/scalaz/scalaz_2.11/7.2.0/scalaz_2.11-7.2 0,0-javadoc.jar /!/index.html # scalaz.FreeT). –

+1

Могу ли я указать вам на библиотеку 47 градусов, точно названную http://47deg.github.io/fetch/, которая скоро станет инкубатором типа? Имейте в виду, я не работаю на 47 градусов, но похоже, что у этого уже есть решение для большей части того, что вы хотите сделать. – wheaties

ответ

7

Абстрактный тип данных определяет Ops алгебра для извлечения и магазина множественного Record с. Он описывает две операции, но это тоже единственное, что должна делать алгебра. Как фактически выполняются операции, не имеет значения вообще Fetch и Store, единственная полезная вещь, которую вы ожидаете, - это List[Record] и List[Response].

Посредством формирования ожидаемого результата результата Fetch и Store a Future[List[Record]]] вы ограничиваете возможности интерпретации этой алгебры. Возможно, в ваших тестах вы не хотите подключаться асинхронно к веб-сервису или базе данных и просто хотите протестировать с помощью Map[Int, Result] или Vector[Result], но теперь вам необходимо вернуть Future, что делает тесты более сложными, чем они могли бы быть.

Но, заявив, что вам не нужен ETL[Future[List[Record]]] не решит ваш вопрос: вы используете асинхронные библиотеки, и вы, вероятно, захотите вернуть Future.

Начиная с первой реализации:

import scala.concurrent.Future 
import scala.concurrent.ExecutionContext.Implicits.global 
import cats.implicits._ 
import cats.free.Free 

type Record = String 
type Response = String 

sealed trait EtlOp[T] 
case class Fetch(offset: Int, amount: Int) extends EtlOp[List[Record]] 
case class Store(recs: List[Record]) extends EtlOp[List[Response]] 

type ETL[A] = Free[EtlOp, A] 

def fetch(offset: Int, amount: Int): ETL[List[Record]] = 
    Free.liftF(Fetch(offset, amount)) 
def store(recs: List[Record]): ETL[List[Response]] = 
    Free.liftF(Store(recs)) 

def fetchStore(offset: Int, amount: Int): ETL[List[Response]] = 
    fetch(offset, amount).flatMap(store) 

Но теперь у нас до сих пор нет Future сек?Это работа нашего переводчика:

import cats.~> 

val interpretFutureDumb: EtlOp ~> Future = new (EtlOp ~> Future) { 
    def apply[A](op: EtlOp[A]): Future[A] = op match { 
    case Store(records) => 
     Future.successful(records.map(rec => s"Resp($rec)")) 
     // store in DB, send to webservice, ... 
    case Fetch(offset, amount) => 
     Future.successful(List.fill(amount)(offset.toString)) 
     // get from DB, from webservice, ... 
    } 
} 

С этим переводчиком (где, конечно, вы бы заменить Future.successful(...) с чем-то более полезным), мы можем получить наши Future[List[Response]]:

val responses: Future[List[Response]] = 
    fetchStore(1, 5).foldMap(interpretFutureDumb) 

val records: Future[List[Record]] = 
    fetch(2, 4).foldMap(interpretFutureDumb) 

responses.foreach(println) 
// List(Resp(1), Resp(1), Resp(1), Resp(1), Resp(1)) 
records.foreach(println) 
// List(2, 2, 2, 2) 

Но мы все еще можем создать другой переводчик, который не возвращает Future:

import scala.collection.mutable.ListBuffer 
import cats.Id 

val interpretSync: EtlOp ~> Id = new (EtlOp ~> Id) { 
    val records: ListBuffer[Record] = ListBuffer() 
    def apply[A](op: EtlOp[A]): Id[A] = op match { 
    case Store(recs) => 
     records ++= recs 
     records.toList 
    case Fetch(offset, amount) => 
     records.drop(offset).take(amount).toList 
    } 
} 

val etlResponse: ETL[List[Response]] = 
    for { 
    _  <- store(List("a", "b", "c", "d")) 
    records <- fetch(1, 2) 
    resp <- store(records) 
    } yield resp 

val responses2: List[Response] = etlResponse.foldMap(interpretSync) 
// List(a, b, c, d, b, c) 
+0

А, я вижу, имеет смысл. Похоже, я концептуально путал определения подъема с исполнением интерпретатором. Спасибо, Питер. – kostja

+0

@ peter-neyens Когда мы объединяем Алгебра, можем ли мы объединить два переводчика - один возвращающий Id и другое возвращающееся Будущее? – arjunswaj

+0

Я не уверен, каким образом вы хотите объединить два переводчика. Вы всегда можете интерпретировать программу дважды, один раз - «Id» и один раз - «Будущее». –

Смежные вопросы