diff --git a/skipruntime-ts/core/native/src/BaseTypes.sk b/skipruntime-ts/core/native/src/BaseTypes.sk index 489362440..c0329d035 100644 --- a/skipruntime-ts/core/native/src/BaseTypes.sk +++ b/skipruntime-ts/core/native/src/BaseTypes.sk @@ -91,4 +91,20 @@ base class Service( ): Map; } +value class ReactiveResponse(id: String, watermark: SKStore.Tick) + +value class Values( + values: Array<(SKJSON.CJSON, Array)>, + reactive: ReactiveResponse, +) + +base class Request { + children = + | Identifier(id: String) +} + +base class Checker extends Request { + fun check(request: String): void; +} + module end; diff --git a/skipruntime-ts/core/native/src/Extern.sk b/skipruntime-ts/core/native/src/Extern.sk index 00349406c..78b84ac39 100644 --- a/skipruntime-ts/core/native/src/Extern.sk +++ b/skipruntime-ts/core/native/src/Extern.sk @@ -234,6 +234,24 @@ fun updateOfCollectionWriter( }; } +@export("SkipRuntime_CollectionWriter__loading") +fun loadingOfCollectionWriter(collection: String): Float { + writer = CollectionWriter(SKStore.DirName::create(collection)); + writer.loading() match { + | Success _ -> 0.0 + | Failure(err) -> getErrorHdl(err) + }; +} + +@export("SkipRuntime_CollectionWriter__error") +fun errorOfCollectionWriter(collection: String, error: SKJSON.CJSON): Float { + writer = CollectionWriter(SKStore.DirName::create(collection)); + writer.error(error) match { + | Success _ -> 0.0 + | Failure(err) -> getErrorHdl(err) + }; +} + /************ Resource ****************/ @cpp_extern("SkipRuntime_Resource__reactiveCompute") @@ -295,6 +313,32 @@ class ExternResourceBuilder( } } +/************ Checker ****************/ + +@cpp_extern("SkipRuntime_Checker__check") +@debug +native fun checkOfChecker(executor: UInt32, request: String): void; + +@cpp_extern("SkipRuntime_deleteChecker") +@debug +native fun deleteChecker(mapper: UInt32): void; + +@export("SkipRuntime_createChecker") +fun createChecker(executor: UInt32): ExternChecker { + ExternChecker(SKStore.ExternalPointer::create(executor, deleteChecker)) +} + +@export("SkipRuntime_createIdentifier") +fun createIdentifier(request: String): Identifier { + Identifier(request) +} + +class ExternChecker(eptr: SKStore.ExternalPointer) extends Checker { + fun check(request: String): void { + checkOfChecker(this.eptr.value, request) + } +} + /************ Service ****************/ @export("SkipRuntime_createService") @@ -598,28 +642,63 @@ fun getAllOfRuntime( resource: String, jsonParams: SKJSON.CJObject, reactiveAuth: ?Array, + optRequest: ?Request, ): SKJSON.CJSON { - SKStore.runWithResult(context ~> { - getAll(context, resource, params(jsonParams), reactiveAuth) + (getContext() match { + | Some(context) -> + try { + Success( + getAll(context, resource, params(jsonParams), reactiveAuth, optRequest), + ) + } catch { + | ex -> Failure(ex) + } + | _ -> + SKStore.runWithResult(context ~> { + getAll(context, resource, params(jsonParams), reactiveAuth, optRequest) + }) }) match { - | Success(values) -> - SKJSON.CJArray( - Array[ + | Success(result) -> + reactive = Array<(String, SKJSON.CJSON)>[ + ("collection", SKJSON.CJString(result.values.reactive.id)), + ( + "watermark", + SKJSON.CJString(result.values.reactive.watermark.value.toString()), + ), + ]; + response = Array<(String, SKJSON.CJSON)>[ + ( + "values", SKJSON.CJArray( - values.values.map(v -> + result.values.values.map(v -> SKJSON.CJArray(Array[v.i0, SKJSON.CJArray(v.i1)]) ), ), - SKJSON.CJArray( - Array[ - SKJSON.CJString(values.reactive.id), - SKJSON.CJString(values.reactive.watermark.value.toString()), - ], + ), + ( + "reactive", + SKJSON.CJObject( + SKJSON.CJFields::create(reactive.sortedBy(x ~> x.i0), x -> x), ), - ], + ), + ]; + fields = mutable Vector<(String, SKJSON.CJSON)>[ + ( + "payload", + SKJSON.CJObject( + SKJSON.CJFields::create(response.sortedBy(x ~> x.i0), x -> x), + ), + ), + ("errors", SKJSON.CJArray(result.errors)), + ]; + result.request.each(request -> + fields.push(("request", SKJSON.CJString(request))) + ); + SKJSON.CJObject( + SKJSON.CJFields::create(fields.sortedBy(x ~> x.i0).toArray(), x -> x), ) | Failure(err) -> SKJSON.CJFloat(getErrorHdl(err)) - }; + } } @export("SkipRuntime_Runtime__getForKey") @@ -628,11 +707,47 @@ fun getForKeyOfRuntime( jsonParams: SKJSON.CJObject, key: SKJSON.CJSON, reactiveAuth: ?Array, + optRequest: ?Request, ): SKJSON.CJSON { - SKStore.runWithResult(context ~> { - getForKey(context, resource, params(jsonParams), key, reactiveAuth) + (getContext() match { + | Some(context) -> + try { + Success( + getForKey( + context, + resource, + params(jsonParams), + key, + reactiveAuth, + optRequest, + ), + ) + } catch { + | ex -> Failure(ex) + } + | _ -> + SKStore.runWithResult(context ~> { + getForKey( + context, + resource, + params(jsonParams), + key, + reactiveAuth, + optRequest, + ) + }) }) match { - | Success(value) -> SKJSON.CJArray(value) + | Success(result) -> + fields = mutable Vector<(String, SKJSON.CJSON)>[ + ("payload", SKJSON.CJArray(result.values)), + ("errors", SKJSON.CJArray(result.errors)), + ]; + result.request.each(request -> + fields.push(("request", SKJSON.CJString(request))) + ); + SKJSON.CJObject( + SKJSON.CJFields::create(fields.sortedBy(x ~> x.i0).toArray(), x -> x), + ) | Failure(err) -> SKJSON.CJFloat(getErrorHdl(err)) }; } diff --git a/skipruntime-ts/core/native/src/Runtime.sk b/skipruntime-ts/core/native/src/Runtime.sk index 4788fcc82..29f7fcaea 100644 --- a/skipruntime-ts/core/native/src/Runtime.sk +++ b/skipruntime-ts/core/native/src/Runtime.sk @@ -78,8 +78,90 @@ class ResourceKey( } } +class ResourceStatus( + loadable: SKStore.DirName, + status: Status, +) extends SKStore.File {} + class ServiceFile(value: Map) extends SKStore.File +base class Status(created: Int, modified: Int) uses Show { + children = + | Loading() + | Error(error: SKJSON.CJSON) + | Ok() + + static fun create(): Status { + time = Time.time_ms(); + Loading(time, time) + } + + fun ok(): Status { + Ok(this.created, Time.time_ms()) + } + + fun err(error: SKJSON.CJSON): Status { + Error(error, this.created, Time.time_ms()) + } + + fun toString(): String + | Loading(created, modified) -> + `Loading{created: ${created}, modified: ${modified}}` + | Error(error, created, modified) -> + `Error{error: ${error.toJSON()}, created: ${created}, modified: ${modified}}` + | Ok(created, modified) -> `Ok{created: ${created}, modified: ${modified}}` +} + +class StatusFile(status: Status) extends SKStore.File + +class RequestFile( + timestamp: Int, + resource: ResourceInfo, + checker: ?Checker, +) extends SKStore.File + +class RequestStatuses(statuses: Array) extends SKStore.File { + // + fun getResult(request: String, values: T): GetResult { + errors = mutable Vector[]; + loading: Bool = false; + for (resourceStatus in this.statuses) { + resourceStatus.status match { + | Error(error, _, _) -> errors.push(error) + | Loading _ -> !loading = true + | Ok _ -> void + } + }; + GetResult(if (loading) Some(request) else None(), values, errors.toArray()) + } + + fun isLoading(): Bool { + for (resourceStatus in this.statuses) { + resourceStatus.status match { + | Loading _ -> break true + | Error _ + | Ok _ -> + void + } + } else { + false + } + } + + fun getErrors(): Array { + errors = mutable Vector[]; + for (resourceStatus in this.statuses) { + resourceStatus.status match { + | Error(error, _, _) -> errors.push(error) + | Loading _ + | Ok _ -> + void + } + }; + errors.toArray() + } +} + class RemoteSpecifiers( value: Map, ) extends SKStore.File @@ -88,7 +170,51 @@ class ResourceInfo( name: String, collection: Collection, reactiveAuth: ?Array, -) extends SKStore.File + statusRef: SKStore.DirName, +) extends SKStore.File { + /** + * Create a status request + */ + fun createRequest( + context: mutable SKStore.Context, + optChecker: ?Checker, + ): String { + requestId = Ksuid::create().toString(); + SKStore.EHandle( + SKStore.SID::keyType, + RequestFile::type, + this.statusRef.sub("requests"), + ).writeArray( + context, + SKStore.SID(requestId), + Array[RequestFile(Time.time_ms(), this, optChecker)], + ); + context.update(); + requestId; + } + + fun getResult( + context: mutable SKStore.Context, + request: String, + values: T, + ): GetResult { + SKStore.EHandle( + SKStore.SID::keyType, + RequestStatuses::type, + this.statusRef.sub("all"), + ) + .get(context, SKStore.SID(request)) + .getResult(request, values) + } + + fun clearRequest(context: mutable SKStore.Context, requestId: String): void { + SKStore.EHandle( + SKStore.SID::keyType, + RequestFile::type, + this.statusRef.sub("requests"), + ).writeArray(context, SKStore.SID(requestId), Array[]); + } +} fun initCtx(): SKStore.Context { SKStore.Context{} @@ -105,7 +231,7 @@ fun initService(service: Service): Result { if (SKStore.gHasContext() == 0) { SKStore.gContextInit(initCtx()); }; - session = `${Ksuid::create().toString()}`; + session = Ksuid::create().toString(); SKStore.runWithResult(context ~> { context.setPersistent( kRemoteSpecifiers, @@ -161,6 +287,24 @@ fun initService(service: Service): Result { } }, ); + // Service status + statusHdl = context + .mkdir( + SKStore.DirName::keyType, + StatusFile::type, + kSessionDir.sub("status"), + Array[], + ) + .map( + SKStore.IID::keyType, + ResourceStatus::type, + context, + kSessionDir.sub("statuses"), + (_ctx, writer, key, it) ~> { + writer.set(SKStore.IID(0), ResourceStatus(key, it.first.status)); + }, + ); + rSessionHdl = context.mkdir( SKStore.SID::keyType, SKStore.IntFile::type, @@ -175,21 +319,12 @@ fun initService(service: Service): Result { kResourceGraphDir, (context, writer, key, _it) ~> { rsession = key.value; - dirname = SKStore.DirName::create( - `/sk_prv/resources/session/${rsession}/`, - ); - nDirname = SKStore.DirName::create( - `/sk_prv/resources/session/${rsession}/names/`, - ); - gDirname = SKStore.DirName::create( - `/sk_prv/resources/session/${rsession}/graph/`, - ); - cDirname = SKStore.DirName::create( - `/sk_prv/resources/session/${rsession}/collections/`, - ); - dDirname = SKStore.DirName::create( - `/sk_prv/resources/session/${rsession}/data/`, - ); + // Resource graph + dirname = resourceSessionDirName(rsession); + nDirname = dirname.sub("names"); + gDirname = dirname.sub("graph"); + cDirname = dirname.sub("collections"); + dDirname = dirname.sub("data"); namesHdl = context .mkdir( ResourceKey::keyType, @@ -220,6 +355,57 @@ fun initService(service: Service): Result { pushContext(context); try { resourceId = it.first.value; + statusRef = dirname.sub(resourceId); + // Status graph + sStatusHdl = context + .mkdir( + SKStore.DirName::keyType, + StatusFile::type, + statusRef.sub("status"), + Array[], + true, + ) + .map( + SKStore.IID::keyType, + ResourceStatus::type, + context, + statusRef.sub("statuses"), + (_ctx, writer, key, it) ~> { + writer.set( + SKStore.IID(0), + ResourceStatus(key, it.first.status), + ); + }, + ); + _sRequestsHdl = context + .mkdir( + SKStore.SID::keyType, + RequestFile::type, + statusRef.sub("requests"), + Array[], + true, + ) + .map( + SKStore.SID::keyType, + RequestStatuses::type, + context, + statusRef.sub("all"), + (ctx, writer, key, it) ~> { + rf = it.first; + timestamp = rf.timestamp; + statuses = statusHdl + .getArray(ctx, SKStore.IID(0)) + .concat(sStatusHdl.getArray(ctx, SKStore.IID(0))) + .filter(rs -> rs.status.created <= timestamp); + requestStatuses = RequestStatuses(statuses); + writer.set(key, requestStatuses); + rf.checker match { + | Some(checker) if (!requestStatuses.isLoading()) -> + ctx.postpone(CheckRequest(statusRef, key, checker)) + | _ -> void + } + }, + ); resourceBuilder = service.resources.get(key.name); resource = resourceBuilder.build(key.params.value); allCollections = mutable Map[]; @@ -247,6 +433,7 @@ fun initService(service: Service): Result { resourceId, collection with {hdl => resourceData}, key.reactiveAuth, + statusRef, ), ); popContext() @@ -317,6 +504,24 @@ fun closeService(): Result { }) } +class CheckRequest( + statusRef: SKStore.DirName, + request: SKStore.SID, + checker: Checker, +) extends SKStore.Postponable { + // + fun perform(context: mutable SKStore.Context): void { + pushContext(context); + this.checker.check(this.request.value); + popContext(); + SKStore.EHandle( + SKStore.SID::keyType, + RequestFile::type, + this.statusRef.sub("requests"), + ).writeArray(context, this.request, Array[]); + } +} + class LinkToResource( supplier: ExternalSupplier, writer: CollectionWriter, @@ -324,7 +529,9 @@ class LinkToResource( params: Map, reactiveAuth: ?Array, ) extends SKStore.Postponable { + // fun perform(context: mutable SKStore.Context): void { + this.writer.status(context, Status::create()); pushContext(context); this.supplier.subscribe( this.writer, @@ -869,6 +1076,7 @@ class CollectionWriter(dirName: SKStore.DirName) { | Some(context) -> try { this.update_(context, values, isInit); + this.updateStatus(context, status ~> status.ok()); Success(void) } catch { | ex -> Failure(ex) @@ -876,11 +1084,84 @@ class CollectionWriter(dirName: SKStore.DirName) { | _ -> SKStore.runWithResult(context ~> { this.update_(context, values, isInit); + this.updateStatus(context, status ~> status.ok()); + context.update(); + }) + } + } + + fun error(error: SKJSON.CJSON): Result { + getContext() match { + | Some(context) -> + try { + this.updateStatus(context, status ~> status.err(error)); + Success(void) + } catch { + | ex -> Failure(ex) + } + | _ -> + SKStore.runWithResult(context ~> { + this.updateStatus(context, status ~> status.err(error)); + context.update(); + }) + } + } + + fun loading(): Result { + getContext() match { + | Some(context) -> + try { + this.status(context, Status::create()); + Success(void) + } catch { + | ex -> Failure(ex) + } + | _ -> + SKStore.runWithResult(context ~> { + this.status(context, Status::create()); context.update(); }) } } + fun status(context: mutable SKStore.Context, status: Status): void { + context.maybeGetEagerDir(this.dirName) match { + | Some(dir) -> + dirname = this.sessionId(context, dir) match { + | Some(sessionId) -> resourceSessionDirName(sessionId).sub("status") + | _ -> kSessionDir.sub("status") + }; + shdl = SKStore.EHandle( + SKStore.DirName::keyType, + StatusFile::type, + dirname, + ); + shdl.writeArray(context, this.dirName, Array[StatusFile(status)]) + | _ -> void + }; + } + + fun updateStatus( + context: mutable SKStore.Context, + update: Status ~> Status, + ): void { + context.maybeGetEagerDir(this.dirName) match { + | Some(dir) -> + dirname = this.sessionId(context, dir) match { + | Some(sessionId) -> resourceSessionDirName(sessionId).sub("status") + | _ -> kSessionDir.sub("status") + }; + shdl = SKStore.EHandle( + SKStore.DirName::keyType, + StatusFile::type, + dirname, + ); + status = update(shdl.get(context, this.dirName).status); + shdl.writeArray(context, this.dirName, Array[StatusFile(status)]) + | _ -> void + }; + } + private fun update_( context: mutable SKStore.Context, values: Array<(SKJSON.CJSON, Array)>, @@ -904,14 +1185,61 @@ class CollectionWriter(dirName: SKStore.DirName) { }); keys.each(key -> chdl.writeArray(context, key, Array[])); } -} -value class ReactiveResponse(id: String, watermark: SKStore.Tick) + private fun sessionId( + context: mutable SKStore.Context, + dir: SKStore.EagerDir, + ): ?String { + dir.creator match { + | Some(arrow) -> + this.extractSessionId(arrow.parentName) match { + | Some(dn) -> Some(dn) + | _ -> + context.maybeGetEagerDir(arrow.parentName) match { + | Some(sdir) -> this.sessionId(context, sdir) + | _ -> None() + } + } + | _ -> None() + } + } -value class Values( - values: Array<(SKJSON.CJSON, Array)>, - reactive: ReactiveResponse, -) + private const kSessionDir: Array = Array[ + "", + "sk_prv", + "resources", + "session", + "#", + "graph", + "", + ]; + + private fun extractSessionId(dirName: SKStore.DirName): ?String { + elements = dirName.toString().split("/"); + if (elements.size() == static::kSessionDir.size()) { + session: ?String = None(); + for (idx in Range(0, elements.size())) { + if (idx == 4) { + !session = Some(elements[idx]); + } else if (elements[idx] != static::kSessionDir[idx]) { + return None() + } + }; + return session; + }; + None() + } +} + +value class GetResult( + request: ?String, + values: T, + errors: Array, +) { + fun loading(): Bool { + this.request.isSome() + } +} fun createReactiveResource( context: mutable SKStore.Context, @@ -925,27 +1253,64 @@ fun createReactiveResource( fun getAll( context: mutable SKStore.Context, - resource: String, + resourceName: String, params: Map, reactiveAuth: ?Array, -): Values { - names = createResource_(context, resource, params, reactiveAuth); - values = names.collection.getAll(context); - Values(values, ReactiveResponse(names.name, context.tick)) + optRequest: ?Request, +): GetResult { + resource = createResource_(context, resourceName, params, reactiveAuth); + // create requests + request = optRequest match { + | Some(checker @ Checker _) -> resource.createRequest(context, Some(checker)) + | Some(Identifier(req)) -> req + | _ -> resource.createRequest(context, None()) + }; + values = Values( + resource.collection.getAll(context), + ReactiveResponse(resource.name, context.tick), + ); + // return result type + res = resource.getResult(context, request, values); + if (!res.loading()) { + optRequest match { + | Some(Identifier _) + | None() -> + resource.clearRequest(context, request) + | Some(Checker _) -> void + }; + }; + res; } fun getForKey( context: mutable SKStore.Context, - resource: String, + resourceName: String, params: Map, key: SKJSON.CJSON, reactiveAuth: ?Array, -): Array { - names = createResource_(context, resource, params, reactiveAuth); + optRequest: ?Request, +): GetResult> { + resource = createResource_(context, resourceName, params, reactiveAuth); + // create requests + request = optRequest match { + | Some(checker @ Checker _) -> resource.createRequest(context, Some(checker)) + | Some(Identifier(req)) -> req + | _ -> resource.createRequest(context, None()) + }; pushContext(context); - res = names.collection.getArray(key); + values = resource.collection.getArray(key); popContext(); - res + // return result type + res = resource.getResult(context, request, values); + if (!res.loading()) { + optRequest match { + | Some(Identifier _) + | None() -> + resource.clearRequest(context, request) + | Some(Checker _) -> void + }; + }; + res; } fun closeReactiveResource( @@ -955,7 +1320,7 @@ fun closeReactiveResource( reactiveAuth: ?Array, ): void { session = toSessionName(reactiveAuth.default(Array[])); - rDirName = SKStore.DirName::create(`/sk_prv/resources/session/${session}/`); + rDirName = resourceSessionDirName(session); context.maybeGetEagerDir(rDirName).each(_ -> { rObject = ResourceKey(resource, Params(params), reactiveAuth); resourceHdl = SKStore.EHandle( @@ -969,7 +1334,7 @@ fun closeReactiveResource( } fun closeSession_(context: mutable SKStore.Context, session: String): void { - rDirName = SKStore.DirName::create(`/sk_prv/resources/session/${session}/`); + rDirName = resourceSessionDirName(session); resourceHdl = SKStore.EHandle( ResourceKey::keyType, SKStore.IntFile::type, @@ -1095,7 +1460,7 @@ private fun createResource_( Array[SKStore.IntFile(0)], ); context.update(); - rDirName = SKStore.DirName::create(`/sk_prv/resources/session/${session}/`); + rDirName = resourceSessionDirName(session); rObject = ResourceKey(resource, Params(params), reactiveAuth); resourceHdl = SKStore.EHandle( ResourceKey::keyType, @@ -1104,9 +1469,7 @@ private fun createResource_( ); resourceHdl.writeArray(context, rObject, Array[SKStore.IntFile(0)]); context.update(); - gDirName = SKStore.DirName::create( - `/sk_prv/resources/session/${session}/graph/`, - ); + gDirName = rDirName.sub("graph"); graphHdl = SKStore.EHandle( ResourceKey::keyType, ResourceInfo::type, diff --git a/skipruntime-ts/core/native/src/Utils.sk b/skipruntime-ts/core/native/src/Utils.sk index e45943afd..34d1ef237 100644 --- a/skipruntime-ts/core/native/src/Utils.sk +++ b/skipruntime-ts/core/native/src/Utils.sk @@ -17,6 +17,10 @@ const kResourceCollectionsDir: SKStore.DirName = SKStore.DirName::create( const kRemoteSpecifiers: String = "SkipRuntime.RemoteSpecifiers"; +fun resourceSessionDirName(sessionId: String): SKStore.DirName { + SKStore.DirName::create(`/sk_prv/resources/session/${sessionId}/`) +} + class JSONKeyConverter() extends KeyConverter { fun toJSON(value: SKStore.Key): SKJSON.CJSON { JSONID::keyType(value).value diff --git a/skipruntime-ts/core/src/internals/skipruntime_internal_types.ts b/skipruntime-ts/core/src/internals/skipruntime_internal_types.ts index 65d0a0d7a..44acf7f51 100644 --- a/skipruntime-ts/core/src/internals/skipruntime_internal_types.ts +++ b/skipruntime-ts/core/src/internals/skipruntime_internal_types.ts @@ -51,3 +51,6 @@ export type Accumulator = T; declare const notifier: unique symbol; export type Notifier = T; + +declare const request: unique symbol; +export type Request = T; diff --git a/skipruntime-ts/core/src/internals/skipruntime_module.ts b/skipruntime-ts/core/src/internals/skipruntime_module.ts index b0a7b7fb2..fcc217384 100644 --- a/skipruntime-ts/core/src/internals/skipruntime_module.ts +++ b/skipruntime-ts/core/src/internals/skipruntime_module.ts @@ -32,9 +32,7 @@ import type { SubscriptionID, } from "../skipruntime_api.js"; -import type { ServiceInstance } from "../skipruntime_init.js"; - -import type { SKJSON } from "skjson"; +import type { Exportable, SKJSON } from "skjson"; import { UnknownCollectionError } from "../skipruntime_errors.js"; export type Handle = Internal.Opaque; @@ -179,6 +177,15 @@ export interface FromWasm { isInit: boolean, ): Handle; + SkipRuntime_CollectionWriter__error( + name: ptr, + error: ptr, + ): Handle; + + SkipRuntime_CollectionWriter__loading( + name: ptr, + ): Handle; + // Resource SkipRuntime_createResource(ref: Handle): ptr; @@ -290,14 +297,16 @@ export interface FromWasm { resource: ptr, jsonParams: ptr, reactiveAuth: ptr> | null, - ): ptr; + request: ptr | null, + ): ptr; SkipRuntime_Runtime__getForKey( resource: ptr, jsonParams: ptr, key: ptr, reactiveAuth: ptr> | null, - ): ptr; + request: ptr | null, + ): ptr; SkipRuntime_Runtime__closeResource( resource: ptr, @@ -310,8 +319,8 @@ export interface FromWasm { ): Handle; SkipRuntime_Runtime__subscribe( - reactiveId: ptr, - from: bigint, + collection: ptr, + fromWatermark: bigint, notifier: ptr, reactiveAuth: ptr> | null, ): bigint; @@ -352,6 +361,14 @@ export interface FromWasm { params: ptr, reactiveAuth: ptr> | null, ): ptr; + + // Checker + + SkipRuntime_createIdentifier( + supplier: ptr, + ): ptr; + + SkipRuntime_createChecker(ref: Handle): ptr; } interface ToWasm { @@ -462,6 +479,15 @@ interface ToWasm { SkipRuntime_deleteAccumulator( notifier: Handle>, ): void; + + // Checker + + SkipRuntime_Checker__check( + checker: Handle, + request: ptr, + ): void; + + SkipRuntime_deleteChecker(checker: Handle): void; } class Handles { @@ -782,7 +808,11 @@ class LinksImpl implements Links { supplier.subscribe( resource, params, - writer.update.bind(writer), + { + update: writer.update.bind(writer), + error: writer.error.bind(writer), + loading: writer.loading.bind(writer), + }, reactiveAuth, ); } @@ -812,6 +842,18 @@ class LinksImpl implements Links { this.handles.deleteHandle(supplier); } + // Checker + + checkOfChecker(skchecker: Handle, skrequest: ptr) { + const skjson = this.getSkjson(); + const checker = this.handles.get(skchecker); + checker.check(skjson.importString(skrequest)); + } + + deleteChecker(checker: Handle) { + this.handles.deleteHandle(checker); + } + initService(service: SkipService): ServiceInstance { const skjson = this.getSkjson(); const result = skjson.runWithGC(() => { @@ -854,7 +896,7 @@ class LinksImpl implements Links { if (result != 0) { throw this.handles.deleteAsError(result as Handle); } - return new ServiceInstanceImpl( + return new ServiceInstance( new Refs(skjson, this.fromWasm, this.handles, this.needGC.bind(this)), ); } @@ -1032,17 +1074,46 @@ class CollectionWriter { ) {} update(values: Entry[], isInit: boolean): void { - const todo = () => { + const update_ = () => { return this.refs.fromWasm.SkipRuntime_CollectionWriter__update( this.refs.skjson.exportString(this.collection), this.refs.skjson.exportJSON(values), isInit, ); }; - const needGC = this.refs.needGC(); - let result: Handle; - if (needGC) result = this.refs.skjson.runWithGC(todo); - else result = todo(); + const result: Handle = this.refs.needGC() + ? this.refs.skjson.runWithGC(update_) + : update_(); + + if (result != 0) { + throw this.refs.handles.deleteAsError(result); + } + } + + loading(): void { + const loading_ = () => { + return this.refs.fromWasm.SkipRuntime_CollectionWriter__loading( + this.refs.skjson.exportString(this.collection), + ); + }; + const result: Handle = this.refs.needGC() + ? this.refs.skjson.runWithGC(loading_) + : loading_(); + if (result != 0) { + throw this.refs.handles.deleteAsError(result); + } + } + + error(error: TJSON): void { + const error_ = () => { + return this.refs.fromWasm.SkipRuntime_CollectionWriter__error( + this.refs.skjson.exportString(this.collection), + this.refs.skjson.exportJSON(error), + ); + }; + const result: Handle = this.refs.needGC() + ? this.refs.skjson.runWithGC(error_) + : error_(); if (result != 0) { throw this.refs.handles.deleteAsError(result); } @@ -1140,10 +1211,91 @@ export class ServiceInstanceFactory implements Shared { } } -class ServiceInstanceImpl implements ServiceInstance { +export type Values = { + values: Entry[]; + reactive?: ReactiveResponse; +}; + +export type GetResult = { + request?: string; + payload: T; + errors: TJSON[]; +}; + +export type Executor = { + resolve: (value: T) => void; + reject: (reason?: any) => void; +}; + +interface Checker { + check(request: string): void; +} + +class AllChecker implements Checker { + constructor( + private service: ServiceInstance, + private executor: Executor>, + private resource: string, + private params: Record, + private reactiveAuth?: Uint8Array, + ) {} + + check(request: string): void { + const result = this.service.getAll( + this.resource, + this.params, + this.reactiveAuth, + request, + ); + if (result.errors.length > 0) { + this.executor.reject(new Error(JSON.stringify(result.errors))); + } else { + this.executor.resolve(result.payload); + } + } +} + +class OneChecker implements Checker { + constructor( + private service: ServiceInstance, + private executor: Executor, + private resource: string, + private params: Record, + private key: string | number, + private reactiveAuth?: Uint8Array, + ) {} + + check(request: string): void { + const result = this.service.getArray( + this.resource, + this.key, + this.params, + this.reactiveAuth, + request, + ); + if (result.errors.length > 0) { + this.executor.reject(new Error(JSON.stringify(result.errors))); + } else { + this.executor.resolve(result.payload); + } + } +} + +/** + * A `ServiceInstance` is a running instance of a `SkipService`, providing access to its resources + * and operations to manage susbscriptions and the service itself. + */ +export class ServiceInstance { constructor(private refs: Refs) {} - createResource( + /** + * Instantiate a resource with some parameters and client session authentication token + * @param resource - A resource name, which must correspond to a key in this `SkipService`'s `resources` field + * @param params - Resource parameters, which will be passed to the resource constructor specified in this `SkipService`'s `resources` field + * @param reactiveAuth - A client-generated Skip session authentication token + * @returns A response token which can be used to initiate reactive subscription + */ + instantiateResource( resource: string, params: Record, reactiveAuth?: Uint8Array, @@ -1165,56 +1317,117 @@ class ServiceInstanceImpl implements ServiceInstance { return { collection, watermark: BigInt(watermark) as Watermark }; } + /** + * Creates if not exists and get all current values of specified resource + * @param resource - the resource name corresponding to a key in remotes field of SkipService + * @param params - the parameters of the resource used to build the resource with the corresponding constructor specified in remotes field of SkipService + * @param reactiveAuth - the client user Skip session authentification + * @returns The current values of the corresponding resource with reactive responce token to allow subscription + */ getAll( resource: string, - params: Record, + params: Record = {}, reactiveAuth?: Uint8Array, - ): { values: Entry[]; reactive?: ReactiveResponse } { - const result = this.refs.skjson.runWithGC(() => { + request?: string | Executor>, + ): GetResult> { + const get_ = () => { return this.refs.skjson.importJSON( this.refs.fromWasm.SkipRuntime_Runtime__getAll( this.refs.skjson.exportString(resource), this.refs.skjson.exportJSON(params), reactiveAuth ? this.refs.skjson.exportBytes(reactiveAuth) : null, + request !== undefined + ? typeof request == "string" + ? this.refs.fromWasm.SkipRuntime_createIdentifier( + this.refs.skjson.exportString(request), + ) + : this.refs.fromWasm.SkipRuntime_createChecker( + this.refs.handles.register( + new AllChecker( + this, + request, + resource, + params, + reactiveAuth, + ), + ), + ) + : null, ), true, ); - }); + }; + const result: Exportable = this.refs.needGC() + ? this.refs.skjson.runWithGC(get_) + : get_(); + if (typeof result == "number") { throw this.refs.handles.deleteAsError(result as Handle); } - const [values, reactive] = result as [Entry[], [string, string]]; - const [collection, watermark] = reactive; - return { - values, - reactive: { collection, watermark: BigInt(watermark) as Watermark }, - }; - } - - getOne( + return result as GetResult>; + } + + /** + * Get the current value of a key in the specified resource instance, creating it if it doesn't already exist + * @param resource - A resource name, which must correspond to a key in this `SkipService`'s `resources` field + * @param key - A key to look up in the resource instance + * @param params - Resource parameters, passed to the resource constructor specified in this `SkipService`'s `resources` field + * @param reactiveAuth - the client Skip session authentication token + * @returns The current value(s) for this key in the specified resource instance + */ + getArray( resource: string, - params: Record, key: string | number, + params: Record = {}, reactiveAuth?: Uint8Array, - ): V[] { - const result = this.refs.skjson.runWithGC(() => { + request?: string | Executor, + ): GetResult { + const get_ = () => { return this.refs.skjson.importJSON( this.refs.fromWasm.SkipRuntime_Runtime__getForKey( this.refs.skjson.exportString(resource), this.refs.skjson.exportJSON(params), this.refs.skjson.exportJSON(key), reactiveAuth ? this.refs.skjson.exportBytes(reactiveAuth) : null, + request !== undefined + ? typeof request == "string" + ? this.refs.fromWasm.SkipRuntime_createIdentifier( + this.refs.skjson.exportString(request), + ) + : this.refs.fromWasm.SkipRuntime_createChecker( + this.refs.handles.register( + new OneChecker( + this, + request, + resource, + params, + key, + reactiveAuth, + ), + ), + ) + : null, ), true, ); - }); + }; + const needGC = this.refs.needGC(); + const result: Exportable = needGC + ? this.refs.skjson.runWithGC(get_) + : get_(); if (typeof result == "number") { throw this.refs.handles.deleteAsError(result as Handle); } - return result as V[]; + return result as GetResult; } - closeResource( + /** + * Close the specified resource instance + * @param resource - The resource name, which must correspond to a key in this `SkipService`'s `resources` field + * @param params - Resource parameters which were used to instantiate the resource + * @param reactiveAuth - The client Skip session authentication for this resource instance + */ + closeResourceInstance( resource: string, params: Record, reactiveAuth?: Uint8Array, @@ -1231,6 +1444,10 @@ class ServiceInstanceImpl implements ServiceInstance { } } + /** + * Close all resource instances maintained for the specified `reactiveAuth` session + * @param reactiveAuth - A client Skip session authentication token + */ closeSession(reactiveAuth?: Uint8Array): void { const result = this.refs.skjson.runWithGC(() => { return this.refs.fromWasm.SkipRuntime_Runtime__closeSession( @@ -1242,9 +1459,15 @@ class ServiceInstanceImpl implements ServiceInstance { } } + /** + * Initiate reactive subscription on a resource instance + * @param reactiveResponse - the reactive response + * @param f - A callback to execute on collection updates + * @param reactiveAuth The client Skip session authentication token corresponding to the reactive response + * @returns A subcription identifier + */ subscribe( - reactiveId: string, - since: Watermark, + reactiveResponse: ReactiveResponse, f: (update: CollectionUpdate) => void, reactiveAuth?: Uint8Array, ): SubscriptionID { @@ -1253,17 +1476,19 @@ class ServiceInstanceImpl implements ServiceInstance { this.refs.handles.register(f), ); return this.refs.fromWasm.SkipRuntime_Runtime__subscribe( - this.refs.skjson.exportString(reactiveId), - since, + this.refs.skjson.exportString(reactiveResponse.collection), + reactiveResponse.watermark, sknotifier, reactiveAuth ? this.refs.skjson.exportBytes(reactiveAuth) : null, ); }); if (session == -1n) { - throw new UnknownCollectionError(`Unknown collection '${reactiveId}'`); + throw new UnknownCollectionError( + `Unknown collection '${reactiveResponse.collection}'`, + ); } else if (session == -2n) { throw new UnknownCollectionError( - `Access to collection '${reactiveId}' refused.`, + `Access to collection '${reactiveResponse.collection}' refused.`, ); } else if (session < 0n) { throw new Error("Unknown error"); @@ -1271,6 +1496,10 @@ class ServiceInstanceImpl implements ServiceInstance { return session as SubscriptionID; } + /** + * Terminate a client's subscription to a reactive resource instance + * @param id - The subcription identifier returned by a call to `subscribe` + */ unsubscribe(id: SubscriptionID): void { const result = this.refs.skjson.runWithGC(() => { return this.refs.fromWasm.SkipRuntime_Runtime__unsubscribe(id); @@ -1280,14 +1509,19 @@ class ServiceInstanceImpl implements ServiceInstance { } } + /** + * Update an input collection + * @param collection - the name of the input collection to update + * @param entries - entries to update in the collection. + */ update( collection: string, - values: Entry[], + entries: Entry[], ): void { const result = this.refs.skjson.runWithGC(() => { return this.refs.fromWasm.SkipRuntime_Runtime__update( this.refs.skjson.exportString(collection), - this.refs.skjson.exportJSON(values), + this.refs.skjson.exportJSON(entries), ); }); if (result != 0) { @@ -1295,6 +1529,10 @@ class ServiceInstanceImpl implements ServiceInstance { } } + /** + * Close all resources and shut down the service. + * Any subsequent calls on the service will result in errors. + */ close(): void { const result = this.refs.skjson.runWithGC(() => { return this.refs.fromWasm.SkipRuntime_closeService(); @@ -1442,6 +1680,11 @@ class Manager implements ToWasmManager { links.dismissOfAccumulator.bind(links); toWasm.SkipRuntime_deleteAccumulator = links.deleteAccumulator.bind(links); + // Checker + + toWasm.SkipRuntime_Checker__check = links.checkOfChecker.bind(links); + toWasm.SkipRuntime_deleteChecker = links.deleteChecker.bind(links); + return links; } } diff --git a/skipruntime-ts/core/src/skip-runtime.ts b/skipruntime-ts/core/src/skip-runtime.ts index 35e2182a7..adbde28e5 100644 --- a/skipruntime-ts/core/src/skip-runtime.ts +++ b/skipruntime-ts/core/src/skip-runtime.ts @@ -1,4 +1,4 @@ -export { type ServiceInstance, initService } from "./skipruntime_init.js"; +export { initService } from "./skipruntime_init.js"; export type { Mapper, @@ -21,6 +21,10 @@ export type { SubscriptionID, } from "./skipruntime_api.js"; +export type { + Values, + ServiceInstance, +} from "./internals/skipruntime_module.js"; export { UnknownCollectionError } from "./skipruntime_errors.js"; export { freeze, ValueMapper } from "./skipruntime_api.js"; export { diff --git a/skipruntime-ts/core/src/skipruntime_api.ts b/skipruntime-ts/core/src/skipruntime_api.ts index c623d4db0..234fe4e1b 100644 --- a/skipruntime-ts/core/src/skipruntime_api.ts +++ b/skipruntime-ts/core/src/skipruntime_api.ts @@ -305,13 +305,20 @@ export interface ExternalSupplier { * Subscribe to the external resource * @param resource - the name of the external resource * @param params - the parameters of the external resource - * @param cb - the callback called on collection updates + * @param callbacks - the callbacks to react on update/error/loading + * @param callbacks.update - the update callback + * @param callbacks.error - the error callback + * @param callbacks.loading - the loading callback * @param reactiveAuth - the client user Skip session authentification of the caller */ subscribe( resource: string, params: Record, - cb: (updates: Entry[], isInit: boolean) => void, + callbacks: { + update: (updates: Entry[], isInit: boolean) => void; + error: (error: TJSON) => void; + loading: () => void; + }, reactiveAuth?: Uint8Array, ): void; diff --git a/skipruntime-ts/core/src/skipruntime_helpers.ts b/skipruntime-ts/core/src/skipruntime_helpers.ts index fb3b227b5..5edec4f57 100644 --- a/skipruntime-ts/core/src/skipruntime_helpers.ts +++ b/skipruntime-ts/core/src/skipruntime_helpers.ts @@ -4,7 +4,11 @@ import { fetchJSON } from "./skipruntime_rest.js"; export interface ExternalResource { open( params: Record, - cb: (updates: Entry[], isInit: boolean) => void, + callbacks: { + update: (updates: Entry[], isInit: boolean) => void; + error: (error: TJSON) => void; + loading: () => void; + }, reactiveAuth?: Uint8Array, ): void; @@ -20,7 +24,11 @@ export class ExternalService implements ExternalSupplier { subscribe( resourceName: string, params: Record, - cb: (updates: Entry[], isInit: boolean) => void, + callbacks: { + update: (updates: Entry[], isInit: boolean) => void; + error: (error: TJSON) => void; + loading: () => void; + }, reactiveAuth?: Uint8Array, ): void { const resource = this.resources[resourceName] as @@ -29,7 +37,7 @@ export class ExternalService implements ExternalSupplier { if (!resource) { throw new Error(`Unkonwn resource named '${resourceName}'`); } - resource.open(params, cb, reactiveAuth); + resource.open(params, callbacks, reactiveAuth); } unsubscribe( @@ -58,7 +66,11 @@ export class TimeCollection implements ExternalResource { open( params: Record, - cb: (updates: Entry[], isInit: boolean) => void, + callbacks: { + update: (updates: Entry[], isInit: boolean) => void; + error: (error: TJSON) => void; + loading: () => void; + }, reactiveAuth?: Uint8Array, ) { const time = new Date().getTime(); @@ -66,7 +78,7 @@ export class TimeCollection implements ExternalResource { for (const name of Object.keys(params)) { values.push([name, [time]]); } - cb(values, true); + callbacks.update(values, true); const id = toId(params, reactiveAuth); const intervals: Record = {}; for (const [name, duration] of Object.entries(params)) { @@ -74,7 +86,7 @@ export class TimeCollection implements ExternalResource { if (ms > 0) { intervals[name] = setInterval(() => { const newvalue: Entry = [name, [new Date().getTime()]]; - cb([newvalue], false); + callbacks.update([newvalue], true); }, ms); } } @@ -107,7 +119,11 @@ export class Polled open( params: Record, - cb: (updates: Entry[], isInit: boolean) => void, + callbacks: { + update: (updates: Entry[], isInit: boolean) => void; + error: (error: TJSON) => void; + loading: () => void; + }, reactiveAuth?: Uint8Array, ): void { this.close(params, reactiveAuth); @@ -118,11 +134,15 @@ export class Polled const strParams = new URLSearchParams(querieParams).toString(); const uri = `${this.uri}?${strParams}`; const call = () => { + callbacks.loading(); fetchJSON(uri, "GET", {}) .then((r) => { - cb(this.conv(r[0] as S), true); + callbacks.update(this.conv(r[0] as S), true); }) - .catch((e: unknown) => console.error(e)); + .catch((e: unknown) => { + callbacks.error(e instanceof Error ? e.message : JSON.stringify(e)); + console.error(e); + }); }; call(); this.intervals.set( diff --git a/skipruntime-ts/core/src/skipruntime_init.ts b/skipruntime-ts/core/src/skipruntime_init.ts index 7810d5246..786ff60a0 100644 --- a/skipruntime-ts/core/src/skipruntime_init.ts +++ b/skipruntime-ts/core/src/skipruntime_init.ts @@ -1,14 +1,9 @@ import { run, type ModuleInit } from "std"; +import type { SkipService } from "./skipruntime_api.js"; import type { - CollectionUpdate, - Entry, - ReactiveResponse, - SkipService, - SubscriptionID, - TJSON, - Watermark, -} from "./skipruntime_api.js"; -import type { ServiceInstanceFactory } from "./internals/skipruntime_module.js"; + ServiceInstanceFactory, + ServiceInstance, +} from "./internals/skipruntime_module.js"; import { init as runtimeInit } from "std/runtime.js"; import { init as posixInit } from "std/posix.js"; @@ -22,13 +17,18 @@ const modules: ModuleInit[] = [ skruntimeInit, ]; -async function wasmUrl(): Promise { +interface Imported { + default: string; +} + +async function wasmUrl(): Promise { //@ts-expect-error ImportMeta is incomplete if (import.meta.env || import.meta.webpack) { - /* eslint-disable @typescript-eslint/no-unsafe-return */ - //@ts-expect-error Cannot find module './skstore.wasm?url' or its corresponding type declarations. - return await import("./libskip-runtime-ts.wasm?url"); - /* eslint-enable @typescript-eslint/no-unsafe-return */ + const imported = (await import( + //@ts-expect-error Cannot find module './skstore.wasm?url' or its corresponding type declarations. + "./libskip-runtime-ts.wasm?url" + )) as Imported; + return imported.default; } return new URL("./libskip-runtime-ts.wasm", import.meta.url); @@ -43,104 +43,3 @@ export async function initService( ) as ServiceInstanceFactory; return factory.initService(service); } - -/** - * A `ServiceInstance` is the result of `initService` - * It gives access to a service's reactively computed resources, and allows to manage sessions or shut down the service. - */ -export interface ServiceInstance { - /** - * Creates if not exists and get all current values of specified resource - * @param resource - the resource name corresponding to a key in remotes field of SkipService - * @param params - the parameters of the resource used to build the resource with the corresponding constructor specified in remotes field of SkipService - * @param reactiveAuth - the client user Skip session authentification - * @returns The current values of the corresponding resource with reactive responce token to allow subscription - */ - getAll( - resource: string, - params: Record, - reactiveAuth?: Uint8Array, - ): { values: Entry[]; reactive?: ReactiveResponse }; - - /** - * Creates specified resource - * @param resource - the resource name correspond to the a key in remotes field of SkipService - * @param params - the parameters of the resource used to build the resource with the corresponding constructor specified in remotes field of SkipService - * @param reactiveAuth - the client user Skip session authentification - * @returns The reactive responce token to allow subscription - */ - createResource( - resource: string, - params: Record, - reactiveAuth?: Uint8Array, - ): ReactiveResponse; - - /** - * Creates if not exists and get the current value of specified key in specified resource - * @param resource - the resource name correspond to the a key in remotes field of SkipService - * @param params - the parameters of the resource used to build the resource with the corresponding constructor specified in remotes field of SkipService - * @param key - the key of value to return - * @param reactiveAuth - the client user Skip session authentification - * @returns The current value of specified key in the corresponding resource - */ - getOne( - resource: string, - params: Record, - key: string | number, - reactiveAuth?: Uint8Array, - ): V[]; - - /** - * Close the specified resource - * @param resource - the resource name correspond to the a key in remotes field of SkipService - * @param params - the parameters of the resource used to build the resource with the corresponding constructor specified in remotes field of SkipService - * @param reactiveAuth - the client user Skip session authentification - */ - closeResource( - resource: string, - params: Record, - reactiveAuth?: Uint8Array, - ): void; - - /** - * Close of the resources corresponding the specified reactiveAuth - * @param reactiveAuth - the client user Skip session authentification - */ - closeSession(reactiveAuth?: Uint8Array): void; - - /** - * Subscribe to a reactive ressource according a given reactive response - * @param reactiveId - the reactive response collection - * @param since - the reactive response watermark - * @param f - the callback called on collection updates - * @param reactiveAuth The client user Skip session authentification corresponding to the reactive response - * @returns The subcription identifier - */ - subscribe( - reactiveId: string, - since: Watermark, - f: (update: CollectionUpdate) => void, - reactiveAuth?: Uint8Array, - ): SubscriptionID; - - /** - * Unsubscribe to a reactive ressource according a given subcription identifier - * @param id - the subcription identifier - */ - unsubscribe(id: SubscriptionID): void; - - /** - * Update an inout collection - * @param input - the name of the input collection to update - * @param values - the values of the input collection to update - */ - update( - input: string, - values: Entry[], - ): void; - - /** - * Close all the resource and shutdown the SkipService - */ - close(): void; -} diff --git a/skipruntime-ts/core/src/skipruntime_remote.ts b/skipruntime-ts/core/src/skipruntime_remote.ts index 7aef7a3f7..d29448634 100644 --- a/skipruntime-ts/core/src/skipruntime_remote.ts +++ b/skipruntime-ts/core/src/skipruntime_remote.ts @@ -74,7 +74,11 @@ export class ExternalSkipService implements ExternalSupplier { subscribe( resource: string, params: Record, - cb: (updates: Entry[], isInit: boolean) => void, + callbacks: { + update: (updates: Entry[], isInit: boolean) => void; + error: (error: TJSON) => void; + loading: () => void; + }, reactiveAuth?: Uint8Array, ): void { if (!this.client) { @@ -89,9 +93,11 @@ export class ExternalSkipService implements ExternalSupplier { }); } } - this.link_(resource, params, cb, reactiveAuth).catch((e: unknown) => { - console.error(e); - }); + this.subscribe_(resource, params, callbacks, reactiveAuth).catch( + (e: unknown) => { + console.error(e); + }, + ); } unsubscribe( @@ -115,19 +121,24 @@ export class ExternalSkipService implements ExternalSupplier { } } - private async link_( + private async subscribe_( resource: string, params: Record, - cb: (updates: Entry[], isInit: boolean) => void, + callbacks: { + update: (updates: Entry[], isInit: boolean) => void; + error: (error: TJSON) => void; + loading: () => void; + }, reactiveAuth?: Uint8Array, ): Promise { const [client, creds] = await this.client!; const publicKey = new Uint8Array(await Protocol.exportKey(creds.publicKey)); const reactive = await this.auth(resource, params, publicKey); + // TODO Manage Status const close = client.subscribe( reactive.collection, BigInt(reactive.watermark), - cb, + callbacks.update, ); this.resources.set(this.toId(resource, params, reactiveAuth), close); } diff --git a/skipruntime-ts/core/test/runtime.spec.ts b/skipruntime-ts/core/test/runtime.spec.ts index 97c7cd37f..a84165393 100644 --- a/skipruntime-ts/core/test/runtime.spec.ts +++ b/skipruntime-ts/core/test/runtime.spec.ts @@ -52,9 +52,9 @@ class Map1Service implements SkipService { } it("testMap1", async () => { - const runtime = await initService(new Map1Service()); - runtime.update("input", [["1", [10]]]); - expect(runtime.getOne("map1", {}, "1")).toEqual([12]); + const service = await initService(new Map1Service()); + service.update("input", [["1", [10]]]); + expect(service.getArray("map1", "1").payload).toEqual([12]); }); //// testMap2 @@ -106,14 +106,14 @@ class Map2Service implements SkipService { } it("testMap2", async () => { - const runtime = await initService(new Map2Service()); + const service = await initService(new Map2Service()); const resource = "map2"; - runtime.update("input1", [["1", [10]]]); - runtime.update("input2", [["1", [20]]]); - expect(runtime.getOne(resource, {}, "1")).toEqual([30]); - runtime.update("input1", [["2", [3]]]); - runtime.update("input2", [["2", [7]]]); - expect(runtime.getAll(resource, {}).values).toEqual([ + service.update("input1", [["1", [10]]]); + service.update("input2", [["1", [20]]]); + expect(service.getArray(resource, "1").payload).toEqual([30]); + service.update("input1", [["2", [3]]]); + service.update("input2", [["2", [7]]]); + expect(service.getAll(resource).payload.values).toEqual([ ["1", [30]], ["2", [10]], ]); @@ -158,14 +158,14 @@ class Map3Service implements SkipService { } it("testMap3", async () => { - const runtime = await initService(new Map3Service()); + const service = await initService(new Map3Service()); const resource = "map3"; - runtime.update("input1", [["1", [1, 2, 3]]]); - runtime.update("input2", [["1", [10]]]); - expect(runtime.getOne(resource, {}, "1")).toEqual([36]); - runtime.update("input1", [["2", [3]]]); - runtime.update("input2", [["2", [7]]]); - expect(runtime.getAll(resource, {}).values).toEqual([ + service.update("input1", [["1", [1, 2, 3]]]); + service.update("input2", [["1", [10]]]); + expect(service.getArray(resource, "1").payload).toEqual([36]); + service.update("input1", [["2", [3]]]); + service.update("input2", [["2", [7]]]); + expect(service.getAll(resource).payload.values).toEqual([ ["1", [36]], ["2", [10]], ]); @@ -211,15 +211,15 @@ class ValueMapperService implements SkipService { } it("valueMapper", async () => { - const runtime = await initService(new ValueMapperService()); + const service = await initService(new ValueMapperService()); const resource = "valueMapper"; - runtime.update("input", [ + service.update("input", [ [1, [1]], [2, [2]], [5, [5]], [10, [10]], ]); - expect(runtime.getAll(resource, {}).values).toEqual([ + expect(service.getAll(resource).payload.values).toEqual([ [1, [2]], [2, [6]], [5, [30]], @@ -268,26 +268,26 @@ class SizeService implements SkipService { } it("testSize", async () => { - const runtime = await initService(new SizeService()); + const service = await initService(new SizeService()); const resource = "size"; - runtime.update("input1", [ + service.update("input1", [ [1, [0]], [2, [2]], ]); - expect(runtime.getAll(resource, {}).values).toEqual([ + expect(service.getAll(resource).payload.values).toEqual([ [1, [0]], [2, [2]], ]); - runtime.update("input2", [ + service.update("input2", [ [1, [10]], [2, [5]], ]); - expect(runtime.getAll(resource, {}).values).toEqual([ + expect(service.getAll(resource).payload.values).toEqual([ [1, [2]], [2, [4]], ]); - runtime.update("input2", [[1, []]]); - expect(runtime.getAll(resource, {}).values).toEqual([ + service.update("input2", [[1, []]]); + expect(service.getAll(resource).payload.values).toEqual([ [1, [1]], [2, [3]], ]); @@ -335,14 +335,14 @@ class SlicedMap1Service implements SkipService { } it("testSlicedMap1", async () => { - const runtime = await initService(new SlicedMap1Service()); + const service = await initService(new SlicedMap1Service()); const resource = "slice"; // Inserts [[0, 0], ..., [30, 30] const values = Array.from({ length: 31 }, (_, i): Entry => { return [i, [i]]; }); - runtime.update("input", values); - expect(runtime.getAll(resource, {}).values).toEqual([ + service.update("input", values); + expect(service.getAll(resource).payload.values).toEqual([ [1, [1]], [3, [9]], [4, [16]], @@ -405,24 +405,24 @@ class LazyService implements SkipService { } it("testLazy", async () => { - const runtime = await initService(new LazyService()); + const service = await initService(new LazyService()); const resource = "lazy"; - runtime.update("input", [ + service.update("input", [ [0, [10]], [1, [20]], ]); - expect(runtime.getAll(resource, {}).values).toEqual([ + expect(service.getAll(resource).payload.values).toEqual([ [0, [2]], [1, [2]], ]); - runtime.update("input", [[2, [4]]]); - expect(runtime.getAll(resource, {}).values).toEqual([ + service.update("input", [[2, [4]]]); + expect(service.getAll(resource).payload.values).toEqual([ [0, [2]], [1, [2]], [2, [2]], ]); - runtime.update("input", [[2, []]]); - expect(runtime.getAll(resource, {}).values).toEqual([ + service.update("input", [[2, []]]); + expect(service.getAll(resource).payload.values).toEqual([ [0, [2]], [1, [2]], ]); @@ -465,33 +465,33 @@ class MapReduceService implements SkipService { } it("testMapReduce", async () => { - const runtime = await initService(new MapReduceService()); + const service = await initService(new MapReduceService()); const resource = "mapReduce"; - runtime.update("input", [ + service.update("input", [ [0, [1]], [1, [1]], [2, [1]], ]); - expect(runtime.getAll(resource, {}).values).toEqual([ + expect(service.getAll(resource).payload.values).toEqual([ [0, [2]], [1, [1]], ]); - runtime.update("input", [[3, [2]]]); - expect(runtime.getAll(resource, {}).values).toEqual([ + service.update("input", [[3, [2]]]); + expect(service.getAll(resource).payload.values).toEqual([ [0, [2]], [1, [3]], ]); - runtime.update("input", [ + service.update("input", [ [0, [2]], [1, [2]], ]); - expect(runtime.getAll(resource, {}).values).toEqual([ + expect(service.getAll(resource).payload.values).toEqual([ [0, [3]], [1, [4]], ]); - runtime.update("input", [[3, []]]); - expect(runtime.getAll(resource, {}).values).toEqual([ + service.update("input", [[3, []]]); + expect(service.getAll(resource).payload.values).toEqual([ [0, [3]], [1, [2]], ]); @@ -534,19 +534,21 @@ function sorted(entries: Entry[]): Entry[] { } it("testMerge1", async () => { - const runtime = await initService(new Merge1Service()); + const service = await initService(new Merge1Service()); const resource = "merge1"; - runtime.update("input1", [[1, [10]]]); - runtime.update("input2", [[1, [20]]]); - expect(sorted(runtime.getAll(resource, {}).values)).toEqual([[1, [10, 20]]]); - runtime.update("input1", [[2, [3]]]); - runtime.update("input2", [[2, [7]]]); - expect(sorted(runtime.getAll(resource, {}).values)).toEqual([ + service.update("input1", [[1, [10]]]); + service.update("input2", [[1, [20]]]); + expect(sorted(service.getAll(resource).payload.values)).toEqual([ + [1, [10, 20]], + ]); + service.update("input1", [[2, [3]]]); + service.update("input2", [[2, [7]]]); + expect(sorted(service.getAll(resource).payload.values)).toEqual([ [1, [10, 20]], [2, [3, 7]], ]); - runtime.update("input1", [[1, []]]); - expect(sorted(runtime.getAll(resource, {}).values)).toEqual([ + service.update("input1", [[1, []]]); + expect(sorted(service.getAll(resource).payload.values)).toEqual([ [1, [20]], [2, [3, 7]], ]); @@ -588,19 +590,19 @@ class MergeReduceService implements SkipService { } it("testMergeReduce", async () => { - const runtime = await initService(new MergeReduceService()); + const service = await initService(new MergeReduceService()); const resource = "mergeReduce"; - runtime.update("input1", [[1, [10]]]); - runtime.update("input2", [[1, [20]]]); - expect(runtime.getAll(resource, {}).values).toEqual([[1, [30]]]); - runtime.update("input1", [[2, [3]]]); - runtime.update("input2", [[2, [7]]]); - expect(runtime.getAll(resource, {}).values).toEqual([ + service.update("input1", [[1, [10]]]); + service.update("input2", [[1, [20]]]); + expect(service.getAll(resource).payload.values).toEqual([[1, [30]]]); + service.update("input1", [[2, [3]]]); + service.update("input2", [[2, [7]]]); + expect(service.getAll(resource).payload.values).toEqual([ [1, [30]], [2, [10]], ]); - runtime.update("input1", [[1, []]]); - expect(runtime.getAll(resource, {}).values).toEqual([ + service.update("input1", [[1, []]]); + expect(service.getAll(resource).payload.values).toEqual([ [1, [20]], [2, [10]], ]); @@ -650,9 +652,9 @@ class JSONExtractService implements SkipService { } it("testJSONExtract", async () => { - const runtime = await initService(new JSONExtractService()); + const service = await initService(new JSONExtractService()); const resource = "jsonExtract"; - runtime.update("input", [ + service.update("input", [ [ 0, [ @@ -682,8 +684,7 @@ it("testJSONExtract", async () => { ], ]); // - const res = runtime.getAll(resource, {}).values; - expect(res).toEqual([ + expect(service.getAll(resource).payload.values).toEqual([ [ 0, [ @@ -718,11 +719,17 @@ class External implements ExternalSupplier { subscribe( resource: string, params: { v1: string; v2: string }, - cb: (updates: Entry[], isInit: boolean) => void, + callbacks: { + update: (updates: Entry[], isInit: boolean) => void; + error: (error: TJSON) => void; + loading: () => void; + }, _reactiveAuth?: Uint8Array, ) { if (resource == "mock") { - this.mock(params, cb).catch((e: unknown) => console.error(e)); + this.mock(params, callbacks.update).catch((e: unknown) => + console.error(e), + ); } return; } @@ -809,38 +816,38 @@ class TestExternalService implements SkipService { it("testExternal", async () => { const resource = "external"; - const runtime = await initService(new TestExternalService()); - runtime.update("input1", [ + const service = await initService(new TestExternalService()); + service.update("input1", [ [0, [10]], [1, [20]], ]); - runtime.update("input2", [ + service.update("input2", [ [0, [5]], [1, [10]], ]); // No value registered in external mock resource - expect(runtime.getAll(resource, {}).values).toEqual([ + expect(service.getAll(resource).payload.values).toEqual([ [0, [[10]]], [1, [[20]]], ]); await timeout(1); // After 1ms values are added to external mock resource - expect(runtime.getAll(resource, {}).values).toEqual([ + expect(service.getAll(resource).payload.values).toEqual([ [0, [[10, 15]]], [1, [[20, 30]]], ]); - runtime.update("input2", [ + service.update("input2", [ [0, [6]], [1, [11]], ]); // New params => No value registered in external mock resource - expect(runtime.getAll(resource, {}).values).toEqual([ + expect(service.getAll(resource).payload.values).toEqual([ [0, [[10]]], [1, [[20]]], ]); await timeout(6); // After 5ms values are added to external mock resource - expect(runtime.getAll(resource, {}).values).toEqual([ + expect(service.getAll(resource).payload.values).toEqual([ [0, [[10, 16]]], [1, [[20, 31]]], ]); @@ -879,15 +886,18 @@ class TokensService implements SkipService { } it("testCloseSession", async () => { - const runtime = await initService(new TokensService()); + const service = await initService(new TokensService()); const resource = "tokens"; - const start = runtime.getOne(resource, {}, "5ms"); + const start = service.getArray(resource, "5ms").payload; await timeout(2); - expect(runtime.getOne(resource, {}, "5ms")).toEqual(start); - await timeout(4); - const current = runtime.getOne(resource, {}, "5ms"); - expect(current == start).toEqual(false); - runtime.closeResource(resource, {}); + try { + expect(service.getArray(resource, "5ms").payload).toEqual(start); + await timeout(4); + const current = service.getArray(resource, "5ms").payload; + expect(current == start).toEqual(false); + } finally { + service.closeResourceInstance(resource, {}); + } }); //// testMultipleResources @@ -932,11 +942,11 @@ class MultipleResourcesService implements SkipService { it("testMultipleResources", async () => { const service = await initService(new MultipleResourcesService()); service.update("input1", [["1", [10]]]); - expect(service.getOne("resource1", {}, "1")).toEqual([10]); + expect(service.getArray("resource1", "1").payload).toEqual([10]); service.update("input2", [["1", [20]]]); - expect(service.getOne("resource2", {}, "1")).toEqual([20]); + expect(service.getArray("resource2", "1").payload).toEqual([20]); service.update("input1", [["1", [30]]]); - expect(service.getOne("resource1", {}, "1")).toEqual([30]); + expect(service.getArray("resource1", "1").payload).toEqual([30]); service.update("input2", [["1", [40]]]); - expect(service.getOne("resource2", {}, "1")).toEqual([40]); + expect(service.getArray("resource2", "1").payload).toEqual([40]); }); diff --git a/skipruntime-ts/examples/departures-client.ts b/skipruntime-ts/examples/departures-client.ts index 0373fb9ef..2f1ce0889 100644 --- a/skipruntime-ts/examples/departures-client.ts +++ b/skipruntime-ts/examples/departures-client.ts @@ -3,6 +3,10 @@ import { run, type Step } from "./utils.js"; function scenarios(): Step[][] { return [ [ + { + type: "log", + payload: { resource: "departures" }, + }, { type: "request", payload: { resource: "departures" }, @@ -34,6 +38,30 @@ function scenarios(): Step[][] { }, ], }, + { + type: "log", + payload: { resource: "departures" }, + }, + { + type: "log", + payload: { resource: "departures" }, + }, + { + type: "log", + payload: { resource: "departures" }, + }, + { + type: "log", + payload: { resource: "departures" }, + }, + { + type: "log", + payload: { resource: "departures" }, + }, + { + type: "log", + payload: { resource: "departures" }, + }, ], ]; } diff --git a/skipruntime-ts/examples/utils.ts b/skipruntime-ts/examples/utils.ts index 04e8f03fd..6ee15834d 100644 --- a/skipruntime-ts/examples/utils.ts +++ b/skipruntime-ts/examples/utils.ts @@ -1,4 +1,4 @@ -import type { TJSON, Entrypoint, Entry, JSONObject } from "@skipruntime/core"; +import type { TJSON, Entrypoint, Entry } from "@skipruntime/core"; import { SkipRESTRuntime } from "@skipruntime/core"; import { createInterface } from "readline"; import { connect, Protocol, Client } from "@skipruntime/client"; @@ -71,6 +71,20 @@ class SkipHttpAccessV1 { return Promise.allSettled(promises); } + async log(resource: string, params: Record, port?: number) { + const publicKey = new Uint8Array( + await Protocol.exportKey(this.creds.publicKey), + ); + const runtime = this.runtimes[port ?? this.defaultPort]; + if (runtime === undefined) throw new Error(`Invalid port ${port}`); + const result = await runtime.getAll(resource, params, publicKey); + console.log( + JSON.stringify(result, (_key: string, value: unknown) => + typeof value === "bigint" ? value.toString() : value, + ), + ); + } + async request( resource: string, params: Record, @@ -100,18 +114,33 @@ class SkipHttpAccessV1 { interface RequestQuery { type: "request"; - payload: JSONObject; + payload: { + resource: string; + params?: Record; + port?: number; + }; } + +interface LogQuery { + type: "log"; + payload: { + resource: string; + params?: Record; + port?: number; + }; +} + interface WriteQuery { type: "write"; payload: Write[]; } + interface DeleteQuery { type: "delete"; payload: Delete[]; } -export type Step = RequestQuery | WriteQuery | DeleteQuery; +export type Step = RequestQuery | LogQuery | WriteQuery | DeleteQuery; class Session { scenario: Step[]; @@ -166,6 +195,7 @@ class Player { private perform: (l: string) => void, private send: (l: Step) => void, private error: (e: string) => void, + private exit: () => void, ) {} start(idx: number) { @@ -266,6 +296,12 @@ class Player { this.stop(); }, ], + [ + /^exit$/g, + () => { + this.exit(); + }, + ], ]; let done = false; for (const pattern of patterns) { @@ -301,7 +337,7 @@ export async function run(scenarios: Step[][], ports: number[] = [3587]) { const jsquery = JSON.parse(query) as { resource: string; params?: Record; - port: number; + port?: number; }; access .request(jsquery.resource, jsquery.params ?? {}, jsquery.port) @@ -311,6 +347,21 @@ export async function run(scenarios: Step[][], ports: number[] = [3587]) { }); }, ], + [ + /^log (.*)$/g, + (query: string) => { + const jsquery = JSON.parse(query) as { + resource: string; + params?: Record; + port?: number; + }; + access + .log(jsquery.resource, jsquery.params ?? {}, jsquery.port) + .catch((e: unknown) => { + console.error(e); + }); + }, + ], [ /^write (.*)$/g, (query: string) => { @@ -359,13 +410,12 @@ export async function run(scenarios: Step[][], ports: number[] = [3587]) { online, (step) => { if (step.type == "request") { - const jsquery = step.payload as { - resource: string; - params?: Record; - port?: number; - }; access - .request(jsquery.resource, jsquery.params ?? {}, jsquery.port) + .request( + step.payload.resource, + step.payload.params ?? {}, + step.payload.port, + ) .then(console.log) .catch((e: unknown) => { console.error(e); @@ -377,6 +427,12 @@ export async function run(scenarios: Step[][], ports: number[] = [3587]) { .catch((e: unknown) => { console.error(e); }); + } else if (step.type == "log") { + access + .log(step.payload.resource, step.payload.params ?? {}) + .catch((e: unknown) => { + console.error(e); + }); } else { access .deleteMany(step.payload) @@ -387,6 +443,7 @@ export async function run(scenarios: Step[][], ports: number[] = [3587]) { } }, console.error, + access.close.bind(access), ); const rl = createInterface({ input: process.stdin, diff --git a/skipruntime-ts/server/src/replication.ts b/skipruntime-ts/server/src/replication.ts index 45a6df626..22b351656 100644 --- a/skipruntime-ts/server/src/replication.ts +++ b/skipruntime-ts/server/src/replication.ts @@ -3,6 +3,7 @@ import { type TJSON, type CollectionUpdate, type Watermark, + type ReactiveResponse, type SubscriptionID, type ServiceInstance, UnknownCollectionError, @@ -18,17 +19,15 @@ class TailingSession { ) {} subscribe( - collection: string, - since: Watermark, + reactiveResponse: ReactiveResponse, callback: (update: CollectionUpdate) => void, ) { const subsession = this.replication.subscribe( - collection, - since, + reactiveResponse, callback, new Uint8Array(this.pubkey), ); - this.subsessions.set(collection, subsession); + this.subsessions.set(reactiveResponse.collection, subsession); } unsubscribe(collection: string) { @@ -81,7 +80,11 @@ function handleMessage( } case "tail": { try { - session.subscribe(msg.collection, msg.since as Watermark, (update) => { + const reactiveResponse = { + collection: msg.collection, + watermark: msg.since as Watermark, + }; + session.subscribe(reactiveResponse, (update) => { ws.send( Protocol.encodeMsg({ type: "data", diff --git a/skipruntime-ts/server/src/rest.ts b/skipruntime-ts/server/src/rest.ts index 111fb3f7f..528ccc660 100644 --- a/skipruntime-ts/server/src/rest.ts +++ b/skipruntime-ts/server/src/rest.ts @@ -1,5 +1,5 @@ import express from "express"; -import type { ServiceInstance, Entry, TJSON } from "@skipruntime/core"; +import type { Entry, TJSON, ServiceInstance, Values } from "@skipruntime/core"; import { UnknownCollectionError, reactiveResponseHeader, @@ -16,7 +16,7 @@ export function createRESTServer(service: ServiceInstance): express.Express { if (!strReactiveAuth) throw new Error("X-Reactive-Auth must be specified."); const reactiveAuth = new Uint8Array(Buffer.from(strReactiveAuth, "base64")); try { - const data = service.createResource( + const data = service.instantiateResource( resourceName, req.query as Record, reactiveAuth, @@ -31,13 +31,28 @@ export function createRESTServer(service: ServiceInstance): express.Express { app.get("/v1/:resource/:key", (req, res) => { const key = req.params.key; const resourceName = req.params.resource; + const strReactiveAuth = req.headers["x-reactive-auth"] as string; + const reactiveAuth = strReactiveAuth + ? new Uint8Array(Buffer.from(strReactiveAuth, "base64")) + : undefined; try { - const data = service.getOne( - resourceName, - req.query as Record, - key, - ); - res.status(200).json(data); + const promise = new Promise(function (resolve, reject) { + service.getArray( + resourceName, + key, + req.query as Record, + reactiveAuth, + { + resolve, + reject, + }, + ); + }); + promise + .then((data) => res.status(200).json(data)) + .catch((e: unknown) => + res.status(500).json(e instanceof Error ? e.message : e), + ); } catch (e: unknown) { res.status(500).json(e instanceof Error ? e.message : e); } @@ -49,16 +64,32 @@ export function createRESTServer(service: ServiceInstance): express.Express { ? new Uint8Array(Buffer.from(strReactiveAuth, "base64")) : undefined; try { - const data = service.getAll( - resourceName, - req.query as Record, - reactiveAuth, - ); - if (data.reactive) { - const [name, value] = reactiveResponseHeader(data.reactive); - res.set(name, value); - } - res.status(200).json(data.values); + const promise = new Promise>(function ( + resolve, + reject, + ) { + service.getAll( + resourceName, + req.query as Record, + reactiveAuth, + { + resolve, + reject, + }, + ); + }); + promise + .then((data) => { + const reactive = data.reactive; + if (reactive) { + const [name, value] = reactiveResponseHeader(reactive); + res.set(name, value); + } + res.status(200).json(data.values); + }) + .catch((e: unknown) => + res.status(500).json(e instanceof Error ? e.message : e), + ); } catch (e: unknown) { res.status(500).json(e instanceof Error ? e.message : e); }