diff --git a/src/storages/AbstractSegmentsCacheSync.ts b/src/storages/AbstractMySegmentsCacheSync.ts similarity index 61% rename from src/storages/AbstractSegmentsCacheSync.ts rename to src/storages/AbstractMySegmentsCacheSync.ts index 7e398203..740b9644 100644 --- a/src/storages/AbstractSegmentsCacheSync.ts +++ b/src/storages/AbstractMySegmentsCacheSync.ts @@ -1,5 +1,3 @@ -/* eslint-disable @typescript-eslint/no-unused-vars */ -/* eslint-disable no-unused-vars */ import { IMySegmentsResponse } from '../dtos/types'; import { MySegmentsData } from '../sync/polling/types'; import { ISegmentsCacheSync } from './types'; @@ -8,18 +6,11 @@ import { ISegmentsCacheSync } from './types'; * This class provides a skeletal implementation of the ISegmentsCacheSync interface * to minimize the effort required to implement this interface. */ -export abstract class AbstractSegmentsCacheSync implements ISegmentsCacheSync { - /** - * For server-side synchronizer: add `segmentKeys` list of keys to `name` segment. - * For client-side synchronizer: add `name` segment to the cache. `segmentKeys` is undefined. - */ - abstract addToSegment(name: string, segmentKeys?: string[]): boolean +export abstract class AbstractMySegmentsCacheSync implements ISegmentsCacheSync { - /** - * For server-side synchronizer: remove `segmentKeys` list of keys from `name` segment. - * For client-side synchronizer: remove `name` segment from the cache. `segmentKeys` is undefined. - */ - abstract removeFromSegment(name: string, segmentKeys?: string[]): boolean + protected abstract addSegment(name: string): boolean + protected abstract removeSegment(name: string): boolean + protected abstract setChangeNumber(changeNumber?: number): boolean | void /** * For server-side synchronizer: check if `key` is in `name` segment. @@ -34,11 +25,10 @@ export abstract class AbstractSegmentsCacheSync implements ISegmentsCacheSync { this.resetSegments({}); } - /** - * For server-side synchronizer: add the given list of segments to the cache, with an empty list of keys. The segments that already exist are not modified. - * For client-side synchronizer: the method is not used. - */ - registerSegments(names: string[]): boolean { return false; } + + // No-op. Not used in client-side. + registerSegments(): boolean { return false; } + update() { return false; } /** * For server-side synchronizer: get the list of segments to fetch changes. @@ -52,11 +42,6 @@ export abstract class AbstractSegmentsCacheSync implements ISegmentsCacheSync { */ abstract getKeysCount(): number - /** - * For server-side synchronizer: change number of `name` segment. - * For client-side synchronizer: change number of mySegments. - */ - abstract setChangeNumber(name?: string, changeNumber?: number): boolean | void abstract getChangeNumber(name: string): number /** @@ -64,7 +49,7 @@ export abstract class AbstractSegmentsCacheSync implements ISegmentsCacheSync { * For client-side synchronizer: it resets or updates the cache. */ resetSegments(segmentsData: MySegmentsData | IMySegmentsResponse): boolean { - this.setChangeNumber(undefined, segmentsData.cn); + this.setChangeNumber(segmentsData.cn); const { added, removed } = segmentsData as MySegmentsData; @@ -72,11 +57,11 @@ export abstract class AbstractSegmentsCacheSync implements ISegmentsCacheSync { let isDiff = false; added.forEach(segment => { - isDiff = this.addToSegment(segment) || isDiff; + isDiff = this.addSegment(segment) || isDiff; }); removed.forEach(segment => { - isDiff = this.removeFromSegment(segment) || isDiff; + isDiff = this.removeSegment(segment) || isDiff; }); return isDiff; @@ -97,11 +82,11 @@ export abstract class AbstractSegmentsCacheSync implements ISegmentsCacheSync { // Slowest path => add and/or remove segments for (let removeIndex = index; removeIndex < storedSegmentKeys.length; removeIndex++) { - this.removeFromSegment(storedSegmentKeys[removeIndex]); + this.removeSegment(storedSegmentKeys[removeIndex]); } for (let addIndex = index; addIndex < names.length; addIndex++) { - this.addToSegment(names[addIndex]); + this.addSegment(names[addIndex]); } return true; diff --git a/src/storages/inLocalStorage/MySegmentsCacheInLocal.ts b/src/storages/inLocalStorage/MySegmentsCacheInLocal.ts index 7e01a906..8765db91 100644 --- a/src/storages/inLocalStorage/MySegmentsCacheInLocal.ts +++ b/src/storages/inLocalStorage/MySegmentsCacheInLocal.ts @@ -1,10 +1,10 @@ import { ILogger } from '../../logger/types'; import { isNaNNumber } from '../../utils/lang'; -import { AbstractSegmentsCacheSync } from '../AbstractSegmentsCacheSync'; +import { AbstractMySegmentsCacheSync } from '../AbstractMySegmentsCacheSync'; import type { MySegmentsKeyBuilder } from '../KeyBuilderCS'; import { LOG_PREFIX, DEFINED } from './constants'; -export class MySegmentsCacheInLocal extends AbstractSegmentsCacheSync { +export class MySegmentsCacheInLocal extends AbstractMySegmentsCacheSync { private readonly keys: MySegmentsKeyBuilder; private readonly log: ILogger; @@ -16,7 +16,7 @@ export class MySegmentsCacheInLocal extends AbstractSegmentsCacheSync { // There is not need to flush segments cache like splits cache, since resetSegments receives the up-to-date list of active segments } - addToSegment(name: string): boolean { + protected addSegment(name: string): boolean { const segmentKey = this.keys.buildSegmentNameKey(name); try { @@ -29,7 +29,7 @@ export class MySegmentsCacheInLocal extends AbstractSegmentsCacheSync { } } - removeFromSegment(name: string): boolean { + protected removeSegment(name: string): boolean { const segmentKey = this.keys.buildSegmentNameKey(name); try { @@ -81,7 +81,7 @@ export class MySegmentsCacheInLocal extends AbstractSegmentsCacheSync { return 1; } - setChangeNumber(name?: string, changeNumber?: number) { + protected setChangeNumber(changeNumber?: number) { try { if (changeNumber) localStorage.setItem(this.keys.buildTillKey(), changeNumber + ''); else localStorage.removeItem(this.keys.buildTillKey()); diff --git a/src/storages/inLocalStorage/__tests__/MySegmentsCacheInLocal.spec.ts b/src/storages/inLocalStorage/__tests__/MySegmentsCacheInLocal.spec.ts index aac52cac..813d4e7f 100644 --- a/src/storages/inLocalStorage/__tests__/MySegmentsCacheInLocal.spec.ts +++ b/src/storages/inLocalStorage/__tests__/MySegmentsCacheInLocal.spec.ts @@ -22,7 +22,11 @@ test('SEGMENT CACHE / in LocalStorage', () => { }); caches.forEach(cache => { - cache.removeFromSegment('mocked-segment'); + // @ts-expect-error + cache.resetSegments({ + added: [], + removed: ['mocked-segment'] + }); expect(cache.isInSegment('mocked-segment')).toBe(false); expect(cache.getRegisteredSegments()).toEqual(['mocked-segment-2']); diff --git a/src/storages/inMemory/MySegmentsCacheInMemory.ts b/src/storages/inMemory/MySegmentsCacheInMemory.ts index 1e10c0a6..546a83c3 100644 --- a/src/storages/inMemory/MySegmentsCacheInMemory.ts +++ b/src/storages/inMemory/MySegmentsCacheInMemory.ts @@ -1,15 +1,15 @@ -import { AbstractSegmentsCacheSync } from '../AbstractSegmentsCacheSync'; +import { AbstractMySegmentsCacheSync } from '../AbstractMySegmentsCacheSync'; /** * Default MySegmentsCacheInMemory implementation that stores MySegments in memory. * Supported by all JS runtimes. */ -export class MySegmentsCacheInMemory extends AbstractSegmentsCacheSync { +export class MySegmentsCacheInMemory extends AbstractMySegmentsCacheSync { private segmentCache: Record = {}; private cn?: number; - addToSegment(name: string): boolean { + protected addSegment(name: string): boolean { if (this.segmentCache[name]) return false; this.segmentCache[name] = true; @@ -17,7 +17,7 @@ export class MySegmentsCacheInMemory extends AbstractSegmentsCacheSync { return true; } - removeFromSegment(name: string): boolean { + protected removeSegment(name: string): boolean { if (!this.segmentCache[name]) return false; delete this.segmentCache[name]; @@ -30,7 +30,7 @@ export class MySegmentsCacheInMemory extends AbstractSegmentsCacheSync { } - setChangeNumber(name?: string, changeNumber?: number) { + protected setChangeNumber(changeNumber?: number) { this.cn = changeNumber; } diff --git a/src/storages/inMemory/SegmentsCacheInMemory.ts b/src/storages/inMemory/SegmentsCacheInMemory.ts index a7d52b7c..66cc9c3f 100644 --- a/src/storages/inMemory/SegmentsCacheInMemory.ts +++ b/src/storages/inMemory/SegmentsCacheInMemory.ts @@ -1,36 +1,25 @@ -import { AbstractSegmentsCacheSync } from '../AbstractSegmentsCacheSync'; import { ISet, _Set } from '../../utils/lang/sets'; import { isIntegerNumber } from '../../utils/lang'; +import { ISegmentsCacheSync } from '../types'; /** - * Default ISplitsCacheSync implementation that stores split definitions in memory. - * Supported by all JS runtimes. + * Default ISplitsCacheSync implementation for server-side that stores segments definitions in memory. */ -export class SegmentsCacheInMemory extends AbstractSegmentsCacheSync { +export class SegmentsCacheInMemory implements ISegmentsCacheSync { private segmentCache: Record> = {}; private segmentChangeNumber: Record = {}; - addToSegment(name: string, segmentKeys: string[]): boolean { - const values = this.segmentCache[name]; - const keySet = values ? values : new _Set(); + update(name: string, addedKeys: string[], removedKeys: string[], changeNumber: number) { + const keySet = this.segmentCache[name] || new _Set(); - segmentKeys.forEach(k => keySet.add(k)); + addedKeys.forEach(k => keySet.add(k)); + removedKeys.forEach(k => keySet.delete(k)); this.segmentCache[name] = keySet; + this.segmentChangeNumber[name] = changeNumber; - return true; - } - - removeFromSegment(name: string, segmentKeys: string[]): boolean { - const values = this.segmentCache[name]; - const keySet = values ? values : new _Set(); - - segmentKeys.forEach(k => keySet.delete(k)); - - this.segmentCache[name] = keySet; - - return true; + return addedKeys.length > 0 || removedKeys.length > 0; } isInSegment(name: string, key: string): boolean { @@ -74,16 +63,13 @@ export class SegmentsCacheInMemory extends AbstractSegmentsCacheSync { }, 0); } - setChangeNumber(name: string, changeNumber: number) { - this.segmentChangeNumber[name] = changeNumber; - - return true; - } - getChangeNumber(name: string) { const value = this.segmentChangeNumber[name]; return isIntegerNumber(value) ? value : -1; } + // No-op. Not used in server-side + resetSegments() { return false; } + } diff --git a/src/storages/inMemory/SplitsCacheInMemory.ts b/src/storages/inMemory/SplitsCacheInMemory.ts index 9294cc43..c1aa951b 100644 --- a/src/storages/inMemory/SplitsCacheInMemory.ts +++ b/src/storages/inMemory/SplitsCacheInMemory.ts @@ -5,7 +5,6 @@ import { ISet, _Set } from '../../utils/lang/sets'; /** * Default ISplitsCacheSync implementation that stores split definitions in memory. - * Supported by all JS runtimes. */ export class SplitsCacheInMemory extends AbstractSplitsCacheSync { diff --git a/src/storages/inMemory/__tests__/SegmentsCacheInMemory.spec.ts b/src/storages/inMemory/__tests__/SegmentsCacheInMemory.spec.ts index e6713376..5ee2683c 100644 --- a/src/storages/inMemory/__tests__/SegmentsCacheInMemory.spec.ts +++ b/src/storages/inMemory/__tests__/SegmentsCacheInMemory.spec.ts @@ -2,24 +2,18 @@ import { SegmentsCacheInMemory } from '../SegmentsCacheInMemory'; describe('SEGMENTS CACHE IN MEMORY', () => { - test('isInSegment, set/getChangeNumber, add/removeFromSegment, getKeysCount', () => { + test('isInSegment, getChangeNumber, update, getKeysCount', () => { const cache = new SegmentsCacheInMemory(); - cache.addToSegment('mocked-segment', [ - 'a', 'b', 'c' - ]); - - cache.setChangeNumber('mocked-segment', 1); - - cache.removeFromSegment('mocked-segment', ['d']); + cache.update('mocked-segment', [ 'a', 'b', 'c'], [], 1); + cache.update('mocked-segment', [], ['d'], 1); expect(cache.getChangeNumber('mocked-segment') === 1).toBe(true); - cache.addToSegment('mocked-segment', ['d', 'e']); + cache.update('mocked-segment', [ 'd', 'e'], [], 2); + cache.update('mocked-segment', [], ['a', 'c'], 2); - cache.removeFromSegment('mocked-segment', ['a', 'c']); - - expect(cache.getChangeNumber('mocked-segment') === 1).toBe(true); + expect(cache.getChangeNumber('mocked-segment') === 2).toBe(true); expect(cache.isInSegment('mocked-segment', 'a')).toBe(false); expect(cache.isInSegment('mocked-segment', 'b')).toBe(true); // b @@ -29,7 +23,7 @@ describe('SEGMENTS CACHE IN MEMORY', () => { // getKeysCount expect(cache.getKeysCount()).toBe(3); - cache.addToSegment('mocked-segment-2', ['a', 'b', 'c', 'd', 'e']); + cache.update('mocked-segment-2', ['a', 'b', 'c', 'd', 'e'], [], 2); expect(cache.getKeysCount()).toBe(8); cache.clear(); expect(cache.getKeysCount()).toBe(0); diff --git a/src/storages/inRedis/SegmentsCacheInRedis.ts b/src/storages/inRedis/SegmentsCacheInRedis.ts index 7ec2f20f..42ed3b10 100644 --- a/src/storages/inRedis/SegmentsCacheInRedis.ts +++ b/src/storages/inRedis/SegmentsCacheInRedis.ts @@ -17,24 +17,21 @@ export class SegmentsCacheInRedis implements ISegmentsCacheAsync { this.keys = keys; } - addToSegment(name: string, segmentKeys: string[]) { + /** + * Update the given segment `name` with the lists of `addedKeys`, `removedKeys` and `changeNumber`. + * The returned promise is resolved if the operation success, with `true` if the segment was updated (i.e., some key was added or removed), + * or rejected if it fails (e.g., Redis operation fails). + */ + update(name: string, addedKeys: string[], removedKeys: string[], changeNumber: number) { const segmentKey = this.keys.buildSegmentNameKey(name); - if (segmentKeys.length) { - return this.redis.sadd(segmentKey, segmentKeys).then(() => true); - } else { - return Promise.resolve(true); - } - } - - removeFromSegment(name: string, segmentKeys: string[]) { - const segmentKey = this.keys.buildSegmentNameKey(name); - - if (segmentKeys.length) { - return this.redis.srem(segmentKey, segmentKeys).then(() => true); - } else { - return Promise.resolve(true); - } + return Promise.all([ + addedKeys.length && this.redis.sadd(segmentKey, addedKeys), + removedKeys.length && this.redis.srem(segmentKey, removedKeys), + this.redis.set(this.keys.buildSegmentTillKey(name), changeNumber + '') + ]).then(() => { + return addedKeys.length > 0 || removedKeys.length > 0; + }); } isInSegment(name: string, key: string) { @@ -43,12 +40,6 @@ export class SegmentsCacheInRedis implements ISegmentsCacheAsync { ).then(matches => matches !== 0); } - setChangeNumber(name: string, changeNumber: number) { - return this.redis.set( - this.keys.buildSegmentTillKey(name), changeNumber + '' - ).then(status => status === 'OK'); - } - getChangeNumber(name: string) { return this.redis.get(this.keys.buildSegmentTillKey(name)).then((value: string | null) => { const i = parseInt(value as string, 10); diff --git a/src/storages/inRedis/__tests__/SegmentsCacheInRedis.spec.ts b/src/storages/inRedis/__tests__/SegmentsCacheInRedis.spec.ts index 6222af95..62799bab 100644 --- a/src/storages/inRedis/__tests__/SegmentsCacheInRedis.spec.ts +++ b/src/storages/inRedis/__tests__/SegmentsCacheInRedis.spec.ts @@ -9,25 +9,21 @@ const keys = new KeyBuilderSS(prefix, metadata); describe('SEGMENTS CACHE IN REDIS', () => { - test('isInSegment, set/getChangeNumber, add/removeFromSegment', async () => { + test('isInSegment, getChangeNumber, update', async () => { const connection = new RedisAdapter(loggerMock); const cache = new SegmentsCacheInRedis(loggerMock, keys, connection); - await cache.addToSegment('mocked-segment', ['a', 'b', 'c']); - - await cache.setChangeNumber('mocked-segment', 1); - - await cache.removeFromSegment('mocked-segment', ['d']); + await cache.update('mocked-segment', ['a', 'b', 'c'], ['d'], 1); expect(await cache.getChangeNumber('mocked-segment') === 1).toBe(true); expect(await cache.getChangeNumber('inexistent-segment')).toBe(-1); // -1 if the segment doesn't exist - await cache.addToSegment('mocked-segment', ['d', 'e']); + await cache.update('mocked-segment', ['d', 'e'], [], 2); - await cache.removeFromSegment('mocked-segment', ['a', 'c']); + await cache.update('mocked-segment', [], ['a', 'c'], 2); - expect(await cache.getChangeNumber('mocked-segment') === 1).toBe(true); + expect(await cache.getChangeNumber('mocked-segment') === 2).toBe(true); expect(await cache.isInSegment('mocked-segment', 'a')).toBe(false); expect(await cache.isInSegment('mocked-segment', 'b')).toBe(true); diff --git a/src/storages/pluggable/SegmentsCachePluggable.ts b/src/storages/pluggable/SegmentsCachePluggable.ts index 995c66df..05173938 100644 --- a/src/storages/pluggable/SegmentsCachePluggable.ts +++ b/src/storages/pluggable/SegmentsCachePluggable.ts @@ -23,33 +23,20 @@ export class SegmentsCachePluggable implements ISegmentsCacheAsync { } /** - * Add a list of `segmentKeys` to the given segment `name`. - * The returned promise is resolved when the operation success - * or rejected if wrapper operation fails. - */ - addToSegment(name: string, segmentKeys: string[]) { - const segmentKey = this.keys.buildSegmentNameKey(name); - - if (segmentKeys.length) { - return this.wrapper.addItems(segmentKey, segmentKeys); - } else { - return Promise.resolve(); - } - } - - /** - * Remove a list of `segmentKeys` from the given segment `name`. - * The returned promise is resolved when the operation success - * or rejected if wrapper operation fails. + * Update the given segment `name` with the lists of `addedKeys`, `removedKeys` and `changeNumber`. + * The returned promise is resolved if the operation success, with `true` if the segment was updated (i.e., some key was added or removed), + * or rejected if it fails (e.g., wrapper operation fails). */ - removeFromSegment(name: string, segmentKeys: string[]) { + update(name: string, addedKeys: string[], removedKeys: string[], changeNumber: number) { const segmentKey = this.keys.buildSegmentNameKey(name); - if (segmentKeys.length) { - return this.wrapper.removeItems(segmentKey, segmentKeys); - } else { - return Promise.resolve(); - } + return Promise.all([ + addedKeys.length && this.wrapper.addItems(segmentKey, addedKeys), + removedKeys.length && this.wrapper.removeItems(segmentKey, removedKeys), + this.wrapper.set(this.keys.buildSegmentTillKey(name), changeNumber + '') + ]).then(() => { + return addedKeys.length > 0 || removedKeys.length > 0; + }); } /** @@ -60,17 +47,6 @@ export class SegmentsCachePluggable implements ISegmentsCacheAsync { return this.wrapper.itemContains(this.keys.buildSegmentNameKey(name), key); } - /** - * Set till number for the given segment `name`. - * The returned promise is resolved when the operation success, - * or rejected if it fails (e.g., wrapper operation fails). - */ - setChangeNumber(name: string, changeNumber: number) { - return this.wrapper.set( - this.keys.buildSegmentTillKey(name), changeNumber + '' - ); - } - /** * Get till number or -1 if it's not defined. * The returned promise is resolved with the changeNumber or -1 if it doesn't exist or a wrapper operation fails. diff --git a/src/storages/pluggable/__tests__/SegmentsCachePluggable.spec.ts b/src/storages/pluggable/__tests__/SegmentsCachePluggable.spec.ts index 7fa1c537..eedb8f11 100644 --- a/src/storages/pluggable/__tests__/SegmentsCachePluggable.spec.ts +++ b/src/storages/pluggable/__tests__/SegmentsCachePluggable.spec.ts @@ -13,24 +13,20 @@ describe('SEGMENTS CACHE PLUGGABLE', () => { wrapperMock.mockClear(); }); - test('isInSegment, set/getChangeNumber, add/removeFromSegment', async () => { + test('isInSegment, getChangeNumber, update', async () => { const cache = new SegmentsCachePluggable(loggerMock, keyBuilder, wrapperMock); - await cache.addToSegment('mocked-segment', ['a', 'b', 'c']); - - await cache.setChangeNumber('mocked-segment', 1); - - await cache.removeFromSegment('mocked-segment', ['d']); + await cache.update('mocked-segment', ['a', 'b', 'c'], ['d'], 1); expect(await cache.getChangeNumber('mocked-segment') === 1).toBe(true); expect(await cache.getChangeNumber('inexistent-segment')).toBe(-1); // -1 if the segment doesn't exist - await cache.addToSegment('mocked-segment', ['d', 'e']); + await cache.update('mocked-segment', ['d', 'e'], [], 2); - await cache.removeFromSegment('mocked-segment', ['a', 'c']); + await cache.update('mocked-segment', [], ['a', 'c'], 2); - expect(await cache.getChangeNumber('mocked-segment') === 1).toBe(true); + expect(await cache.getChangeNumber('mocked-segment') === 2).toBe(true); expect(await cache.isInSegment('mocked-segment', 'a')).toBe(false); expect(await cache.isInSegment('mocked-segment', 'b')).toBe(true); diff --git a/src/storages/types.ts b/src/storages/types.ts index b3b1076c..1daa81fb 100644 --- a/src/storages/types.ts +++ b/src/storages/types.ts @@ -251,38 +251,32 @@ export interface ISplitsCacheAsync extends ISplitsCacheBase { /** Segments cache */ export interface ISegmentsCacheBase { - addToSegment(name: string, segmentKeys: string[]): MaybeThenable // different signature on Server and Client-Side - removeFromSegment(name: string, segmentKeys: string[]): MaybeThenable // different signature on Server and Client-Side isInSegment(name: string, key?: string): MaybeThenable // different signature on Server and Client-Side registerSegments(names: string[]): MaybeThenable // only for Server-Side getRegisteredSegments(): MaybeThenable // only for Server-Side - setChangeNumber(name: string, changeNumber: number): MaybeThenable // only for Server-Side getChangeNumber(name: string): MaybeThenable // only for Server-Side + update(name: string, addedKeys: string[], removedKeys: string[], changeNumber: number): MaybeThenable // only for Server-Side clear(): MaybeThenable } // Same API for both variants: SegmentsCache and MySegmentsCache (client-side API) export interface ISegmentsCacheSync extends ISegmentsCacheBase { - addToSegment(name: string, segmentKeys?: string[]): boolean - removeFromSegment(name: string, segmentKeys?: string[]): boolean isInSegment(name: string, key?: string): boolean registerSegments(names: string[]): boolean getRegisteredSegments(): string[] getKeysCount(): number // only used for telemetry - setChangeNumber(name: string, changeNumber: number): boolean | void getChangeNumber(name?: string): number + update(name: string, addedKeys: string[], removedKeys: string[], changeNumber: number): boolean // only for Server-Side resetSegments(segmentsData: MySegmentsData | IMySegmentsResponse): boolean // only for Sync Client-Side clear(): void } export interface ISegmentsCacheAsync extends ISegmentsCacheBase { - addToSegment(name: string, segmentKeys: string[]): Promise - removeFromSegment(name: string, segmentKeys: string[]): Promise isInSegment(name: string, key: string): Promise registerSegments(names: string[]): Promise getRegisteredSegments(): Promise - setChangeNumber(name: string, changeNumber: number): Promise getChangeNumber(name: string): Promise + update(name: string, addedKeys: string[], removedKeys: string[], changeNumber: number): Promise clear(): Promise } diff --git a/src/sync/polling/updaters/segmentChangesUpdater.ts b/src/sync/polling/updaters/segmentChangesUpdater.ts index 39d147ff..a643826b 100644 --- a/src/sync/polling/updaters/segmentChangesUpdater.ts +++ b/src/sync/polling/updaters/segmentChangesUpdater.ts @@ -1,12 +1,9 @@ import { ISegmentChangesFetcher } from '../fetchers/types'; import { ISegmentsCacheBase } from '../../../storages/types'; import { IReadinessManager } from '../../../readiness/types'; -import { MaybeThenable } from '../../../dtos/types'; -import { findIndex } from '../../../utils/lang'; import { SDK_SEGMENTS_ARRIVED } from '../../../readiness/constants'; import { ILogger } from '../../../logger/types'; import { LOG_PREFIX_INSTANTIATION, LOG_PREFIX_SYNC_SEGMENTS } from '../../../logger/constants'; -import { thenable } from '../../../utils/promise/thenable'; type ISegmentChangesUpdater = (fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number) => Promise @@ -30,31 +27,22 @@ export function segmentChangesUpdaterFactory( let readyOnAlreadyExistentState = true; - function updateSegment(segmentName: string, noCache?: boolean, till?: number, fetchOnlyNew?: boolean) { + function updateSegment(segmentName: string, noCache?: boolean, till?: number, fetchOnlyNew?: boolean): Promise { log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Processing segment ${segmentName}`); let sincePromise = Promise.resolve(segments.getChangeNumber(segmentName)); return sincePromise.then(since => { // if fetchOnlyNew flag, avoid processing already fetched segments - if (fetchOnlyNew && since !== -1) return -1; - - return segmentChangesFetcher(since, segmentName, noCache, till).then(function (changes) { - let changeNumber = -1; - const results: MaybeThenable[] = []; - changes.forEach(x => { - if (x.added.length > 0) results.push(segments.addToSegment(segmentName, x.added)); - if (x.removed.length > 0) results.push(segments.removeFromSegment(segmentName, x.removed)); - if (x.added.length > 0 || x.removed.length > 0) { - results.push(segments.setChangeNumber(segmentName, x.till)); - changeNumber = x.till; - } - - log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Processed ${segmentName} with till = ${x.till}. Added: ${x.added.length}. Removed: ${x.removed.length}`); + return fetchOnlyNew && since !== -1 ? + false : + segmentChangesFetcher(since, segmentName, noCache, till).then((changes) => { + return Promise.all(changes.map(x => { + log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Processing ${segmentName} with till = ${x.till}. Added: ${x.added.length}. Removed: ${x.removed.length}`); + return segments.update(x.name, x.added, x.removed, x.till); + })).then((updates) => { + return updates.some(update => update); + }); }); - // If at least one storage operation result is a promise, join all in a single promise. - if (results.some(result => thenable(result))) return Promise.all(results).then(() => changeNumber); - return changeNumber; - }); }); } /** @@ -75,16 +63,12 @@ export function segmentChangesUpdaterFactory( let segmentsPromise = Promise.resolve(segmentName ? [segmentName] : segments.getRegisteredSegments()); return segmentsPromise.then(segmentNames => { - // Async fetchers are collected here. - const updaters: Promise[] = []; - - for (let index = 0; index < segmentNames.length; index++) { - updaters.push(updateSegment(segmentNames[index], noCache, till, fetchOnlyNew)); - } + // Async fetchers + const updaters = segmentNames.map(segmentName => updateSegment(segmentName, noCache, till, fetchOnlyNew)); return Promise.all(updaters).then(shouldUpdateFlags => { // if at least one segment fetch succeeded, mark segments ready - if (findIndex(shouldUpdateFlags, v => v !== -1) !== -1 || readyOnAlreadyExistentState) { + if (shouldUpdateFlags.some(update => update) || readyOnAlreadyExistentState) { readyOnAlreadyExistentState = false; if (readiness) readiness.segments.emit(SDK_SEGMENTS_ARRIVED); } diff --git a/src/sync/streaming/UpdateWorkers/__tests__/SegmentsUpdateWorker.spec.ts b/src/sync/streaming/UpdateWorkers/__tests__/SegmentsUpdateWorker.spec.ts index 5f4068f2..31663fb5 100644 --- a/src/sync/streaming/UpdateWorkers/__tests__/SegmentsUpdateWorker.spec.ts +++ b/src/sync/streaming/UpdateWorkers/__tests__/SegmentsUpdateWorker.spec.ts @@ -12,7 +12,7 @@ function segmentsSyncTaskMock(segmentsStorage: SegmentsCacheInMemory, changeNumb function __resolveSegmentsUpdaterCall(changeNumber: Record) { Object.keys(changeNumber).forEach(segmentName => { - segmentsStorage.setChangeNumber(segmentName, changeNumber[segmentName]); // update changeNumber in storage + segmentsStorage.update(segmentName, [], [], changeNumber[segmentName]); // update changeNumber in storage }); if (__segmentsUpdaterCalls.length) __segmentsUpdaterCalls.shift().res(); // resolve `execute` call else changeNumbers.push(changeNumber);