diff --git a/build.sbt b/build.sbt index 425f5a4..e6f5e40 100644 --- a/build.sbt +++ b/build.sbt @@ -17,6 +17,7 @@ .json(_.dependsOn(json)) .repo(_.dependsOn(`mongo-support`)) .entity(_.dependsOn(`akka-persistence-support`)) + .projection(_.dependsOn(`akka-persistence-support`)) .endpoints(_.dependsOn(`tapir-support`)) lazy val parameters = entityProject("parameters", file("domain/parameters")) @@ -79,7 +80,7 @@ lazy val `akka-persistence-support` = project .in(file("fiftyforms/akka-persistence")) - .settings(IWDeps.akkaPersistence) + .settings(IWDeps.akka.profiles.eventsourcedJdbcProjection) lazy val app = (project in file("app")) .enablePlugins(ScalaJSPlugin, VitePlugin) @@ -164,6 +165,7 @@ users.command.api, proof.query.api, proof.command.api, + proof.query.projection, endpoints.jvm ) diff --git a/build.sbt b/build.sbt index 425f5a4..e6f5e40 100644 --- a/build.sbt +++ b/build.sbt @@ -17,6 +17,7 @@ .json(_.dependsOn(json)) .repo(_.dependsOn(`mongo-support`)) .entity(_.dependsOn(`akka-persistence-support`)) + .projection(_.dependsOn(`akka-persistence-support`)) .endpoints(_.dependsOn(`tapir-support`)) lazy val parameters = entityProject("parameters", file("domain/parameters")) @@ -79,7 +80,7 @@ lazy val `akka-persistence-support` = project .in(file("fiftyforms/akka-persistence")) - .settings(IWDeps.akkaPersistence) + .settings(IWDeps.akka.profiles.eventsourcedJdbcProjection) lazy val app = (project in file("app")) .enablePlugins(ScalaJSPlugin, VitePlugin) @@ -164,6 +165,7 @@ users.command.api, proof.query.api, proof.command.api, + proof.query.projection, endpoints.jvm ) diff --git a/domain/proof/command/entity/src/ProofBehaviour.scala b/domain/proof/command/entity/src/ProofBehaviour.scala index c682550..e8c7e4e 100644 --- a/domain/proof/command/entity/src/ProofBehaviour.scala +++ b/domain/proof/command/entity/src/ProofBehaviour.scala @@ -17,22 +17,24 @@ type ReplyTo = ActorRef[StatusReply[Done]] - type Effect = akka.persistence.typed.scaladsl.Effect[Event, State] - case class ProofCommand(command: Command, meta: WW, replyTo: ReplyTo) - case class ProofEvent(event: Event, meta: WW) + + type Effect = akka.persistence.typed.scaladsl.Effect[Event, State] type ProofReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[ProofEvent, State] def apply(persistenceId: PersistenceId): Behavior[ProofCommand] = + import ProofEventHandler.* EventSourcedBehavior .withEnforcedReplies[ProofCommand, ProofEvent, State]( persistenceId = persistenceId, emptyState = None, commandHandler = handleProofCommand, - eventHandler = handleProofEvent + eventHandler = (state, event) => + state.handleEvent(event).orElse(unhandledEvent(event, state)) ) + .withTagger(_ => Set(ProofEvent.Tag)) def handleProofCommand( state: State, @@ -42,59 +44,6 @@ case Some(events) => persist(events, command.meta, command.replyTo) case _ => unhandled(command.command, command.replyTo) - type ProofHandler = WW ?=> PartialFunction[Event, Proof] - type ProofModHandler = WW ?=> PartialFunction[Event, Proof => Proof] - - def handleProofEvent(state: State, event: ProofEvent): State = - val ProofEvent(ev, ww) = event - - def handle(h: ProofHandler): State = - Some(h(using ww).applyOrElse(ev, unhandledEvent(event, state))) - - def handleMod(p: Proof)(h: ProofModHandler): State = - Some(h(using ww).applyOrElse(ev, unhandledEvent(event, state))(p)) - - state match - case None => - handle(handleCreateProof) - case Some(proof) => - handleMod(proof) { - handleAuthorizeProof orElse handleUpdateProof orElse handleRevokeProof - } - - def handleCreateProof: ProofHandler = { - case ProofCreated(id, person, parameterId, criterionId, documents) => - Proof( - id, - person, - parameterId, - criterionId, - documents, - Nil, - Nil, - summon[WW] - ) - } - - def handleAuthorizeProof: ProofModHandler = { case AuthorizeProof(id, note) => - proof => - proof.copy(authorizations = - proof.authorizations :+ Authorization(summon[WW], note) - ) - } - - def handleUpdateProof: ProofModHandler = { case UpdateProof(id, documents) => - proof => proof.copy(documents = documents) - } - - def handleRevokeProof: ProofModHandler = { - case RevokeProof(id, reason, since, documents) => - proof => - proof.copy(revocations = - proof.revocations :+ Revocation(summon[WW], since, reason, documents) - ) - } - def handleCommand(state: State, cmd: Command): Option[Seq[Event]] = state match case None => diff --git a/build.sbt b/build.sbt index 425f5a4..e6f5e40 100644 --- a/build.sbt +++ b/build.sbt @@ -17,6 +17,7 @@ .json(_.dependsOn(json)) .repo(_.dependsOn(`mongo-support`)) .entity(_.dependsOn(`akka-persistence-support`)) + .projection(_.dependsOn(`akka-persistence-support`)) .endpoints(_.dependsOn(`tapir-support`)) lazy val parameters = entityProject("parameters", file("domain/parameters")) @@ -79,7 +80,7 @@ lazy val `akka-persistence-support` = project .in(file("fiftyforms/akka-persistence")) - .settings(IWDeps.akkaPersistence) + .settings(IWDeps.akka.profiles.eventsourcedJdbcProjection) lazy val app = (project in file("app")) .enablePlugins(ScalaJSPlugin, VitePlugin) @@ -164,6 +165,7 @@ users.command.api, proof.query.api, proof.command.api, + proof.query.projection, endpoints.jvm ) diff --git a/domain/proof/command/entity/src/ProofBehaviour.scala b/domain/proof/command/entity/src/ProofBehaviour.scala index c682550..e8c7e4e 100644 --- a/domain/proof/command/entity/src/ProofBehaviour.scala +++ b/domain/proof/command/entity/src/ProofBehaviour.scala @@ -17,22 +17,24 @@ type ReplyTo = ActorRef[StatusReply[Done]] - type Effect = akka.persistence.typed.scaladsl.Effect[Event, State] - case class ProofCommand(command: Command, meta: WW, replyTo: ReplyTo) - case class ProofEvent(event: Event, meta: WW) + + type Effect = akka.persistence.typed.scaladsl.Effect[Event, State] type ProofReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[ProofEvent, State] def apply(persistenceId: PersistenceId): Behavior[ProofCommand] = + import ProofEventHandler.* EventSourcedBehavior .withEnforcedReplies[ProofCommand, ProofEvent, State]( persistenceId = persistenceId, emptyState = None, commandHandler = handleProofCommand, - eventHandler = handleProofEvent + eventHandler = (state, event) => + state.handleEvent(event).orElse(unhandledEvent(event, state)) ) + .withTagger(_ => Set(ProofEvent.Tag)) def handleProofCommand( state: State, @@ -42,59 +44,6 @@ case Some(events) => persist(events, command.meta, command.replyTo) case _ => unhandled(command.command, command.replyTo) - type ProofHandler = WW ?=> PartialFunction[Event, Proof] - type ProofModHandler = WW ?=> PartialFunction[Event, Proof => Proof] - - def handleProofEvent(state: State, event: ProofEvent): State = - val ProofEvent(ev, ww) = event - - def handle(h: ProofHandler): State = - Some(h(using ww).applyOrElse(ev, unhandledEvent(event, state))) - - def handleMod(p: Proof)(h: ProofModHandler): State = - Some(h(using ww).applyOrElse(ev, unhandledEvent(event, state))(p)) - - state match - case None => - handle(handleCreateProof) - case Some(proof) => - handleMod(proof) { - handleAuthorizeProof orElse handleUpdateProof orElse handleRevokeProof - } - - def handleCreateProof: ProofHandler = { - case ProofCreated(id, person, parameterId, criterionId, documents) => - Proof( - id, - person, - parameterId, - criterionId, - documents, - Nil, - Nil, - summon[WW] - ) - } - - def handleAuthorizeProof: ProofModHandler = { case AuthorizeProof(id, note) => - proof => - proof.copy(authorizations = - proof.authorizations :+ Authorization(summon[WW], note) - ) - } - - def handleUpdateProof: ProofModHandler = { case UpdateProof(id, documents) => - proof => proof.copy(documents = documents) - } - - def handleRevokeProof: ProofModHandler = { - case RevokeProof(id, reason, since, documents) => - proof => - proof.copy(revocations = - proof.revocations :+ Revocation(summon[WW], since, reason, documents) - ) - } - def handleCommand(state: State, cmd: Command): Option[Seq[Event]] = state match case None => diff --git a/domain/proof/query/projection/src/ProofProjection.scala b/domain/proof/query/projection/src/ProofProjection.scala index 8b13789..e761a81 100644 --- a/domain/proof/query/projection/src/ProofProjection.scala +++ b/domain/proof/query/projection/src/ProofProjection.scala @@ -1 +1,59 @@ +package mdr.pdb +package proof +package query.projection +import akka.projection.scaladsl.SourceProvider +import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.persistence.query.Offset +import akka.projection.eventsourced.EventEnvelope +import akka.actor.typed.ActorSystem +import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal +import mdr.pdb.proof.query.repo.ProofRepository +import mdr.pdb.proof.query.repo.ProofRepositoryWrite +import akka.projection.scaladsl.Handler +import akka.Done +import scala.concurrent.Future + +import zio.* +import fiftyforms.akka.UnhandledEvent + +class ProofProjection(system: ActorSystem[_]): + + val sourceProvider: SourceProvider[Offset, EventEnvelope[ProofEvent]] = + EventSourcedProvider.eventsByTag[ProofEvent]( + system, + readJournalPluginId = JdbcReadJournal.Identifier, + tag = ProofEvent.Tag + ) + +class ProofProjectionHandler( + tag: String, + system: ActorSystem[_], + runtime: Runtime[ProofRepositoryWrite] +) extends Handler[EventEnvelope[ProofEvent]](): + override def process(envelope: EventEnvelope[ProofEvent]): Future[Done] = + val prog: RIO[ProofRepositoryWrite, Done] = envelope.event.event match + case ev: ProofCreated => createProof(envelope.event) + case ev => updateProof(envelope.event) + runtime.unsafeRunToFuture(prog) + + private def createProof(ev: ProofEvent): RIO[ProofRepositoryWrite, Done] = + for + proof <- handleProof(None)(ev) + _ <- ProofRepositoryWrite.put(proof) + yield Done + + private def updateProof(ev: ProofEvent): RIO[ProofRepositoryWrite, Done] = + for + orig <- ProofRepositoryWrite + .matching(ProofRepository.WithId(ev.event.id)) + .map(_.headOption) + proof <- handleProof(orig)(ev) + _ <- ProofRepositoryWrite.put(proof) + yield Done + + private def handleProof(proof: Option[Proof])(ev: ProofEvent): Task[Proof] = + import ProofEventHandler.* + ZIO + .fromOption(proof.handleEvent(ev)) + .mapError(_ => UnhandledEvent(ev, proof)) diff --git a/build.sbt b/build.sbt index 425f5a4..e6f5e40 100644 --- a/build.sbt +++ b/build.sbt @@ -17,6 +17,7 @@ .json(_.dependsOn(json)) .repo(_.dependsOn(`mongo-support`)) .entity(_.dependsOn(`akka-persistence-support`)) + .projection(_.dependsOn(`akka-persistence-support`)) .endpoints(_.dependsOn(`tapir-support`)) lazy val parameters = entityProject("parameters", file("domain/parameters")) @@ -79,7 +80,7 @@ lazy val `akka-persistence-support` = project .in(file("fiftyforms/akka-persistence")) - .settings(IWDeps.akkaPersistence) + .settings(IWDeps.akka.profiles.eventsourcedJdbcProjection) lazy val app = (project in file("app")) .enablePlugins(ScalaJSPlugin, VitePlugin) @@ -164,6 +165,7 @@ users.command.api, proof.query.api, proof.command.api, + proof.query.projection, endpoints.jvm ) diff --git a/domain/proof/command/entity/src/ProofBehaviour.scala b/domain/proof/command/entity/src/ProofBehaviour.scala index c682550..e8c7e4e 100644 --- a/domain/proof/command/entity/src/ProofBehaviour.scala +++ b/domain/proof/command/entity/src/ProofBehaviour.scala @@ -17,22 +17,24 @@ type ReplyTo = ActorRef[StatusReply[Done]] - type Effect = akka.persistence.typed.scaladsl.Effect[Event, State] - case class ProofCommand(command: Command, meta: WW, replyTo: ReplyTo) - case class ProofEvent(event: Event, meta: WW) + + type Effect = akka.persistence.typed.scaladsl.Effect[Event, State] type ProofReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[ProofEvent, State] def apply(persistenceId: PersistenceId): Behavior[ProofCommand] = + import ProofEventHandler.* EventSourcedBehavior .withEnforcedReplies[ProofCommand, ProofEvent, State]( persistenceId = persistenceId, emptyState = None, commandHandler = handleProofCommand, - eventHandler = handleProofEvent + eventHandler = (state, event) => + state.handleEvent(event).orElse(unhandledEvent(event, state)) ) + .withTagger(_ => Set(ProofEvent.Tag)) def handleProofCommand( state: State, @@ -42,59 +44,6 @@ case Some(events) => persist(events, command.meta, command.replyTo) case _ => unhandled(command.command, command.replyTo) - type ProofHandler = WW ?=> PartialFunction[Event, Proof] - type ProofModHandler = WW ?=> PartialFunction[Event, Proof => Proof] - - def handleProofEvent(state: State, event: ProofEvent): State = - val ProofEvent(ev, ww) = event - - def handle(h: ProofHandler): State = - Some(h(using ww).applyOrElse(ev, unhandledEvent(event, state))) - - def handleMod(p: Proof)(h: ProofModHandler): State = - Some(h(using ww).applyOrElse(ev, unhandledEvent(event, state))(p)) - - state match - case None => - handle(handleCreateProof) - case Some(proof) => - handleMod(proof) { - handleAuthorizeProof orElse handleUpdateProof orElse handleRevokeProof - } - - def handleCreateProof: ProofHandler = { - case ProofCreated(id, person, parameterId, criterionId, documents) => - Proof( - id, - person, - parameterId, - criterionId, - documents, - Nil, - Nil, - summon[WW] - ) - } - - def handleAuthorizeProof: ProofModHandler = { case AuthorizeProof(id, note) => - proof => - proof.copy(authorizations = - proof.authorizations :+ Authorization(summon[WW], note) - ) - } - - def handleUpdateProof: ProofModHandler = { case UpdateProof(id, documents) => - proof => proof.copy(documents = documents) - } - - def handleRevokeProof: ProofModHandler = { - case RevokeProof(id, reason, since, documents) => - proof => - proof.copy(revocations = - proof.revocations :+ Revocation(summon[WW], since, reason, documents) - ) - } - def handleCommand(state: State, cmd: Command): Option[Seq[Event]] = state match case None => diff --git a/domain/proof/query/projection/src/ProofProjection.scala b/domain/proof/query/projection/src/ProofProjection.scala index 8b13789..e761a81 100644 --- a/domain/proof/query/projection/src/ProofProjection.scala +++ b/domain/proof/query/projection/src/ProofProjection.scala @@ -1 +1,59 @@ +package mdr.pdb +package proof +package query.projection +import akka.projection.scaladsl.SourceProvider +import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.persistence.query.Offset +import akka.projection.eventsourced.EventEnvelope +import akka.actor.typed.ActorSystem +import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal +import mdr.pdb.proof.query.repo.ProofRepository +import mdr.pdb.proof.query.repo.ProofRepositoryWrite +import akka.projection.scaladsl.Handler +import akka.Done +import scala.concurrent.Future + +import zio.* +import fiftyforms.akka.UnhandledEvent + +class ProofProjection(system: ActorSystem[_]): + + val sourceProvider: SourceProvider[Offset, EventEnvelope[ProofEvent]] = + EventSourcedProvider.eventsByTag[ProofEvent]( + system, + readJournalPluginId = JdbcReadJournal.Identifier, + tag = ProofEvent.Tag + ) + +class ProofProjectionHandler( + tag: String, + system: ActorSystem[_], + runtime: Runtime[ProofRepositoryWrite] +) extends Handler[EventEnvelope[ProofEvent]](): + override def process(envelope: EventEnvelope[ProofEvent]): Future[Done] = + val prog: RIO[ProofRepositoryWrite, Done] = envelope.event.event match + case ev: ProofCreated => createProof(envelope.event) + case ev => updateProof(envelope.event) + runtime.unsafeRunToFuture(prog) + + private def createProof(ev: ProofEvent): RIO[ProofRepositoryWrite, Done] = + for + proof <- handleProof(None)(ev) + _ <- ProofRepositoryWrite.put(proof) + yield Done + + private def updateProof(ev: ProofEvent): RIO[ProofRepositoryWrite, Done] = + for + orig <- ProofRepositoryWrite + .matching(ProofRepository.WithId(ev.event.id)) + .map(_.headOption) + proof <- handleProof(orig)(ev) + _ <- ProofRepositoryWrite.put(proof) + yield Done + + private def handleProof(proof: Option[Proof])(ev: ProofEvent): Task[Proof] = + import ProofEventHandler.* + ZIO + .fromOption(proof.handleEvent(ev)) + .mapError(_ => UnhandledEvent(ev, proof)) diff --git a/domain/proof/query/repo/src/ProofRepository.scala b/domain/proof/query/repo/src/ProofRepository.scala index ae34fa2..9ad9719 100644 --- a/domain/proof/query/repo/src/ProofRepository.scala +++ b/domain/proof/query/repo/src/ProofRepository.scala @@ -9,9 +9,20 @@ case class WithId(id: Proof.Id) extends Criteria case class OfPerson(osc: OsobniCislo) extends Criteria + def matching(criteria: Criteria): RIO[ProofRepository, Seq[Proof]] = + ZIO.serviceWithZIO(_.matching(criteria)) + trait ProofRepository: import ProofRepository.* def matching(criteria: Criteria): Task[Seq[Proof]] private[query] trait ProofRepositoryWrite extends ProofRepository: def put(proof: Proof): Task[Unit] + +private[query] object ProofRepositoryWrite: + def matching( + criteria: ProofRepository.Criteria + ): RIO[ProofRepositoryWrite, Seq[Proof]] = + ZIO.serviceWithZIO(_.matching(criteria)) + def put(proof: Proof): RIO[ProofRepositoryWrite, Unit] = + ZIO.serviceWithZIO(_.put(proof)) diff --git a/build.sbt b/build.sbt index 425f5a4..e6f5e40 100644 --- a/build.sbt +++ b/build.sbt @@ -17,6 +17,7 @@ .json(_.dependsOn(json)) .repo(_.dependsOn(`mongo-support`)) .entity(_.dependsOn(`akka-persistence-support`)) + .projection(_.dependsOn(`akka-persistence-support`)) .endpoints(_.dependsOn(`tapir-support`)) lazy val parameters = entityProject("parameters", file("domain/parameters")) @@ -79,7 +80,7 @@ lazy val `akka-persistence-support` = project .in(file("fiftyforms/akka-persistence")) - .settings(IWDeps.akkaPersistence) + .settings(IWDeps.akka.profiles.eventsourcedJdbcProjection) lazy val app = (project in file("app")) .enablePlugins(ScalaJSPlugin, VitePlugin) @@ -164,6 +165,7 @@ users.command.api, proof.query.api, proof.command.api, + proof.query.projection, endpoints.jvm ) diff --git a/domain/proof/command/entity/src/ProofBehaviour.scala b/domain/proof/command/entity/src/ProofBehaviour.scala index c682550..e8c7e4e 100644 --- a/domain/proof/command/entity/src/ProofBehaviour.scala +++ b/domain/proof/command/entity/src/ProofBehaviour.scala @@ -17,22 +17,24 @@ type ReplyTo = ActorRef[StatusReply[Done]] - type Effect = akka.persistence.typed.scaladsl.Effect[Event, State] - case class ProofCommand(command: Command, meta: WW, replyTo: ReplyTo) - case class ProofEvent(event: Event, meta: WW) + + type Effect = akka.persistence.typed.scaladsl.Effect[Event, State] type ProofReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[ProofEvent, State] def apply(persistenceId: PersistenceId): Behavior[ProofCommand] = + import ProofEventHandler.* EventSourcedBehavior .withEnforcedReplies[ProofCommand, ProofEvent, State]( persistenceId = persistenceId, emptyState = None, commandHandler = handleProofCommand, - eventHandler = handleProofEvent + eventHandler = (state, event) => + state.handleEvent(event).orElse(unhandledEvent(event, state)) ) + .withTagger(_ => Set(ProofEvent.Tag)) def handleProofCommand( state: State, @@ -42,59 +44,6 @@ case Some(events) => persist(events, command.meta, command.replyTo) case _ => unhandled(command.command, command.replyTo) - type ProofHandler = WW ?=> PartialFunction[Event, Proof] - type ProofModHandler = WW ?=> PartialFunction[Event, Proof => Proof] - - def handleProofEvent(state: State, event: ProofEvent): State = - val ProofEvent(ev, ww) = event - - def handle(h: ProofHandler): State = - Some(h(using ww).applyOrElse(ev, unhandledEvent(event, state))) - - def handleMod(p: Proof)(h: ProofModHandler): State = - Some(h(using ww).applyOrElse(ev, unhandledEvent(event, state))(p)) - - state match - case None => - handle(handleCreateProof) - case Some(proof) => - handleMod(proof) { - handleAuthorizeProof orElse handleUpdateProof orElse handleRevokeProof - } - - def handleCreateProof: ProofHandler = { - case ProofCreated(id, person, parameterId, criterionId, documents) => - Proof( - id, - person, - parameterId, - criterionId, - documents, - Nil, - Nil, - summon[WW] - ) - } - - def handleAuthorizeProof: ProofModHandler = { case AuthorizeProof(id, note) => - proof => - proof.copy(authorizations = - proof.authorizations :+ Authorization(summon[WW], note) - ) - } - - def handleUpdateProof: ProofModHandler = { case UpdateProof(id, documents) => - proof => proof.copy(documents = documents) - } - - def handleRevokeProof: ProofModHandler = { - case RevokeProof(id, reason, since, documents) => - proof => - proof.copy(revocations = - proof.revocations :+ Revocation(summon[WW], since, reason, documents) - ) - } - def handleCommand(state: State, cmd: Command): Option[Seq[Event]] = state match case None => diff --git a/domain/proof/query/projection/src/ProofProjection.scala b/domain/proof/query/projection/src/ProofProjection.scala index 8b13789..e761a81 100644 --- a/domain/proof/query/projection/src/ProofProjection.scala +++ b/domain/proof/query/projection/src/ProofProjection.scala @@ -1 +1,59 @@ +package mdr.pdb +package proof +package query.projection +import akka.projection.scaladsl.SourceProvider +import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.persistence.query.Offset +import akka.projection.eventsourced.EventEnvelope +import akka.actor.typed.ActorSystem +import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal +import mdr.pdb.proof.query.repo.ProofRepository +import mdr.pdb.proof.query.repo.ProofRepositoryWrite +import akka.projection.scaladsl.Handler +import akka.Done +import scala.concurrent.Future + +import zio.* +import fiftyforms.akka.UnhandledEvent + +class ProofProjection(system: ActorSystem[_]): + + val sourceProvider: SourceProvider[Offset, EventEnvelope[ProofEvent]] = + EventSourcedProvider.eventsByTag[ProofEvent]( + system, + readJournalPluginId = JdbcReadJournal.Identifier, + tag = ProofEvent.Tag + ) + +class ProofProjectionHandler( + tag: String, + system: ActorSystem[_], + runtime: Runtime[ProofRepositoryWrite] +) extends Handler[EventEnvelope[ProofEvent]](): + override def process(envelope: EventEnvelope[ProofEvent]): Future[Done] = + val prog: RIO[ProofRepositoryWrite, Done] = envelope.event.event match + case ev: ProofCreated => createProof(envelope.event) + case ev => updateProof(envelope.event) + runtime.unsafeRunToFuture(prog) + + private def createProof(ev: ProofEvent): RIO[ProofRepositoryWrite, Done] = + for + proof <- handleProof(None)(ev) + _ <- ProofRepositoryWrite.put(proof) + yield Done + + private def updateProof(ev: ProofEvent): RIO[ProofRepositoryWrite, Done] = + for + orig <- ProofRepositoryWrite + .matching(ProofRepository.WithId(ev.event.id)) + .map(_.headOption) + proof <- handleProof(orig)(ev) + _ <- ProofRepositoryWrite.put(proof) + yield Done + + private def handleProof(proof: Option[Proof])(ev: ProofEvent): Task[Proof] = + import ProofEventHandler.* + ZIO + .fromOption(proof.handleEvent(ev)) + .mapError(_ => UnhandledEvent(ev, proof)) diff --git a/domain/proof/query/repo/src/ProofRepository.scala b/domain/proof/query/repo/src/ProofRepository.scala index ae34fa2..9ad9719 100644 --- a/domain/proof/query/repo/src/ProofRepository.scala +++ b/domain/proof/query/repo/src/ProofRepository.scala @@ -9,9 +9,20 @@ case class WithId(id: Proof.Id) extends Criteria case class OfPerson(osc: OsobniCislo) extends Criteria + def matching(criteria: Criteria): RIO[ProofRepository, Seq[Proof]] = + ZIO.serviceWithZIO(_.matching(criteria)) + trait ProofRepository: import ProofRepository.* def matching(criteria: Criteria): Task[Seq[Proof]] private[query] trait ProofRepositoryWrite extends ProofRepository: def put(proof: Proof): Task[Unit] + +private[query] object ProofRepositoryWrite: + def matching( + criteria: ProofRepository.Criteria + ): RIO[ProofRepositoryWrite, Seq[Proof]] = + ZIO.serviceWithZIO(_.matching(criteria)) + def put(proof: Proof): RIO[ProofRepositoryWrite, Unit] = + ZIO.serviceWithZIO(_.put(proof)) diff --git a/domain/proof/shared/model/src/Event.scala b/domain/proof/shared/model/src/Event.scala index 465d6fd..1818110 100644 --- a/domain/proof/shared/model/src/Event.scala +++ b/domain/proof/shared/model/src/Event.scala @@ -3,7 +3,13 @@ import java.time.Instant -sealed trait Event +case class ProofEvent(event: Event, meta: WW) + +object ProofEvent: + val Tag = "proof" + +sealed trait Event: + def id: Proof.Id case class ProofCreated( id: Proof.Id, diff --git a/build.sbt b/build.sbt index 425f5a4..e6f5e40 100644 --- a/build.sbt +++ b/build.sbt @@ -17,6 +17,7 @@ .json(_.dependsOn(json)) .repo(_.dependsOn(`mongo-support`)) .entity(_.dependsOn(`akka-persistence-support`)) + .projection(_.dependsOn(`akka-persistence-support`)) .endpoints(_.dependsOn(`tapir-support`)) lazy val parameters = entityProject("parameters", file("domain/parameters")) @@ -79,7 +80,7 @@ lazy val `akka-persistence-support` = project .in(file("fiftyforms/akka-persistence")) - .settings(IWDeps.akkaPersistence) + .settings(IWDeps.akka.profiles.eventsourcedJdbcProjection) lazy val app = (project in file("app")) .enablePlugins(ScalaJSPlugin, VitePlugin) @@ -164,6 +165,7 @@ users.command.api, proof.query.api, proof.command.api, + proof.query.projection, endpoints.jvm ) diff --git a/domain/proof/command/entity/src/ProofBehaviour.scala b/domain/proof/command/entity/src/ProofBehaviour.scala index c682550..e8c7e4e 100644 --- a/domain/proof/command/entity/src/ProofBehaviour.scala +++ b/domain/proof/command/entity/src/ProofBehaviour.scala @@ -17,22 +17,24 @@ type ReplyTo = ActorRef[StatusReply[Done]] - type Effect = akka.persistence.typed.scaladsl.Effect[Event, State] - case class ProofCommand(command: Command, meta: WW, replyTo: ReplyTo) - case class ProofEvent(event: Event, meta: WW) + + type Effect = akka.persistence.typed.scaladsl.Effect[Event, State] type ProofReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[ProofEvent, State] def apply(persistenceId: PersistenceId): Behavior[ProofCommand] = + import ProofEventHandler.* EventSourcedBehavior .withEnforcedReplies[ProofCommand, ProofEvent, State]( persistenceId = persistenceId, emptyState = None, commandHandler = handleProofCommand, - eventHandler = handleProofEvent + eventHandler = (state, event) => + state.handleEvent(event).orElse(unhandledEvent(event, state)) ) + .withTagger(_ => Set(ProofEvent.Tag)) def handleProofCommand( state: State, @@ -42,59 +44,6 @@ case Some(events) => persist(events, command.meta, command.replyTo) case _ => unhandled(command.command, command.replyTo) - type ProofHandler = WW ?=> PartialFunction[Event, Proof] - type ProofModHandler = WW ?=> PartialFunction[Event, Proof => Proof] - - def handleProofEvent(state: State, event: ProofEvent): State = - val ProofEvent(ev, ww) = event - - def handle(h: ProofHandler): State = - Some(h(using ww).applyOrElse(ev, unhandledEvent(event, state))) - - def handleMod(p: Proof)(h: ProofModHandler): State = - Some(h(using ww).applyOrElse(ev, unhandledEvent(event, state))(p)) - - state match - case None => - handle(handleCreateProof) - case Some(proof) => - handleMod(proof) { - handleAuthorizeProof orElse handleUpdateProof orElse handleRevokeProof - } - - def handleCreateProof: ProofHandler = { - case ProofCreated(id, person, parameterId, criterionId, documents) => - Proof( - id, - person, - parameterId, - criterionId, - documents, - Nil, - Nil, - summon[WW] - ) - } - - def handleAuthorizeProof: ProofModHandler = { case AuthorizeProof(id, note) => - proof => - proof.copy(authorizations = - proof.authorizations :+ Authorization(summon[WW], note) - ) - } - - def handleUpdateProof: ProofModHandler = { case UpdateProof(id, documents) => - proof => proof.copy(documents = documents) - } - - def handleRevokeProof: ProofModHandler = { - case RevokeProof(id, reason, since, documents) => - proof => - proof.copy(revocations = - proof.revocations :+ Revocation(summon[WW], since, reason, documents) - ) - } - def handleCommand(state: State, cmd: Command): Option[Seq[Event]] = state match case None => diff --git a/domain/proof/query/projection/src/ProofProjection.scala b/domain/proof/query/projection/src/ProofProjection.scala index 8b13789..e761a81 100644 --- a/domain/proof/query/projection/src/ProofProjection.scala +++ b/domain/proof/query/projection/src/ProofProjection.scala @@ -1 +1,59 @@ +package mdr.pdb +package proof +package query.projection +import akka.projection.scaladsl.SourceProvider +import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.persistence.query.Offset +import akka.projection.eventsourced.EventEnvelope +import akka.actor.typed.ActorSystem +import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal +import mdr.pdb.proof.query.repo.ProofRepository +import mdr.pdb.proof.query.repo.ProofRepositoryWrite +import akka.projection.scaladsl.Handler +import akka.Done +import scala.concurrent.Future + +import zio.* +import fiftyforms.akka.UnhandledEvent + +class ProofProjection(system: ActorSystem[_]): + + val sourceProvider: SourceProvider[Offset, EventEnvelope[ProofEvent]] = + EventSourcedProvider.eventsByTag[ProofEvent]( + system, + readJournalPluginId = JdbcReadJournal.Identifier, + tag = ProofEvent.Tag + ) + +class ProofProjectionHandler( + tag: String, + system: ActorSystem[_], + runtime: Runtime[ProofRepositoryWrite] +) extends Handler[EventEnvelope[ProofEvent]](): + override def process(envelope: EventEnvelope[ProofEvent]): Future[Done] = + val prog: RIO[ProofRepositoryWrite, Done] = envelope.event.event match + case ev: ProofCreated => createProof(envelope.event) + case ev => updateProof(envelope.event) + runtime.unsafeRunToFuture(prog) + + private def createProof(ev: ProofEvent): RIO[ProofRepositoryWrite, Done] = + for + proof <- handleProof(None)(ev) + _ <- ProofRepositoryWrite.put(proof) + yield Done + + private def updateProof(ev: ProofEvent): RIO[ProofRepositoryWrite, Done] = + for + orig <- ProofRepositoryWrite + .matching(ProofRepository.WithId(ev.event.id)) + .map(_.headOption) + proof <- handleProof(orig)(ev) + _ <- ProofRepositoryWrite.put(proof) + yield Done + + private def handleProof(proof: Option[Proof])(ev: ProofEvent): Task[Proof] = + import ProofEventHandler.* + ZIO + .fromOption(proof.handleEvent(ev)) + .mapError(_ => UnhandledEvent(ev, proof)) diff --git a/domain/proof/query/repo/src/ProofRepository.scala b/domain/proof/query/repo/src/ProofRepository.scala index ae34fa2..9ad9719 100644 --- a/domain/proof/query/repo/src/ProofRepository.scala +++ b/domain/proof/query/repo/src/ProofRepository.scala @@ -9,9 +9,20 @@ case class WithId(id: Proof.Id) extends Criteria case class OfPerson(osc: OsobniCislo) extends Criteria + def matching(criteria: Criteria): RIO[ProofRepository, Seq[Proof]] = + ZIO.serviceWithZIO(_.matching(criteria)) + trait ProofRepository: import ProofRepository.* def matching(criteria: Criteria): Task[Seq[Proof]] private[query] trait ProofRepositoryWrite extends ProofRepository: def put(proof: Proof): Task[Unit] + +private[query] object ProofRepositoryWrite: + def matching( + criteria: ProofRepository.Criteria + ): RIO[ProofRepositoryWrite, Seq[Proof]] = + ZIO.serviceWithZIO(_.matching(criteria)) + def put(proof: Proof): RIO[ProofRepositoryWrite, Unit] = + ZIO.serviceWithZIO(_.put(proof)) diff --git a/domain/proof/shared/model/src/Event.scala b/domain/proof/shared/model/src/Event.scala index 465d6fd..1818110 100644 --- a/domain/proof/shared/model/src/Event.scala +++ b/domain/proof/shared/model/src/Event.scala @@ -3,7 +3,13 @@ import java.time.Instant -sealed trait Event +case class ProofEvent(event: Event, meta: WW) + +object ProofEvent: + val Tag = "proof" + +sealed trait Event: + def id: Proof.Id case class ProofCreated( id: Proof.Id, diff --git a/domain/proof/shared/model/src/ProofEventHandler.scala b/domain/proof/shared/model/src/ProofEventHandler.scala new file mode 100644 index 0000000..4de82bd --- /dev/null +++ b/domain/proof/shared/model/src/ProofEventHandler.scala @@ -0,0 +1,52 @@ +package mdr.pdb +package proof + +object ProofEventHandler: + + extension (maybeProof: Option[Proof]) + // Returns None if the event could not be handled, otherwise the result is always some Proof + def handleEvent(event: ProofEvent): Option[Proof] = + type ProofHandler = PartialFunction[Event, Proof] + type ProofModHandler = PartialFunction[Event, Proof => Proof] + + val ProofEvent(ev, ww) = event + + val handleCreateProof: ProofHandler = { + case ProofCreated(id, person, parameterId, criterionId, documents) => + Proof(id, person, parameterId, criterionId, documents, Nil, Nil, ww) + } + + val handleAuthorizeProof: ProofModHandler = { + case ProofAuthorized(id, note) => + proof => + proof.copy(authorizations = + proof.authorizations :+ Authorization(ww, note) + ) + } + + val handleUpdateProof: ProofModHandler = { + case ProofUpdated(id, documents) => + proof => proof.copy(documents = documents) + } + + val handleRevokeProof: ProofModHandler = { + case ProofRevoked(id, reason, since, documents) => + proof => + proof.copy(revocations = + proof.revocations :+ Revocation(ww, since, reason, documents) + ) + } + + def handle(h: ProofHandler): Option[Proof] = + h.lift(ev) + + def handleMod(h: ProofModHandler): Option[Proof => Proof] = + h.lift(ev) + + maybeProof match + case None => + handle(handleCreateProof) + case Some(p) => + handleMod { + handleAuthorizeProof orElse handleUpdateProof orElse handleRevokeProof + } map (_(p))