diff --git a/service/shared/src/main/scala/works/iterative/core/service/Repository.scala b/service/shared/src/main/scala/works/iterative/core/service/Repository.scala index c450279..fa2b41d 100644 --- a/service/shared/src/main/scala/works/iterative/core/service/Repository.scala +++ b/service/shared/src/main/scala/works/iterative/core/service/Repository.scala @@ -7,8 +7,8 @@ type Op[A] = Eff[A] def load(id: Key): Op[Option[Value]] -trait GenericUpdateNotifyService[Eff[+_], Str[+_], Key]: - def updates: Eff[Str[Key]] +trait GenericUpdateNotifyService[Str[+_], Key]: + def updates: Str[Key] trait GenericLoadAllService[Eff[+_], Coll[+_], -Key, +Value]: type Op[A] = Eff[A] @@ -42,7 +42,7 @@ ZIO.foreach(ids)(load).map(_.flatten.toList) trait UpdateNotifyRepository[Key] - extends GenericUpdateNotifyService[UIO, UStream, Key] + extends GenericUpdateNotifyService[UStream, Key] trait WriteRepository[-Key, -Value] extends GenericWriteRepository[UIO, Key, Value] diff --git a/service/shared/src/main/scala/works/iterative/core/service/Repository.scala b/service/shared/src/main/scala/works/iterative/core/service/Repository.scala index c450279..fa2b41d 100644 --- a/service/shared/src/main/scala/works/iterative/core/service/Repository.scala +++ b/service/shared/src/main/scala/works/iterative/core/service/Repository.scala @@ -7,8 +7,8 @@ type Op[A] = Eff[A] def load(id: Key): Op[Option[Value]] -trait GenericUpdateNotifyService[Eff[+_], Str[+_], Key]: - def updates: Eff[Str[Key]] +trait GenericUpdateNotifyService[Str[+_], Key]: + def updates: Str[Key] trait GenericLoadAllService[Eff[+_], Coll[+_], -Key, +Value]: type Op[A] = Eff[A] @@ -42,7 +42,7 @@ ZIO.foreach(ids)(load).map(_.flatten.toList) trait UpdateNotifyRepository[Key] - extends GenericUpdateNotifyService[UIO, UStream, Key] + extends GenericUpdateNotifyService[UStream, Key] trait WriteRepository[-Key, -Value] extends GenericWriteRepository[UIO, Key, Value] diff --git a/tapir/shared/src/main/scala/works/iterative/tapir/ReconnectingUpdateSource.scala b/tapir/shared/src/main/scala/works/iterative/tapir/ReconnectingUpdateSource.scala new file mode 100644 index 0000000..2d2e665 --- /dev/null +++ b/tapir/shared/src/main/scala/works/iterative/tapir/ReconnectingUpdateSource.scala @@ -0,0 +1,37 @@ +package works.iterative.tapir + +import zio.* +import zio.stream.* + +/** Run a read-only source from server, publish whatever comes to a hub. + * Reconnect on failure. + * + * @param updateHub + * hub to publish updates to + * @param retrySchedule + * schedule for retries, defaults to 2 seconds + * @tparam A + * type of updates + */ +class ReconectingUpdateSource[A]( + updateHub: Hub[A], + retrySchedule: Schedule[Any, Any, ?] = Schedule.spaced(2.second) +): + + /** Run the update source. + * + * It will start the process, reconnecting on failure. Meant to be forked + * somewhere scoped, `updateSource.run.forkScoped` + */ + def run[B]( + source: Unit => UIO[ + ZStream[Any, Throwable, B] => ZStream[Any, Throwable, A] + ] + ): UIO[Unit] = + source(()) + .flatMap( + _(ZStream.never) + .foreach(updateHub.publish) + .retry(retrySchedule) + .orDie + ) diff --git a/service/shared/src/main/scala/works/iterative/core/service/Repository.scala b/service/shared/src/main/scala/works/iterative/core/service/Repository.scala index c450279..fa2b41d 100644 --- a/service/shared/src/main/scala/works/iterative/core/service/Repository.scala +++ b/service/shared/src/main/scala/works/iterative/core/service/Repository.scala @@ -7,8 +7,8 @@ type Op[A] = Eff[A] def load(id: Key): Op[Option[Value]] -trait GenericUpdateNotifyService[Eff[+_], Str[+_], Key]: - def updates: Eff[Str[Key]] +trait GenericUpdateNotifyService[Str[+_], Key]: + def updates: Str[Key] trait GenericLoadAllService[Eff[+_], Coll[+_], -Key, +Value]: type Op[A] = Eff[A] @@ -42,7 +42,7 @@ ZIO.foreach(ids)(load).map(_.flatten.toList) trait UpdateNotifyRepository[Key] - extends GenericUpdateNotifyService[UIO, UStream, Key] + extends GenericUpdateNotifyService[UStream, Key] trait WriteRepository[-Key, -Value] extends GenericWriteRepository[UIO, Key, Value] diff --git a/tapir/shared/src/main/scala/works/iterative/tapir/ReconnectingUpdateSource.scala b/tapir/shared/src/main/scala/works/iterative/tapir/ReconnectingUpdateSource.scala new file mode 100644 index 0000000..2d2e665 --- /dev/null +++ b/tapir/shared/src/main/scala/works/iterative/tapir/ReconnectingUpdateSource.scala @@ -0,0 +1,37 @@ +package works.iterative.tapir + +import zio.* +import zio.stream.* + +/** Run a read-only source from server, publish whatever comes to a hub. + * Reconnect on failure. + * + * @param updateHub + * hub to publish updates to + * @param retrySchedule + * schedule for retries, defaults to 2 seconds + * @tparam A + * type of updates + */ +class ReconectingUpdateSource[A]( + updateHub: Hub[A], + retrySchedule: Schedule[Any, Any, ?] = Schedule.spaced(2.second) +): + + /** Run the update source. + * + * It will start the process, reconnecting on failure. Meant to be forked + * somewhere scoped, `updateSource.run.forkScoped` + */ + def run[B]( + source: Unit => UIO[ + ZStream[Any, Throwable, B] => ZStream[Any, Throwable, A] + ] + ): UIO[Unit] = + source(()) + .flatMap( + _(ZStream.never) + .foreach(updateHub.publish) + .retry(retrySchedule) + .orDie + ) diff --git a/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala b/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala index 7b6c8fa..422c273 100644 --- a/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala +++ b/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala @@ -15,7 +15,7 @@ case class ReloadableComponent[A, I]( fetch: I => IO[UserMessage, A], init: Option[I] = None, - updates: Option[UIO[UStream[ReloadableComponent.Reload[A]]]] = None, + updates: Option[UStream[ReloadableComponent.Reload[A]]] = None, loadSchedule: Schedule[Any, Any, ?] = Schedule.stop )(using runtime: Runtime[Any]): import ReloadableComponent.Reload @@ -38,8 +38,8 @@ memo.now().foreach(reloadUntilChanged(_, original)) } - private def eventStreamFromStreamEffect[A]( - eff: UIO[UStream[A]] + private def eventStreamFromZioStream[A]( + eff: UStream[A] ): EventStream[A] = var runningFiber: Option[Fiber.Runtime[Nothing, Unit]] = None EventStream @@ -48,9 +48,7 @@ start = (fireValue, _, _, _) => { runningFiber = Some(Unsafe.unsafely { runtime.unsafe.fork( - eff.flatMap( - _.runForeach(v => ZIO.succeed(fireValue(v))) - ) + eff.runForeach(v => ZIO.succeed(fireValue(v))) ) }) }, @@ -64,10 +62,10 @@ ) private def updateFromZioStream( - upd: UIO[UStream[Reload[A]]] + upd: UStream[Reload[A]] ): HtmlMod = onMountBind { _ => - eventStreamFromStreamEffect(upd) --> reload + eventStreamFromZioStream(upd) --> reload } private def updateStream: HtmlMod = updates match