Skip to content

Commit e94057e

Browse files
committed
Use versioned bucket ids
1 parent 161e469 commit e94057e

File tree

13 files changed

+142
-23
lines changed

13 files changed

+142
-23
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ const replicationMutex = new utils.Mutex();
4545
export interface MongoBucketBatchOptions {
4646
db: PowerSyncMongo;
4747
syncRules: SqlSyncRules;
48+
syncRulesId: string;
4849
groupId: number;
4950
slotName: string;
5051
lastCheckpointLsn: string | null;
@@ -72,6 +73,7 @@ export class MongoBucketBatch
7273
public readonly db: PowerSyncMongo;
7374
public readonly session: mongo.ClientSession;
7475
private readonly sync_rules: SqlSyncRules;
76+
private readonly syncRulesId: string;
7577

7678
private readonly group_id: number;
7779

@@ -127,6 +129,7 @@ export class MongoBucketBatch
127129
this.session = this.client.startSession();
128130
this.slot_name = options.slotName;
129131
this.sync_rules = options.syncRules;
132+
this.syncRulesId = options.syncRulesId;
130133
this.storeCurrentData = options.storeCurrentData;
131134
this.skipExistingRows = options.skipExistingRows;
132135
this.markRecordUnavailable = options.markRecordUnavailable;
@@ -461,7 +464,8 @@ export class MongoBucketBatch
461464
if (sourceTable.syncData) {
462465
const { results: evaluated, errors: syncErrors } = this.sync_rules.evaluateRowWithErrors({
463466
record: after,
464-
sourceTable
467+
sourceTable,
468+
bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer(this.syncRulesId)
465469
});
466470

467471
for (let error of syncErrors) {

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ export class MongoSyncBucketStorage
154154
logger: options.logger,
155155
db: this.db,
156156
syncRules: this.sync_rules.parsed(options).sync_rules,
157+
syncRulesId: `${this.sync_rules.id}`,
157158
groupId: this.group_id,
158159
slotName: this.slot_name,
159160
lastCheckpointLsn: checkpoint_lsn,

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ export class PostgresSyncRulesStorage
330330
logger: options.logger ?? framework.logger,
331331
db: this.db,
332332
sync_rules: this.sync_rules.parsed(options).sync_rules,
333+
syncRulesId: `${this.sync_rules.id}`,
333334
group_id: this.group_id,
334335
slot_name: this.slot_name,
335336
last_checkpoint_lsn: checkpoint_lsn,

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export interface PostgresBucketBatchOptions {
2525
logger: Logger;
2626
db: lib_postgres.DatabaseClient;
2727
sync_rules: sync_rules.SqlSyncRules;
28+
syncRulesId: string;
2829
group_id: number;
2930
slot_name: string;
3031
last_checkpoint_lsn: string | null;
@@ -73,6 +74,7 @@ export class PostgresBucketBatch
7374

7475
protected write_checkpoint_batch: storage.CustomWriteCheckpointOptions[];
7576
protected readonly sync_rules: sync_rules.SqlSyncRules;
77+
private readonly syncRulesId: string;
7678
protected batch: OperationBatch | null;
7779
private lastWaitingLogThrottled = 0;
7880
private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;
@@ -88,6 +90,7 @@ export class PostgresBucketBatch
8890
this.resumeFromLsn = options.resumeFromLsn;
8991
this.write_checkpoint_batch = [];
9092
this.sync_rules = options.sync_rules;
93+
this.syncRulesId = options.syncRulesId;
9194
this.markRecordUnavailable = options.markRecordUnavailable;
9295
this.batch = null;
9396
this.persisted_op = null;
@@ -825,7 +828,8 @@ export class PostgresBucketBatch
825828
if (sourceTable.syncData) {
826829
const { results: evaluated, errors: syncErrors } = this.sync_rules.evaluateRowWithErrors({
827830
record: after,
828-
sourceTable
831+
sourceTable,
832+
bucketIdTransformer: sync_rules.SqlSyncRules.versionedBucketIdTransformer(this.syncRulesId)
829833
});
830834

831835
for (const error of syncErrors) {

packages/sync-rules/src/BaseSqlDataQuery.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { SourceTableInterface } from './SourceTableInterface.js';
55
import { SqlTools } from './sql_filters.js';
66
import { TablePattern } from './TablePattern.js';
77
import {
8+
BucketIdTransformer,
89
EvaluationResult,
910
QueryParameters,
1011
QuerySchema,
@@ -25,6 +26,7 @@ export interface EvaluateRowOptions {
2526
table: SourceTableInterface;
2627
row: SqliteRow;
2728
bucketIds: (params: QueryParameters) => string[];
29+
bucketIdTransformer: BucketIdTransformer | null;
2830
}
2931

3032
export interface BaseSqlDataQueryOptions {
@@ -177,7 +179,7 @@ export class BaseSqlDataQuery {
177179

178180
evaluateRowWithOptions(options: EvaluateRowOptions): EvaluationResult[] {
179181
try {
180-
const { table, row, bucketIds } = options;
182+
const { table, row, bucketIds, bucketIdTransformer } = options;
181183

182184
const tables = { [this.table]: this.addSpecialParameters(table, row) };
183185
const resolvedBucketIds = bucketIds(tables);
@@ -197,7 +199,7 @@ export class BaseSqlDataQuery {
197199

198200
return resolvedBucketIds.map((bucketId) => {
199201
return {
200-
bucket: bucketId,
202+
bucket: bucketIdTransformer ? bucketIdTransformer(bucketId) : bucketId,
201203
table: outputTable,
202204
id: id,
203205
data

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,17 @@ export class SqlBucketDescriptor implements BucketSource {
104104
continue;
105105
}
106106

107-
results.push(...query.evaluateRow(options.sourceTable, applyRowContext(options.record, this.compatibility)));
107+
const bucketIdTransformer = this.compatibility.isFixed(Quirk.versionedBucketIds)
108+
? options.bucketIdTransformer
109+
: null;
110+
111+
results.push(
112+
...query.evaluateRow(
113+
options.sourceTable,
114+
applyRowContext(options.record, this.compatibility),
115+
bucketIdTransformer
116+
)
117+
);
108118
}
109119
return results;
110120
}

packages/sync-rules/src/SqlDataQuery.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { checkUnsupportedFeatures, isClauseError } from './sql_support.js';
1010
import { SyncRulesOptions } from './SqlSyncRules.js';
1111
import { TablePattern } from './TablePattern.js';
1212
import { TableQuerySchema } from './TableQuerySchema.js';
13-
import { EvaluationResult, ParameterMatchClause, QuerySchema, SqliteRow } from './types.js';
13+
import { BucketIdTransformer, EvaluationResult, ParameterMatchClause, QuerySchema, SqliteRow } from './types.js';
1414
import { getBucketId, isSelectStatement } from './utils.js';
1515

1616
export interface SqlDataQueryOptions extends BaseSqlDataQueryOptions {
@@ -185,10 +185,15 @@ export class SqlDataQuery extends BaseSqlDataQuery {
185185
this.filter = options.filter;
186186
}
187187

188-
evaluateRow(table: SourceTableInterface, row: SqliteRow): EvaluationResult[] {
188+
evaluateRow(
189+
table: SourceTableInterface,
190+
row: SqliteRow,
191+
bucketIdTransformer: BucketIdTransformer | null
192+
): EvaluationResult[] {
189193
return this.evaluateRowWithOptions({
190194
table,
191195
row,
196+
bucketIdTransformer,
192197
bucketIds: (tables) => {
193198
const bucketParameters = this.filter.filterRow(tables);
194199
return bucketParameters.map((params) => getBucketId(this.descriptorName, this.bucketParameters, params));

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,4 +494,8 @@ export class SqlSyncRules implements SyncRules {
494494
}
495495
}
496496
}
497+
498+
static versionedBucketIdTransformer(version: string) {
499+
return (bucketId: string) => `${version}#${bucketId}`;
500+
}
497501
}

packages/sync-rules/src/quirks.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,15 @@ export class Quirk {
2626
CompatibilityLevel.SYNC_STREAMS
2727
);
2828

29+
static versionedBucketIds = new Quirk(
30+
'versioned_bucket_ids',
31+
'Old versions of the sync service did not update bucket ids when sync rules change. This can lead to a resync on clients taking longer than necessary, and break client-side progress reports.',
32+
CompatibilityLevel.SYNC_STREAMS
33+
);
34+
2935
static byName: Record<string, Quirk> = (() => {
3036
const byName: Record<string, Quirk> = {};
31-
for (const entry of [this.nonIso8601Timestamps]) {
37+
for (const entry of [this.nonIso8601Timestamps, this.versionedBucketIds]) {
3238
byName[entry.name] = entry;
3339
}
3440

packages/sync-rules/src/types.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,24 @@ export interface InputParameter {
283283
parametersToLookupValue(parameters: ParameterValueSet): SqliteValue;
284284
}
285285

286-
export interface EvaluateRowOptions extends TableRow<SqliteInputRow> {}
286+
/**
287+
* Transforms bucket ids generated when evaluating the row by e.g. encoding version information.
288+
*
289+
* Because buckets are recreated on a sync rule redeploy, it makes sense to use different bucket ids (otherwise, clients
290+
* may run into checksum errors causing a sync to take longer than necessary or breaking progress).
291+
*
292+
* So, this transformer receives the original bucket id as generated by defined sync rules, and can prepend a version
293+
* identifier.
294+
*
295+
* Note that this transformation has not been present in older versions of the sync service. To preserve backwards
296+
* compatibility, data queries will not invoke this function by default when creating bucket ids. Using this function
297+
* requires an opt-in.
298+
*/
299+
export type BucketIdTransformer = (regularId: string) => string;
300+
301+
export interface EvaluateRowOptions extends TableRow<SqliteInputRow> {
302+
bucketIdTransformer: BucketIdTransformer;
303+
}
287304

288305
/**
289306
* A row associated with the table it's coming from.

0 commit comments

Comments
 (0)