2016-06-01 3 views
0

Я только начал работать с потоками akka (2.4.6) и slick (3.1.1) в Scala (2.11.7). Я использую Intellij (не уверен, что это важно, но я думал, что выброшу его там).Scala - fromPublisher не входит в объект akka.stream.scaladsl.Source

val scanner: DatabasePublisher[Stage] = db.stream(action.transactionally.withStatementParameters(fetchSize = 5000)) 
val source = Source.fromPublisher(scanner) 

проблема находится на второй линии. Intellij говорит мне, что он не может разрешить символ «fromPublisher» ... Я думал, что это было IJ быть finnicky, но когда я пошел, чтобы построить я получил это:

Error:(38, 10) value fromPublisher is not a member of object akka.stream.scaladsl.Source 
    Source.fromPublisher(scanner) 
     ^

любая идея о том, что я мог бы делать неправильно?

Борьба с этим целая ночь, и я готов вытащить волосы.

Спасибо!

+0

Вы бы описали полный код – Sky

ответ

1

См рабочий пример:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Source 
import slick.backend.DatabasePublisher 
import slick.driver.H2Driver.api._ 

import scala.concurrent.Await 
import scala.concurrent.duration._ 

case class Emp(id: Int, name: String) 

object Demo extends App { 

    implicit val system = ActorSystem("Sys") 

    val db = Database.forConfig("h2mem1") 

    val empTableQuery = TableQuery[EmployeeTable] 
    val insertQuery = empTableQuery ++= Seq(Emp(1, "emp1"), Emp(2, "emp2"), Emp(3, "emp3"), Emp(4, "emp4")) 
    val action = DBIO.seq(empTableQuery.schema.create, insertQuery) 

    //create schema and insert record 
    Await.result(db.run(action), 1000 second) 
    // print db record 
    Await.result(db.run(empTableQuery.result), 1000 second).foreach(println) 
    val publisher: DatabasePublisher[Emp] = db.stream(empTableQuery.result) 

    import system.dispatcher 

    implicit val materializer = ActorMaterializer() 

    //consume using stream 
    println("Steaming data::::::::") 
    val source = Source.fromPublisher(publisher).map(emp => emp.id + " : " + emp.name).runForeach(println) 

    class EmployeeTable(tag: Tag) extends Table[Emp](tag, "emp") { 
    val id = column[Int]("id", O.PrimaryKey) 
    val name = column[String]("name") 

    def * = (id, name) <>(Emp.tupled, Emp.unapply) 
    } 


    source.onComplete(_ => system.terminate()) 

} 

build.sbt

scalaVersion := "2.11.8" 

libraryDependencies ++= Seq(
    "mysql" % "mysql-connector-java" % "5.1.36", 
    "com.typesafe.slick" %% "slick-hikaricp" % "3.1.1", 
    "ch.qos.logback" % "logback-classic" % "1.1.3", 
    "com.typesafe.slick" %% "slick" % "3.1.1", 
    "com.typesafe.akka" %% "akka-stream" % "2.4.6", 
    "org.scalatest" %% "scalatest" % "2.2.5" % "test", 
    "com.h2database" % "h2" % "1.4.187" 
) 

application.conf

h2mem1 = { 
    url = "jdbc:h2:mem:test1" 
    driver = org.h2.Driver 
    connectionPool = disabled 
    keepAliveConnection = true 
} 
0

Спасибо всем за ответ на мой вопрос. Я нашел проблему. Это был не код.

Проблема была в установке библиотек Intellij.

Я скомпилировал проект с помощью SBT, и он работал и работал отлично, несмотря на то, что IJ разворачивался на нем.

Итак, я после того, как пробрался вокруг, я вошел в структуру проекта -> SDK, и я заметил, что в библиотеках SDK, akka-stream-experimental 1.0 jar и целой очереди других барахлов.

Я удалил их (оставил JDK 8 банок). И теперь все работает!

В любом случае, спасибо!

Смежные вопросы