@@ -16,6 +16,12 @@ class ExistingResourceException() extends Exception {
1616 }
1717}
1818
19+ class ExistingSubscriptionException() extends Exception {
20+ fun getMessage(): String {
21+ "A resource instance with specified identifier already subscribed."
22+ }
23+ }
24+
1925class Params(value: Map<String, PValue>) extends SKStore.File uses Orderable {
2026 fun compare(other: Params): Order {
2127 keys = this.value.keys().collect(Array).sorted();
@@ -475,9 +481,7 @@ fun closeService(): Result<void, .Exception> {
475481 resourceHdl.items(context).each(kf -> {
476482 kf.i1.next().each(_f -> todestroy.push(SKStore.SID::keyType(kf.i0)))
477483 });
478- todestroy.each(key -> {
479- resourceHdl.writeArray(context, key, Array[]);
480- });
484+ todestroy.each(key -> destroyReactiveResource(context, key));
481485 context
482486 .getPersistent(kRemoteSpecifiers)
483487 .map(RemoteSpecifiers::type)
@@ -880,19 +884,22 @@ class Collection(
880884
881885 fun subscribe(
882886 context: mutable SKStore.Context,
887+ identifier: String,
883888 session: Int,
884889 from: SKStore.Tick,
885890 notify: (
886891 Array<(SKJSON.CJSON, Array<SKJSON.CJSON>)>,
887892 SKStore.Tick,
888893 Bool,
889894 ) ~> void,
895+ close: () ~> void,
890896 ): void {
891897 keyConverter = this.keyConverter;
892898 fileConverter = this.fileConverter;
893899 context.subscribe(
894900 session,
895901 SKStore.NWatch(
902+ identifier,
896903 from,
897904 (_dirName, values, tick, update) ~> {
898905 notify(
@@ -903,6 +910,7 @@ class Collection(
903910 update,
904911 )
905912 },
913+ close,
906914 true,
907915 ),
908916 None(),
@@ -1241,27 +1249,61 @@ fun getForKey(
12411249 res;
12421250}
12431251
1252+ fun destroyReactiveResource(
1253+ context: mutable SKStore.Context,
1254+ sid: SKStore.SID,
1255+ ): void {
1256+ context
1257+ .getPersistent(`subscription.${sid}`)
1258+ .map(SKStore.IntFile::type) match {
1259+ | Some(subId) ->
1260+ session = subId.value;
1261+ context.sessions.maybeGet(session).each(sub -> {
1262+ close = sub.cmd match {
1263+ | SKStore.NWatch(_, _, _, close, _) -> close
1264+ | _ -> invariant_violation("Not manage session kind")
1265+ };
1266+ context.removePersistent(`subscription.${sid}`);
1267+ close();
1268+ context.unsubscribe(session);
1269+ })
1270+ | _ -> void
1271+ };
1272+ resourceHdl = SKStore.EHandle(
1273+ SKStore.SID::keyType,
1274+ ResourceDef::type,
1275+ kResourceSessionDir,
1276+ );
1277+ resourceHdl.writeArray(context, sid, Array[]);
1278+ }
1279+
12441280fun closeReactiveResource(
12451281 context: mutable SKStore.Context,
12461282 identifier: String,
12471283 update: Bool = true,
12481284): void {
1249- garbageHdl = SKStore.EHandle(
1250- SKStore.SID::keyType,
1251- SKStore.IntFile::type,
1252- kResourceGarbageDir,
1253- );
1254- sid = SKStore.SID(identifier);
1255- if (garbageHdl.maybeGet(context, sid).isSome()) return void;
1256- time = Time.time_ms();
1257- garbageHdl.writeArray(context, sid, Array[SKStore.IntFile(time)]);
1258- if (update) updateContext(context);
1285+ context
1286+ .getPersistent(`subscription.${identifier}`)
1287+ .map(SKStore.IntFile::type) match {
1288+ | Some(subId) -> unsubscribe(context, subId.value, update)
1289+ | _ ->
1290+ garbageHdl = SKStore.EHandle(
1291+ SKStore.SID::keyType,
1292+ SKStore.IntFile::type,
1293+ kResourceGarbageDir,
1294+ );
1295+ sid = SKStore.SID(identifier);
1296+ if (garbageHdl.maybeGet(context, sid).isSome()) return void;
1297+ time = Time.time_ms();
1298+ garbageHdl.writeArray(context, sid, Array[SKStore.IntFile(time)]);
1299+ if (update) updateContext(context)
1300+ };
12591301}
12601302
12611303fun subscribe(
12621304 context: mutable SKStore.Context,
12631305 identifier: String,
1264- notify: (Array<(SKJSON.CJSON, Array<SKJSON.CJSON>)>, String, Bool) ~> void ,
1306+ notifier: Notifier ,
12651307 optWatermark: ?String,
12661308): Int {
12671309 garbageHdl = SKStore.EHandle(
@@ -1282,6 +1324,10 @@ fun subscribe(
12821324 ResourceInfo::type,
12831325 kResourceCollectionsDir,
12841326 );
1327+ subId = `subscription.${identifier}`;
1328+ if (context.getPersistent(subId).isSome()) {
1329+ throw ExistingSubscriptionException()
1330+ };
12851331 resourceHdl.maybeGet(context, sid) match {
12861332 | Some(definition) ->
12871333 info = resourcesCollectionsHdl.get(context, definition);
@@ -1291,21 +1337,38 @@ fun subscribe(
12911337 watermark.stripPrefix(start).toInt()
12921338 | _ -> 0
12931339 };
1340+ notifier.subscribed();
12941341 info.collection.subscribe(
12951342 context,
1343+ identifier,
12961344 session,
12971345 SKStore.Tick(from),
12981346 (values, tick, update) ~> {
1299- notify(values, `${info.session}/${tick}`, update)
1347+ notifier. notify(values, `${info.session}/${tick}`, update)
13001348 },
1349+ notifier.close,
13011350 );
1351+ context.setPersistent(subId, SKStore.IntFile(session));
13021352 session
13031353 | _ -> -1
13041354 }
13051355}
13061356
1307- fun unsubscribe(context: mutable SKStore.Context, session: Int): void {
1308- context.unsubscribe(session)
1357+ fun unsubscribe(
1358+ context: mutable SKStore.Context,
1359+ session: Int,
1360+ update: Bool = true,
1361+ ): void {
1362+ context.sessions.maybeGet(session).each(sub -> {
1363+ (identifier, close) = sub.cmd match {
1364+ | SKStore.NWatch(identifier, _, _, close, _) -> (identifier, close)
1365+ | _ -> invariant_violation("Not manage session kind")
1366+ };
1367+ context.removePersistent(`subscription.${identifier}`);
1368+ close();
1369+ context.unsubscribe(session);
1370+ closeReactiveResource(context, identifier, update);
1371+ });
13091372}
13101373
13111374// WRITES
0 commit comments