Newer
Older
support / domain / proof / query / projection / src / ProofProjection.scala
package mdr.pdb
package proof
package query.projection

import akka.projection.scaladsl.SourceProvider
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.persistence.query.Offset
import akka.projection.eventsourced.EventEnvelope
import akka.actor.typed.ActorSystem
import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal
import mdr.pdb.proof.query.repo.ProofRepository
import mdr.pdb.proof.query.repo.ProofRepositoryWrite
import akka.projection.scaladsl.Handler
import akka.Done
import scala.concurrent.Future

import zio.*
import works.iterative.akka.UnhandledEvent
import akka.projection.jdbc.scaladsl.JdbcHandler
import akka.projection.ProjectionId
import akka.projection.slick.SlickProjection
import akka.projection.slick.SlickHandler
import slick.basic.DatabaseConfig
import slick.jdbc.MySQLProfile
import akka.projection.Projection

object ProofProjection:

  val live: ZLayer[ActorSystem[
    ?
  ] & ProofRepositoryWrite, Throwable, ProofProjection] =
    (for
      given ActorSystem[?] <- ZIO.service[ActorSystem[?]]
      runtime <- ZIO.runtime[ProofRepositoryWrite]
      sourceProvider <- Task
        .attempt[SourceProvider[Offset, EventEnvelope[ProofEvent]]] {
          EventSourcedProvider.eventsByTag[ProofEvent](
            summon[ActorSystem[?]],
            readJournalPluginId = JdbcReadJournal.Identifier,
            tag = ProofEvent.Tag
          )
        }
      dbConfig <- Task.attempt[DatabaseConfig[MySQLProfile]] {
        DatabaseConfig.forConfig(
          "projection.slick",
          summon[ActorSystem[?]].settings.config
        )
      }
      proj <- Task.attempt {
        SlickProjection.exactlyOnce(
          projectionId = ProjectionId("Proof", "proof"),
          sourceProvider,
          dbConfig,
          handler = () => new ProofProjectionHandler(dbConfig, runtime)
        )
      }
    yield ProofProjection(proj)).toLayer

case class ProofProjection(projection: Projection[EventEnvelope[ProofEvent]])

// TODO: extract TaskHandler with ZIO convertible to DBIO given runtime
class ProofProjectionHandler(
    dbConfig: DatabaseConfig[MySQLProfile],
    runtime: Runtime[ProofRepositoryWrite]
) extends SlickHandler[EventEnvelope[ProofEvent]]():
  import dbConfig.profile.api.*

  override def process(envelope: EventEnvelope[ProofEvent]): DBIO[Done] =
    val prog: RIO[ProofRepositoryWrite, Done] = envelope.event.event match
      case ev: ProofCreated => createProof(envelope.event)
      case ev               => updateProof(envelope.event)
    // TODO: is there a better way?
    DBIO.from(runtime.unsafeRunToFuture(prog))

  private def createProof(ev: ProofEvent): RIO[ProofRepositoryWrite, Done] =
    for
      proof <- handleProof(None)(ev)
      _ <- ProofRepositoryWrite.put(proof)
    yield Done

  private def updateProof(ev: ProofEvent): RIO[ProofRepositoryWrite, Done] =
    for
      orig <- ProofRepositoryWrite
        .matching(ProofRepository.WithId(ev.event.id))
        .map(_.headOption)
      proof <- handleProof(orig)(ev)
      _ <- ProofRepositoryWrite.put(proof)
    yield Done

  private def handleProof(proof: Option[Proof])(ev: ProofEvent): Task[Proof] =
    import ProofEventHandler.*
    ZIO
      .fromOption(proof.handleEvent(ev))
      .mapError(_ => UnhandledEvent(ev, proof))