From 16b020ca6f2928df4750344283840dfd9c45f5e1 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 25 Jun 2025 13:22:04 +0100 Subject: [PATCH 1/7] checkpoint --- packages/db/src/collection.ts | 290 +++++++++++++++--- packages/db/src/transactions.ts | 7 +- .../collection-subscribe-changes.test.ts | 99 +++--- 3 files changed, 298 insertions(+), 98 deletions(-) diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index 2d6e66ad5..5101c9e2e 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -159,10 +159,14 @@ export class CollectionImpl< // This is populated by createCollection public utils: Record = {} - private pendingSyncedTransactions: Array> = [] + public pendingSyncedTransactions: Array> = [] private syncedKeys = new Set() + private preSyncVisibleState = new Map() + private recentlySyncedKeys = new Set() public config: CollectionConfig private hasReceivedFirstCommit = false + private isCommittingSyncTransactions = false + // Array to store one-time commit listeners private onFirstCommitCallbacks: Array<() => void> = [] @@ -280,6 +284,13 @@ export class CollectionImpl< * Recompute optimistic state from active transactions */ private recomputeOptimisticState(): void { + // Skip redundant recalculations when we're in the middle of committing sync transactions + if (this.isCommittingSyncTransactions) { + return + } + + + const previousState = new Map(this.derivedUpserts) const previousDeletes = new Set(this.derivedDeletes) @@ -287,7 +298,7 @@ export class CollectionImpl< this.derivedUpserts.clear() this.derivedDeletes.clear() - // Apply active transactions + // Apply active transactions only (completed transactions are handled by sync operations) const activeTransactions = Array.from(this.transactions.values()) for (const transaction of activeTransactions) { if (![`completed`, `failed`].includes(transaction.state)) { @@ -315,9 +326,65 @@ export class CollectionImpl< // Collect events for changes const events: Array> = [] this.collectOptimisticChanges(previousState, previousDeletes, events) + + // Filter out events for recently synced keys to prevent duplicates + const filteredEventsBySyncStatus = events.filter(event => !this.recentlySyncedKeys.has(event.key)) + + // Filter out redundant delete events if there are pending sync transactions + // that will immediately restore the same data, but only for completed transactions + if (this.pendingSyncedTransactions.length > 0) { + const pendingSyncKeys = new Set() + const completedTransactionMutations = new Set() + + // Collect keys from pending sync operations + for (const transaction of this.pendingSyncedTransactions) { + for (const operation of transaction.operations) { + pendingSyncKeys.add(operation.key as TKey) + } + } + + // Collect mutation IDs from completed transactions + const completedTransactions = Array.from( + this.transactions.values() + ).filter((tx) => tx.state === `completed`) + for (const tx of completedTransactions) { + for (const mutation of tx.mutations) { + if (mutation.collection === this) { + completedTransactionMutations.add(mutation.mutationId) + } + } + } + + // Only filter out delete events for keys that: + // 1. Have pending sync operations AND + // 2. Are from completed transactions (being cleaned up) + const filteredEvents = filteredEventsBySyncStatus.filter((event) => { + if (event.type === `delete` && pendingSyncKeys.has(event.key)) { + // Check if this delete is from clearing optimistic state of completed transactions + // We can infer this by checking if we have no remaining optimistic mutations for this key + const hasActiveOptimisticMutation = Array.from( + this.transactions.values() + ).some( + (tx) => + tx.state !== `completed` && + tx.state !== `failed` && + tx.mutations.some( + (m) => m.collection === this && m.key === event.key + ) + ) + + if (!hasActiveOptimisticMutation) { + return false // Skip this delete event as sync will restore the data + } + } + return true + }) - // Emit all events at once - this.emitEvents(events) + this.emitEvents(filteredEvents) + } else { + // Emit all events if no pending sync transactions + this.emitEvents(filteredEventsBySyncStatus) + } } /** @@ -531,13 +598,26 @@ export class CollectionImpl< ({ state }) => state === `persisting` ) ) { + // Set flag to prevent redundant optimistic state recalculations + this.isCommittingSyncTransactions = true + + // First collect all keys that will be affected by sync operations const changedKeys = new Set() + for (const transaction of this.pendingSyncedTransactions) { + for (const operation of transaction.operations) { + changedKeys.add(operation.key as TKey) + } + } + + // Use the pre-captured visible state (captured before optimistic state was cleared) + // This ensures we correctly detect existing vs new rows + const currentVisibleState = this.preSyncVisibleState + const events: Array> = [] for (const transaction of this.pendingSyncedTransactions) { for (const operation of transaction.operations) { const key = operation.key as TKey - changedKeys.add(key) this.syncedKeys.add(key) // Update metadata @@ -560,22 +640,10 @@ export class CollectionImpl< break } - // Update synced data and collect events - const previousValue = this.syncedData.get(key) - + // Update synced data switch (operation.type) { case `insert`: this.syncedData.set(key, operation.value) - if ( - !this.derivedDeletes.has(key) && - !this.derivedUpserts.has(key) - ) { - events.push({ - type: `insert`, - key, - value: operation.value, - }) - } break case `update`: { const updatedValue = Object.assign( @@ -584,38 +652,108 @@ export class CollectionImpl< operation.value ) this.syncedData.set(key, updatedValue) - if ( - !this.derivedDeletes.has(key) && - !this.derivedUpserts.has(key) - ) { - events.push({ - type: `update`, - key, - value: updatedValue, - previousValue, - }) - } break } case `delete`: this.syncedData.delete(key) - if ( - !this.derivedDeletes.has(key) && - !this.derivedUpserts.has(key) - ) { - if (previousValue) { - events.push({ - type: `delete`, - key, - value: previousValue, - }) - } - } break } } } + // Clear optimistic state since sync operations will now provide the authoritative data + this.derivedUpserts.clear() + this.derivedDeletes.clear() + + + + // Reset flag and recompute optimistic state for any remaining active transactions + this.isCommittingSyncTransactions = false + const activeTransactions = Array.from(this.transactions.values()) + for (const transaction of activeTransactions) { + if (![`completed`, `failed`].includes(transaction.state)) { + for (const mutation of transaction.mutations) { + if (mutation.collection === this) { + switch (mutation.type) { + case `insert`: + case `update`: + this.derivedUpserts.set(mutation.key, mutation.modified as T) + this.derivedDeletes.delete(mutation.key) + break + case `delete`: + this.derivedUpserts.delete(mutation.key) + this.derivedDeletes.add(mutation.key) + break + } + } + } + } + } + + // Check for redundant sync operations that match completed optimistic operations + const completedOptimisticOps = new Map() + const completedTransactions = Array.from(this.transactions.values()).filter( + tx => tx.state === 'completed' + ) + + for (const tx of completedTransactions) { + for (const mutation of tx.mutations) { + if (mutation.collection === this && changedKeys.has(mutation.key)) { + completedOptimisticOps.set(mutation.key, { + type: mutation.type, + value: mutation.modified + }) + } + } + } + + // Now check what actually changed in the final visible state + for (const key of changedKeys) { + const previousVisibleValue = currentVisibleState.get(key) + const newVisibleValue = this.get(key) // This returns the new derived state + + // Check if this sync operation is redundant with a completed optimistic operation + const completedOp = completedOptimisticOps.get(key) + const isRedundantSync = completedOp && + newVisibleValue !== undefined && + this.deepEqual(completedOp.value, newVisibleValue) + + + + if (!isRedundantSync) { + if ( + previousVisibleValue === undefined && + newVisibleValue !== undefined + ) { + events.push({ + type: `insert`, + key, + value: newVisibleValue, + }) + } else if ( + previousVisibleValue !== undefined && + newVisibleValue === undefined + ) { + events.push({ + type: `delete`, + key, + value: previousVisibleValue, + }) + } else if ( + previousVisibleValue !== undefined && + newVisibleValue !== undefined && + !this.deepEqual(previousVisibleValue, newVisibleValue) + ) { + events.push({ + type: `update`, + key, + value: newVisibleValue, + previousValue: previousVisibleValue, + }) + } + } + } + // Update cached size after synced data changes this._size = this.calculateSize() @@ -623,6 +761,16 @@ export class CollectionImpl< this.emitEvents(events) this.pendingSyncedTransactions = [] + + // Clear the pre-sync state since sync operations are complete + this.preSyncVisibleState.clear() + + // Clear recently synced keys after a microtask to allow recomputeOptimisticState to see them + Promise.resolve().then(() => { + this.recentlySyncedKeys.clear() + }) + + // Call any registered one-time commit listeners if (!this.hasReceivedFirstCommit) { @@ -659,6 +807,28 @@ export class CollectionImpl< return `KEY::${this.id}/${key}` } + private deepEqual(a: any, b: any): boolean { + if (a === b) return true + if (a == null || b == null) return false + if (typeof a !== typeof b) return false + + if (typeof a === `object`) { + if (Array.isArray(a) !== Array.isArray(b)) return false + + const keysA = Object.keys(a) + const keysB = Object.keys(b) + if (keysA.length !== keysB.length) return false + + for (const key of keysA) { + if (!keysB.includes(key)) return false + if (!this.deepEqual(a[key], b[key])) return false + } + return true + } + + return false + } + private validateData( data: unknown, type: `insert` | `update`, @@ -1260,11 +1430,49 @@ export class CollectionImpl< } } + + + /** + * Capture visible state for keys that will be affected by pending sync operations + * This must be called BEFORE onTransactionStateChange clears optimistic state + */ + private capturePreSyncVisibleState(): void { + if (this.pendingSyncedTransactions.length === 0) return + + // Clear any previous capture + this.preSyncVisibleState.clear() + + // Get all keys that will be affected by sync operations + const syncedKeys = new Set() + for (const transaction of this.pendingSyncedTransactions) { + for (const operation of transaction.operations) { + syncedKeys.add(operation.key as TKey) + } + } + + // Mark keys as about to be synced to suppress intermediate events from recomputeOptimisticState + for (const key of syncedKeys) { + this.recentlySyncedKeys.add(key) + } + + // Capture current visible state for ALL keys, not just sync keys + // This ensures we have the correct "before" state for event detection + for (const key of this.keys()) { + const currentValue = this.get(key) + if (currentValue !== undefined) { + this.preSyncVisibleState.set(key, currentValue) + } + } + } + /** * Trigger a recomputation when transactions change * This method should be called by the Transaction class when state changes */ public onTransactionStateChange(): void { + // CRITICAL: Capture visible state BEFORE clearing optimistic state + this.capturePreSyncVisibleState() + this.recomputeOptimisticState() } diff --git a/packages/db/src/transactions.ts b/packages/db/src/transactions.ts index d2e5792e7..3c394c2fa 100644 --- a/packages/db/src/transactions.ts +++ b/packages/db/src/transactions.ts @@ -156,7 +156,12 @@ export class Transaction> { for (const mutation of this.mutations) { if (!hasCalled.has(mutation.collection.id)) { mutation.collection.onTransactionStateChange() - mutation.collection.commitPendingTransactions() + + // Only call commitPendingTransactions if there are pending sync transactions + if (mutation.collection.pendingSyncedTransactions.length > 0) { + mutation.collection.commitPendingTransactions() + } + hasCalled.add(mutation.collection.id) } } diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 8c0b5c34e..115f0b352 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -100,7 +100,7 @@ describe(`Collection.subscribeChanges`, () => { unsubscribe() }) - it(`should emit changes from synced operations using mitt emitter`, () => { + it(`should emit changes from synced operations`, () => { const emitter = mitt() const callback = vi.fn() @@ -118,7 +118,7 @@ describe(`Collection.subscribeChanges`, () => { write({ type: change.type, // @ts-expect-error TODO type changes - value: change.changes, + value: change.modified, }) }) commit() @@ -141,7 +141,7 @@ describe(`Collection.subscribeChanges`, () => { emitter.emit(`testEvent`, [ { type: `insert`, - changes: { id: 1, value: `sync value 1` }, + modified: { id: 1, value: `sync value 1` }, }, ]) @@ -152,14 +152,12 @@ describe(`Collection.subscribeChanges`, () => { }> expect(insertChanges).toHaveLength(1) - if (insertChanges.length > 0) { - const insertChange = insertChanges[0]! as ChangeMessage<{ - value: string - }> - expect(insertChange).toBeDefined() - expect(insertChange.type).toBe(`insert`) - expect(insertChange.value).toEqual({ id: 1, value: `sync value 1` }) - } + const insertChange = insertChanges[0]! as ChangeMessage<{ + value: string + }> + expect(insertChange).toBeDefined() + expect(insertChange.type).toBe(`insert`) + expect(insertChange.value).toEqual({ id: 1, value: `sync value 1` }) // Reset mock callback.mockReset() @@ -168,18 +166,18 @@ describe(`Collection.subscribeChanges`, () => { emitter.emit(`testEvent`, [ { type: `update`, - changes: { id: 1, value: `updated sync value` }, + modified: { id: 1, value: `updated sync value` }, }, ]) // Verify that update was emitted expect(callback).toHaveBeenCalledTimes(1) - const undateChanges = callback.mock.calls[0]![0] as ChangesPayload<{ + const updateChanges = callback.mock.calls[0]![0] as ChangesPayload<{ value: string }> - expect(undateChanges).toHaveLength(1) + expect(updateChanges).toHaveLength(1) - const updateChange = undateChanges[0]! as ChangeMessage<{ + const updateChange = updateChanges[0]! as ChangeMessage<{ value: string }> expect(updateChange).toBeDefined() @@ -193,7 +191,7 @@ describe(`Collection.subscribeChanges`, () => { emitter.emit(`testEvent`, [ { type: `delete`, - changes: { id: 1, value: `updated sync value` }, + modified: { id: 1, value: `updated sync value` }, }, ]) @@ -269,17 +267,15 @@ describe(`Collection.subscribeChanges`, () => { }> expect(insertChanges).toHaveLength(1) - if (insertChanges.length > 0) { - const insertChange = insertChanges[0]! as ChangeMessage<{ - value: string - }> - expect(insertChange).toBeDefined() - expect(insertChange).toEqual({ - key: 1, - type: `insert`, - value: { id: 1, value: `optimistic value` }, - }) - } + const insertChange = insertChanges[0]! as ChangeMessage<{ + value: string + }> + expect(insertChange).toBeDefined() + expect(insertChange).toEqual({ + key: 1, + type: `insert`, + value: { id: 1, value: `optimistic value` }, + }) // Reset mock callback.mockReset() @@ -297,8 +293,12 @@ describe(`Collection.subscribeChanges`, () => { }) ) + await waitForChanges() + // Verify that update was emitted - expect(callback).toHaveBeenCalledTimes(1) + expect(callback).toHaveBeenCalledTimes(1) // ✅ MUST be 1 - only the optimistic update, no redundant sync events + + // Check that the call contains the correct update const updateChanges = callback.mock.calls[0]![0] as ChangesPayload<{ value: string updated?: boolean @@ -411,7 +411,6 @@ describe(`Collection.subscribeChanges`, () => { tx.mutate(() => collection.insert({ id: 2, value: `optimistic value` })) // Verify optimistic insert was emitted - this is the synchronous optimistic update - // and so we don't await here expect(callback).toHaveBeenCalledTimes(1) expect(callback.mock.calls[0]![0]).toEqual([ { @@ -424,26 +423,20 @@ describe(`Collection.subscribeChanges`, () => { await tx.isPersisted.promise - // Verify synced update was emitted - expect(callback).toHaveBeenCalledTimes(2) // FIXME: this should ideally be 0 - we currently see a delete and an insert - // This is called 1 time when the mutationFn call returns - // and the optimistic state is dropped and the synced state applied. + // Verify no changes were emitted as the sync should match the optimistic state + expect(callback).toHaveBeenCalledTimes(0) callback.mockReset() // Update both items in optimistic and synced ways // First update the optimistic item optimistically - const optItem = collection.state.get(2) - let updateTx - if (optItem) { - updateTx = createTransaction({ mutationFn }) - updateTx.mutate(() => - collection.update(optItem.id, (draft) => { - draft.value = `updated optimistic value` - }) - ) - } - - // We don't await here as the optimistic update is sync + const optItem = collection.state.get(2)! + expect(optItem).toBeDefined() + const updateTx = createTransaction({ mutationFn }) + updateTx.mutate(() => + collection.update(optItem.id, (draft) => { + draft.value = `updated optimistic value` + }) + ) // Verify the optimistic update was emitted expect(callback).toHaveBeenCalledTimes(1) @@ -456,7 +449,6 @@ describe(`Collection.subscribeChanges`, () => { value: `updated optimistic value`, }, previousValue: { - // TODO why isn't this working? id: 2, value: `optimistic value`, }, @@ -464,12 +456,10 @@ describe(`Collection.subscribeChanges`, () => { ]) callback.mockReset() - await updateTx?.isPersisted.promise + await updateTx.isPersisted.promise - // Verify synced update was emitted - expect(callback).toHaveBeenCalledTimes(2) // FIXME: check is we can reduce this - // This is called 1 time when the mutationFn call returns - // and the optimistic state is dropped and the synced state applied. + // Verify no redundant sync events were emitted + expect(callback).toHaveBeenCalledTimes(0) callback.mockReset() // Then update the synced item with a synced update @@ -582,11 +572,8 @@ describe(`Collection.subscribeChanges`, () => { // Wait for changes to propagate await waitForChanges() - // Verify synced update was emitted - expect(callback).toHaveBeenCalledTimes(2) // FIXME: this should ideally be 0 - we currently see a delete and an insert - // This is called when the mutationFn returns and - // the optimistic state is dropped and synced state is - // applied. + // Verify no changes were emitted as the sync should match the optimistic state + expect(callback).toHaveBeenCalledTimes(0) callback.mockReset() // Update one item only From 222aa44382b50f99b8cabe02eee639e6ce0e7b46 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 25 Jun 2025 13:30:06 +0100 Subject: [PATCH 2/7] fixed? --- packages/db/src/collection.ts | 16 +++++++++++++--- .../tests/collection-subscribe-changes.test.ts | 1 + 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index 5101c9e2e..e5c144697 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -609,9 +609,19 @@ export class CollectionImpl< } } - // Use the pre-captured visible state (captured before optimistic state was cleared) - // This ensures we correctly detect existing vs new rows - const currentVisibleState = this.preSyncVisibleState + // Use pre-captured state if available (from optimistic scenarios), + // otherwise capture current state (for pure sync scenarios) + let currentVisibleState = this.preSyncVisibleState + if (currentVisibleState.size === 0) { + // No pre-captured state, capture it now for pure sync operations + currentVisibleState = new Map() + for (const key of changedKeys) { + const currentValue = this.get(key) + if (currentValue !== undefined) { + currentVisibleState.set(key, currentValue) + } + } + } const events: Array> = [] diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 115f0b352..855640df5 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -226,6 +226,7 @@ describe(`Collection.subscribeChanges`, () => { getKey: (item) => { return item.id }, + startSync: true, sync: { sync: ({ begin, write, commit }) => { // Listen for sync events From 0db1536333b96be5504f32d1368fdf05a6a3276a Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 25 Jun 2025 13:36:54 +0100 Subject: [PATCH 3/7] tiday tests --- packages/db/src/collection.ts | 48 +++++++++++++++-------------------- 1 file changed, 20 insertions(+), 28 deletions(-) diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index e5c144697..8f95b1313 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -167,7 +167,6 @@ export class CollectionImpl< private hasReceivedFirstCommit = false private isCommittingSyncTransactions = false - // Array to store one-time commit listeners private onFirstCommitCallbacks: Array<() => void> = [] @@ -289,8 +288,6 @@ export class CollectionImpl< return } - - const previousState = new Map(this.derivedUpserts) const previousDeletes = new Set(this.derivedDeletes) @@ -326,9 +323,11 @@ export class CollectionImpl< // Collect events for changes const events: Array> = [] this.collectOptimisticChanges(previousState, previousDeletes, events) - + // Filter out events for recently synced keys to prevent duplicates - const filteredEventsBySyncStatus = events.filter(event => !this.recentlySyncedKeys.has(event.key)) + const filteredEventsBySyncStatus = events.filter( + (event) => !this.recentlySyncedKeys.has(event.key) + ) // Filter out redundant delete events if there are pending sync transactions // that will immediately restore the same data, but only for completed transactions @@ -609,7 +608,7 @@ export class CollectionImpl< } } - // Use pre-captured state if available (from optimistic scenarios), + // Use pre-captured state if available (from optimistic scenarios), // otherwise capture current state (for pure sync scenarios) let currentVisibleState = this.preSyncVisibleState if (currentVisibleState.size === 0) { @@ -675,8 +674,6 @@ export class CollectionImpl< this.derivedUpserts.clear() this.derivedDeletes.clear() - - // Reset flag and recompute optimistic state for any remaining active transactions this.isCommittingSyncTransactions = false const activeTransactions = Array.from(this.transactions.values()) @@ -702,16 +699,16 @@ export class CollectionImpl< // Check for redundant sync operations that match completed optimistic operations const completedOptimisticOps = new Map() - const completedTransactions = Array.from(this.transactions.values()).filter( - tx => tx.state === 'completed' - ) - + const completedTransactions = Array.from( + this.transactions.values() + ).filter((tx) => tx.state === `completed`) + for (const tx of completedTransactions) { for (const mutation of tx.mutations) { if (mutation.collection === this && changedKeys.has(mutation.key)) { completedOptimisticOps.set(mutation.key, { type: mutation.type, - value: mutation.modified + value: mutation.modified, }) } } @@ -721,15 +718,14 @@ export class CollectionImpl< for (const key of changedKeys) { const previousVisibleValue = currentVisibleState.get(key) const newVisibleValue = this.get(key) // This returns the new derived state - + // Check if this sync operation is redundant with a completed optimistic operation const completedOp = completedOptimisticOps.get(key) - const isRedundantSync = completedOp && + const isRedundantSync = + completedOp && newVisibleValue !== undefined && this.deepEqual(completedOp.value, newVisibleValue) - - if (!isRedundantSync) { if ( previousVisibleValue === undefined && @@ -771,17 +767,15 @@ export class CollectionImpl< this.emitEvents(events) this.pendingSyncedTransactions = [] - + // Clear the pre-sync state since sync operations are complete this.preSyncVisibleState.clear() - + // Clear recently synced keys after a microtask to allow recomputeOptimisticState to see them Promise.resolve().then(() => { this.recentlySyncedKeys.clear() }) - - // Call any registered one-time commit listeners if (!this.hasReceivedFirstCommit) { this.hasReceivedFirstCommit = true @@ -1440,18 +1434,16 @@ export class CollectionImpl< } } - - /** * Capture visible state for keys that will be affected by pending sync operations * This must be called BEFORE onTransactionStateChange clears optimistic state */ private capturePreSyncVisibleState(): void { if (this.pendingSyncedTransactions.length === 0) return - + // Clear any previous capture this.preSyncVisibleState.clear() - + // Get all keys that will be affected by sync operations const syncedKeys = new Set() for (const transaction of this.pendingSyncedTransactions) { @@ -1459,12 +1451,12 @@ export class CollectionImpl< syncedKeys.add(operation.key as TKey) } } - + // Mark keys as about to be synced to suppress intermediate events from recomputeOptimisticState for (const key of syncedKeys) { this.recentlySyncedKeys.add(key) } - + // Capture current visible state for ALL keys, not just sync keys // This ensures we have the correct "before" state for event detection for (const key of this.keys()) { @@ -1482,7 +1474,7 @@ export class CollectionImpl< public onTransactionStateChange(): void { // CRITICAL: Capture visible state BEFORE clearing optimistic state this.capturePreSyncVisibleState() - + this.recomputeOptimisticState() } From 76c35c9b318699043c8601a0521581bc0b99d6e9 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 25 Jun 2025 13:46:56 +0100 Subject: [PATCH 4/7] optimise --- packages/db/src/collection.ts | 100 ++++++++++++++++++---------------- 1 file changed, 52 insertions(+), 48 deletions(-) diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index 8f95b1313..6b40a72bd 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -295,23 +295,31 @@ export class CollectionImpl< this.derivedUpserts.clear() this.derivedDeletes.clear() + const activeTransactions: Array> = [] + const completedTransactions: Array> = [] + + for (const transaction of this.transactions.values()) { + if (transaction.state === `completed`) { + completedTransactions.push(transaction) + } else if (![`completed`, `failed`].includes(transaction.state)) { + activeTransactions.push(transaction) + } + } + // Apply active transactions only (completed transactions are handled by sync operations) - const activeTransactions = Array.from(this.transactions.values()) for (const transaction of activeTransactions) { - if (![`completed`, `failed`].includes(transaction.state)) { - for (const mutation of transaction.mutations) { - if (mutation.collection === this) { - switch (mutation.type) { - case `insert`: - case `update`: - this.derivedUpserts.set(mutation.key, mutation.modified as T) - this.derivedDeletes.delete(mutation.key) - break - case `delete`: - this.derivedUpserts.delete(mutation.key) - this.derivedDeletes.add(mutation.key) - break - } + for (const mutation of transaction.mutations) { + if (mutation.collection === this) { + switch (mutation.type) { + case `insert`: + case `update`: + this.derivedUpserts.set(mutation.key, mutation.modified as T) + this.derivedDeletes.delete(mutation.key) + break + case `delete`: + this.derivedUpserts.delete(mutation.key) + this.derivedDeletes.add(mutation.key) + break } } } @@ -343,9 +351,6 @@ export class CollectionImpl< } // Collect mutation IDs from completed transactions - const completedTransactions = Array.from( - this.transactions.values() - ).filter((tx) => tx.state === `completed`) for (const tx of completedTransactions) { for (const mutation of tx.mutations) { if (mutation.collection === this) { @@ -361,15 +366,10 @@ export class CollectionImpl< if (event.type === `delete` && pendingSyncKeys.has(event.key)) { // Check if this delete is from clearing optimistic state of completed transactions // We can infer this by checking if we have no remaining optimistic mutations for this key - const hasActiveOptimisticMutation = Array.from( - this.transactions.values() - ).some( - (tx) => - tx.state !== `completed` && - tx.state !== `failed` && - tx.mutations.some( - (m) => m.collection === this && m.key === event.key - ) + const hasActiveOptimisticMutation = activeTransactions.some((tx) => + tx.mutations.some( + (m) => m.collection === this && m.key === event.key + ) ) if (!hasActiveOptimisticMutation) { @@ -592,11 +592,16 @@ export class CollectionImpl< * This method processes operations from pending transactions and applies them to the synced data */ commitPendingTransactions = () => { - if ( - !Array.from(this.transactions.values()).some( - ({ state }) => state === `persisting` - ) - ) { + // Check if there are any persisting transaction + let hasPersistingTransaction = false + for (const transaction of this.transactions.values()) { + if (transaction.state === `persisting`) { + hasPersistingTransaction = true + break + } + } + + if (!hasPersistingTransaction) { // Set flag to prevent redundant optimistic state recalculations this.isCommittingSyncTransactions = true @@ -676,8 +681,7 @@ export class CollectionImpl< // Reset flag and recompute optimistic state for any remaining active transactions this.isCommittingSyncTransactions = false - const activeTransactions = Array.from(this.transactions.values()) - for (const transaction of activeTransactions) { + for (const transaction of this.transactions.values()) { if (![`completed`, `failed`].includes(transaction.state)) { for (const mutation of transaction.mutations) { if (mutation.collection === this) { @@ -699,17 +703,16 @@ export class CollectionImpl< // Check for redundant sync operations that match completed optimistic operations const completedOptimisticOps = new Map() - const completedTransactions = Array.from( - this.transactions.values() - ).filter((tx) => tx.state === `completed`) - for (const tx of completedTransactions) { - for (const mutation of tx.mutations) { - if (mutation.collection === this && changedKeys.has(mutation.key)) { - completedOptimisticOps.set(mutation.key, { - type: mutation.type, - value: mutation.modified, - }) + for (const transaction of this.transactions.values()) { + if (transaction.state === `completed`) { + for (const mutation of transaction.mutations) { + if (mutation.collection === this && changedKeys.has(mutation.key)) { + completedOptimisticOps.set(mutation.key, { + type: mutation.type, + value: mutation.modified, + }) + } } } } @@ -823,8 +826,9 @@ export class CollectionImpl< const keysB = Object.keys(b) if (keysA.length !== keysB.length) return false + const keysBSet = new Set(keysB) for (const key of keysA) { - if (!keysB.includes(key)) return false + if (!keysBSet.has(key)) return false if (!this.deepEqual(a[key], b[key])) return false } return true @@ -1457,9 +1461,9 @@ export class CollectionImpl< this.recentlySyncedKeys.add(key) } - // Capture current visible state for ALL keys, not just sync keys - // This ensures we have the correct "before" state for event detection - for (const key of this.keys()) { + // Only capture current visible state for keys that will be affected by sync operations + // This is much more efficient than capturing the entire collection state + for (const key of syncedKeys) { const currentValue = this.get(key) if (currentValue !== undefined) { this.preSyncVisibleState.set(key, currentValue) From bb82b4904f86ae3956fc72837bd12f7cde133d18 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 25 Jun 2025 13:58:41 +0100 Subject: [PATCH 5/7] fix --- packages/db/tests/collection-subscribe-changes.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 855640df5..115f0b352 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -226,7 +226,6 @@ describe(`Collection.subscribeChanges`, () => { getKey: (item) => { return item.id }, - startSync: true, sync: { sync: ({ begin, write, commit }) => { // Listen for sync events From fbd275940685f81b968affac77825b3614482c13 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 25 Jun 2025 14:01:29 +0100 Subject: [PATCH 6/7] tidy --- packages/db/src/collection.ts | 7 ++++--- packages/db/tests/collection-subscribe-changes.test.ts | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index 6b40a72bd..a22f12980 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -138,9 +138,11 @@ export class CollectionImpl< T extends object = Record, TKey extends string | number = string | number, > { - public transactions: SortedMap> + public config: CollectionConfig // Core state - make public for testing + public transactions: SortedMap> + public pendingSyncedTransactions: Array> = [] public syncedData: Map | SortedMap public syncedMetadata = new Map() @@ -159,11 +161,10 @@ export class CollectionImpl< // This is populated by createCollection public utils: Record = {} - public pendingSyncedTransactions: Array> = [] + // State used for computing the change events private syncedKeys = new Set() private preSyncVisibleState = new Map() private recentlySyncedKeys = new Set() - public config: CollectionConfig private hasReceivedFirstCommit = false private isCommittingSyncTransactions = false diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 115f0b352..c32a9c61c 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -296,7 +296,7 @@ describe(`Collection.subscribeChanges`, () => { await waitForChanges() // Verify that update was emitted - expect(callback).toHaveBeenCalledTimes(1) // ✅ MUST be 1 - only the optimistic update, no redundant sync events + expect(callback).toHaveBeenCalledTimes(1) // Check that the call contains the correct update const updateChanges = callback.mock.calls[0]![0] as ChangesPayload<{ From 6172059a3d7ac5fed797fba652fc9077a888edaf Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 25 Jun 2025 14:07:11 +0100 Subject: [PATCH 7/7] changeset --- .changeset/gold-friends-pull.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/gold-friends-pull.md diff --git a/.changeset/gold-friends-pull.md b/.changeset/gold-friends-pull.md new file mode 100644 index 000000000..6a4ca921f --- /dev/null +++ b/.changeset/gold-friends-pull.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Refactored the way we compute change events over the synced state and the optimistic changes. This fixes a couple of issues where the change events were not being emitted correctly.