diff --git a/.changeset/gold-friends-pull.md b/.changeset/gold-friends-pull.md new file mode 100644 index 000000000..6a4ca921f --- /dev/null +++ b/.changeset/gold-friends-pull.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Refactored the way we compute change events over the synced state and the optimistic changes. This fixes a couple of issues where the change events were not being emitted correctly. diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index 2d6e66ad5..a22f12980 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -138,9 +138,11 @@ export class CollectionImpl< T extends object = Record, TKey extends string | number = string | number, > { - public transactions: SortedMap> + public config: CollectionConfig // Core state - make public for testing + public transactions: SortedMap> + public pendingSyncedTransactions: Array> = [] public syncedData: Map | SortedMap public syncedMetadata = new Map() @@ -159,10 +161,12 @@ export class CollectionImpl< // This is populated by createCollection public utils: Record = {} - private pendingSyncedTransactions: Array> = [] + // State used for computing the change events private syncedKeys = new Set() - public config: CollectionConfig + private preSyncVisibleState = new Map() + private recentlySyncedKeys = new Set() private hasReceivedFirstCommit = false + private isCommittingSyncTransactions = false // Array to store one-time commit listeners private onFirstCommitCallbacks: Array<() => void> = [] @@ -280,6 +284,11 @@ export class CollectionImpl< * Recompute optimistic state from active transactions */ private recomputeOptimisticState(): void { + // Skip redundant recalculations when we're in the middle of committing sync transactions + if (this.isCommittingSyncTransactions) { + return + } + const previousState = new Map(this.derivedUpserts) const previousDeletes = new Set(this.derivedDeletes) @@ -287,23 +296,31 @@ export class CollectionImpl< this.derivedUpserts.clear() this.derivedDeletes.clear() - // Apply active transactions - const activeTransactions = Array.from(this.transactions.values()) + const activeTransactions: Array> = [] + const completedTransactions: Array> = [] + + for (const transaction of this.transactions.values()) { + if (transaction.state === `completed`) { + completedTransactions.push(transaction) + } else if (![`completed`, `failed`].includes(transaction.state)) { + activeTransactions.push(transaction) + } + } + + // Apply active transactions only (completed transactions are handled by sync operations) for (const transaction of activeTransactions) { - if (![`completed`, `failed`].includes(transaction.state)) { - for (const mutation of transaction.mutations) { - if (mutation.collection === this) { - switch (mutation.type) { - case `insert`: - case `update`: - this.derivedUpserts.set(mutation.key, mutation.modified as T) - this.derivedDeletes.delete(mutation.key) - break - case `delete`: - this.derivedUpserts.delete(mutation.key) - this.derivedDeletes.add(mutation.key) - break - } + for (const mutation of transaction.mutations) { + if (mutation.collection === this) { + switch (mutation.type) { + case `insert`: + case `update`: + this.derivedUpserts.set(mutation.key, mutation.modified as T) + this.derivedDeletes.delete(mutation.key) + break + case `delete`: + this.derivedUpserts.delete(mutation.key) + this.derivedDeletes.add(mutation.key) + break } } } @@ -316,8 +333,58 @@ export class CollectionImpl< const events: Array> = [] this.collectOptimisticChanges(previousState, previousDeletes, events) - // Emit all events at once - this.emitEvents(events) + // Filter out events for recently synced keys to prevent duplicates + const filteredEventsBySyncStatus = events.filter( + (event) => !this.recentlySyncedKeys.has(event.key) + ) + + // Filter out redundant delete events if there are pending sync transactions + // that will immediately restore the same data, but only for completed transactions + if (this.pendingSyncedTransactions.length > 0) { + const pendingSyncKeys = new Set() + const completedTransactionMutations = new Set() + + // Collect keys from pending sync operations + for (const transaction of this.pendingSyncedTransactions) { + for (const operation of transaction.operations) { + pendingSyncKeys.add(operation.key as TKey) + } + } + + // Collect mutation IDs from completed transactions + for (const tx of completedTransactions) { + for (const mutation of tx.mutations) { + if (mutation.collection === this) { + completedTransactionMutations.add(mutation.mutationId) + } + } + } + + // Only filter out delete events for keys that: + // 1. Have pending sync operations AND + // 2. Are from completed transactions (being cleaned up) + const filteredEvents = filteredEventsBySyncStatus.filter((event) => { + if (event.type === `delete` && pendingSyncKeys.has(event.key)) { + // Check if this delete is from clearing optimistic state of completed transactions + // We can infer this by checking if we have no remaining optimistic mutations for this key + const hasActiveOptimisticMutation = activeTransactions.some((tx) => + tx.mutations.some( + (m) => m.collection === this && m.key === event.key + ) + ) + + if (!hasActiveOptimisticMutation) { + return false // Skip this delete event as sync will restore the data + } + } + return true + }) + + this.emitEvents(filteredEvents) + } else { + // Emit all events if no pending sync transactions + this.emitEvents(filteredEventsBySyncStatus) + } } /** @@ -526,18 +593,46 @@ export class CollectionImpl< * This method processes operations from pending transactions and applies them to the synced data */ commitPendingTransactions = () => { - if ( - !Array.from(this.transactions.values()).some( - ({ state }) => state === `persisting` - ) - ) { + // Check if there are any persisting transaction + let hasPersistingTransaction = false + for (const transaction of this.transactions.values()) { + if (transaction.state === `persisting`) { + hasPersistingTransaction = true + break + } + } + + if (!hasPersistingTransaction) { + // Set flag to prevent redundant optimistic state recalculations + this.isCommittingSyncTransactions = true + + // First collect all keys that will be affected by sync operations const changedKeys = new Set() + for (const transaction of this.pendingSyncedTransactions) { + for (const operation of transaction.operations) { + changedKeys.add(operation.key as TKey) + } + } + + // Use pre-captured state if available (from optimistic scenarios), + // otherwise capture current state (for pure sync scenarios) + let currentVisibleState = this.preSyncVisibleState + if (currentVisibleState.size === 0) { + // No pre-captured state, capture it now for pure sync operations + currentVisibleState = new Map() + for (const key of changedKeys) { + const currentValue = this.get(key) + if (currentValue !== undefined) { + currentVisibleState.set(key, currentValue) + } + } + } + const events: Array> = [] for (const transaction of this.pendingSyncedTransactions) { for (const operation of transaction.operations) { const key = operation.key as TKey - changedKeys.add(key) this.syncedKeys.add(key) // Update metadata @@ -560,22 +655,10 @@ export class CollectionImpl< break } - // Update synced data and collect events - const previousValue = this.syncedData.get(key) - + // Update synced data switch (operation.type) { case `insert`: this.syncedData.set(key, operation.value) - if ( - !this.derivedDeletes.has(key) && - !this.derivedUpserts.has(key) - ) { - events.push({ - type: `insert`, - key, - value: operation.value, - }) - } break case `update`: { const updatedValue = Object.assign( @@ -584,38 +667,103 @@ export class CollectionImpl< operation.value ) this.syncedData.set(key, updatedValue) - if ( - !this.derivedDeletes.has(key) && - !this.derivedUpserts.has(key) - ) { - events.push({ - type: `update`, - key, - value: updatedValue, - previousValue, - }) - } break } case `delete`: this.syncedData.delete(key) - if ( - !this.derivedDeletes.has(key) && - !this.derivedUpserts.has(key) - ) { - if (previousValue) { - events.push({ - type: `delete`, - key, - value: previousValue, - }) - } - } break } } } + // Clear optimistic state since sync operations will now provide the authoritative data + this.derivedUpserts.clear() + this.derivedDeletes.clear() + + // Reset flag and recompute optimistic state for any remaining active transactions + this.isCommittingSyncTransactions = false + for (const transaction of this.transactions.values()) { + if (![`completed`, `failed`].includes(transaction.state)) { + for (const mutation of transaction.mutations) { + if (mutation.collection === this) { + switch (mutation.type) { + case `insert`: + case `update`: + this.derivedUpserts.set(mutation.key, mutation.modified as T) + this.derivedDeletes.delete(mutation.key) + break + case `delete`: + this.derivedUpserts.delete(mutation.key) + this.derivedDeletes.add(mutation.key) + break + } + } + } + } + } + + // Check for redundant sync operations that match completed optimistic operations + const completedOptimisticOps = new Map() + + for (const transaction of this.transactions.values()) { + if (transaction.state === `completed`) { + for (const mutation of transaction.mutations) { + if (mutation.collection === this && changedKeys.has(mutation.key)) { + completedOptimisticOps.set(mutation.key, { + type: mutation.type, + value: mutation.modified, + }) + } + } + } + } + + // Now check what actually changed in the final visible state + for (const key of changedKeys) { + const previousVisibleValue = currentVisibleState.get(key) + const newVisibleValue = this.get(key) // This returns the new derived state + + // Check if this sync operation is redundant with a completed optimistic operation + const completedOp = completedOptimisticOps.get(key) + const isRedundantSync = + completedOp && + newVisibleValue !== undefined && + this.deepEqual(completedOp.value, newVisibleValue) + + if (!isRedundantSync) { + if ( + previousVisibleValue === undefined && + newVisibleValue !== undefined + ) { + events.push({ + type: `insert`, + key, + value: newVisibleValue, + }) + } else if ( + previousVisibleValue !== undefined && + newVisibleValue === undefined + ) { + events.push({ + type: `delete`, + key, + value: previousVisibleValue, + }) + } else if ( + previousVisibleValue !== undefined && + newVisibleValue !== undefined && + !this.deepEqual(previousVisibleValue, newVisibleValue) + ) { + events.push({ + type: `update`, + key, + value: newVisibleValue, + previousValue: previousVisibleValue, + }) + } + } + } + // Update cached size after synced data changes this._size = this.calculateSize() @@ -624,6 +772,14 @@ export class CollectionImpl< this.pendingSyncedTransactions = [] + // Clear the pre-sync state since sync operations are complete + this.preSyncVisibleState.clear() + + // Clear recently synced keys after a microtask to allow recomputeOptimisticState to see them + Promise.resolve().then(() => { + this.recentlySyncedKeys.clear() + }) + // Call any registered one-time commit listeners if (!this.hasReceivedFirstCommit) { this.hasReceivedFirstCommit = true @@ -659,6 +815,29 @@ export class CollectionImpl< return `KEY::${this.id}/${key}` } + private deepEqual(a: any, b: any): boolean { + if (a === b) return true + if (a == null || b == null) return false + if (typeof a !== typeof b) return false + + if (typeof a === `object`) { + if (Array.isArray(a) !== Array.isArray(b)) return false + + const keysA = Object.keys(a) + const keysB = Object.keys(b) + if (keysA.length !== keysB.length) return false + + const keysBSet = new Set(keysB) + for (const key of keysA) { + if (!keysBSet.has(key)) return false + if (!this.deepEqual(a[key], b[key])) return false + } + return true + } + + return false + } + private validateData( data: unknown, type: `insert` | `update`, @@ -1260,11 +1439,47 @@ export class CollectionImpl< } } + /** + * Capture visible state for keys that will be affected by pending sync operations + * This must be called BEFORE onTransactionStateChange clears optimistic state + */ + private capturePreSyncVisibleState(): void { + if (this.pendingSyncedTransactions.length === 0) return + + // Clear any previous capture + this.preSyncVisibleState.clear() + + // Get all keys that will be affected by sync operations + const syncedKeys = new Set() + for (const transaction of this.pendingSyncedTransactions) { + for (const operation of transaction.operations) { + syncedKeys.add(operation.key as TKey) + } + } + + // Mark keys as about to be synced to suppress intermediate events from recomputeOptimisticState + for (const key of syncedKeys) { + this.recentlySyncedKeys.add(key) + } + + // Only capture current visible state for keys that will be affected by sync operations + // This is much more efficient than capturing the entire collection state + for (const key of syncedKeys) { + const currentValue = this.get(key) + if (currentValue !== undefined) { + this.preSyncVisibleState.set(key, currentValue) + } + } + } + /** * Trigger a recomputation when transactions change * This method should be called by the Transaction class when state changes */ public onTransactionStateChange(): void { + // CRITICAL: Capture visible state BEFORE clearing optimistic state + this.capturePreSyncVisibleState() + this.recomputeOptimisticState() } diff --git a/packages/db/src/transactions.ts b/packages/db/src/transactions.ts index d2e5792e7..3c394c2fa 100644 --- a/packages/db/src/transactions.ts +++ b/packages/db/src/transactions.ts @@ -156,7 +156,12 @@ export class Transaction> { for (const mutation of this.mutations) { if (!hasCalled.has(mutation.collection.id)) { mutation.collection.onTransactionStateChange() - mutation.collection.commitPendingTransactions() + + // Only call commitPendingTransactions if there are pending sync transactions + if (mutation.collection.pendingSyncedTransactions.length > 0) { + mutation.collection.commitPendingTransactions() + } + hasCalled.add(mutation.collection.id) } } diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 8c0b5c34e..c32a9c61c 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -100,7 +100,7 @@ describe(`Collection.subscribeChanges`, () => { unsubscribe() }) - it(`should emit changes from synced operations using mitt emitter`, () => { + it(`should emit changes from synced operations`, () => { const emitter = mitt() const callback = vi.fn() @@ -118,7 +118,7 @@ describe(`Collection.subscribeChanges`, () => { write({ type: change.type, // @ts-expect-error TODO type changes - value: change.changes, + value: change.modified, }) }) commit() @@ -141,7 +141,7 @@ describe(`Collection.subscribeChanges`, () => { emitter.emit(`testEvent`, [ { type: `insert`, - changes: { id: 1, value: `sync value 1` }, + modified: { id: 1, value: `sync value 1` }, }, ]) @@ -152,14 +152,12 @@ describe(`Collection.subscribeChanges`, () => { }> expect(insertChanges).toHaveLength(1) - if (insertChanges.length > 0) { - const insertChange = insertChanges[0]! as ChangeMessage<{ - value: string - }> - expect(insertChange).toBeDefined() - expect(insertChange.type).toBe(`insert`) - expect(insertChange.value).toEqual({ id: 1, value: `sync value 1` }) - } + const insertChange = insertChanges[0]! as ChangeMessage<{ + value: string + }> + expect(insertChange).toBeDefined() + expect(insertChange.type).toBe(`insert`) + expect(insertChange.value).toEqual({ id: 1, value: `sync value 1` }) // Reset mock callback.mockReset() @@ -168,18 +166,18 @@ describe(`Collection.subscribeChanges`, () => { emitter.emit(`testEvent`, [ { type: `update`, - changes: { id: 1, value: `updated sync value` }, + modified: { id: 1, value: `updated sync value` }, }, ]) // Verify that update was emitted expect(callback).toHaveBeenCalledTimes(1) - const undateChanges = callback.mock.calls[0]![0] as ChangesPayload<{ + const updateChanges = callback.mock.calls[0]![0] as ChangesPayload<{ value: string }> - expect(undateChanges).toHaveLength(1) + expect(updateChanges).toHaveLength(1) - const updateChange = undateChanges[0]! as ChangeMessage<{ + const updateChange = updateChanges[0]! as ChangeMessage<{ value: string }> expect(updateChange).toBeDefined() @@ -193,7 +191,7 @@ describe(`Collection.subscribeChanges`, () => { emitter.emit(`testEvent`, [ { type: `delete`, - changes: { id: 1, value: `updated sync value` }, + modified: { id: 1, value: `updated sync value` }, }, ]) @@ -269,17 +267,15 @@ describe(`Collection.subscribeChanges`, () => { }> expect(insertChanges).toHaveLength(1) - if (insertChanges.length > 0) { - const insertChange = insertChanges[0]! as ChangeMessage<{ - value: string - }> - expect(insertChange).toBeDefined() - expect(insertChange).toEqual({ - key: 1, - type: `insert`, - value: { id: 1, value: `optimistic value` }, - }) - } + const insertChange = insertChanges[0]! as ChangeMessage<{ + value: string + }> + expect(insertChange).toBeDefined() + expect(insertChange).toEqual({ + key: 1, + type: `insert`, + value: { id: 1, value: `optimistic value` }, + }) // Reset mock callback.mockReset() @@ -297,8 +293,12 @@ describe(`Collection.subscribeChanges`, () => { }) ) + await waitForChanges() + // Verify that update was emitted expect(callback).toHaveBeenCalledTimes(1) + + // Check that the call contains the correct update const updateChanges = callback.mock.calls[0]![0] as ChangesPayload<{ value: string updated?: boolean @@ -411,7 +411,6 @@ describe(`Collection.subscribeChanges`, () => { tx.mutate(() => collection.insert({ id: 2, value: `optimistic value` })) // Verify optimistic insert was emitted - this is the synchronous optimistic update - // and so we don't await here expect(callback).toHaveBeenCalledTimes(1) expect(callback.mock.calls[0]![0]).toEqual([ { @@ -424,26 +423,20 @@ describe(`Collection.subscribeChanges`, () => { await tx.isPersisted.promise - // Verify synced update was emitted - expect(callback).toHaveBeenCalledTimes(2) // FIXME: this should ideally be 0 - we currently see a delete and an insert - // This is called 1 time when the mutationFn call returns - // and the optimistic state is dropped and the synced state applied. + // Verify no changes were emitted as the sync should match the optimistic state + expect(callback).toHaveBeenCalledTimes(0) callback.mockReset() // Update both items in optimistic and synced ways // First update the optimistic item optimistically - const optItem = collection.state.get(2) - let updateTx - if (optItem) { - updateTx = createTransaction({ mutationFn }) - updateTx.mutate(() => - collection.update(optItem.id, (draft) => { - draft.value = `updated optimistic value` - }) - ) - } - - // We don't await here as the optimistic update is sync + const optItem = collection.state.get(2)! + expect(optItem).toBeDefined() + const updateTx = createTransaction({ mutationFn }) + updateTx.mutate(() => + collection.update(optItem.id, (draft) => { + draft.value = `updated optimistic value` + }) + ) // Verify the optimistic update was emitted expect(callback).toHaveBeenCalledTimes(1) @@ -456,7 +449,6 @@ describe(`Collection.subscribeChanges`, () => { value: `updated optimistic value`, }, previousValue: { - // TODO why isn't this working? id: 2, value: `optimistic value`, }, @@ -464,12 +456,10 @@ describe(`Collection.subscribeChanges`, () => { ]) callback.mockReset() - await updateTx?.isPersisted.promise + await updateTx.isPersisted.promise - // Verify synced update was emitted - expect(callback).toHaveBeenCalledTimes(2) // FIXME: check is we can reduce this - // This is called 1 time when the mutationFn call returns - // and the optimistic state is dropped and the synced state applied. + // Verify no redundant sync events were emitted + expect(callback).toHaveBeenCalledTimes(0) callback.mockReset() // Then update the synced item with a synced update @@ -582,11 +572,8 @@ describe(`Collection.subscribeChanges`, () => { // Wait for changes to propagate await waitForChanges() - // Verify synced update was emitted - expect(callback).toHaveBeenCalledTimes(2) // FIXME: this should ideally be 0 - we currently see a delete and an insert - // This is called when the mutationFn returns and - // the optimistic state is dropped and synced state is - // applied. + // Verify no changes were emitted as the sync should match the optimistic state + expect(callback).toHaveBeenCalledTimes(0) callback.mockReset() // Update one item only