diff --git a/akka-persistence/src/main/scala/iw/akka/AkkaProjectionSupport.scala b/akka-persistence/src/main/scala/iw/akka/AkkaProjectionSupport.scala new file mode 100644 index 0000000..12a3ec7 --- /dev/null +++ b/akka-persistence/src/main/scala/iw/akka/AkkaProjectionSupport.scala @@ -0,0 +1,75 @@ +package works.iterative.akka + +import zio.* +import akka.Done +import akka.actor.typed.ActorSystem +import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal +import akka.persistence.query.Offset +import akka.projection.ProjectionId +import akka.projection.eventsourced.EventEnvelope +import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.projection.scaladsl.SourceProvider +import akka.projection.slick.{SlickHandler, SlickProjection} +import slick.basic.DatabaseConfig +import slick.jdbc.MySQLProfile +import akka.projection.ProjectionBehavior +import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess +import works.iterative.entity.ViewProcessor + +object AkkaProjectionSupport: + def runSingle[E]( + projectionName: String, + projectionKey: String, + tag: String, + processor: ViewProcessor[E] + ): ZIO[AkkaActorSystem, Throwable, Unit] = + ZIO.serviceWith[AkkaActorSystem](_.system).flatMap { system => + given ActorSystem[?] = system + for + given Runtime[Any] <- ZIO.runtime[Any] + sourceProvider <- ZIO + .attempt[SourceProvider[Offset, EventEnvelope[E]]] { + EventSourcedProvider.eventsByTag[E]( + system, + readJournalPluginId = JdbcReadJournal.Identifier, + tag = tag + ) + } + dbConfig <- ZIO.attempt[DatabaseConfig[MySQLProfile]] { + DatabaseConfig.forConfig( + "projection.slick", + system.settings.config + ) + } + proj <- ZIO.attempt { + SlickProjection.exactlyOnce( + projectionId = ProjectionId(projectionName, projectionKey), + sourceProvider, + dbConfig, + handler = () => new ProjectionHandler(dbConfig, processor) + ) + } + _ <- ZIO.attempt { + ShardedDaemonProcess(system).init[ProjectionBehavior.Command]( + name = s"$projectionName-$projectionKey", + numberOfInstances = 1, + behaviorFactory = _ => ProjectionBehavior(proj), + stopMessage = ProjectionBehavior.Stop + ) + } + yield () + } + +class ProjectionHandler[E]( + dbConfig: DatabaseConfig[MySQLProfile], + processor: ViewProcessor[E] +)(using runtime: Runtime[Any]) + extends SlickHandler[EventEnvelope[E]](): + import dbConfig.profile.api.* + + override def process(envelope: EventEnvelope[E]): DBIO[Done] = + DBIO.from( + Unsafe.unsafe(implicit unsafe => + runtime.unsafe.runToFuture(processor.process(envelope.event).as(Done)) + ) + ) diff --git a/akka-persistence/src/main/scala/iw/akka/AkkaProjectionSupport.scala b/akka-persistence/src/main/scala/iw/akka/AkkaProjectionSupport.scala new file mode 100644 index 0000000..12a3ec7 --- /dev/null +++ b/akka-persistence/src/main/scala/iw/akka/AkkaProjectionSupport.scala @@ -0,0 +1,75 @@ +package works.iterative.akka + +import zio.* +import akka.Done +import akka.actor.typed.ActorSystem +import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal +import akka.persistence.query.Offset +import akka.projection.ProjectionId +import akka.projection.eventsourced.EventEnvelope +import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.projection.scaladsl.SourceProvider +import akka.projection.slick.{SlickHandler, SlickProjection} +import slick.basic.DatabaseConfig +import slick.jdbc.MySQLProfile +import akka.projection.ProjectionBehavior +import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess +import works.iterative.entity.ViewProcessor + +object AkkaProjectionSupport: + def runSingle[E]( + projectionName: String, + projectionKey: String, + tag: String, + processor: ViewProcessor[E] + ): ZIO[AkkaActorSystem, Throwable, Unit] = + ZIO.serviceWith[AkkaActorSystem](_.system).flatMap { system => + given ActorSystem[?] = system + for + given Runtime[Any] <- ZIO.runtime[Any] + sourceProvider <- ZIO + .attempt[SourceProvider[Offset, EventEnvelope[E]]] { + EventSourcedProvider.eventsByTag[E]( + system, + readJournalPluginId = JdbcReadJournal.Identifier, + tag = tag + ) + } + dbConfig <- ZIO.attempt[DatabaseConfig[MySQLProfile]] { + DatabaseConfig.forConfig( + "projection.slick", + system.settings.config + ) + } + proj <- ZIO.attempt { + SlickProjection.exactlyOnce( + projectionId = ProjectionId(projectionName, projectionKey), + sourceProvider, + dbConfig, + handler = () => new ProjectionHandler(dbConfig, processor) + ) + } + _ <- ZIO.attempt { + ShardedDaemonProcess(system).init[ProjectionBehavior.Command]( + name = s"$projectionName-$projectionKey", + numberOfInstances = 1, + behaviorFactory = _ => ProjectionBehavior(proj), + stopMessage = ProjectionBehavior.Stop + ) + } + yield () + } + +class ProjectionHandler[E]( + dbConfig: DatabaseConfig[MySQLProfile], + processor: ViewProcessor[E] +)(using runtime: Runtime[Any]) + extends SlickHandler[EventEnvelope[E]](): + import dbConfig.profile.api.* + + override def process(envelope: EventEnvelope[E]): DBIO[Done] = + DBIO.from( + Unsafe.unsafe(implicit unsafe => + runtime.unsafe.runToFuture(processor.process(envelope.event).as(Done)) + ) + ) diff --git a/build.sbt b/build.sbt index 2e1fdd8..6e994de 100644 --- a/build.sbt +++ b/build.sbt @@ -86,6 +86,7 @@ IWDeps.akka.profiles.eventsourcedJdbcProjection, libraryDependencies += "com.github.ghik" %% "silencer-lib" % "1.4.2" % Provided cross CrossVersion.for3Use2_13 ) + .dependsOn(core.jvm) lazy val ui = crossProject(JSPlatform, JVMPlatform) .crossType(CrossType.Full) diff --git a/akka-persistence/src/main/scala/iw/akka/AkkaProjectionSupport.scala b/akka-persistence/src/main/scala/iw/akka/AkkaProjectionSupport.scala new file mode 100644 index 0000000..12a3ec7 --- /dev/null +++ b/akka-persistence/src/main/scala/iw/akka/AkkaProjectionSupport.scala @@ -0,0 +1,75 @@ +package works.iterative.akka + +import zio.* +import akka.Done +import akka.actor.typed.ActorSystem +import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal +import akka.persistence.query.Offset +import akka.projection.ProjectionId +import akka.projection.eventsourced.EventEnvelope +import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.projection.scaladsl.SourceProvider +import akka.projection.slick.{SlickHandler, SlickProjection} +import slick.basic.DatabaseConfig +import slick.jdbc.MySQLProfile +import akka.projection.ProjectionBehavior +import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess +import works.iterative.entity.ViewProcessor + +object AkkaProjectionSupport: + def runSingle[E]( + projectionName: String, + projectionKey: String, + tag: String, + processor: ViewProcessor[E] + ): ZIO[AkkaActorSystem, Throwable, Unit] = + ZIO.serviceWith[AkkaActorSystem](_.system).flatMap { system => + given ActorSystem[?] = system + for + given Runtime[Any] <- ZIO.runtime[Any] + sourceProvider <- ZIO + .attempt[SourceProvider[Offset, EventEnvelope[E]]] { + EventSourcedProvider.eventsByTag[E]( + system, + readJournalPluginId = JdbcReadJournal.Identifier, + tag = tag + ) + } + dbConfig <- ZIO.attempt[DatabaseConfig[MySQLProfile]] { + DatabaseConfig.forConfig( + "projection.slick", + system.settings.config + ) + } + proj <- ZIO.attempt { + SlickProjection.exactlyOnce( + projectionId = ProjectionId(projectionName, projectionKey), + sourceProvider, + dbConfig, + handler = () => new ProjectionHandler(dbConfig, processor) + ) + } + _ <- ZIO.attempt { + ShardedDaemonProcess(system).init[ProjectionBehavior.Command]( + name = s"$projectionName-$projectionKey", + numberOfInstances = 1, + behaviorFactory = _ => ProjectionBehavior(proj), + stopMessage = ProjectionBehavior.Stop + ) + } + yield () + } + +class ProjectionHandler[E]( + dbConfig: DatabaseConfig[MySQLProfile], + processor: ViewProcessor[E] +)(using runtime: Runtime[Any]) + extends SlickHandler[EventEnvelope[E]](): + import dbConfig.profile.api.* + + override def process(envelope: EventEnvelope[E]): DBIO[Done] = + DBIO.from( + Unsafe.unsafe(implicit unsafe => + runtime.unsafe.runToFuture(processor.process(envelope.event).as(Done)) + ) + ) diff --git a/build.sbt b/build.sbt index 2e1fdd8..6e994de 100644 --- a/build.sbt +++ b/build.sbt @@ -86,6 +86,7 @@ IWDeps.akka.profiles.eventsourcedJdbcProjection, libraryDependencies += "com.github.ghik" %% "silencer-lib" % "1.4.2" % Provided cross CrossVersion.for3Use2_13 ) + .dependsOn(core.jvm) lazy val ui = crossProject(JSPlatform, JVMPlatform) .crossType(CrossType.Full) diff --git a/core/shared/src/main/scala/works/iterative/entity/ViewProcessor.scala b/core/shared/src/main/scala/works/iterative/entity/ViewProcessor.scala new file mode 100644 index 0000000..946011a --- /dev/null +++ b/core/shared/src/main/scala/works/iterative/entity/ViewProcessor.scala @@ -0,0 +1,6 @@ +package works.iterative.entity + +import zio.* + +trait ViewProcessor[Event]: + def process(event: Event): UIO[Unit]