-
-
Notifications
You must be signed in to change notification settings - Fork 3.4k
feat(query-core): add custom reducer support to streamedQuery #9532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(query-core): add custom reducer support to streamedQuery #9532
Conversation
Replace maxChunks parameter with flexible reducer function that delegates data aggregation to consumer code. This provides full control over how streamed chunks are combined into the final data structure. Add support for custom placeholderData that works seamlessly with the reducer function, allowing initialization of complex data types beyond simple arrays. TanStack#9065 BREAKING CHANGE: The maxChunks parameter has been removed from streamedQuery. Use a custom reducer function to control data aggregation behavior instead.
queryFn: streamedQuery<number, Record<number, boolean>>({ | ||
queryFn: () => createAsyncNumberGenerator(2), | ||
reducer: (acc, chunk) => ({ | ||
...acc, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the first time reducer
is invoked, acc
is an empty array, because placeholderData
is not specified.
it seems a bit strange to me. but i'm not sure how to solve it.
i can apply the same logic as in native reduce
...
@@ -1,6 +1,15 @@ | |||
import { addToEnd } from './utils' | |||
import type { QueryFunction, QueryFunctionContext, QueryKey } from './types' | |||
|
|||
type StreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created this type just for clarity. should i keep it inline/move to a dedicated file or in types?
About type safety i tried to split the type in 2 like below, but i feel it is over-engineering.
type BaseStreamedQueryParams<TQueryFnData, TData> = {
queryFn: (
context: QueryFunctionContext<TQueryKey>,
) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>>
refetchMode?: 'append' | 'reset' | 'replace'
}
type SimpleStreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> = BaseStreamedQueryParams<TQueryFnData, TData> & {
reducer?: never;
placeholderData?: never;
}
type ReducibleStreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> = BaseStreamedQueryParams<TQueryFnData, TData> & {
reducer?: (acc: TData, chunk: TQueryFnData) => TData
placeholderData?: TData
}
type StreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> =
| SimpleStreamedQueryParams<TQueryFnData, TData, TQueryKey>
| ReducibleStreamedQueryParams<TQueryFnData, TData, TQueryKey>
export function streamedQuery<
TQueryFnData = unknown,
TData = Array<TQueryFnData>,
TQueryKey extends QueryKey = QueryKey,
>(params: StreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction<TData, TQueryKey> {
const reducer =
'reducer' in params && typeof params.reducer === 'function'
? params.reducer
: (items: TData, chunk: TQueryFnData) =>
addToEnd((items ?? []) as Array<TQueryFnData>, chunk) as TData;
const placeholderData =
'placeholderData' in params ? params.placeholderData : ([] as TData);
// ...
maxChunks?: number | ||
}): QueryFunction<Array<TQueryFnData>, TQueryKey> { | ||
reducer = (items, chunk) => addToEnd((items ?? []) as Array<TQueryFnData>, chunk) as TData, | ||
placeholderData = [] as TData, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i introduced this placeholderData
prop to mimic native reduce
with an initialValue.
My first approach was to use the existing placeholderData
parameter from the Observer, but even though I can access it through the query.options
object, TypeScript throws an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below you can find an implementation about how to mimic Array.prototype.reduce
behavior where the initial value is optional, and if not provided, the first element becomes the accumulator. I didn't applied it to the PR because i think it better to have your point of of view about it.
// ...existing code...
return async (context) => {
const query = context.client
.getQueryCache()
.find({ queryKey: context.queryKey, exact: true })
const isRefetch = !!query && query.state.data !== undefined
if (isRefetch && params.refetchMode === 'reset') {
query.setState({
status: 'pending',
data: undefined,
error: null,
fetchStatus: 'fetching',
})
}
const hasInitialValue = 'placeholderData' in params;
let result: TData;
let isFirstChunk = true;
const stream = await params.queryFn(context)
for await (const chunk of stream) {
if (context.signal.aborted) {
break
}
if (isFirstChunk) {
if (hasInitialValue) {
// If we have placeholderData, use it as initial accumulator
result = reducer(placeholderData, chunk);
} else {
// If no placeholderData, first chunk becomes the accumulator
result = chunk as unknown as TData;
}
isFirstChunk = false;
} else {
result = reducer(result, chunk);
}
// don't append to the cache directly when replace-refetching
if (!isRefetch || params.refetchMode !== 'replace') {
context.client.setQueryData<TData>(
context.queryKey,
(prev) => {
if (prev === undefined) {
return result;
}
return hasInitialValue ? reducer(prev, chunk) : result;
}
)
}
}
// Handle empty stream case
if (isFirstChunk) {
if (hasInitialValue) {
result = placeholderData;
} else {
throw new Error('Reduce of empty stream with no initial value');
}
}
// finalize result: replace-refetching needs to write to the cache
if (isRefetch && params.refetchMode === 'replace' && !context.signal.aborted) {
context.client.setQueryData<TData>(context.queryKey, result)
}
return context.client.getQueryData(context.queryKey)!
}
The key changes:
- Initial value logic: Only use
placeholderData
if it's explicitly provided - First chunk handling: If no
placeholderData
the first chunk becomes the initial accumulator - Empty stream error: Throw an error if the stream is empty and no
placeholderData
is provided
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what we’d want here is a mandatory initialValue
to the reducer
if you pass a custom reducer. The term placeholderData
is a bit overloaded because here, it does something different than on useQuery
itself so I wouldn’t name it like that.
It could be two separate params, like reducer
and initialValue
, but we’d have to make the types so that initialValue
is required when you pass a reducer. That’s possible but usually needs overloads or conditional types. Or, we could also just use a tuple maybe:
reducer: [initialValue: TData, (accumulator: TData, chunk: TQueryFnData) => TData]
View your CI Pipeline Execution ↗ for commit 890e373
☁️ Nx Cloud last updated this comment at |
Replace maxChunks parameter with flexible reducer function that delegates data aggregation to consumer code. This provides full control over how streamed chunks are combined into the final data structure.
Add support for custom placeholderData that works seamlessly with the reducer function, allowing initialization of complex data types beyond simple arrays.
#9065
BREAKING CHANGE: The maxChunks parameter has been removed from streamedQuery. Use a custom reducer function to control data aggregation behavior instead.