diff --git a/build.sbt b/build.sbt index 571c949..d67beb8 100644 --- a/build.sbt +++ b/build.sbt @@ -129,7 +129,11 @@ ) ) .withSourceMap(true) - .withRelativizeSourceMapBase(Some(base.toURI())) + //.withRelativizeSourceMapBase(Some(base.toURI())) + }, + scalacOptions += { + val localRootBase = (LocalRootProject / baseDirectory).value + s"-scalajs-mapSourceURI:${localRootBase.toURI.toString}->/mdr/poptavky/@fs${localRootBase.toString}/", }, scalaJSUseMainModuleInitializer := true ) diff --git a/build.sbt b/build.sbt index 571c949..d67beb8 100644 --- a/build.sbt +++ b/build.sbt @@ -129,7 +129,11 @@ ) ) .withSourceMap(true) - .withRelativizeSourceMapBase(Some(base.toURI())) + //.withRelativizeSourceMapBase(Some(base.toURI())) + }, + scalacOptions += { + val localRootBase = (LocalRootProject / baseDirectory).value + s"-scalajs-mapSourceURI:${localRootBase.toURI.toString}->/mdr/poptavky/@fs${localRootBase.toString}/", }, scalaJSUseMainModuleInitializer := true ) diff --git a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala index 326af94..c450279 100644 --- a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala +++ b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala @@ -1,11 +1,15 @@ package works.iterative.core.service import zio.* +import zio.stream.* trait GenericLoadService[Eff[+_], -Key, +Value]: type Op[A] = Eff[A] def load(id: Key): Op[Option[Value]] +trait GenericUpdateNotifyService[Eff[+_], Str[+_], Key]: + def updates: Eff[Str[Key]] + trait GenericLoadAllService[Eff[+_], Coll[+_], -Key, +Value]: type Op[A] = Eff[A] def loadAll(ids: Seq[Key]): Op[Coll[Value]] @@ -37,6 +41,9 @@ // Inefficient implementation, meant to be overridden ZIO.foreach(ids)(load).map(_.flatten.toList) +trait UpdateNotifyRepository[Key] + extends GenericUpdateNotifyService[UIO, UStream, Key] + trait WriteRepository[-Key, -Value] extends GenericWriteRepository[UIO, Key, Value] diff --git a/build.sbt b/build.sbt index 571c949..d67beb8 100644 --- a/build.sbt +++ b/build.sbt @@ -129,7 +129,11 @@ ) ) .withSourceMap(true) - .withRelativizeSourceMapBase(Some(base.toURI())) + //.withRelativizeSourceMapBase(Some(base.toURI())) + }, + scalacOptions += { + val localRootBase = (LocalRootProject / baseDirectory).value + s"-scalajs-mapSourceURI:${localRootBase.toURI.toString}->/mdr/poptavky/@fs${localRootBase.toString}/", }, scalaJSUseMainModuleInitializer := true ) diff --git a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala index 326af94..c450279 100644 --- a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala +++ b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala @@ -1,11 +1,15 @@ package works.iterative.core.service import zio.* +import zio.stream.* trait GenericLoadService[Eff[+_], -Key, +Value]: type Op[A] = Eff[A] def load(id: Key): Op[Option[Value]] +trait GenericUpdateNotifyService[Eff[+_], Str[+_], Key]: + def updates: Eff[Str[Key]] + trait GenericLoadAllService[Eff[+_], Coll[+_], -Key, +Value]: type Op[A] = Eff[A] def loadAll(ids: Seq[Key]): Op[Coll[Value]] @@ -37,6 +41,9 @@ // Inefficient implementation, meant to be overridden ZIO.foreach(ids)(load).map(_.flatten.toList) +trait UpdateNotifyRepository[Key] + extends GenericUpdateNotifyService[UIO, UStream, Key] + trait WriteRepository[-Key, -Value] extends GenericWriteRepository[UIO, Key, Value] diff --git a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala index e911f7f..cd8a995 100644 --- a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala +++ b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala @@ -2,8 +2,15 @@ import works.iterative.tapir.CustomTapir.* import works.iterative.core.auth.CurrentUser +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets final case class HttpApplication[Env]( - secureEndpoints: List[ZServerEndpoint[Env & CurrentUser, Any]], - publicEndpoints: List[ZServerEndpoint[Env, Any]] + secureEndpoints: List[ + ZServerEndpoint[Env & CurrentUser, ZioStreams] + ], + publicEndpoints: List[ZServerEndpoint[Env, ZioStreams]], + wsEndpoints: List[ + ZServerEndpoint[Env, ZioStreams & WebSockets] + ] = Nil ) diff --git a/build.sbt b/build.sbt index 571c949..d67beb8 100644 --- a/build.sbt +++ b/build.sbt @@ -129,7 +129,11 @@ ) ) .withSourceMap(true) - .withRelativizeSourceMapBase(Some(base.toURI())) + //.withRelativizeSourceMapBase(Some(base.toURI())) + }, + scalacOptions += { + val localRootBase = (LocalRootProject / baseDirectory).value + s"-scalajs-mapSourceURI:${localRootBase.toURI.toString}->/mdr/poptavky/@fs${localRootBase.toString}/", }, scalaJSUseMainModuleInitializer := true ) diff --git a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala index 326af94..c450279 100644 --- a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala +++ b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala @@ -1,11 +1,15 @@ package works.iterative.core.service import zio.* +import zio.stream.* trait GenericLoadService[Eff[+_], -Key, +Value]: type Op[A] = Eff[A] def load(id: Key): Op[Option[Value]] +trait GenericUpdateNotifyService[Eff[+_], Str[+_], Key]: + def updates: Eff[Str[Key]] + trait GenericLoadAllService[Eff[+_], Coll[+_], -Key, +Value]: type Op[A] = Eff[A] def loadAll(ids: Seq[Key]): Op[Coll[Value]] @@ -37,6 +41,9 @@ // Inefficient implementation, meant to be overridden ZIO.foreach(ids)(load).map(_.flatten.toList) +trait UpdateNotifyRepository[Key] + extends GenericUpdateNotifyService[UIO, UStream, Key] + trait WriteRepository[-Key, -Value] extends GenericWriteRepository[UIO, Key, Value] diff --git a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala index e911f7f..cd8a995 100644 --- a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala +++ b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala @@ -2,8 +2,15 @@ import works.iterative.tapir.CustomTapir.* import works.iterative.core.auth.CurrentUser +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets final case class HttpApplication[Env]( - secureEndpoints: List[ZServerEndpoint[Env & CurrentUser, Any]], - publicEndpoints: List[ZServerEndpoint[Env, Any]] + secureEndpoints: List[ + ZServerEndpoint[Env & CurrentUser, ZioStreams] + ], + publicEndpoints: List[ZServerEndpoint[Env, ZioStreams]], + wsEndpoints: List[ + ZServerEndpoint[Env, ZioStreams & WebSockets] + ] = Nil ) diff --git a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala index 7d38587..63e031c 100644 --- a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala +++ b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala @@ -16,6 +16,7 @@ import cats.arrow.FunctionK import works.iterative.tapir.BaseUri import org.http4s.server.Router +import org.http4s.server.websocket.WebSocketBuilder2 class BlazeHttpServer( config: BlazeServerConfig, @@ -75,6 +76,9 @@ val securedRoutes: HttpRoutes[SecuredTask] = securedInterpreter.from(app.secureEndpoints).toRoutes + val wsRoutes: WebSocketBuilder2[AppTask] => HttpRoutes[AppTask] = + publicInterpreter.fromWebSocket(app.wsEndpoints).toRoutes + val eliminated: HttpRoutes[AppTask] = provideCurrentUser(securedRoutes) def withBaseUri(routes: HttpRoutes[AppTask]): HttpRoutes[AppTask] = @@ -84,9 +88,9 @@ BlazeServerBuilder[AppTask] .bindHttp(config.port, config.host) - .withHttpApp( + .withHttpWebSocketApp(wsb => (pac4jSecurity.route <+> withBaseUri( - publicRoutes <+> eliminated + wsRoutes(wsb) <+> publicRoutes <+> eliminated )).orNotFound ) .serve diff --git a/build.sbt b/build.sbt index 571c949..d67beb8 100644 --- a/build.sbt +++ b/build.sbt @@ -129,7 +129,11 @@ ) ) .withSourceMap(true) - .withRelativizeSourceMapBase(Some(base.toURI())) + //.withRelativizeSourceMapBase(Some(base.toURI())) + }, + scalacOptions += { + val localRootBase = (LocalRootProject / baseDirectory).value + s"-scalajs-mapSourceURI:${localRootBase.toURI.toString}->/mdr/poptavky/@fs${localRootBase.toString}/", }, scalaJSUseMainModuleInitializer := true ) diff --git a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala index 326af94..c450279 100644 --- a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala +++ b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala @@ -1,11 +1,15 @@ package works.iterative.core.service import zio.* +import zio.stream.* trait GenericLoadService[Eff[+_], -Key, +Value]: type Op[A] = Eff[A] def load(id: Key): Op[Option[Value]] +trait GenericUpdateNotifyService[Eff[+_], Str[+_], Key]: + def updates: Eff[Str[Key]] + trait GenericLoadAllService[Eff[+_], Coll[+_], -Key, +Value]: type Op[A] = Eff[A] def loadAll(ids: Seq[Key]): Op[Coll[Value]] @@ -37,6 +41,9 @@ // Inefficient implementation, meant to be overridden ZIO.foreach(ids)(load).map(_.flatten.toList) +trait UpdateNotifyRepository[Key] + extends GenericUpdateNotifyService[UIO, UStream, Key] + trait WriteRepository[-Key, -Value] extends GenericWriteRepository[UIO, Key, Value] diff --git a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala index e911f7f..cd8a995 100644 --- a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala +++ b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala @@ -2,8 +2,15 @@ import works.iterative.tapir.CustomTapir.* import works.iterative.core.auth.CurrentUser +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets final case class HttpApplication[Env]( - secureEndpoints: List[ZServerEndpoint[Env & CurrentUser, Any]], - publicEndpoints: List[ZServerEndpoint[Env, Any]] + secureEndpoints: List[ + ZServerEndpoint[Env & CurrentUser, ZioStreams] + ], + publicEndpoints: List[ZServerEndpoint[Env, ZioStreams]], + wsEndpoints: List[ + ZServerEndpoint[Env, ZioStreams & WebSockets] + ] = Nil ) diff --git a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala index 7d38587..63e031c 100644 --- a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala +++ b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala @@ -16,6 +16,7 @@ import cats.arrow.FunctionK import works.iterative.tapir.BaseUri import org.http4s.server.Router +import org.http4s.server.websocket.WebSocketBuilder2 class BlazeHttpServer( config: BlazeServerConfig, @@ -75,6 +76,9 @@ val securedRoutes: HttpRoutes[SecuredTask] = securedInterpreter.from(app.secureEndpoints).toRoutes + val wsRoutes: WebSocketBuilder2[AppTask] => HttpRoutes[AppTask] = + publicInterpreter.fromWebSocket(app.wsEndpoints).toRoutes + val eliminated: HttpRoutes[AppTask] = provideCurrentUser(securedRoutes) def withBaseUri(routes: HttpRoutes[AppTask]): HttpRoutes[AppTask] = @@ -84,9 +88,9 @@ BlazeServerBuilder[AppTask] .bindHttp(config.port, config.host) - .withHttpApp( + .withHttpWebSocketApp(wsb => (pac4jSecurity.route <+> withBaseUri( - publicRoutes <+> eliminated + wsRoutes(wsb) <+> publicRoutes <+> eliminated )).orNotFound ) .serve diff --git a/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala b/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala index 32b3902..d11a3dd 100644 --- a/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala +++ b/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala @@ -19,7 +19,7 @@ trait CustomTapirPlatformSpecific extends SttpClientInterpreter: self: CustomTapir => - type Backend = SttpBackend[Task, _] + type Backend = SttpBackend[Task, ZioStreams & WebSockets] val clientLayer: ULayer[Backend] = ZLayer.succeed( FetchZioBackend( diff --git a/build.sbt b/build.sbt index 571c949..d67beb8 100644 --- a/build.sbt +++ b/build.sbt @@ -129,7 +129,11 @@ ) ) .withSourceMap(true) - .withRelativizeSourceMapBase(Some(base.toURI())) + //.withRelativizeSourceMapBase(Some(base.toURI())) + }, + scalacOptions += { + val localRootBase = (LocalRootProject / baseDirectory).value + s"-scalajs-mapSourceURI:${localRootBase.toURI.toString}->/mdr/poptavky/@fs${localRootBase.toString}/", }, scalaJSUseMainModuleInitializer := true ) diff --git a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala index 326af94..c450279 100644 --- a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala +++ b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala @@ -1,11 +1,15 @@ package works.iterative.core.service import zio.* +import zio.stream.* trait GenericLoadService[Eff[+_], -Key, +Value]: type Op[A] = Eff[A] def load(id: Key): Op[Option[Value]] +trait GenericUpdateNotifyService[Eff[+_], Str[+_], Key]: + def updates: Eff[Str[Key]] + trait GenericLoadAllService[Eff[+_], Coll[+_], -Key, +Value]: type Op[A] = Eff[A] def loadAll(ids: Seq[Key]): Op[Coll[Value]] @@ -37,6 +41,9 @@ // Inefficient implementation, meant to be overridden ZIO.foreach(ids)(load).map(_.flatten.toList) +trait UpdateNotifyRepository[Key] + extends GenericUpdateNotifyService[UIO, UStream, Key] + trait WriteRepository[-Key, -Value] extends GenericWriteRepository[UIO, Key, Value] diff --git a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala index e911f7f..cd8a995 100644 --- a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala +++ b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala @@ -2,8 +2,15 @@ import works.iterative.tapir.CustomTapir.* import works.iterative.core.auth.CurrentUser +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets final case class HttpApplication[Env]( - secureEndpoints: List[ZServerEndpoint[Env & CurrentUser, Any]], - publicEndpoints: List[ZServerEndpoint[Env, Any]] + secureEndpoints: List[ + ZServerEndpoint[Env & CurrentUser, ZioStreams] + ], + publicEndpoints: List[ZServerEndpoint[Env, ZioStreams]], + wsEndpoints: List[ + ZServerEndpoint[Env, ZioStreams & WebSockets] + ] = Nil ) diff --git a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala index 7d38587..63e031c 100644 --- a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala +++ b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala @@ -16,6 +16,7 @@ import cats.arrow.FunctionK import works.iterative.tapir.BaseUri import org.http4s.server.Router +import org.http4s.server.websocket.WebSocketBuilder2 class BlazeHttpServer( config: BlazeServerConfig, @@ -75,6 +76,9 @@ val securedRoutes: HttpRoutes[SecuredTask] = securedInterpreter.from(app.secureEndpoints).toRoutes + val wsRoutes: WebSocketBuilder2[AppTask] => HttpRoutes[AppTask] = + publicInterpreter.fromWebSocket(app.wsEndpoints).toRoutes + val eliminated: HttpRoutes[AppTask] = provideCurrentUser(securedRoutes) def withBaseUri(routes: HttpRoutes[AppTask]): HttpRoutes[AppTask] = @@ -84,9 +88,9 @@ BlazeServerBuilder[AppTask] .bindHttp(config.port, config.host) - .withHttpApp( + .withHttpWebSocketApp(wsb => (pac4jSecurity.route <+> withBaseUri( - publicRoutes <+> eliminated + wsRoutes(wsb) <+> publicRoutes <+> eliminated )).orNotFound ) .serve diff --git a/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala b/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala index 32b3902..d11a3dd 100644 --- a/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala +++ b/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala @@ -19,7 +19,7 @@ trait CustomTapirPlatformSpecific extends SttpClientInterpreter: self: CustomTapir => - type Backend = SttpBackend[Task, _] + type Backend = SttpBackend[Task, ZioStreams & WebSockets] val clientLayer: ULayer[Backend] = ZLayer.succeed( FetchZioBackend( diff --git a/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala b/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala index f83d618..f2a3e6d 100644 --- a/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala +++ b/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala @@ -11,11 +11,13 @@ import java.net.http.HttpClient import java.net.CookieHandler import java.net.URI +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets trait CustomTapirPlatformSpecific extends ZTapir with SttpClientInterpreter: self: CustomTapir => - type Backend = SttpBackend[Task, _] + type Backend = SttpBackend[Task, ZioStreams & WebSockets] private def addSession( session: String diff --git a/build.sbt b/build.sbt index 571c949..d67beb8 100644 --- a/build.sbt +++ b/build.sbt @@ -129,7 +129,11 @@ ) ) .withSourceMap(true) - .withRelativizeSourceMapBase(Some(base.toURI())) + //.withRelativizeSourceMapBase(Some(base.toURI())) + }, + scalacOptions += { + val localRootBase = (LocalRootProject / baseDirectory).value + s"-scalajs-mapSourceURI:${localRootBase.toURI.toString}->/mdr/poptavky/@fs${localRootBase.toString}/", }, scalaJSUseMainModuleInitializer := true ) diff --git a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala index 326af94..c450279 100644 --- a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala +++ b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala @@ -1,11 +1,15 @@ package works.iterative.core.service import zio.* +import zio.stream.* trait GenericLoadService[Eff[+_], -Key, +Value]: type Op[A] = Eff[A] def load(id: Key): Op[Option[Value]] +trait GenericUpdateNotifyService[Eff[+_], Str[+_], Key]: + def updates: Eff[Str[Key]] + trait GenericLoadAllService[Eff[+_], Coll[+_], -Key, +Value]: type Op[A] = Eff[A] def loadAll(ids: Seq[Key]): Op[Coll[Value]] @@ -37,6 +41,9 @@ // Inefficient implementation, meant to be overridden ZIO.foreach(ids)(load).map(_.flatten.toList) +trait UpdateNotifyRepository[Key] + extends GenericUpdateNotifyService[UIO, UStream, Key] + trait WriteRepository[-Key, -Value] extends GenericWriteRepository[UIO, Key, Value] diff --git a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala index e911f7f..cd8a995 100644 --- a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala +++ b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala @@ -2,8 +2,15 @@ import works.iterative.tapir.CustomTapir.* import works.iterative.core.auth.CurrentUser +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets final case class HttpApplication[Env]( - secureEndpoints: List[ZServerEndpoint[Env & CurrentUser, Any]], - publicEndpoints: List[ZServerEndpoint[Env, Any]] + secureEndpoints: List[ + ZServerEndpoint[Env & CurrentUser, ZioStreams] + ], + publicEndpoints: List[ZServerEndpoint[Env, ZioStreams]], + wsEndpoints: List[ + ZServerEndpoint[Env, ZioStreams & WebSockets] + ] = Nil ) diff --git a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala index 7d38587..63e031c 100644 --- a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala +++ b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala @@ -16,6 +16,7 @@ import cats.arrow.FunctionK import works.iterative.tapir.BaseUri import org.http4s.server.Router +import org.http4s.server.websocket.WebSocketBuilder2 class BlazeHttpServer( config: BlazeServerConfig, @@ -75,6 +76,9 @@ val securedRoutes: HttpRoutes[SecuredTask] = securedInterpreter.from(app.secureEndpoints).toRoutes + val wsRoutes: WebSocketBuilder2[AppTask] => HttpRoutes[AppTask] = + publicInterpreter.fromWebSocket(app.wsEndpoints).toRoutes + val eliminated: HttpRoutes[AppTask] = provideCurrentUser(securedRoutes) def withBaseUri(routes: HttpRoutes[AppTask]): HttpRoutes[AppTask] = @@ -84,9 +88,9 @@ BlazeServerBuilder[AppTask] .bindHttp(config.port, config.host) - .withHttpApp( + .withHttpWebSocketApp(wsb => (pac4jSecurity.route <+> withBaseUri( - publicRoutes <+> eliminated + wsRoutes(wsb) <+> publicRoutes <+> eliminated )).orNotFound ) .serve diff --git a/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala b/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala index 32b3902..d11a3dd 100644 --- a/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala +++ b/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala @@ -19,7 +19,7 @@ trait CustomTapirPlatformSpecific extends SttpClientInterpreter: self: CustomTapir => - type Backend = SttpBackend[Task, _] + type Backend = SttpBackend[Task, ZioStreams & WebSockets] val clientLayer: ULayer[Backend] = ZLayer.succeed( FetchZioBackend( diff --git a/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala b/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala index f83d618..f2a3e6d 100644 --- a/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala +++ b/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala @@ -11,11 +11,13 @@ import java.net.http.HttpClient import java.net.CookieHandler import java.net.URI +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets trait CustomTapirPlatformSpecific extends ZTapir with SttpClientInterpreter: self: CustomTapir => - type Backend = SttpBackend[Task, _] + type Backend = SttpBackend[Task, ZioStreams & WebSockets] private def addSession( session: String diff --git a/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala b/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala index be3e2a3..f232c35 100644 --- a/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala +++ b/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala @@ -2,6 +2,8 @@ import zio.* import sttp.tapir.PublicEndpoint +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets opaque type Client[I, E, O] = I => IO[E, O] @@ -16,4 +18,22 @@ def umake[I, O]( endpoint: PublicEndpoint[I, Unit, O, Any] ): Client[I, Nothing, O] - def make[I, E, O](endpoint: PublicEndpoint[I, E, O, Any]): Client[I, E, O] + def make[I, E, O]( + endpoint: PublicEndpoint[I, E, O, Any] + ): Client[I, E, O] + def stream[I, E, O]( + endpoint: PublicEndpoint[ + Unit, + E, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, E, ZioStreams.Pipe[I, O]] + def ustream[I, O]( + endpoint: PublicEndpoint[ + Unit, + Unit, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, Nothing, ZioStreams.Pipe[I, O]] diff --git a/build.sbt b/build.sbt index 571c949..d67beb8 100644 --- a/build.sbt +++ b/build.sbt @@ -129,7 +129,11 @@ ) ) .withSourceMap(true) - .withRelativizeSourceMapBase(Some(base.toURI())) + //.withRelativizeSourceMapBase(Some(base.toURI())) + }, + scalacOptions += { + val localRootBase = (LocalRootProject / baseDirectory).value + s"-scalajs-mapSourceURI:${localRootBase.toURI.toString}->/mdr/poptavky/@fs${localRootBase.toString}/", }, scalaJSUseMainModuleInitializer := true ) diff --git a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala index 326af94..c450279 100644 --- a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala +++ b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala @@ -1,11 +1,15 @@ package works.iterative.core.service import zio.* +import zio.stream.* trait GenericLoadService[Eff[+_], -Key, +Value]: type Op[A] = Eff[A] def load(id: Key): Op[Option[Value]] +trait GenericUpdateNotifyService[Eff[+_], Str[+_], Key]: + def updates: Eff[Str[Key]] + trait GenericLoadAllService[Eff[+_], Coll[+_], -Key, +Value]: type Op[A] = Eff[A] def loadAll(ids: Seq[Key]): Op[Coll[Value]] @@ -37,6 +41,9 @@ // Inefficient implementation, meant to be overridden ZIO.foreach(ids)(load).map(_.flatten.toList) +trait UpdateNotifyRepository[Key] + extends GenericUpdateNotifyService[UIO, UStream, Key] + trait WriteRepository[-Key, -Value] extends GenericWriteRepository[UIO, Key, Value] diff --git a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala index e911f7f..cd8a995 100644 --- a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala +++ b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala @@ -2,8 +2,15 @@ import works.iterative.tapir.CustomTapir.* import works.iterative.core.auth.CurrentUser +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets final case class HttpApplication[Env]( - secureEndpoints: List[ZServerEndpoint[Env & CurrentUser, Any]], - publicEndpoints: List[ZServerEndpoint[Env, Any]] + secureEndpoints: List[ + ZServerEndpoint[Env & CurrentUser, ZioStreams] + ], + publicEndpoints: List[ZServerEndpoint[Env, ZioStreams]], + wsEndpoints: List[ + ZServerEndpoint[Env, ZioStreams & WebSockets] + ] = Nil ) diff --git a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala index 7d38587..63e031c 100644 --- a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala +++ b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala @@ -16,6 +16,7 @@ import cats.arrow.FunctionK import works.iterative.tapir.BaseUri import org.http4s.server.Router +import org.http4s.server.websocket.WebSocketBuilder2 class BlazeHttpServer( config: BlazeServerConfig, @@ -75,6 +76,9 @@ val securedRoutes: HttpRoutes[SecuredTask] = securedInterpreter.from(app.secureEndpoints).toRoutes + val wsRoutes: WebSocketBuilder2[AppTask] => HttpRoutes[AppTask] = + publicInterpreter.fromWebSocket(app.wsEndpoints).toRoutes + val eliminated: HttpRoutes[AppTask] = provideCurrentUser(securedRoutes) def withBaseUri(routes: HttpRoutes[AppTask]): HttpRoutes[AppTask] = @@ -84,9 +88,9 @@ BlazeServerBuilder[AppTask] .bindHttp(config.port, config.host) - .withHttpApp( + .withHttpWebSocketApp(wsb => (pac4jSecurity.route <+> withBaseUri( - publicRoutes <+> eliminated + wsRoutes(wsb) <+> publicRoutes <+> eliminated )).orNotFound ) .serve diff --git a/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala b/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala index 32b3902..d11a3dd 100644 --- a/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala +++ b/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala @@ -19,7 +19,7 @@ trait CustomTapirPlatformSpecific extends SttpClientInterpreter: self: CustomTapir => - type Backend = SttpBackend[Task, _] + type Backend = SttpBackend[Task, ZioStreams & WebSockets] val clientLayer: ULayer[Backend] = ZLayer.succeed( FetchZioBackend( diff --git a/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala b/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala index f83d618..f2a3e6d 100644 --- a/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala +++ b/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala @@ -11,11 +11,13 @@ import java.net.http.HttpClient import java.net.CookieHandler import java.net.URI +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets trait CustomTapirPlatformSpecific extends ZTapir with SttpClientInterpreter: self: CustomTapir => - type Backend = SttpBackend[Task, _] + type Backend = SttpBackend[Task, ZioStreams & WebSockets] private def addSession( session: String diff --git a/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala b/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala index be3e2a3..f232c35 100644 --- a/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala +++ b/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala @@ -2,6 +2,8 @@ import zio.* import sttp.tapir.PublicEndpoint +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets opaque type Client[I, E, O] = I => IO[E, O] @@ -16,4 +18,22 @@ def umake[I, O]( endpoint: PublicEndpoint[I, Unit, O, Any] ): Client[I, Nothing, O] - def make[I, E, O](endpoint: PublicEndpoint[I, E, O, Any]): Client[I, E, O] + def make[I, E, O]( + endpoint: PublicEndpoint[I, E, O, Any] + ): Client[I, E, O] + def stream[I, E, O]( + endpoint: PublicEndpoint[ + Unit, + E, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, E, ZioStreams.Pipe[I, O]] + def ustream[I, O]( + endpoint: PublicEndpoint[ + Unit, + Unit, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, Nothing, ZioStreams.Pipe[I, O]] diff --git a/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala b/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala index 6e292c6..7657048 100644 --- a/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala +++ b/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala @@ -2,7 +2,9 @@ import zio.* import sttp.tapir.PublicEndpoint -import sttp.tapir.client.sttp.WebSocketToPipe +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets +import sttp.tapir.client.sttp.ws.zio.* class LiveClientEndpointFactory(using baseUri: BaseUri, @@ -22,14 +24,46 @@ endpoint: PublicEndpoint[I, E, O, Any] ): Client[I, E, O] = mkClient(endpoint) + override def stream[I, E, O]( + endpoint: PublicEndpoint[ + Unit, + E, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, E, ZioStreams.Pipe[I, O]] = + mkClient(endpoint, true) + + override def ustream[I, O]( + endpoint: PublicEndpoint[ + Unit, + Unit, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, Nothing, ZioStreams.Pipe[I, O]] = Client( + mkClient(endpoint, true)(_).orDieWith(_ => + new IllegalStateException("Internal Server Error") + ) + ) + private def mkClient[I, E, O]( - endpoint: PublicEndpoint[I, E, O, Any] + endpoint: PublicEndpoint[I, E, O, ZioStreams & WebSockets], + isWebSocket: Boolean = false )(using baseUri: BaseUri, - backend: Backend, - wsToPipe: WebSocketToPipe[Any] + backend: Backend ): Client[I, E, O] = Client((input: I) => - val req = toRequest(endpoint, baseUri.toUri) + val req = toRequest( + endpoint, + if isWebSocket then + baseUri.toUri.map(b => + b.scheme match + case Some("https") => b.scheme("wss") + case _ => b.scheme("ws") + ) + else baseUri.toUri + ) val fetch = req(input).followRedirects(false).send(backend) for resp <- fetch.orDie diff --git a/build.sbt b/build.sbt index 571c949..d67beb8 100644 --- a/build.sbt +++ b/build.sbt @@ -129,7 +129,11 @@ ) ) .withSourceMap(true) - .withRelativizeSourceMapBase(Some(base.toURI())) + //.withRelativizeSourceMapBase(Some(base.toURI())) + }, + scalacOptions += { + val localRootBase = (LocalRootProject / baseDirectory).value + s"-scalajs-mapSourceURI:${localRootBase.toURI.toString}->/mdr/poptavky/@fs${localRootBase.toString}/", }, scalaJSUseMainModuleInitializer := true ) diff --git a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala index 326af94..c450279 100644 --- a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala +++ b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala @@ -1,11 +1,15 @@ package works.iterative.core.service import zio.* +import zio.stream.* trait GenericLoadService[Eff[+_], -Key, +Value]: type Op[A] = Eff[A] def load(id: Key): Op[Option[Value]] +trait GenericUpdateNotifyService[Eff[+_], Str[+_], Key]: + def updates: Eff[Str[Key]] + trait GenericLoadAllService[Eff[+_], Coll[+_], -Key, +Value]: type Op[A] = Eff[A] def loadAll(ids: Seq[Key]): Op[Coll[Value]] @@ -37,6 +41,9 @@ // Inefficient implementation, meant to be overridden ZIO.foreach(ids)(load).map(_.flatten.toList) +trait UpdateNotifyRepository[Key] + extends GenericUpdateNotifyService[UIO, UStream, Key] + trait WriteRepository[-Key, -Value] extends GenericWriteRepository[UIO, Key, Value] diff --git a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala index e911f7f..cd8a995 100644 --- a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala +++ b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala @@ -2,8 +2,15 @@ import works.iterative.tapir.CustomTapir.* import works.iterative.core.auth.CurrentUser +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets final case class HttpApplication[Env]( - secureEndpoints: List[ZServerEndpoint[Env & CurrentUser, Any]], - publicEndpoints: List[ZServerEndpoint[Env, Any]] + secureEndpoints: List[ + ZServerEndpoint[Env & CurrentUser, ZioStreams] + ], + publicEndpoints: List[ZServerEndpoint[Env, ZioStreams]], + wsEndpoints: List[ + ZServerEndpoint[Env, ZioStreams & WebSockets] + ] = Nil ) diff --git a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala index 7d38587..63e031c 100644 --- a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala +++ b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala @@ -16,6 +16,7 @@ import cats.arrow.FunctionK import works.iterative.tapir.BaseUri import org.http4s.server.Router +import org.http4s.server.websocket.WebSocketBuilder2 class BlazeHttpServer( config: BlazeServerConfig, @@ -75,6 +76,9 @@ val securedRoutes: HttpRoutes[SecuredTask] = securedInterpreter.from(app.secureEndpoints).toRoutes + val wsRoutes: WebSocketBuilder2[AppTask] => HttpRoutes[AppTask] = + publicInterpreter.fromWebSocket(app.wsEndpoints).toRoutes + val eliminated: HttpRoutes[AppTask] = provideCurrentUser(securedRoutes) def withBaseUri(routes: HttpRoutes[AppTask]): HttpRoutes[AppTask] = @@ -84,9 +88,9 @@ BlazeServerBuilder[AppTask] .bindHttp(config.port, config.host) - .withHttpApp( + .withHttpWebSocketApp(wsb => (pac4jSecurity.route <+> withBaseUri( - publicRoutes <+> eliminated + wsRoutes(wsb) <+> publicRoutes <+> eliminated )).orNotFound ) .serve diff --git a/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala b/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala index 32b3902..d11a3dd 100644 --- a/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala +++ b/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala @@ -19,7 +19,7 @@ trait CustomTapirPlatformSpecific extends SttpClientInterpreter: self: CustomTapir => - type Backend = SttpBackend[Task, _] + type Backend = SttpBackend[Task, ZioStreams & WebSockets] val clientLayer: ULayer[Backend] = ZLayer.succeed( FetchZioBackend( diff --git a/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala b/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala index f83d618..f2a3e6d 100644 --- a/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala +++ b/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala @@ -11,11 +11,13 @@ import java.net.http.HttpClient import java.net.CookieHandler import java.net.URI +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets trait CustomTapirPlatformSpecific extends ZTapir with SttpClientInterpreter: self: CustomTapir => - type Backend = SttpBackend[Task, _] + type Backend = SttpBackend[Task, ZioStreams & WebSockets] private def addSession( session: String diff --git a/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala b/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala index be3e2a3..f232c35 100644 --- a/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala +++ b/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala @@ -2,6 +2,8 @@ import zio.* import sttp.tapir.PublicEndpoint +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets opaque type Client[I, E, O] = I => IO[E, O] @@ -16,4 +18,22 @@ def umake[I, O]( endpoint: PublicEndpoint[I, Unit, O, Any] ): Client[I, Nothing, O] - def make[I, E, O](endpoint: PublicEndpoint[I, E, O, Any]): Client[I, E, O] + def make[I, E, O]( + endpoint: PublicEndpoint[I, E, O, Any] + ): Client[I, E, O] + def stream[I, E, O]( + endpoint: PublicEndpoint[ + Unit, + E, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, E, ZioStreams.Pipe[I, O]] + def ustream[I, O]( + endpoint: PublicEndpoint[ + Unit, + Unit, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, Nothing, ZioStreams.Pipe[I, O]] diff --git a/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala b/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala index 6e292c6..7657048 100644 --- a/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala +++ b/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala @@ -2,7 +2,9 @@ import zio.* import sttp.tapir.PublicEndpoint -import sttp.tapir.client.sttp.WebSocketToPipe +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets +import sttp.tapir.client.sttp.ws.zio.* class LiveClientEndpointFactory(using baseUri: BaseUri, @@ -22,14 +24,46 @@ endpoint: PublicEndpoint[I, E, O, Any] ): Client[I, E, O] = mkClient(endpoint) + override def stream[I, E, O]( + endpoint: PublicEndpoint[ + Unit, + E, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, E, ZioStreams.Pipe[I, O]] = + mkClient(endpoint, true) + + override def ustream[I, O]( + endpoint: PublicEndpoint[ + Unit, + Unit, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, Nothing, ZioStreams.Pipe[I, O]] = Client( + mkClient(endpoint, true)(_).orDieWith(_ => + new IllegalStateException("Internal Server Error") + ) + ) + private def mkClient[I, E, O]( - endpoint: PublicEndpoint[I, E, O, Any] + endpoint: PublicEndpoint[I, E, O, ZioStreams & WebSockets], + isWebSocket: Boolean = false )(using baseUri: BaseUri, - backend: Backend, - wsToPipe: WebSocketToPipe[Any] + backend: Backend ): Client[I, E, O] = Client((input: I) => - val req = toRequest(endpoint, baseUri.toUri) + val req = toRequest( + endpoint, + if isWebSocket then + baseUri.toUri.map(b => + b.scheme match + case Some("https") => b.scheme("wss") + case _ => b.scheme("ws") + ) + else baseUri.toUri + ) val fetch = req(input).followRedirects(false).send(backend) for resp <- fetch.orDie diff --git a/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala b/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala index 174897d..09030d4 100644 --- a/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala +++ b/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala @@ -1,17 +1,22 @@ package works.iterative.ui.components import com.raquo.laminar.api.L.* +import io.laminext.syntax.core.* import sttp.tapir.PublicEndpoint import works.iterative.core.* import works.iterative.tapir.ClientEndpointFactory import works.iterative.ui.model.Computable import zio.* +import zio.stream.* case class ReloadableComponent[A, I]( fetch: I => IO[UserMessage, A], init: Option[I] = None, + updates: Option[UIO[UStream[ReloadableComponent.Reload[A]]]] = None, loadSchedule: Schedule[Any, Any, ?] = Schedule.stop )(using runtime: Runtime[Any]): + import ReloadableComponent.Reload + private val computable: Var[Computable[A]] = Var(Computable.Uninitialized) private val memo: Var[Option[I]] = Var(init) @@ -24,14 +29,52 @@ load(input) } - val reload: Observer[ReloadableComponent.Reload[A]] = Observer { - case ReloadableComponent.Reload.Once => memo.now().foreach(load) - case ReloadableComponent.Reload.UntilChanged(original) => + val reload: Observer[Reload[A]] = Observer { + case Reload.Once => memo.now().foreach(load) + case Reload.UntilChanged(original) => memo.now().foreach(reloadUntilChanged(_, original)) } - def initMod: HtmlMod = - EventStream.fromValue(ReloadableComponent.Reload.Once) --> reload + private def eventStreamFromStreamEffect[A]( + eff: UIO[UStream[A]] + ): EventStream[A] = + var runningFiber: Option[Fiber.Runtime[Nothing, Unit]] = None + EventStream + .fromCustomSource( + shouldStart = _ => true, + start = (fireValue, _, _, _) => { + runningFiber = Some(Unsafe.unsafely { + runtime.unsafe.fork( + eff.flatMap( + _.runForeach(v => ZIO.succeed(fireValue(v))) + ) + ) + }) + }, + stop = _ => { + runningFiber.foreach { f => + Unsafe.unsafely { + runtime.unsafe.fork(f.interrupt) + } + } + } + ) + + private def updateFromZioStream( + upd: UIO[UStream[Reload[A]]] + ): HtmlMod = + onMountBind { _ => + eventStreamFromStreamEffect(upd) --> reload + } + + private def updateStream: HtmlMod = updates match + case None => emptyMod + case Some(upd) => updateFromZioStream(upd) + + def initMod: HtmlMod = nodeSeq( + EventStream.fromValue(ReloadableComponent.Reload.Once) --> reload, + updateStream + ) def load(input: I): Unit = doLoad( input, diff --git a/build.sbt b/build.sbt index 571c949..d67beb8 100644 --- a/build.sbt +++ b/build.sbt @@ -129,7 +129,11 @@ ) ) .withSourceMap(true) - .withRelativizeSourceMapBase(Some(base.toURI())) + //.withRelativizeSourceMapBase(Some(base.toURI())) + }, + scalacOptions += { + val localRootBase = (LocalRootProject / baseDirectory).value + s"-scalajs-mapSourceURI:${localRootBase.toURI.toString}->/mdr/poptavky/@fs${localRootBase.toString}/", }, scalaJSUseMainModuleInitializer := true ) diff --git a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala index 326af94..c450279 100644 --- a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala +++ b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala @@ -1,11 +1,15 @@ package works.iterative.core.service import zio.* +import zio.stream.* trait GenericLoadService[Eff[+_], -Key, +Value]: type Op[A] = Eff[A] def load(id: Key): Op[Option[Value]] +trait GenericUpdateNotifyService[Eff[+_], Str[+_], Key]: + def updates: Eff[Str[Key]] + trait GenericLoadAllService[Eff[+_], Coll[+_], -Key, +Value]: type Op[A] = Eff[A] def loadAll(ids: Seq[Key]): Op[Coll[Value]] @@ -37,6 +41,9 @@ // Inefficient implementation, meant to be overridden ZIO.foreach(ids)(load).map(_.flatten.toList) +trait UpdateNotifyRepository[Key] + extends GenericUpdateNotifyService[UIO, UStream, Key] + trait WriteRepository[-Key, -Value] extends GenericWriteRepository[UIO, Key, Value] diff --git a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala index e911f7f..cd8a995 100644 --- a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala +++ b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala @@ -2,8 +2,15 @@ import works.iterative.tapir.CustomTapir.* import works.iterative.core.auth.CurrentUser +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets final case class HttpApplication[Env]( - secureEndpoints: List[ZServerEndpoint[Env & CurrentUser, Any]], - publicEndpoints: List[ZServerEndpoint[Env, Any]] + secureEndpoints: List[ + ZServerEndpoint[Env & CurrentUser, ZioStreams] + ], + publicEndpoints: List[ZServerEndpoint[Env, ZioStreams]], + wsEndpoints: List[ + ZServerEndpoint[Env, ZioStreams & WebSockets] + ] = Nil ) diff --git a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala index 7d38587..63e031c 100644 --- a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala +++ b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala @@ -16,6 +16,7 @@ import cats.arrow.FunctionK import works.iterative.tapir.BaseUri import org.http4s.server.Router +import org.http4s.server.websocket.WebSocketBuilder2 class BlazeHttpServer( config: BlazeServerConfig, @@ -75,6 +76,9 @@ val securedRoutes: HttpRoutes[SecuredTask] = securedInterpreter.from(app.secureEndpoints).toRoutes + val wsRoutes: WebSocketBuilder2[AppTask] => HttpRoutes[AppTask] = + publicInterpreter.fromWebSocket(app.wsEndpoints).toRoutes + val eliminated: HttpRoutes[AppTask] = provideCurrentUser(securedRoutes) def withBaseUri(routes: HttpRoutes[AppTask]): HttpRoutes[AppTask] = @@ -84,9 +88,9 @@ BlazeServerBuilder[AppTask] .bindHttp(config.port, config.host) - .withHttpApp( + .withHttpWebSocketApp(wsb => (pac4jSecurity.route <+> withBaseUri( - publicRoutes <+> eliminated + wsRoutes(wsb) <+> publicRoutes <+> eliminated )).orNotFound ) .serve diff --git a/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala b/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala index 32b3902..d11a3dd 100644 --- a/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala +++ b/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala @@ -19,7 +19,7 @@ trait CustomTapirPlatformSpecific extends SttpClientInterpreter: self: CustomTapir => - type Backend = SttpBackend[Task, _] + type Backend = SttpBackend[Task, ZioStreams & WebSockets] val clientLayer: ULayer[Backend] = ZLayer.succeed( FetchZioBackend( diff --git a/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala b/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala index f83d618..f2a3e6d 100644 --- a/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala +++ b/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala @@ -11,11 +11,13 @@ import java.net.http.HttpClient import java.net.CookieHandler import java.net.URI +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets trait CustomTapirPlatformSpecific extends ZTapir with SttpClientInterpreter: self: CustomTapir => - type Backend = SttpBackend[Task, _] + type Backend = SttpBackend[Task, ZioStreams & WebSockets] private def addSession( session: String diff --git a/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala b/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala index be3e2a3..f232c35 100644 --- a/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala +++ b/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala @@ -2,6 +2,8 @@ import zio.* import sttp.tapir.PublicEndpoint +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets opaque type Client[I, E, O] = I => IO[E, O] @@ -16,4 +18,22 @@ def umake[I, O]( endpoint: PublicEndpoint[I, Unit, O, Any] ): Client[I, Nothing, O] - def make[I, E, O](endpoint: PublicEndpoint[I, E, O, Any]): Client[I, E, O] + def make[I, E, O]( + endpoint: PublicEndpoint[I, E, O, Any] + ): Client[I, E, O] + def stream[I, E, O]( + endpoint: PublicEndpoint[ + Unit, + E, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, E, ZioStreams.Pipe[I, O]] + def ustream[I, O]( + endpoint: PublicEndpoint[ + Unit, + Unit, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, Nothing, ZioStreams.Pipe[I, O]] diff --git a/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala b/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala index 6e292c6..7657048 100644 --- a/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala +++ b/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala @@ -2,7 +2,9 @@ import zio.* import sttp.tapir.PublicEndpoint -import sttp.tapir.client.sttp.WebSocketToPipe +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets +import sttp.tapir.client.sttp.ws.zio.* class LiveClientEndpointFactory(using baseUri: BaseUri, @@ -22,14 +24,46 @@ endpoint: PublicEndpoint[I, E, O, Any] ): Client[I, E, O] = mkClient(endpoint) + override def stream[I, E, O]( + endpoint: PublicEndpoint[ + Unit, + E, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, E, ZioStreams.Pipe[I, O]] = + mkClient(endpoint, true) + + override def ustream[I, O]( + endpoint: PublicEndpoint[ + Unit, + Unit, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, Nothing, ZioStreams.Pipe[I, O]] = Client( + mkClient(endpoint, true)(_).orDieWith(_ => + new IllegalStateException("Internal Server Error") + ) + ) + private def mkClient[I, E, O]( - endpoint: PublicEndpoint[I, E, O, Any] + endpoint: PublicEndpoint[I, E, O, ZioStreams & WebSockets], + isWebSocket: Boolean = false )(using baseUri: BaseUri, - backend: Backend, - wsToPipe: WebSocketToPipe[Any] + backend: Backend ): Client[I, E, O] = Client((input: I) => - val req = toRequest(endpoint, baseUri.toUri) + val req = toRequest( + endpoint, + if isWebSocket then + baseUri.toUri.map(b => + b.scheme match + case Some("https") => b.scheme("wss") + case _ => b.scheme("ws") + ) + else baseUri.toUri + ) val fetch = req(input).followRedirects(false).send(backend) for resp <- fetch.orDie diff --git a/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala b/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala index 174897d..09030d4 100644 --- a/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala +++ b/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala @@ -1,17 +1,22 @@ package works.iterative.ui.components import com.raquo.laminar.api.L.* +import io.laminext.syntax.core.* import sttp.tapir.PublicEndpoint import works.iterative.core.* import works.iterative.tapir.ClientEndpointFactory import works.iterative.ui.model.Computable import zio.* +import zio.stream.* case class ReloadableComponent[A, I]( fetch: I => IO[UserMessage, A], init: Option[I] = None, + updates: Option[UIO[UStream[ReloadableComponent.Reload[A]]]] = None, loadSchedule: Schedule[Any, Any, ?] = Schedule.stop )(using runtime: Runtime[Any]): + import ReloadableComponent.Reload + private val computable: Var[Computable[A]] = Var(Computable.Uninitialized) private val memo: Var[Option[I]] = Var(init) @@ -24,14 +29,52 @@ load(input) } - val reload: Observer[ReloadableComponent.Reload[A]] = Observer { - case ReloadableComponent.Reload.Once => memo.now().foreach(load) - case ReloadableComponent.Reload.UntilChanged(original) => + val reload: Observer[Reload[A]] = Observer { + case Reload.Once => memo.now().foreach(load) + case Reload.UntilChanged(original) => memo.now().foreach(reloadUntilChanged(_, original)) } - def initMod: HtmlMod = - EventStream.fromValue(ReloadableComponent.Reload.Once) --> reload + private def eventStreamFromStreamEffect[A]( + eff: UIO[UStream[A]] + ): EventStream[A] = + var runningFiber: Option[Fiber.Runtime[Nothing, Unit]] = None + EventStream + .fromCustomSource( + shouldStart = _ => true, + start = (fireValue, _, _, _) => { + runningFiber = Some(Unsafe.unsafely { + runtime.unsafe.fork( + eff.flatMap( + _.runForeach(v => ZIO.succeed(fireValue(v))) + ) + ) + }) + }, + stop = _ => { + runningFiber.foreach { f => + Unsafe.unsafely { + runtime.unsafe.fork(f.interrupt) + } + } + } + ) + + private def updateFromZioStream( + upd: UIO[UStream[Reload[A]]] + ): HtmlMod = + onMountBind { _ => + eventStreamFromStreamEffect(upd) --> reload + } + + private def updateStream: HtmlMod = updates match + case None => emptyMod + case Some(upd) => updateFromZioStream(upd) + + def initMod: HtmlMod = nodeSeq( + EventStream.fromValue(ReloadableComponent.Reload.Once) --> reload, + updateStream + ) def load(input: I): Unit = doLoad( input, diff --git a/ui/js/src/main/scala/works/iterative/ui/laminar/impl/LaminarUIBuilder.scala b/ui/js/src/main/scala/works/iterative/ui/laminar/impl/LaminarUIBuilder.scala index e100805..26afdd3 100644 --- a/ui/js/src/main/scala/works/iterative/ui/laminar/impl/LaminarUIBuilder.scala +++ b/ui/js/src/main/scala/works/iterative/ui/laminar/impl/LaminarUIBuilder.scala @@ -5,4 +5,4 @@ import works.iterative.ui.model.HtmlUIBuilder object LaminarUIBuilder - extends HtmlUIBuilder[HtmlElement, ComponentContext[Any]] + extends HtmlUIBuilder[HtmlElement, ComponentContext[Any], Signal] diff --git a/build.sbt b/build.sbt index 571c949..d67beb8 100644 --- a/build.sbt +++ b/build.sbt @@ -129,7 +129,11 @@ ) ) .withSourceMap(true) - .withRelativizeSourceMapBase(Some(base.toURI())) + //.withRelativizeSourceMapBase(Some(base.toURI())) + }, + scalacOptions += { + val localRootBase = (LocalRootProject / baseDirectory).value + s"-scalajs-mapSourceURI:${localRootBase.toURI.toString}->/mdr/poptavky/@fs${localRootBase.toString}/", }, scalaJSUseMainModuleInitializer := true ) diff --git a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala index 326af94..c450279 100644 --- a/core/shared/src/main/scala/works/iterative/core/service/Repository.scala +++ b/core/shared/src/main/scala/works/iterative/core/service/Repository.scala @@ -1,11 +1,15 @@ package works.iterative.core.service import zio.* +import zio.stream.* trait GenericLoadService[Eff[+_], -Key, +Value]: type Op[A] = Eff[A] def load(id: Key): Op[Option[Value]] +trait GenericUpdateNotifyService[Eff[+_], Str[+_], Key]: + def updates: Eff[Str[Key]] + trait GenericLoadAllService[Eff[+_], Coll[+_], -Key, +Value]: type Op[A] = Eff[A] def loadAll(ids: Seq[Key]): Op[Coll[Value]] @@ -37,6 +41,9 @@ // Inefficient implementation, meant to be overridden ZIO.foreach(ids)(load).map(_.flatten.toList) +trait UpdateNotifyRepository[Key] + extends GenericUpdateNotifyService[UIO, UStream, Key] + trait WriteRepository[-Key, -Value] extends GenericWriteRepository[UIO, Key, Value] diff --git a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala index e911f7f..cd8a995 100644 --- a/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala +++ b/server/http/src/main/scala/works/iterative/server/http/HttpApplication.scala @@ -2,8 +2,15 @@ import works.iterative.tapir.CustomTapir.* import works.iterative.core.auth.CurrentUser +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets final case class HttpApplication[Env]( - secureEndpoints: List[ZServerEndpoint[Env & CurrentUser, Any]], - publicEndpoints: List[ZServerEndpoint[Env, Any]] + secureEndpoints: List[ + ZServerEndpoint[Env & CurrentUser, ZioStreams] + ], + publicEndpoints: List[ZServerEndpoint[Env, ZioStreams]], + wsEndpoints: List[ + ZServerEndpoint[Env, ZioStreams & WebSockets] + ] = Nil ) diff --git a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala index 7d38587..63e031c 100644 --- a/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala +++ b/server/http/src/main/scala/works/iterative/server/http/impl/blaze/BlazeHttpServer.scala @@ -16,6 +16,7 @@ import cats.arrow.FunctionK import works.iterative.tapir.BaseUri import org.http4s.server.Router +import org.http4s.server.websocket.WebSocketBuilder2 class BlazeHttpServer( config: BlazeServerConfig, @@ -75,6 +76,9 @@ val securedRoutes: HttpRoutes[SecuredTask] = securedInterpreter.from(app.secureEndpoints).toRoutes + val wsRoutes: WebSocketBuilder2[AppTask] => HttpRoutes[AppTask] = + publicInterpreter.fromWebSocket(app.wsEndpoints).toRoutes + val eliminated: HttpRoutes[AppTask] = provideCurrentUser(securedRoutes) def withBaseUri(routes: HttpRoutes[AppTask]): HttpRoutes[AppTask] = @@ -84,9 +88,9 @@ BlazeServerBuilder[AppTask] .bindHttp(config.port, config.host) - .withHttpApp( + .withHttpWebSocketApp(wsb => (pac4jSecurity.route <+> withBaseUri( - publicRoutes <+> eliminated + wsRoutes(wsb) <+> publicRoutes <+> eliminated )).orNotFound ) .serve diff --git a/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala b/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala index 32b3902..d11a3dd 100644 --- a/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala +++ b/tapir/js/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala @@ -19,7 +19,7 @@ trait CustomTapirPlatformSpecific extends SttpClientInterpreter: self: CustomTapir => - type Backend = SttpBackend[Task, _] + type Backend = SttpBackend[Task, ZioStreams & WebSockets] val clientLayer: ULayer[Backend] = ZLayer.succeed( FetchZioBackend( diff --git a/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala b/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala index f83d618..f2a3e6d 100644 --- a/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala +++ b/tapir/jvm/src/main/scala/works/iterative/tapir/CustomTapirPlatformSpecific.scala @@ -11,11 +11,13 @@ import java.net.http.HttpClient import java.net.CookieHandler import java.net.URI +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets trait CustomTapirPlatformSpecific extends ZTapir with SttpClientInterpreter: self: CustomTapir => - type Backend = SttpBackend[Task, _] + type Backend = SttpBackend[Task, ZioStreams & WebSockets] private def addSession( session: String diff --git a/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala b/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala index be3e2a3..f232c35 100644 --- a/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala +++ b/tapir/shared/src/main/scala/works/iterative/tapir/ClientEndpointFactory.scala @@ -2,6 +2,8 @@ import zio.* import sttp.tapir.PublicEndpoint +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets opaque type Client[I, E, O] = I => IO[E, O] @@ -16,4 +18,22 @@ def umake[I, O]( endpoint: PublicEndpoint[I, Unit, O, Any] ): Client[I, Nothing, O] - def make[I, E, O](endpoint: PublicEndpoint[I, E, O, Any]): Client[I, E, O] + def make[I, E, O]( + endpoint: PublicEndpoint[I, E, O, Any] + ): Client[I, E, O] + def stream[I, E, O]( + endpoint: PublicEndpoint[ + Unit, + E, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, E, ZioStreams.Pipe[I, O]] + def ustream[I, O]( + endpoint: PublicEndpoint[ + Unit, + Unit, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, Nothing, ZioStreams.Pipe[I, O]] diff --git a/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala b/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala index 6e292c6..7657048 100644 --- a/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala +++ b/tapir/shared/src/main/scala/works/iterative/tapir/LiveClientEndpointFactory.scala @@ -2,7 +2,9 @@ import zio.* import sttp.tapir.PublicEndpoint -import sttp.tapir.client.sttp.WebSocketToPipe +import sttp.capabilities.zio.ZioStreams +import sttp.capabilities.WebSockets +import sttp.tapir.client.sttp.ws.zio.* class LiveClientEndpointFactory(using baseUri: BaseUri, @@ -22,14 +24,46 @@ endpoint: PublicEndpoint[I, E, O, Any] ): Client[I, E, O] = mkClient(endpoint) + override def stream[I, E, O]( + endpoint: PublicEndpoint[ + Unit, + E, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, E, ZioStreams.Pipe[I, O]] = + mkClient(endpoint, true) + + override def ustream[I, O]( + endpoint: PublicEndpoint[ + Unit, + Unit, + ZioStreams.Pipe[I, O], + ZioStreams & WebSockets + ] + ): Client[Unit, Nothing, ZioStreams.Pipe[I, O]] = Client( + mkClient(endpoint, true)(_).orDieWith(_ => + new IllegalStateException("Internal Server Error") + ) + ) + private def mkClient[I, E, O]( - endpoint: PublicEndpoint[I, E, O, Any] + endpoint: PublicEndpoint[I, E, O, ZioStreams & WebSockets], + isWebSocket: Boolean = false )(using baseUri: BaseUri, - backend: Backend, - wsToPipe: WebSocketToPipe[Any] + backend: Backend ): Client[I, E, O] = Client((input: I) => - val req = toRequest(endpoint, baseUri.toUri) + val req = toRequest( + endpoint, + if isWebSocket then + baseUri.toUri.map(b => + b.scheme match + case Some("https") => b.scheme("wss") + case _ => b.scheme("ws") + ) + else baseUri.toUri + ) val fetch = req(input).followRedirects(false).send(backend) for resp <- fetch.orDie diff --git a/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala b/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala index 174897d..09030d4 100644 --- a/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala +++ b/ui/js/src/main/scala/works/iterative/ui/components/ReloadableComponent.scala @@ -1,17 +1,22 @@ package works.iterative.ui.components import com.raquo.laminar.api.L.* +import io.laminext.syntax.core.* import sttp.tapir.PublicEndpoint import works.iterative.core.* import works.iterative.tapir.ClientEndpointFactory import works.iterative.ui.model.Computable import zio.* +import zio.stream.* case class ReloadableComponent[A, I]( fetch: I => IO[UserMessage, A], init: Option[I] = None, + updates: Option[UIO[UStream[ReloadableComponent.Reload[A]]]] = None, loadSchedule: Schedule[Any, Any, ?] = Schedule.stop )(using runtime: Runtime[Any]): + import ReloadableComponent.Reload + private val computable: Var[Computable[A]] = Var(Computable.Uninitialized) private val memo: Var[Option[I]] = Var(init) @@ -24,14 +29,52 @@ load(input) } - val reload: Observer[ReloadableComponent.Reload[A]] = Observer { - case ReloadableComponent.Reload.Once => memo.now().foreach(load) - case ReloadableComponent.Reload.UntilChanged(original) => + val reload: Observer[Reload[A]] = Observer { + case Reload.Once => memo.now().foreach(load) + case Reload.UntilChanged(original) => memo.now().foreach(reloadUntilChanged(_, original)) } - def initMod: HtmlMod = - EventStream.fromValue(ReloadableComponent.Reload.Once) --> reload + private def eventStreamFromStreamEffect[A]( + eff: UIO[UStream[A]] + ): EventStream[A] = + var runningFiber: Option[Fiber.Runtime[Nothing, Unit]] = None + EventStream + .fromCustomSource( + shouldStart = _ => true, + start = (fireValue, _, _, _) => { + runningFiber = Some(Unsafe.unsafely { + runtime.unsafe.fork( + eff.flatMap( + _.runForeach(v => ZIO.succeed(fireValue(v))) + ) + ) + }) + }, + stop = _ => { + runningFiber.foreach { f => + Unsafe.unsafely { + runtime.unsafe.fork(f.interrupt) + } + } + } + ) + + private def updateFromZioStream( + upd: UIO[UStream[Reload[A]]] + ): HtmlMod = + onMountBind { _ => + eventStreamFromStreamEffect(upd) --> reload + } + + private def updateStream: HtmlMod = updates match + case None => emptyMod + case Some(upd) => updateFromZioStream(upd) + + def initMod: HtmlMod = nodeSeq( + EventStream.fromValue(ReloadableComponent.Reload.Once) --> reload, + updateStream + ) def load(input: I): Unit = doLoad( input, diff --git a/ui/js/src/main/scala/works/iterative/ui/laminar/impl/LaminarUIBuilder.scala b/ui/js/src/main/scala/works/iterative/ui/laminar/impl/LaminarUIBuilder.scala index e100805..26afdd3 100644 --- a/ui/js/src/main/scala/works/iterative/ui/laminar/impl/LaminarUIBuilder.scala +++ b/ui/js/src/main/scala/works/iterative/ui/laminar/impl/LaminarUIBuilder.scala @@ -5,4 +5,4 @@ import works.iterative.ui.model.HtmlUIBuilder object LaminarUIBuilder - extends HtmlUIBuilder[HtmlElement, ComponentContext[Any]] + extends HtmlUIBuilder[HtmlElement, ComponentContext[Any], Signal] diff --git a/ui/shared/src/main/scala/works/iterative/ui/model/HtmlUIBuilder.scala b/ui/shared/src/main/scala/works/iterative/ui/model/HtmlUIBuilder.scala index 9686c74..4cc3f24 100644 --- a/ui/shared/src/main/scala/works/iterative/ui/model/HtmlUIBuilder.scala +++ b/ui/shared/src/main/scala/works/iterative/ui/model/HtmlUIBuilder.scala @@ -2,9 +2,8 @@ import works.iterative.core.* import zio.prelude.* -import com.raquo.airstream.core.Signal -trait HtmlUIBuilder[Node, Context]: +trait HtmlUIBuilder[Node, Context, Sub[+_]]: type Ctx = Context type Output = Node @@ -53,11 +52,11 @@ enum Actions: case Direct(actions: List[Action]) - case Deferred(actions: Signal[List[Action]]) + case Deferred(actions: Sub[List[Action]]) object Actions: given Conversion[List[Action], Actions] = Direct(_) - given Conversion[Signal[List[Action]], Actions] = Deferred(_) + given Conversion[Sub[List[Action]], Actions] = Deferred(_) trait UIInterpreter: def render(el: UIElement): Rendered