Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .changeset/collection-lifecycle-management.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
"@tanstack/db": patch
"@tanstack/vue-db": patch
"@tanstack/react-db": patch
"@tanstack/db-collections": patch
---

feat: implement Collection Lifecycle Management

Adds automatic lifecycle management for collections to optimize resource usage.

**New Features:**

- Added `startSync` option (defaults to `false`, set to `true` to start syncing immediately)
- Automatic garbage collection after `gcTime` (default 5 minutes) of inactivity
- Collection status tracking: "idle" | "loading" | "ready" | "error" | "cleaned-up"
- Manual `preload()` and `cleanup()` methods for lifecycle control

**Usage:**

```typescript
const collection = createCollection({
startSync: false, // Enable lazy loading
gcTime: 300000, // Cleanup timeout (default: 5 minutes)
})

console.log(collection.status) // Current state
await collection.preload() // Ensure ready
await collection.cleanup() // Manual cleanup
```
39 changes: 34 additions & 5 deletions packages/db-collections/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,30 @@ function createElectricSync<T extends object>(
}
}

// Abort controller for the stream - wraps the signal if provided
const abortController = new AbortController()
if (shapeOptions.signal) {
shapeOptions.signal.addEventListener(`abort`, () => {
abortController.abort()
})
if (shapeOptions.signal.aborted) {
abortController.abort()
}
}

let unsubscribeStream: () => void

return {
sync: (params: Parameters<SyncConfig<T>[`sync`]>[0]) => {
const { begin, write, commit } = params
const stream = new ShapeStream(shapeOptions)
const stream = new ShapeStream({
...shapeOptions,
signal: abortController.signal,
})
let transactionStarted = false
let newTxids = new Set<string>()

stream.subscribe((messages: Array<Message<Row>>) => {
unsubscribeStream = stream.subscribe((messages: Array<Message<Row>>) => {
let hasUpToDate = false

for (const message of messages) {
Expand Down Expand Up @@ -338,18 +354,31 @@ function createElectricSync<T extends object>(
}
}

if (hasUpToDate && transactionStarted) {
commit()
if (hasUpToDate) {
// Commit transaction if one was started
if (transactionStarted) {
commit()
transactionStarted = false
}

// Always commit txids when we receive up-to-date, regardless of transaction state
seenTxids.setState((currentTxids) => {
const clonedSeen = new Set(currentTxids)
newTxids.forEach((txid) => clonedSeen.add(String(txid)))

newTxids = new Set()
return clonedSeen
})
transactionStarted = false
}
})

// Return the unsubscribe function
return () => {
// Unsubscribe from the stream
unsubscribeStream()
// Abort the abort controller to stop the stream
abortController.abort()
}
},
// Expose the getSyncMetadata function
getSyncMetadata,
Expand Down
7 changes: 6 additions & 1 deletion packages/db-collections/src/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export interface QueryCollectionConfig<
getKey: CollectionConfig<TItem>[`getKey`]
schema?: CollectionConfig<TItem>[`schema`]
sync?: CollectionConfig<TItem>[`sync`]
startSync?: CollectionConfig<TItem>[`startSync`]

// Direct persistence handlers
/**
Expand Down Expand Up @@ -250,7 +251,11 @@ export function queryCollectionOptions<
}
})

return actualUnsubscribeFn
return async () => {
actualUnsubscribeFn()
await queryClient.cancelQueries({ queryKey })
queryClient.removeQueries({ queryKey })
}
}

/**
Expand Down
Loading