diff --git a/build.sbt b/build.sbt index 13b2d5f..7c2a248 100644 --- a/build.sbt +++ b/build.sbt @@ -8,9 +8,6 @@ ThisBuild / scalaVersion := scala3Version -// TODO: get rid of the nested empty directories in packages -// MAYBE: get rid of src/main/scala, replace with less nested structure -// We mostly know what kind of project it is, there is no need to have this by default lazy val proof = entityProject("proof", file("domain/proof")) .components(_.dependsOn(ui)) .model(_.dependsOn(core)) @@ -81,8 +78,12 @@ lazy val `akka-persistence-support` = project .in(file("fiftyforms/akka-persistence")) .settings( - libraryDependencies += IWDeps.akka.modules.persistence.cross(CrossVersion.for3Use2_13), - libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % IWDeps.akka.V cross(CrossVersion.for3Use2_13), + IWDeps.useZIO(Test), + IWDeps.zioJson, + IWDeps.zioConfig, + libraryDependencies += IWDeps.akka.modules.persistence + .cross(CrossVersion.for3Use2_13), + libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % IWDeps.akka.V cross (CrossVersion.for3Use2_13), IWDeps.akka.profiles.eventsourcedJdbcProjection ) diff --git a/build.sbt b/build.sbt index 13b2d5f..7c2a248 100644 --- a/build.sbt +++ b/build.sbt @@ -8,9 +8,6 @@ ThisBuild / scalaVersion := scala3Version -// TODO: get rid of the nested empty directories in packages -// MAYBE: get rid of src/main/scala, replace with less nested structure -// We mostly know what kind of project it is, there is no need to have this by default lazy val proof = entityProject("proof", file("domain/proof")) .components(_.dependsOn(ui)) .model(_.dependsOn(core)) @@ -81,8 +78,12 @@ lazy val `akka-persistence-support` = project .in(file("fiftyforms/akka-persistence")) .settings( - libraryDependencies += IWDeps.akka.modules.persistence.cross(CrossVersion.for3Use2_13), - libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % IWDeps.akka.V cross(CrossVersion.for3Use2_13), + IWDeps.useZIO(Test), + IWDeps.zioJson, + IWDeps.zioConfig, + libraryDependencies += IWDeps.akka.modules.persistence + .cross(CrossVersion.for3Use2_13), + libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % IWDeps.akka.V cross (CrossVersion.for3Use2_13), IWDeps.akka.profiles.eventsourcedJdbcProjection ) diff --git a/domain/proof/query/projection/src/ProofProjection.scala b/domain/proof/query/projection/src/ProofProjection.scala index e761a81..f3c6f94 100644 --- a/domain/proof/query/projection/src/ProofProjection.scala +++ b/domain/proof/query/projection/src/ProofProjection.scala @@ -16,26 +16,61 @@ import zio.* import fiftyforms.akka.UnhandledEvent +import akka.projection.jdbc.scaladsl.JdbcHandler +import akka.projection.ProjectionId +import akka.projection.slick.SlickProjection +import akka.projection.slick.SlickHandler +import slick.basic.DatabaseConfig +import slick.jdbc.MySQLProfile +import akka.projection.Projection -class ProofProjection(system: ActorSystem[_]): +object ProofProjection: - val sourceProvider: SourceProvider[Offset, EventEnvelope[ProofEvent]] = - EventSourcedProvider.eventsByTag[ProofEvent]( - system, - readJournalPluginId = JdbcReadJournal.Identifier, - tag = ProofEvent.Tag - ) + val live: ZLayer[ActorSystem[ + ? + ] & ProofRepositoryWrite, Throwable, ProofProjection] = + (for + given ActorSystem[?] <- ZIO.service[ActorSystem[?]] + runtime <- ZIO.runtime[ProofRepositoryWrite] + sourceProvider <- Task + .attempt[SourceProvider[Offset, EventEnvelope[ProofEvent]]] { + EventSourcedProvider.eventsByTag[ProofEvent]( + summon[ActorSystem[?]], + readJournalPluginId = JdbcReadJournal.Identifier, + tag = ProofEvent.Tag + ) + } + dbConfig <- Task.attempt[DatabaseConfig[MySQLProfile]] { + DatabaseConfig.forConfig( + "projection.slick", + summon[ActorSystem[?]].settings.config + ) + } + proj <- Task.attempt { + SlickProjection.exactlyOnce( + projectionId = ProjectionId("Proof", "proof"), + sourceProvider, + dbConfig, + handler = () => new ProofProjectionHandler(dbConfig, runtime) + ) + } + yield ProofProjection(proj)).toLayer +case class ProofProjection(projection: Projection[EventEnvelope[ProofEvent]]) + +// TODO: extract TaskHandler with ZIO convertible to DBIO given runtime class ProofProjectionHandler( - tag: String, - system: ActorSystem[_], + dbConfig: DatabaseConfig[MySQLProfile], runtime: Runtime[ProofRepositoryWrite] -) extends Handler[EventEnvelope[ProofEvent]](): - override def process(envelope: EventEnvelope[ProofEvent]): Future[Done] = +) extends SlickHandler[EventEnvelope[ProofEvent]](): + import dbConfig.profile.api.* + + override def process(envelope: EventEnvelope[ProofEvent]): DBIO[Done] = val prog: RIO[ProofRepositoryWrite, Done] = envelope.event.event match case ev: ProofCreated => createProof(envelope.event) case ev => updateProof(envelope.event) - runtime.unsafeRunToFuture(prog) + // TODO: is there a better way? + DBIO.from(runtime.unsafeRunToFuture(prog)) private def createProof(ev: ProofEvent): RIO[ProofRepositoryWrite, Done] = for diff --git a/build.sbt b/build.sbt index 13b2d5f..7c2a248 100644 --- a/build.sbt +++ b/build.sbt @@ -8,9 +8,6 @@ ThisBuild / scalaVersion := scala3Version -// TODO: get rid of the nested empty directories in packages -// MAYBE: get rid of src/main/scala, replace with less nested structure -// We mostly know what kind of project it is, there is no need to have this by default lazy val proof = entityProject("proof", file("domain/proof")) .components(_.dependsOn(ui)) .model(_.dependsOn(core)) @@ -81,8 +78,12 @@ lazy val `akka-persistence-support` = project .in(file("fiftyforms/akka-persistence")) .settings( - libraryDependencies += IWDeps.akka.modules.persistence.cross(CrossVersion.for3Use2_13), - libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % IWDeps.akka.V cross(CrossVersion.for3Use2_13), + IWDeps.useZIO(Test), + IWDeps.zioJson, + IWDeps.zioConfig, + libraryDependencies += IWDeps.akka.modules.persistence + .cross(CrossVersion.for3Use2_13), + libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % IWDeps.akka.V cross (CrossVersion.for3Use2_13), IWDeps.akka.profiles.eventsourcedJdbcProjection ) diff --git a/domain/proof/query/projection/src/ProofProjection.scala b/domain/proof/query/projection/src/ProofProjection.scala index e761a81..f3c6f94 100644 --- a/domain/proof/query/projection/src/ProofProjection.scala +++ b/domain/proof/query/projection/src/ProofProjection.scala @@ -16,26 +16,61 @@ import zio.* import fiftyforms.akka.UnhandledEvent +import akka.projection.jdbc.scaladsl.JdbcHandler +import akka.projection.ProjectionId +import akka.projection.slick.SlickProjection +import akka.projection.slick.SlickHandler +import slick.basic.DatabaseConfig +import slick.jdbc.MySQLProfile +import akka.projection.Projection -class ProofProjection(system: ActorSystem[_]): +object ProofProjection: - val sourceProvider: SourceProvider[Offset, EventEnvelope[ProofEvent]] = - EventSourcedProvider.eventsByTag[ProofEvent]( - system, - readJournalPluginId = JdbcReadJournal.Identifier, - tag = ProofEvent.Tag - ) + val live: ZLayer[ActorSystem[ + ? + ] & ProofRepositoryWrite, Throwable, ProofProjection] = + (for + given ActorSystem[?] <- ZIO.service[ActorSystem[?]] + runtime <- ZIO.runtime[ProofRepositoryWrite] + sourceProvider <- Task + .attempt[SourceProvider[Offset, EventEnvelope[ProofEvent]]] { + EventSourcedProvider.eventsByTag[ProofEvent]( + summon[ActorSystem[?]], + readJournalPluginId = JdbcReadJournal.Identifier, + tag = ProofEvent.Tag + ) + } + dbConfig <- Task.attempt[DatabaseConfig[MySQLProfile]] { + DatabaseConfig.forConfig( + "projection.slick", + summon[ActorSystem[?]].settings.config + ) + } + proj <- Task.attempt { + SlickProjection.exactlyOnce( + projectionId = ProjectionId("Proof", "proof"), + sourceProvider, + dbConfig, + handler = () => new ProofProjectionHandler(dbConfig, runtime) + ) + } + yield ProofProjection(proj)).toLayer +case class ProofProjection(projection: Projection[EventEnvelope[ProofEvent]]) + +// TODO: extract TaskHandler with ZIO convertible to DBIO given runtime class ProofProjectionHandler( - tag: String, - system: ActorSystem[_], + dbConfig: DatabaseConfig[MySQLProfile], runtime: Runtime[ProofRepositoryWrite] -) extends Handler[EventEnvelope[ProofEvent]](): - override def process(envelope: EventEnvelope[ProofEvent]): Future[Done] = +) extends SlickHandler[EventEnvelope[ProofEvent]](): + import dbConfig.profile.api.* + + override def process(envelope: EventEnvelope[ProofEvent]): DBIO[Done] = val prog: RIO[ProofRepositoryWrite, Done] = envelope.event.event match case ev: ProofCreated => createProof(envelope.event) case ev => updateProof(envelope.event) - runtime.unsafeRunToFuture(prog) + // TODO: is there a better way? + DBIO.from(runtime.unsafeRunToFuture(prog)) private def createProof(ev: ProofEvent): RIO[ProofRepositoryWrite, Done] = for diff --git a/server/src/main/resources/application.conf b/server/src/main/resources/application.conf index d0d4948..bb807b1 100644 --- a/server/src/main/resources/application.conf +++ b/server/src/main/resources/application.conf @@ -55,6 +55,8 @@ docker.host = mysql +projection.slick = ${slick} + slick { profile = "slick.jdbc.MySQLProfile$" db { diff --git a/build.sbt b/build.sbt index 13b2d5f..7c2a248 100644 --- a/build.sbt +++ b/build.sbt @@ -8,9 +8,6 @@ ThisBuild / scalaVersion := scala3Version -// TODO: get rid of the nested empty directories in packages -// MAYBE: get rid of src/main/scala, replace with less nested structure -// We mostly know what kind of project it is, there is no need to have this by default lazy val proof = entityProject("proof", file("domain/proof")) .components(_.dependsOn(ui)) .model(_.dependsOn(core)) @@ -81,8 +78,12 @@ lazy val `akka-persistence-support` = project .in(file("fiftyforms/akka-persistence")) .settings( - libraryDependencies += IWDeps.akka.modules.persistence.cross(CrossVersion.for3Use2_13), - libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % IWDeps.akka.V cross(CrossVersion.for3Use2_13), + IWDeps.useZIO(Test), + IWDeps.zioJson, + IWDeps.zioConfig, + libraryDependencies += IWDeps.akka.modules.persistence + .cross(CrossVersion.for3Use2_13), + libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % IWDeps.akka.V cross (CrossVersion.for3Use2_13), IWDeps.akka.profiles.eventsourcedJdbcProjection ) diff --git a/domain/proof/query/projection/src/ProofProjection.scala b/domain/proof/query/projection/src/ProofProjection.scala index e761a81..f3c6f94 100644 --- a/domain/proof/query/projection/src/ProofProjection.scala +++ b/domain/proof/query/projection/src/ProofProjection.scala @@ -16,26 +16,61 @@ import zio.* import fiftyforms.akka.UnhandledEvent +import akka.projection.jdbc.scaladsl.JdbcHandler +import akka.projection.ProjectionId +import akka.projection.slick.SlickProjection +import akka.projection.slick.SlickHandler +import slick.basic.DatabaseConfig +import slick.jdbc.MySQLProfile +import akka.projection.Projection -class ProofProjection(system: ActorSystem[_]): +object ProofProjection: - val sourceProvider: SourceProvider[Offset, EventEnvelope[ProofEvent]] = - EventSourcedProvider.eventsByTag[ProofEvent]( - system, - readJournalPluginId = JdbcReadJournal.Identifier, - tag = ProofEvent.Tag - ) + val live: ZLayer[ActorSystem[ + ? + ] & ProofRepositoryWrite, Throwable, ProofProjection] = + (for + given ActorSystem[?] <- ZIO.service[ActorSystem[?]] + runtime <- ZIO.runtime[ProofRepositoryWrite] + sourceProvider <- Task + .attempt[SourceProvider[Offset, EventEnvelope[ProofEvent]]] { + EventSourcedProvider.eventsByTag[ProofEvent]( + summon[ActorSystem[?]], + readJournalPluginId = JdbcReadJournal.Identifier, + tag = ProofEvent.Tag + ) + } + dbConfig <- Task.attempt[DatabaseConfig[MySQLProfile]] { + DatabaseConfig.forConfig( + "projection.slick", + summon[ActorSystem[?]].settings.config + ) + } + proj <- Task.attempt { + SlickProjection.exactlyOnce( + projectionId = ProjectionId("Proof", "proof"), + sourceProvider, + dbConfig, + handler = () => new ProofProjectionHandler(dbConfig, runtime) + ) + } + yield ProofProjection(proj)).toLayer +case class ProofProjection(projection: Projection[EventEnvelope[ProofEvent]]) + +// TODO: extract TaskHandler with ZIO convertible to DBIO given runtime class ProofProjectionHandler( - tag: String, - system: ActorSystem[_], + dbConfig: DatabaseConfig[MySQLProfile], runtime: Runtime[ProofRepositoryWrite] -) extends Handler[EventEnvelope[ProofEvent]](): - override def process(envelope: EventEnvelope[ProofEvent]): Future[Done] = +) extends SlickHandler[EventEnvelope[ProofEvent]](): + import dbConfig.profile.api.* + + override def process(envelope: EventEnvelope[ProofEvent]): DBIO[Done] = val prog: RIO[ProofRepositoryWrite, Done] = envelope.event.event match case ev: ProofCreated => createProof(envelope.event) case ev => updateProof(envelope.event) - runtime.unsafeRunToFuture(prog) + // TODO: is there a better way? + DBIO.from(runtime.unsafeRunToFuture(prog)) private def createProof(ev: ProofEvent): RIO[ProofRepositoryWrite, Done] = for diff --git a/server/src/main/resources/application.conf b/server/src/main/resources/application.conf index d0d4948..bb807b1 100644 --- a/server/src/main/resources/application.conf +++ b/server/src/main/resources/application.conf @@ -55,6 +55,8 @@ docker.host = mysql +projection.slick = ${slick} + slick { profile = "slick.jdbc.MySQLProfile$" db { diff --git a/server/src/main/scala/mdr/pdb/server/Main.scala b/server/src/main/scala/mdr/pdb/server/Main.scala index 9340477..c5ddb0d 100644 --- a/server/src/main/scala/mdr/pdb/server/Main.scala +++ b/server/src/main/scala/mdr/pdb/server/Main.scala @@ -13,6 +13,9 @@ import akka.actor.typed.scaladsl.Behaviors import akka.cluster.typed.Cluster import akka.cluster.typed.Join +import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess +import akka.projection.ProjectionBehavior +import mdr.pdb.proof.query.projection.ProofProjection object Main extends ZIOAppDefault: @@ -56,5 +59,18 @@ server <- ZIO .service[HttpServer] .provideCustom(serverLayer) - _ <- server.serve().provideCustom(appEnvLayer >+> securityLayer) + _ <- (for + system <- ZIO.service[ActorSystem[?]] + _ <- ZIO.serviceWithZIO[ProofProjection] { pp => + Task.attempt { + ShardedDaemonProcess(system).init[ProjectionBehavior.Command]( + name = "proof", + numberOfInstances = 1, + behaviorFactory = _ => ProjectionBehavior(pp.projection), + stopMessage = ProjectionBehavior.Stop + ) + } + } + _ <- server.serve() + yield ()).provideCustom(appEnvLayer >+> securityLayer) } yield ()