From 48808f8e41fbb50f61651fbf76a9831b2aa0c16d Mon Sep 17 00:00:00 2001 From: Daniel Jonathan Date: Sat, 25 Oct 2025 05:23:20 -0400 Subject: [PATCH 1/2] feat(streamedListObjects): cross-platform streaming with retries and telemetry - Node: axios stream path via API layer; unwraps to raw Readable - Browser: fetch + NDJSON ReadableStream parser with exponential backoff & jitter - Unified error handling across axios/fetch; maps to SDK error classes - Robust NDJSON parsing (async-iterable and event-based fallback) - New examples: streamed-list-objects and streamed-list-objects-local (FGA_API_URL) - Tests and nock stream mocks; generator templates updated to include streaming helper - CHANGELOG and PR summaries updated --- CHANGELOG.md | 8 + api.ts | 197 +++++++--- apiModel.ts | 13 + client.ts | 128 ++++++- common.ts | 103 ++++- example/streamed-list-objects-local/README.md | 21 + .../streamedListObjectsLocal.mjs | 62 +++ example/streamed-list-objects/README.md | 34 ++ example/streamed-list-objects/model.json | 251 ++++++++++++ .../streamedListObjects.mjs | 106 ++++++ index.ts | 1 + streaming.ts | 356 +++++++++++++++++ tests/client.test.ts | 35 +- tests/helpers/nocks.ts | 41 +- tests/streaming.test.ts | 360 ++++++++++++++++++ 15 files changed, 1615 insertions(+), 101 deletions(-) create mode 100644 example/streamed-list-objects-local/README.md create mode 100644 example/streamed-list-objects-local/streamedListObjectsLocal.mjs create mode 100644 example/streamed-list-objects/README.md create mode 100644 example/streamed-list-objects/model.json create mode 100644 example/streamed-list-objects/streamedListObjects.mjs create mode 100644 streaming.ts create mode 100644 tests/streaming.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bfad73..bcd81fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,14 @@ ## [Unreleased](https://github.com/openfga/js-sdk/compare/v0.9.0...HEAD) - feat: add support for handling Retry-After header (#267) +- feat: streamedListObjects (streaming ListObjects) + - Supports both Node.js (axios streaming) and browser (Fetch API) + - Enables retrieving >1000 objects beyond standard listObjects limit + - Node path uses API layer; telemetry preserved; returns raw stream + - Browser path uses retries (exponential backoff with jitter) and SDK error mapping + - Resilient NDJSON parsing (async-iterable and Buffer/string compatibility) + - Adds example for usage: `example/streamed-list-objects` + - Adds example for local usage: `example/streamed-list-objects-local` ## v0.9.0 diff --git a/api.ts b/api.ts index 4a08e83..76b1268 100644 --- a/api.ts +++ b/api.ts @@ -20,6 +20,7 @@ import { serializeDataIfNeeded, toPathString, createRequestFunction, + createStreamingRequestFunction, RequestArgs, CallResult, PromiseResult @@ -147,16 +148,16 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "POST", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "POST", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; - + localVarHeaderParameter["Content-Type"] = "application/json"; setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions); return { @@ -186,16 +187,16 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "POST", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "POST", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; - + localVarHeaderParameter["Content-Type"] = "application/json"; setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions); return { @@ -214,24 +215,24 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio // verify required parameter 'body' is not null or undefined assertParamExists("createStore", "body", body); const localVarPath = "/stores" - ; - // use dummy base URL string because the URL constructor only accepts absolute URLs. + ; + // use dummy base URL string because the URL constructor only accepts absolute URLs. const localVarUrlObj = new URL(localVarPath, DUMMY_BASE_URL); let baseOptions; if (configuration) { baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "POST", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "POST", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; - + localVarHeaderParameter["Content-Type"] = "application/json"; setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions); return { @@ -258,14 +259,14 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "DELETE", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "DELETE", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; - + setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; return { url: toPathString(localVarUrlObj), @@ -294,16 +295,16 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "POST", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "POST", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; - + localVarHeaderParameter["Content-Type"] = "application/json"; setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions); return { @@ -330,14 +331,14 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "GET", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "GET", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; - + setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; return { url: toPathString(localVarUrlObj), @@ -366,16 +367,57 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "POST", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "POST", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; - + localVarHeaderParameter["Content-Type"] = "application/json"; setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; + localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions); + + return { + url: toPathString(localVarUrlObj), + options: localVarRequestOptions, + }; + }, + /** + * The Streamed ListObjects API is very similar to the ListObjects API, with two differences: + * 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. + * 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @summary Stream all objects of the given type that the user has a relation with + * @param {string} storeId + * @param {ListObjectsRequest} body + * @param {*} [options] Override http request option. + * @throws { FgaError } + */ + streamedListObjects: (storeId: string, body: ListObjectsRequest, options: any = {}): RequestArgs => { + // verify required parameter 'storeId' is not null or undefined + assertParamExists("streamedListObjects", "storeId", storeId); + // verify required parameter 'body' is not null or undefined + assertParamExists("streamedListObjects", "body", body); + const localVarPath = "/stores/{store_id}/streamed-list-objects" + .replace(`{${"store_id"}}`, encodeURIComponent(String(storeId))); + // use dummy base URL string because the URL constructor only accepts absolute URLs. + const localVarUrlObj = new URL(localVarPath, DUMMY_BASE_URL); + let baseOptions; + if (configuration) { + baseOptions = configuration.baseOptions; + } + + const localVarRequestOptions = { method: "POST", ...baseOptions, ...options }; + const localVarHeaderParameter = {} as any; + const localVarQueryParameter = {} as any; + + + + localVarHeaderParameter["Content-Type"] = "application/json"; + + setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions); return { @@ -394,15 +436,15 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio */ listStores: (pageSize?: number, continuationToken?: string, name?: string, options: any = {}): RequestArgs => { const localVarPath = "/stores" - ; - // use dummy base URL string because the URL constructor only accepts absolute URLs. + ; + // use dummy base URL string because the URL constructor only accepts absolute URLs. const localVarUrlObj = new URL(localVarPath, DUMMY_BASE_URL); let baseOptions; if (configuration) { baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "GET", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "GET", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; @@ -419,9 +461,9 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio } - + setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; return { url: toPathString(localVarUrlObj), @@ -450,16 +492,16 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "POST", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "POST", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; - + localVarHeaderParameter["Content-Type"] = "application/json"; setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions); return { @@ -489,16 +531,16 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "POST", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "POST", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; - + localVarHeaderParameter["Content-Type"] = "application/json"; setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions); return { @@ -528,14 +570,14 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "GET", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "GET", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; - + setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; return { url: toPathString(localVarUrlObj), @@ -564,14 +606,14 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "GET", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "GET", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; - + setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; return { url: toPathString(localVarUrlObj), @@ -599,7 +641,7 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "GET", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "GET", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; @@ -612,9 +654,9 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio } - + setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; return { url: toPathString(localVarUrlObj), @@ -644,7 +686,7 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "GET", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "GET", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; @@ -667,9 +709,9 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio } - + setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; return { url: toPathString(localVarUrlObj), @@ -698,16 +740,16 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "POST", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "POST", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; - + localVarHeaderParameter["Content-Type"] = "application/json"; setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions); return { @@ -740,16 +782,16 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "PUT", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "PUT", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; - + localVarHeaderParameter["Content-Type"] = "application/json"; setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions); return { @@ -779,16 +821,16 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio baseOptions = configuration.baseOptions; } - const localVarRequestOptions = { method: "POST", ...baseOptions, ...options}; + const localVarRequestOptions = { method: "POST", ...baseOptions, ...options }; const localVarHeaderParameter = {} as any; const localVarQueryParameter = {} as any; - + localVarHeaderParameter["Content-Type"] = "application/json"; setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); - localVarRequestOptions.headers = {...localVarHeaderParameter, ...options.headers}; + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions); return { @@ -803,7 +845,7 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio * OpenFgaApi - functional programming interface * @export */ -export const OpenFgaApiFp = function(configuration: Configuration, credentials: Credentials) { +export const OpenFgaApiFp = function (configuration: Configuration, credentials: Credentials) { const localVarAxiosParamCreator = OpenFgaApiAxiosParamCreator(configuration, credentials); return { /** @@ -912,6 +954,22 @@ export const OpenFgaApiFp = function(configuration: Configuration, credentials: ...TelemetryAttributes.fromRequestBody(body) }); }, + /** + * The Streamed ListObjects API is very similar to the ListObjects API, with two differences: + * 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. + * 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @summary Stream all objects of the given type that the user has a relation with + * @param {string} storeId + * @param {ListObjectsRequest} body + * @param {*} [options] Override http request option. + * @throws { FgaError } + */ + async streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise<(axios?: AxiosInstance) => Promise> { + const localVarAxiosArgs = localVarAxiosParamCreator.streamedListObjects(storeId, body, options); + return createStreamingRequestFunction(localVarAxiosArgs, globalAxios, configuration, credentials, { + [TelemetryAttribute.FgaClientRequestMethod]: "StreamedListObjects" + }); + }, /** * Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores. * @summary List all stores @@ -1156,6 +1214,19 @@ export const OpenFgaApiFactory = function (configuration: Configuration, credent listObjects(storeId: string, body: ListObjectsRequest, options?: any): PromiseResult { return localVarFp.listObjects(storeId, body, options).then((request) => request(axios)); }, + /** + * The Streamed ListObjects API is very similar to the ListObjects API, with two differences: + * 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. + * 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @summary Stream all objects of the given type that the user has a relation with + * @param {string} storeId + * @param {ListObjectsRequest} body + * @param {*} [options] Override http request option. + * @throws { FgaError } + */ + streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise { + return localVarFp.streamedListObjects(storeId, body, options).then((request) => request(axios)); + }, /** * Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores. * @summary List all stores @@ -1370,6 +1441,20 @@ export class OpenFgaApi extends BaseAPI { return OpenFgaApiFp(this.configuration, this.credentials).listObjects(storeId, body, options).then((request) => request(this.axios)); } + /** + * The Streamed ListObjects API is very similar to the ListObjects API, with two differences: + * 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. + * 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @summary Stream all objects of the given type that the user has a relation with + * @param {string} storeId + * @param {ListObjectsRequest} body + * @param {*} [options] Override http request option. + * @throws { FgaError } + */ + public streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise { + return OpenFgaApiFp(this.configuration, this.credentials).streamedListObjects(storeId, body, options).then((request) => request(this.axios)); + } + /** * Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores. * @summary List all stores diff --git a/apiModel.ts b/apiModel.ts index 1678c09..e3ddf6b 100644 --- a/apiModel.ts +++ b/apiModel.ts @@ -860,6 +860,19 @@ export interface ListObjectsResponse { */ objects: Array; } +/** + * The response for a StreamedListObjects RPC. + * @export + * @interface StreamedListObjectsResponse + */ +export interface StreamedListObjectsResponse { + /** + * + * @type {string} + * @memberof StreamedListObjectsResponse + */ + object: string; +} /** * * @export diff --git a/client.ts b/client.ts index c5cdf28..a8ab34a 100644 --- a/client.ts +++ b/client.ts @@ -34,6 +34,7 @@ import { GetStoreResponse, ListObjectsRequest, ListObjectsResponse, + StreamedListObjectsResponse, ListStoresResponse, ListUsersRequest, ListUsersResponse, @@ -54,6 +55,7 @@ import { BaseAPI } from "./base"; import { CallResult, PromiseResult } from "./common"; import { Configuration, RetryParams, UserConfigurationParams } from "./configuration"; import { FgaApiAuthenticationError, FgaRequiredParamError, FgaValidationError } from "./errors"; +import { parseNDJSONStream, parseNDJSONReadableStream, isNodeEnvironment, attemptFetchRequest } from "./streaming"; import { chunkArray, generateRandomIdWithNonUniqueFallback, @@ -66,6 +68,9 @@ export type UserClientConfigurationParams = UserConfigurationParams & { authorizationModelId?: string; } +export const DEFAULT_MAX_STREAM_RETRY = 3; +export const DEFAULT_MIN_STREAM_WAIT_MS = 100; + export class ClientConfiguration extends Configuration { /** * provide storeId @@ -130,8 +135,8 @@ export type ClientRequestOptsWithConsistency = ClientRequestOpts & StoreIdOpts & export type PaginationOptions = { pageSize?: number, continuationToken?: string, name?: string; }; export type ClientCheckRequest = CheckRequestTupleKey & - Pick & - { contextualTuples?: Array }; + Pick & +{ contextualTuples?: Array }; export type ClientBatchCheckClientRequest = ClientCheckRequest[]; @@ -170,17 +175,17 @@ export type ClientBatchCheckRequest = { // for server batch check export interface ClientBatchCheckRequestOpts { - maxParallelRequests?: number; - maxBatchSize?: number; + maxParallelRequests?: number; + maxBatchSize?: number; } // for server batch check export type ClientBatchCheckSingleResponse = { - allowed: boolean; - request: ClientBatchCheckItem; - correlationId: string; - error?: CheckError; + allowed: boolean; + request: ClientBatchCheckItem; + correlationId: string; + error?: CheckError; } export interface ClientBatchCheckResponse { @@ -226,17 +231,17 @@ export interface ClientReadChangesRequest { } export type ClientExpandRequest = ExpandRequestTupleKey & Omit & { - contextualTuples?: Array + contextualTuples?: Array }; export type ClientReadRequest = ReadRequestTupleKey; export type ClientListObjectsRequest = Omit & { - contextualTuples?: Array + contextualTuples?: Array }; export type ClientListUsersRequest = Omit & { - contextualTuples?: Array + contextualTuples?: Array }; export type ClientListRelationsRequest = Omit & { - relations?: string[], + relations?: string[], }; export type ClientWriteAssertionsRequest = (CheckRequestTupleKey & Pick)[]; @@ -521,7 +526,7 @@ export class OpenFgaClient extends BaseAPI { const writeResponses: ClientWriteSingleResponse[][] = []; if (writes?.length) { for await (const singleChunkResponse of asyncPool(maxParallelRequests, chunkArray(writes, maxPerChunk), - (chunk) => this.writeTuples(chunk,{ ...options, headers, transaction: undefined }).catch(err => { + (chunk) => this.writeTuples(chunk, { ...options, headers, transaction: undefined }).catch(err => { if (err instanceof FgaApiAuthenticationError) { throw err; } @@ -672,7 +677,7 @@ export class OpenFgaClient extends BaseAPI { - private singleBatchCheck(body: BatchCheckRequest, options: ClientRequestOptsWithConsistency & ClientBatchCheckRequestOpts = {}): Promise { + private singleBatchCheck(body: BatchCheckRequest, options: ClientRequestOptsWithConsistency & ClientBatchCheckRequestOpts = {}): Promise { return this.api.batchCheck(this.getStoreId(options)!, body, options); } @@ -750,7 +755,7 @@ export class OpenFgaClient extends BaseAPI { // Collect the responses and associate them with their correlation IDs for await (const response of batchResponses) { - if (response) { + if (response) { for (const [correlationId, result] of Object.entries(response)) { const check = correlationIdToCheck.get(correlationId); if (check && result) { @@ -766,7 +771,7 @@ export class OpenFgaClient extends BaseAPI { } return { result: results }; - } + } /** * Expand - Expands the relationships in userset tree format (evaluates) @@ -816,6 +821,97 @@ export class OpenFgaClient extends BaseAPI { }, options); } + /** + * StreamedListObjects - Stream all objects of a particular type that the user has a certain relation to (evaluates) + * + * Note: This method supports both Node.js and browser environments. + * The response will be streamed as newline-delimited JSON objects. + * + * @param {ClientListObjectsRequest} body + * @param {ClientRequestOptsWithConsistency} [options] + * @param {string} [options.authorizationModelId] - Overrides the authorization model id in the configuration + * @param {object} [options.headers] - Custom headers to send alongside the request + * @param {ConsistencyPreference} [options.consistency] - The consistency preference to use + * @param {object} [options.retryParams] - Override the retry parameters for this request + * @param {number} [options.retryParams.maxRetry] - Override the max number of retries on each API request + * @param {number} [options.retryParams.minWaitInMs] - Override the minimum wait before a retry is initiated + * @returns {AsyncGenerator} An async generator that yields objects as they are received + */ + async *streamedListObjects(body: ClientListObjectsRequest, options: ClientRequestOptsWithConsistency = {}): AsyncGenerator { + // Detect the environment and use the appropriate streaming method + if (isNodeEnvironment()) { + // Node.js: Use the API layer which properly handles auth, retries, and telemetry + const stream = await this.api.streamedListObjects(this.getStoreId(options)!, { + authorization_model_id: this.getAuthorizationModelId(options), + user: body.user, + relation: body.relation, + type: body.type, + context: body.context, + contextual_tuples: { tuple_keys: body.contextualTuples || [] }, + consistency: options.consistency + }, options); + + // Unwrap axios CallResult to get the raw Node.js stream when needed + const source = (stream as any)?.$response?.data ?? stream; + + // Parse the Node.js stream + for await (const item of parseNDJSONStream(source as any)) { + if (item && item.result && item.result.object) { + yield { object: item.result.object } as StreamedListObjectsResponse; + } + } + } else { + // Browser: Use Fetch API streaming with retry support + const storeId = this.getStoreId(options); + const requestBody: ListObjectsRequest = { + authorization_model_id: this.getAuthorizationModelId(options), + user: body.user, + relation: body.relation, + type: body.type, + context: body.context, + contextual_tuples: { tuple_keys: body.contextualTuples || [] }, + consistency: options.consistency + }; + + // Set up authentication + const accessTokenHeader = await this.credentials.getAccessTokenHeader(); + const headers: Record = { ...options.headers }; + if (accessTokenHeader) { + headers[accessTokenHeader.name] = accessTokenHeader.value; + } + headers["Content-Type"] = "application/json"; + + const url = this.configuration.getBasePath() + `/stores/${storeId}/streamed-list-objects`; + + const retryParams = options.retryParams || this.configuration.retryParams || { maxRetry: 3, minWaitInMs: 100 }; + + const response = await attemptFetchRequest( + url, + { + method: "POST", + headers: headers, + body: JSON.stringify(requestBody), + }, + requestBody, + { + maxRetry: retryParams.maxRetry || DEFAULT_MAX_STREAM_RETRY, + minWaitInMs: retryParams.minWaitInMs || DEFAULT_MIN_STREAM_WAIT_MS + } + ); + + if (!response.body) { + throw new FgaValidationError("response.body", "Response body is not readable"); + } + + // Parse the browser ReadableStream + for await (const item of parseNDJSONReadableStream(response.body)) { + if (item && item.result && item.result.object) { + yield { object: item.result.object } as StreamedListObjectsResponse; + } + } + } + } + /** * ListRelations - List all the relations a user has with an object (evaluates) * @param {object} listRelationsRequest diff --git a/common.ts b/common.ts index 4bdd79e..805af6f 100644 --- a/common.ts +++ b/common.ts @@ -46,8 +46,8 @@ const MAX_EXPONENTIAL_BACKOFF_MS = 120_000; // 120 seconds * @interface RequestArgs */ export interface RequestArgs { - url: string; - options: any; + url: string; + options: any; } @@ -141,11 +141,15 @@ export type PromiseResult = Promise>; function isAxiosError(err: any): boolean { return err && typeof err === "object" && err.isAxiosError === true; } -function calculateExponentialBackoffWithJitter(retryAttempt: number, minWaitInMs: number): number { - const minDelayMs = Math.ceil(2 ** retryAttempt * minWaitInMs); - const maxDelayMs = Math.ceil(2 ** (retryAttempt + 1) * minWaitInMs); - const randomDelayMs = Math.floor(Math.random() * (maxDelayMs - minDelayMs) + minDelayMs); - return Math.min(randomDelayMs, MAX_EXPONENTIAL_BACKOFF_MS); + +function randomTime(loopCount: number, minWaitInMs: number): number { + const min = Math.ceil(2 ** loopCount * minWaitInMs); + const max = Math.ceil(2 ** (loopCount + 1) * minWaitInMs); + return Math.floor(Math.random() * (max - min) + min); //The maximum is exclusive and the minimum is inclusive +} + +export function calculateExponentialBackoffWithJitter(retryAttempt: number, minWaitInMs: number): number { + return Math.min(randomTime(retryAttempt, minWaitInMs), MAX_EXPONENTIAL_BACKOFF_MS); } /** @@ -273,7 +277,7 @@ export async function attemptHttpRequest( let retryDelayMs: number | undefined; if ((status && - (status === 429 || (status >= 500 && status !== 501))) && + (status === 429 || (status >= 500 && status !== 501))) && err.response?.headers) { retryDelayMs = parseRetryAfterHeader(err.response.headers); } @@ -286,6 +290,77 @@ export async function attemptHttpRequest( } while (iterationCount < config.maxRetry + 1); } +/** + * creates an axios streaming request function that returns the raw response stream + * for incremental parsing (used by streamedListObjects) + */ +export const createStreamingRequestFunction = function (axiosArgs: RequestArgs, axiosInstance: AxiosInstance, configuration: Configuration, credentials: Credentials, methodAttributes: Record = {}) { + configuration.isValid(); + + const retryParams = axiosArgs.options?.retryParams ? axiosArgs.options?.retryParams : configuration.retryParams; + const maxRetry: number = retryParams ? retryParams.maxRetry : 0; + const minWaitInMs: number = retryParams ? retryParams.minWaitInMs : 0; + + const start = performance.now(); + + return async (axios: AxiosInstance = axiosInstance): Promise => { + await setBearerAuthToObject(axiosArgs.options.headers, credentials!); + + const url = configuration.getBasePath() + axiosArgs.url; + + const axiosRequestArgs = { ...axiosArgs.options, responseType: "stream", url: url }; + const wrappedResponse = await attemptHttpRequest(axiosRequestArgs, { + maxRetry, + minWaitInMs, + }, axios); + const response = wrappedResponse?.response; + + const result: any = response?.data; // raw stream + + let attributes: StringIndexable = {}; + + attributes = TelemetryAttributes.fromRequest({ + userAgent: configuration.baseOptions?.headers["User-Agent"], + httpMethod: axiosArgs.options?.method, + url, + resendCount: wrappedResponse?.retries, + start: start, + credentials: credentials, + attributes: methodAttributes, + }); + + attributes = TelemetryAttributes.fromResponse({ + response, + attributes, + }); + + const serverRequestDuration = attributes[TelemetryAttribute.HttpServerRequestDuration]; + if (configuration.telemetry?.metrics?.histogramQueryDuration && typeof serverRequestDuration !== "undefined") { + configuration.telemetry.recorder.histogram( + TelemetryHistograms.queryDuration, + parseInt(attributes[TelemetryAttribute.HttpServerRequestDuration] as string, 10), + TelemetryAttributes.prepare( + attributes, + configuration.telemetry.metrics.histogramQueryDuration.attributes + ) + ); + } + + if (configuration.telemetry?.metrics?.histogramRequestDuration) { + configuration.telemetry.recorder.histogram( + TelemetryHistograms.requestDuration, + attributes[TelemetryAttribute.HttpClientRequestDuration], + TelemetryAttributes.prepare( + attributes, + configuration.telemetry.metrics.histogramRequestDuration.attributes + ) + ); + } + + return result; + }; +}; + /** * creates an axios request function */ @@ -293,24 +368,26 @@ export const createRequestFunction = function (axiosArgs: RequestArgs, axiosInst configuration.isValid(); const retryParams = axiosArgs.options?.retryParams ? axiosArgs.options?.retryParams : configuration.retryParams; - const maxRetry:number = retryParams ? retryParams.maxRetry : 0; - const minWaitInMs:number = retryParams ? retryParams.minWaitInMs : 0; + const maxRetry: number = retryParams ? retryParams.maxRetry : 0; + const minWaitInMs: number = retryParams ? retryParams.minWaitInMs : 0; const start = performance.now(); - return async (axios: AxiosInstance = axiosInstance) : PromiseResult => { + return async (axios: AxiosInstance = axiosInstance): PromiseResult => { await setBearerAuthToObject(axiosArgs.options.headers, credentials!); const url = configuration.getBasePath() + axiosArgs.url; - const axiosRequestArgs = {...axiosArgs.options, url: url}; + const axiosRequestArgs = { ...axiosArgs.options, url: url }; const wrappedResponse = await attemptHttpRequest(axiosRequestArgs, { maxRetry, minWaitInMs, }, axios); const response = wrappedResponse?.response; + + // Regular requests: spread the data into a result object const data = typeof response?.data === "undefined" ? {} : response?.data; - const result: CallResult = { ...data }; + const result: any = { ...data }; setNotEnumerableProperty(result, "$response", response); let attributes: StringIndexable = {}; diff --git a/example/streamed-list-objects-local/README.md b/example/streamed-list-objects-local/README.md new file mode 100644 index 0000000..7879816 --- /dev/null +++ b/example/streamed-list-objects-local/README.md @@ -0,0 +1,21 @@ +# Streamed List Objects (Local) + +This example demonstrates using the js-sdk `streamedListObjects` API against a locally running OpenFGA server that you manage yourself. + +Prerequisites: +- Node.js 18+ +- An OpenFGA server reachable at `FGA_API_URL` (defaults to `http://localhost:8080`) + +Run: +1. From repo root, build the SDK once: + - `cd js-sdk && npm run build` +2. Set the API URL (optional) and run the example: + - `cd js-sdk/example/streamed-list-objects-local` + - `FGA_API_URL=http://localhost:8080 node run.mjs` + +What it does: +- Creates a temporary store +- Writes a schema 1.1 model with an assignable relation +- Inserts 3 tuples +- Streams them via `streamedListObjects` +- Cleans up the store diff --git a/example/streamed-list-objects-local/streamedListObjectsLocal.mjs b/example/streamed-list-objects-local/streamedListObjectsLocal.mjs new file mode 100644 index 0000000..abc6fb2 --- /dev/null +++ b/example/streamed-list-objects-local/streamedListObjectsLocal.mjs @@ -0,0 +1,62 @@ +import { ClientConfiguration, OpenFgaClient, ConsistencyPreference } from "../../dist/index.js"; + +const apiUrl = process.env.FGA_API_URL || "http://localhost:8080"; + +async function main() { + const client = new OpenFgaClient(new ClientConfiguration({ apiUrl })); + + console.log("Creating temporary store"); + const { id: storeId } = await client.createStore({ name: "streamed-list-objects-local" }); + + const clientWithStore = new OpenFgaClient(new ClientConfiguration({ apiUrl, storeId })); + + const model = { + schema_version: "1.1", + type_definitions: [ + { type: "user" }, + { + type: "document", + relations: { can_read: { this: {} } }, + metadata: { + relations: { + can_read: { + directly_related_user_types: [{ type: "user" }] + } + } + } + } + ] + }; + + console.log("Writing authorization model"); + const { authorization_model_id: authorizationModelId } = await clientWithStore.writeAuthorizationModel(model); + + const fga = new OpenFgaClient(new ClientConfiguration({ apiUrl, storeId, authorizationModelId })); + + console.log("Writing tuples"); + await fga.write({ + writes: [ + { user: "user:anne", relation: "can_read", object: "document:1" }, + { user: "user:anne", relation: "can_read", object: "document:2" }, + { user: "user:anne", relation: "can_read", object: "document:3" } + ] + }); + + console.log("Streaming objects..."); + let count = 0; + for await (const _ of fga.streamedListObjects( + { user: "user:anne", relation: "can_read", type: "document" }, + { consistency: ConsistencyPreference.HigherConsistency } + )) { + count++; + } + console.log(`\u2713 Streamed count: ${count}`); + + console.log("Cleaning up..."); + await fga.deleteStore(); + console.log("Done"); +} + +main().catch(_err => { + process.exit(1); +}); diff --git a/example/streamed-list-objects/README.md b/example/streamed-list-objects/README.md new file mode 100644 index 0000000..4c5df6d --- /dev/null +++ b/example/streamed-list-objects/README.md @@ -0,0 +1,34 @@ +# Streamed List Objects Example + +This example demonstrates working with [OpenFGA's `/streamed-list-objects` endpoint](https://openfga.dev/api/service#/Relationship%20Queries/StreamedListObjects) using the JavaScript SDK's `streamedListObjects()` method. + +## Prerequisites + +- Node.js 16.15.0+ +- OpenFGA running on `localhost:8080` + +You can start OpenFGA with Docker by running the following command: + +```bash +docker pull openfga/openfga && docker run -it --rm -p 8080:8080 openfga/openfga run +``` + +## Running the example + +No additional setup is required to run this example. Simply run the following command: + +```bash +node streamedListObjects.mjs +``` + +## About this example + +This example: +1. Creates a temporary store +2. Writes an authorization model (defined in `model.json`) +3. Writes 2000 relationship tuples to the store +4. Calls both `/streamed-list-objects` (streaming) and `/list-objects` (standard) endpoints +5. Compares the results + +The streaming endpoint allows retrieving more than 1000 objects (the default limit for the standard endpoint), and streams results as they are found rather than waiting for all results to be collected. + diff --git a/example/streamed-list-objects/model.json b/example/streamed-list-objects/model.json new file mode 100644 index 0000000..3037fc3 --- /dev/null +++ b/example/streamed-list-objects/model.json @@ -0,0 +1,251 @@ +{ + "schema_version": "1.1", + "type_definitions": [ + { + "type": "user", + "relations": {} + }, + { + "type": "group", + "relations": { + "member": { + "this": {} + } + }, + "metadata": { + "relations": { + "member": { + "directly_related_user_types": [ + { + "type": "user" + } + ] + } + } + } + }, + { + "type": "folder", + "relations": { + "can_create_file": { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + "owner": { + "this": {} + }, + "parent": { + "this": {} + }, + "viewer": { + "union": { + "child": [ + { + "this": {} + }, + { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + { + "tupleToUserset": { + "tupleset": { + "object": "", + "relation": "parent" + }, + "computedUserset": { + "object": "", + "relation": "viewer" + } + } + } + ] + } + } + }, + "metadata": { + "relations": { + "can_create_file": { + "directly_related_user_types": [] + }, + "owner": { + "directly_related_user_types": [ + { + "type": "user" + } + ] + }, + "parent": { + "directly_related_user_types": [ + { + "type": "folder" + } + ] + }, + "viewer": { + "directly_related_user_types": [ + { + "type": "user" + }, + { + "type": "user", + "wildcard": {} + }, + { + "type": "group", + "relation": "member" + } + ] + } + } + } + }, + { + "type": "document", + "relations": { + "can_change_owner": { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + "owner": { + "this": {} + }, + "parent": { + "this": {} + }, + "can_read": { + "union": { + "child": [ + { + "computedUserset": { + "object": "", + "relation": "viewer" + } + }, + { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + { + "tupleToUserset": { + "tupleset": { + "object": "", + "relation": "parent" + }, + "computedUserset": { + "object": "", + "relation": "viewer" + } + } + } + ] + } + }, + "can_share": { + "union": { + "child": [ + { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + { + "tupleToUserset": { + "tupleset": { + "object": "", + "relation": "parent" + }, + "computedUserset": { + "object": "", + "relation": "owner" + } + } + } + ] + } + }, + "viewer": { + "this": {} + }, + "can_write": { + "union": { + "child": [ + { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + { + "tupleToUserset": { + "tupleset": { + "object": "", + "relation": "parent" + }, + "computedUserset": { + "object": "", + "relation": "owner" + } + } + } + ] + } + } + }, + "metadata": { + "relations": { + "can_change_owner": { + "directly_related_user_types": [] + }, + "owner": { + "directly_related_user_types": [ + { + "type": "user" + } + ] + }, + "parent": { + "directly_related_user_types": [ + { + "type": "folder" + } + ] + }, + "can_read": { + "directly_related_user_types": [] + }, + "can_share": { + "directly_related_user_types": [] + }, + "viewer": { + "directly_related_user_types": [ + { + "type": "user" + }, + { + "type": "user", + "wildcard": {} + }, + { + "type": "group", + "relation": "member" + } + ] + }, + "can_write": { + "directly_related_user_types": [] + } + } + } + } + ] +} \ No newline at end of file diff --git a/example/streamed-list-objects/streamedListObjects.mjs b/example/streamed-list-objects/streamedListObjects.mjs new file mode 100644 index 0000000..87145f6 --- /dev/null +++ b/example/streamed-list-objects/streamedListObjects.mjs @@ -0,0 +1,106 @@ +import { OpenFgaClient } from "../../dist/index.js"; +import { readFileSync } from "fs"; + +async function createStore(fgaClient) { + console.log("Creating temporary store"); + const store = await fgaClient.createStore({ name: "Streamed List Objects Demo Store" }); + console.log(`Created store: ${store.id}\n`); + return store.id; +} + +async function writeModel(fgaClient) { + console.log("Writing authorization model"); + const model = JSON.parse(readFileSync("./model.json", "utf8")); + const response = await fgaClient.writeAuthorizationModel(model); + console.log(`Created authorization model: ${response.authorization_model_id}\n`); + return response.authorization_model_id; +} + +async function writeTuples(fgaClient, quantity) { + console.log(`Writing ${quantity} tuples to the store`); + const chunks = Math.floor(quantity / 100); + + for (let chunk = 0; chunk < chunks; ++chunk) { + const tuples = []; + for (let t = 0; t < 100; ++t) { + tuples.push({ + user: "user:anne", + relation: "owner", + object: `document:${chunk * 100 + t}` + }); + } + await fgaClient.writeTuples(tuples); + } + console.log(`Done writing ${quantity} tuples\n`); + return quantity; +} + +async function streamedListObjects(fgaClient, request) { + console.log("Calling /streamed-list-objects endpoint..."); + const results = []; + + // Note: streamedListObjects() is an async generator + // We can process results as they arrive, or collect them all like this + for await (const response of fgaClient.streamedListObjects(request)) { + results.push(response.object); + } + + return results; +} + +async function listObjects(fgaClient, request) { + console.log("Calling /list-objects endpoint for comparison..."); + const response = await fgaClient.listObjects(request); + return response.objects; +} + +async function main() { + const fgaClient = new OpenFgaClient({ + apiUrl: process.env.FGA_API_URL || "http://localhost:8080" + }); + + try { + // Create temporary store + const storeId = await createStore(fgaClient); + fgaClient.storeId = storeId; + + // Write authorization model + const modelId = await writeModel(fgaClient); + fgaClient.authorizationModelId = modelId; + + // Write test data + await writeTuples(fgaClient, 2000); + + // Prepare request to list all documents owned by user:anne + const request = { + type: "document", + relation: "owner", + user: "user:anne" + }; + + // Test streaming endpoint + const streamedResults = await streamedListObjects(fgaClient, request); + console.log(`✓ Streamed endpoint returned ${streamedResults.length} objects\n`); + + // Test standard endpoint for comparison + const standardResults = await listObjects(fgaClient, request); + console.log(`✓ Standard endpoint returned ${standardResults.length} objects\n`); + + console.log("Comparison:"); + console.log(` Streaming: ${streamedResults.length} objects`); + console.log(` Standard: ${standardResults.length} objects (max 1000)`); + console.log(`\nStreaming endpoint retrieved all ${streamedResults.length} objects successfully!`); + + // Cleanup + console.log("\nCleaning up..."); + await fgaClient.deleteStore(); + console.log("Deleted temporary store"); + + } catch (error) { + console.error("Error:", error); + process.exitCode = 1; + } +} + +main(); + diff --git a/index.ts b/index.ts index 291e294..6625ecb 100644 --- a/index.ts +++ b/index.ts @@ -24,5 +24,6 @@ export * from "./telemetry/counters"; export * from "./telemetry/histograms"; export * from "./telemetry/metrics"; export * from "./errors"; +export { parseNDJSONStream, parseNDJSONReadableStream, isNodeEnvironment } from "./streaming"; diff --git a/streaming.ts b/streaming.ts new file mode 100644 index 0000000..c520208 --- /dev/null +++ b/streaming.ts @@ -0,0 +1,356 @@ +/** + * JavaScript and Node.js SDK for OpenFGA + * + * API version: 1.x + * Website: https://openfga.dev + * Documentation: https://openfga.dev/docs + * Support: https://openfga.dev/community + * License: [Apache-2.0](https://github.com/openfga/js-sdk/blob/main/LICENSE) + * + * NOTE: This file was auto generated by OpenAPI Generator (https://openapi-generator.tech). DO NOT EDIT. + */ + + +import type { Readable } from "node:stream"; +import { + FgaError, + FgaApiError, + FgaApiValidationError, + FgaApiAuthenticationError, + FgaApiNotFoundError, + FgaApiRateLimitExceededError, + FgaApiInternalError +} from "./errors"; +import { calculateExponentialBackoffWithJitter } from "./common"; + +// Helper: create async iterable from classic EventEmitter-style Readable streams +const createAsyncIterableFromReadable = (readable: any): AsyncIterable => { + return { + [Symbol.asyncIterator](): AsyncIterator { + const chunkQueue: any[] = []; + const pendingResolvers: Array<(value: IteratorResult) => void> = []; + let ended = false; + let error: any = null; + + const onData = (chunk: any) => { + if (pendingResolvers.length > 0) { + const resolve = pendingResolvers.shift()!; + resolve({ value: chunk, done: false }); + } else { + chunkQueue.push(chunk); + } + }; + + const onEnd = () => { + ended = true; + while (pendingResolvers.length > 0) { + const resolve = pendingResolvers.shift()!; + resolve({ value: undefined, done: true }); + } + }; + + const onError = (err: any) => { + error = err; + while (pendingResolvers.length > 0) { + const resolve = pendingResolvers.shift()!; + // Rejecting inside async iterator isn't straightforward; surface as end and throw later + resolve({ value: undefined, done: true }); + } + }; + + readable.on("data", onData); + readable.once("end", onEnd); + readable.once("error", onError); + + const cleanup = () => { + readable.off("data", onData); + readable.off("end", onEnd); + readable.off("error", onError); + }; + + return { + next(): Promise> { + if (error) { + cleanup(); + return Promise.reject(error); + } + if (chunkQueue.length > 0) { + const value = chunkQueue.shift(); + return Promise.resolve({ value, done: false }); + } + if (ended) { + cleanup(); + return Promise.resolve({ value: undefined, done: true }); + } + return new Promise>((resolve) => { + pendingResolvers.push(resolve); + }); + }, + return(): Promise> { + cleanup(); + return Promise.resolve({ value: undefined, done: true }); + }, + throw(e?: any): Promise> { + cleanup(); + return Promise.reject(e); + } + }; + } + }; +}; + +/** + * Parse newline-delimited JSON (NDJSON) from a Node.js readable stream + * @param stream - Node.js readable stream + * @returns AsyncGenerator that yields parsed JSON objects + */ +export async function* parseNDJSONStream(stream: Readable): AsyncGenerator { + const decoder = new TextDecoder("utf-8"); + let buffer = ""; + + // If stream is actually a string or Buffer-like, handle as whole payload + const isString = typeof (stream as unknown) === "string"; + const isBuffer = typeof Buffer !== "undefined" && Buffer.isBuffer && Buffer.isBuffer(stream as unknown); + if (isString || isBuffer) { + const text = isString ? (stream as unknown as string) : new TextDecoder("utf-8").decode(new Uint8Array(stream as unknown as ArrayBufferLike)); + const lines = text.split("\n"); + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + yield JSON.parse(trimmed); + } catch (err) { + console.warn("Failed to parse JSON line:", err); + } + } + return; + } + + const isAsyncIterable = stream && typeof (stream as any)[Symbol.asyncIterator] === "function"; + const source: AsyncIterable = isAsyncIterable ? (stream as any) : createAsyncIterableFromReadable(stream as any); + + for await (const chunk of source) { + // Node.js streams can return Buffer or string chunks + // Convert to Uint8Array if needed for TextDecoder + const uint8Chunk = typeof chunk === "string" + ? new TextEncoder().encode(chunk) + : chunk instanceof Buffer + ? new Uint8Array(chunk) + : chunk; + + // Append decoded chunk to buffer + buffer += decoder.decode(uint8Chunk, { stream: true }); + + // Split on newlines + const lines = buffer.split("\n"); + + // Keep the last (potentially incomplete) line in the buffer + buffer = lines.pop() || ""; + + // Parse and yield complete lines + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed) { + try { + yield JSON.parse(trimmed); + } catch (err) { + console.warn("Failed to parse JSON line:", err); + } + } + } + } + + // Flush any remaining decoder state + buffer += decoder.decode(); + + // Handle any remaining data in buffer + if (buffer.trim()) { + try { + yield JSON.parse(buffer); + } catch (err) { + console.warn("Failed to parse final JSON buffer:", err); + } + } +} + +/** + * Parse newline-delimited JSON (NDJSON) from a browser ReadableStream + * @param stream - Browser ReadableStream (from Fetch API) + * @returns AsyncGenerator that yields parsed JSON objects + */ +export async function* parseNDJSONReadableStream(stream: ReadableStream): AsyncGenerator { + const reader = stream.getReader(); + const decoder = new TextDecoder("utf-8"); + let buffer = ""; + + try { + while (true) { + const { done, value } = await reader.read(); + + if (done) { + break; + } + + // Decode chunk and append to buffer + buffer += decoder.decode(value, { stream: true }); + + // Split on newlines + const lines = buffer.split("\n"); + + // Keep the last (potentially incomplete) line in the buffer + buffer = lines.pop() || ""; + + // Parse and yield complete lines + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed) { + try { + yield JSON.parse(trimmed); + } catch (err) { + // Skip invalid JSON lines + console.warn("Failed to parse JSON line:", err); + } + } + } + } + + // Handle any remaining data in buffer + if (buffer.trim()) { + try { + yield JSON.parse(buffer); + } catch (err) { + console.warn("Failed to parse final JSON buffer:", err); + } + } + } finally { + reader.releaseLock(); + } +} + +/** + * Detect if we're running in a Node.js environment + * @returns true if Node.js, false if browser + */ +export function isNodeEnvironment(): boolean { + return typeof process === "object" && process.versions?.node !== undefined; +} + +/** + * Create an SDK error from a fetch Response + * + * Note: This function adapts a Fetch API Response to the SDK's AxiosError-based error classes. + * The SDK error classes expect an AxiosError structure, so we create a compatible object + * that contains the same fields (response.status, response.data, config.url, etc.). + * This allows browser fetch requests to use the same error types as axios requests. + * + * @param response - Fetch API Response object + * @param url - Request URL + * @param requestBody - Request body + * @returns Appropriate SDK error based on HTTP status code + */ +export async function createFetchError(response: Response, url: string, requestBody: any): Promise { + const status = response.status; + let errorData: any; + + try { + errorData = await response.json(); + } catch { + errorData = { message: response.statusText }; + } + + // Adapt Fetch Response to AxiosError structure for SDK error classes + // The error classes extract status, data, headers, etc. from this structure + const axiosErrorAdapter = { + response: { + status, + statusText: response.statusText, + data: errorData, + headers: Object.fromEntries(response.headers.entries()) + }, + config: { + url, + method: "POST" as const, + data: JSON.stringify(requestBody) + }, + request: { + path: new URL(url).pathname + }, + isAxiosError: true + } as any; // Type assertion needed as error constructors expect AxiosError + + // Map HTTP status codes to appropriate SDK error types + if (status === 400 || status === 422) { + return new FgaApiValidationError(axiosErrorAdapter); + } else if (status === 401 || status === 403) { + return new FgaApiAuthenticationError(axiosErrorAdapter); + } else if (status === 404) { + return new FgaApiNotFoundError(axiosErrorAdapter); + } else if (status === 429) { + return new FgaApiRateLimitExceededError(axiosErrorAdapter); + } else if (status >= 500 && status !== 501) { + return new FgaApiInternalError(axiosErrorAdapter); + } else { + return new FgaApiError(axiosErrorAdapter); + } +} + +/** + * Attempt a fetch request with retry logic + * @param url - Request URL + * @param options - Fetch options + * @param requestBody - Request body + * @param retryConfig - Retry configuration + * @returns Response object + */ +export async function attemptFetchRequest( + url: string, + options: RequestInit, + requestBody: any, + retryConfig: { maxRetry: number; minWaitInMs: number } +): Promise { + let iterationCount = 0; + const { maxRetry, minWaitInMs } = retryConfig; + + do { + ++iterationCount; + + try { + const response = await fetch(url, options); + + if (response.ok) { + return response; + } + + const status = response.status; + const isRetryable = status === 429 || (status >= 500 && status !== 501); + + if (isRetryable && iterationCount <= maxRetry) { + // Use SDK's standard exponential backoff with jitter + const delayMs = calculateExponentialBackoffWithJitter(iterationCount, minWaitInMs); + await new Promise(r => setTimeout(r, delayMs)); + continue; + } + + // Not retryable or out of retries + throw await createFetchError(response, url, requestBody); + } catch (err) { + // If it's already an SDK error, re-throw + if ((err as any)?.name?.startsWith("FgaApi") || (err as any)?.name === "FgaError") { + throw err; + } + + // Network error + if (iterationCount > maxRetry) { + throw new FgaError(err); + } + + // Retry on network error using SDK's standard backoff + const delayMs = calculateExponentialBackoffWithJitter(iterationCount, minWaitInMs); + await new Promise(r => setTimeout(r, delayMs)); + } + } while (iterationCount <= maxRetry); + + // Should never reach here, but TypeScript needs it + throw new FgaError("Maximum retries exceeded"); +} + diff --git a/tests/client.test.ts b/tests/client.test.ts index 39c09c2..fc8972f 100644 --- a/tests/client.test.ts +++ b/tests/client.test.ts @@ -299,7 +299,7 @@ describe("OpenFGA Client", () => { const continuationToken = "eyJwayI6IkxBVEVTVF9OU0NPTkZJR19hdXRoMHN0b3JlIiwic2siOiIxem1qbXF3MWZLZExTcUoyN01MdTdqTjh0cWgifQ=="; const startTime = "2022-01-01T00:00:00Z"; - const scope = nocks.readChanges(baseConfig.storeId!, "", pageSize, continuationToken,""); + const scope = nocks.readChanges(baseConfig.storeId!, "", pageSize, continuationToken, ""); expect(scope.isDone()).toBe(false); const response = await fgaClient.readChanges(undefined, { pageSize, continuationToken }); @@ -319,7 +319,7 @@ describe("OpenFGA Client", () => { const scope = nocks.read(baseConfig.storeId!, tuple, undefined, ConsistencyPreference.HigherConsistency); expect(scope.isDone()).toBe(false); - const data = await fgaClient.read(tuple, { consistency: ConsistencyPreference.HigherConsistency}); + const data = await fgaClient.read(tuple, { consistency: ConsistencyPreference.HigherConsistency }); expect(scope.isDone()).toBe(true); expect(data).toMatchObject({}); @@ -772,7 +772,7 @@ describe("OpenFGA Client", () => { } finally { expect(scope.isDone()).toBe(true); } - }); + }); }); describe("Expand", () => { @@ -823,6 +823,33 @@ describe("OpenFGA Client", () => { }); }); + describe("StreamedListObjects", () => { + it("should stream objects and return them via async generator", async () => { + const mockObjects = [ + "document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a", + "document:0192ab2a-d83f-756d-9397-c5ed9f3cb69b", + "document:0192ab2a-d83f-756d-9397-c5ed9f3cb69c" + ]; + const scope = nocks.streamedListObjects(baseConfig.storeId!, mockObjects); + + expect(scope.isDone()).toBe(false); + const results: string[] = []; + for await (const response of fgaClient.streamedListObjects({ + user: "user:81684243-9356-4421-8fbf-a4f8d36aa31b", + relation: "can_read", + type: "document", + }, { + authorizationModelId: "01GAHCE4YVKPQEKZQHT2R89MQV", + })) { + results.push(response.object); + } + + expect(scope.isDone()).toBe(true); + expect(results).toHaveLength(mockObjects.length); + expect(results).toEqual(expect.arrayContaining(mockObjects)); + }); + }); + describe("ListRelations", () => { it("should properly pass the request and return an allowed API response", async () => { const tuples = [{ @@ -961,7 +988,7 @@ describe("OpenFGA Client", () => { object: "workspace:1", }]; - const scope0 = nocks.check(baseConfig.storeId!, tuples[0], defaultConfiguration.getBasePath(), {} as any,401); + const scope0 = nocks.check(baseConfig.storeId!, tuples[0], defaultConfiguration.getBasePath(), {} as any, 401); const scope1 = nock(defaultConfiguration.getBasePath()) .get(`/stores/${defaultConfiguration.storeId!}/authorization-models`) .query({ page_size: 1 }) diff --git a/tests/helpers/nocks.ts b/tests/helpers/nocks.ts index f865c90..641cc64 100644 --- a/tests/helpers/nocks.ts +++ b/tests/helpers/nocks.ts @@ -12,6 +12,7 @@ import type * as Nock from "nock"; +import { Readable } from "node:stream"; import { AuthorizationModel, @@ -48,7 +49,7 @@ export const getNocks = ((nock: typeof Nock) => ({ statusCode = 200, headers = {}, ) => { - return nock(`https://${apiTokenIssuer}`, { reqheaders: { "Content-Type": "application/x-www-form-urlencoded"} }) + return nock(`https://${apiTokenIssuer}`, { reqheaders: { "Content-Type": "application/x-www-form-urlencoded" } }) .post("/oauth/token") .reply(statusCode, { access_token: accessToken, @@ -155,8 +156,8 @@ export const getNocks = ((nock: typeof Nock) => ({ .query({ page_size: pageSize, continuation_token: contToken, - ...(type ? { type } : { }), - ...(startTime ? {start_time: startTime } :{}) + ...(type ? { type } : {}), + ...(startTime ? { start_time: startTime } : {}) }) .reply(200, { changes: [{ @@ -175,10 +176,10 @@ export const getNocks = ((nock: typeof Nock) => ({ storeId: string, tuple: TupleKey, basePath = defaultConfiguration.getBasePath(), - consistency: ConsistencyPreference|undefined = undefined, + consistency: ConsistencyPreference | undefined = undefined, ) => { return nock(basePath) - .post(`/stores/${storeId}/read`, (body: ReadRequest) => + .post(`/stores/${storeId}/read`, (body: ReadRequest) => body.consistency === consistency ) .reply(200, { tuples: [], continuation_token: "" } as ReadResponse); @@ -208,7 +209,7 @@ export const getNocks = ((nock: typeof Nock) => ({ basePath = defaultConfiguration.getBasePath(), response: { allowed: boolean } | { code: string, message: string } = { allowed: true }, statusCode = 200, - consistency: ConsistencyPreference|undefined = undefined, + consistency: ConsistencyPreference | undefined = undefined, ) => { return nock(basePath) .post(`/stores/${storeId}/check`, (body: CheckRequest) => @@ -223,7 +224,7 @@ export const getNocks = ((nock: typeof Nock) => ({ storeId: string, responseBody: BatchCheckResponse, basePath = defaultConfiguration.getBasePath(), - consistency: ConsistencyPreference|undefined | undefined, + consistency: ConsistencyPreference | undefined | undefined, authorizationModelId = "auth-model-id", ) => { return nock(basePath) @@ -237,10 +238,10 @@ export const getNocks = ((nock: typeof Nock) => ({ storeId: string, tuple: TupleKey, basePath = defaultConfiguration.getBasePath(), - consistency: ConsistencyPreference|undefined = undefined, + consistency: ConsistencyPreference | undefined = undefined, ) => { return nock(basePath) - .post(`/stores/${storeId}/expand`, (body: ExpandRequest) => + .post(`/stores/${storeId}/expand`, (body: ExpandRequest) => body.consistency === consistency ) .reply(200, { tree: {} } as ExpandResponse); @@ -249,7 +250,7 @@ export const getNocks = ((nock: typeof Nock) => ({ storeId: string, responseBody: ListObjectsResponse, basePath = defaultConfiguration.getBasePath(), - consistency: ConsistencyPreference|undefined = undefined, + consistency: ConsistencyPreference | undefined = undefined, ) => { return nock(basePath) .post(`/stores/${storeId}/list-objects`, (body: ListUsersRequest) => @@ -257,14 +258,30 @@ export const getNocks = ((nock: typeof Nock) => ({ ) .reply(200, responseBody); }, + streamedListObjects: ( + storeId: string, + objects: string[], + basePath = defaultConfiguration.getBasePath(), + ) => { + // Create NDJSON response (newline-delimited JSON) as a stream + const ndjsonResponse = objects + .map(obj => JSON.stringify({ result: { object: obj } })) + .join("\n") + "\n"; + + return nock(basePath) + .post(`/stores/${storeId}/streamed-list-objects`) + .reply(200, () => Readable.from([ndjsonResponse]), { + "Content-Type": "application/x-ndjson" + }); + }, listUsers: ( storeId: string, responseBody: ListUsersResponse, basePath = defaultConfiguration.getBasePath(), - consistency: ConsistencyPreference|undefined = undefined + consistency: ConsistencyPreference | undefined = undefined ) => { return nock(basePath) - .post(`/stores/${storeId}/list-users`, (body: ListUsersRequest) => + .post(`/stores/${storeId}/list-users`, (body: ListUsersRequest) => body.consistency === consistency ) .reply(200, responseBody); diff --git a/tests/streaming.test.ts b/tests/streaming.test.ts new file mode 100644 index 0000000..bb5264d --- /dev/null +++ b/tests/streaming.test.ts @@ -0,0 +1,360 @@ +/** + * JavaScript and Node.js SDK for OpenFGA + * + * API version: 1.x + * Website: https://openfga.dev + * Documentation: https://openfga.dev/docs + * Support: https://openfga.dev/community + * License: [Apache-2.0](https://github.com/openfga/js-sdk/blob/main/LICENSE) + * + * NOTE: This file was auto generated by OpenAPI Generator (https://openapi-generator.tech). DO NOT EDIT. + */ + + +import { Readable } from "node:stream"; +import { parseNDJSONStream, parseNDJSONReadableStream, isNodeEnvironment, createFetchError, attemptFetchRequest } from "../streaming"; + +// Helper to create a mock ReadableStream (used by multiple test suites) +function createMockReadableStream(data: string[]): ReadableStream { + const encoder = new TextEncoder(); + let index = 0; + + return new ReadableStream({ + async pull(controller) { + if (index < data.length) { + controller.enqueue(encoder.encode(data[index])); + index++; + } else { + controller.close(); + } + } + }); +} + +describe("Streaming Utilities", () => { + describe("isNodeEnvironment", () => { + it("should return true in Node.js environment", () => { + expect(isNodeEnvironment()).toBe(true); + }); + }); + + describe("parseNDJSONStream (Node.js)", () => { + it("should parse single line NDJSON", async () => { + const ndjson = '{"result":{"object":"document:1"}}\n'; + const stream = Readable.from([ndjson]); + + const results: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(1); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + }); + + it("should parse multiple line NDJSON", async () => { + const ndjson = '{"result":{"object":"document:1"}}\n{"result":{"object":"document:2"}}\n{"result":{"object":"document:3"}}\n'; + const stream = Readable.from([ndjson]); + + const results: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(3); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + expect(results[1]).toEqual({ result: { object: "document:2" } }); + expect(results[2]).toEqual({ result: { object: "document:3" } }); + }); + + it("should handle chunked data across multiple reads", async () => { + // Simulate data coming in chunks that split JSON objects mid-line + const chunks = [ + '{"result":{"object":"document:1"}}\n{"res', + 'ult":{"object":"document:2"}}\n' + ]; + + const stream = Readable.from(chunks); + + const results: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(2); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + expect(results[1]).toEqual({ result: { object: "document:2" } }); + }); + + it("should handle empty lines", async () => { + const ndjson = '{"result":{"object":"document:1"}}\n\n{"result":{"object":"document:2"}}\n'; + const stream = Readable.from([ndjson]); + + const results: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(2); + }); + + it("should skip invalid JSON lines", async () => { + const consoleWarnSpy = jest.spyOn(console, "warn").mockImplementation(); + + const ndjson = '{"result":{"object":"document:1"}}\ninvalid json\n{"result":{"object":"document:2"}}\n'; + const stream = Readable.from([ndjson]); + + const results: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(2); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + expect(results[1]).toEqual({ result: { object: "document:2" } }); + expect(consoleWarnSpy).toHaveBeenCalled(); + + consoleWarnSpy.mockRestore(); + }); + }); + + describe("parseNDJSONReadableStream (Browser)", () => { + it("should parse single line NDJSON", async () => { + const ndjson = '{"result":{"object":"document:1"}}\n'; + const stream = createMockReadableStream([ndjson]); + + const results: any[] = []; + for await (const item of parseNDJSONReadableStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(1); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + }); + + it("should parse multiple line NDJSON", async () => { + const stream = createMockReadableStream([ + '{"result":{"object":"document:1"}}\n', + '{"result":{"object":"document:2"}}\n', + '{"result":{"object":"document:3"}}\n' + ]); + + const results: any[] = []; + for await (const item of parseNDJSONReadableStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(3); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + expect(results[1]).toEqual({ result: { object: "document:2" } }); + expect(results[2]).toEqual({ result: { object: "document:3" } }); + }); + + it("should handle chunked data across multiple reads", async () => { + // Simulate data coming in chunks that split JSON objects mid-line + const stream = createMockReadableStream([ + '{"result":{"object":"document:1"}}\n{"res', + 'ult":{"object":"document:2"}}\n' + ]); + + const results: any[] = []; + for await (const item of parseNDJSONReadableStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(2); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + expect(results[1]).toEqual({ result: { object: "document:2" } }); + }); + + it("should handle empty lines", async () => { + const stream = createMockReadableStream([ + '{"result":{"object":"document:1"}}\n\n{"result":{"object":"document:2"}}\n' + ]); + + const results: any[] = []; + for await (const item of parseNDJSONReadableStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(2); + }); + + it("should skip invalid JSON lines", async () => { + const consoleWarnSpy = jest.spyOn(console, "warn").mockImplementation(); + + const stream = createMockReadableStream([ + '{"result":{"object":"document:1"}}\ninvalid json\n{"result":{"object":"document:2"}}\n' + ]); + + const results: any[] = []; + for await (const item of parseNDJSONReadableStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(2); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + expect(results[1]).toEqual({ result: { object: "document:2" } }); + expect(consoleWarnSpy).toHaveBeenCalled(); + + consoleWarnSpy.mockRestore(); + }); + + it("should release the reader lock on completion", async () => { + const stream = createMockReadableStream(['{"result":{"object":"document:1"}}\n']); + + const results: any[] = []; + for await (const item of parseNDJSONReadableStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(1); + // If reader lock wasn't released, this would throw + expect(() => stream.getReader()).not.toThrow(); + }); + + it("should handle final buffer with incomplete data gracefully", async () => { + const consoleWarnSpy = jest.spyOn(console, "warn").mockImplementation(); + + const stream = createMockReadableStream([ + '{"result":{"object":"document:1"}}\n{"incomplete' + ]); + + const results: any[] = []; + for await (const item of parseNDJSONReadableStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(1); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + expect(consoleWarnSpy).toHaveBeenCalled(); + + consoleWarnSpy.mockRestore(); + }); + }); + + describe("createFetchError", () => { + it("should create FgaApiValidationError for 400 status", async () => { + const mockResponse = { + status: 400, + statusText: "Bad Request", + headers: new Map([["content-type", "application/json"]]), + json: async () => ({ code: "validation_error", message: "Invalid request" }) + } as unknown as Response; + + const error = await createFetchError(mockResponse, "http://localhost/test", {}); + + expect(error.name).toBe("FgaApiValidationError"); + }); + + it("should create FgaApiAuthenticationError for 401 status", async () => { + const mockResponse = { + status: 401, + statusText: "Unauthorized", + headers: new Map(), + json: async () => ({ message: "Unauthorized" }) + } as unknown as Response; + + const error = await createFetchError(mockResponse, "http://localhost/test", {}); + + expect(error.name).toBe("FgaApiAuthenticationError"); + }); + + it("should create FgaApiRateLimitExceededError for 429 status", async () => { + const mockResponse = { + status: 429, + statusText: "Too Many Requests", + headers: new Map(), + json: async () => ({ message: "Rate limited" }) + } as unknown as Response; + + const error = await createFetchError(mockResponse, "http://localhost/test", {}); + + expect(error.name).toBe("FgaApiRateLimitExceededError"); + }); + + it("should create FgaApiInternalError for 500 status", async () => { + const mockResponse = { + status: 500, + statusText: "Internal Server Error", + headers: new Map(), + json: async () => ({ message: "Server error" }) + } as unknown as Response; + + const error = await createFetchError(mockResponse, "http://localhost/test", {}); + + expect(error.name).toBe("FgaApiInternalError"); + }); + }); + + describe("attemptFetchRequest", () => { + it("should return response on success", async () => { + const mockFetch = jest.fn().mockResolvedValue({ + ok: true, + body: createMockReadableStream(['{"result":{"object":"document:1"}}\n']), + headers: new Map() + }); + global.fetch = mockFetch as any; + + const response = await attemptFetchRequest( + "http://localhost/test", + { method: "POST" }, + {}, + { maxRetry: 3, minWaitInMs: 10 } + ); + + expect(response.ok).toBe(true); + expect(mockFetch).toHaveBeenCalledTimes(1); + }); + + it("should retry on 429 and eventually succeed", async () => { + const mockFetch = jest.fn() + .mockResolvedValueOnce({ + ok: false, + status: 429, + statusText: "Too Many Requests", + headers: new Map(), + json: async () => ({ message: "Rate limited" }) + }) + .mockResolvedValueOnce({ + ok: true, + body: createMockReadableStream(['{"result":{"object":"document:1"}}\n']), + headers: new Map() + }); + global.fetch = mockFetch as any; + + const response = await attemptFetchRequest( + "http://localhost/test", + { method: "POST" }, + {}, + { maxRetry: 3, minWaitInMs: 10 } + ); + + expect(response.ok).toBe(true); + expect(mockFetch).toHaveBeenCalledTimes(2); + }); + + it("should throw FgaApiValidationError on 400 without retry", async () => { + const mockFetch = jest.fn().mockResolvedValue({ + ok: false, + status: 400, + statusText: "Bad Request", + headers: new Map(), + json: async () => ({ code: "validation_error", message: "Invalid" }) + }); + global.fetch = mockFetch as any; + + await expect( + attemptFetchRequest( + "http://localhost/test", + { method: "POST" }, + {}, + { maxRetry: 3, minWaitInMs: 10 } + ) + ).rejects.toThrow(); + + expect(mockFetch).toHaveBeenCalledTimes(1); // Should not retry on 400 + }); + }); +}); + From 5c3657d0e8b53861bdc07e68cb9f295cb9e60c5f Mon Sep 17 00:00:00 2001 From: Daniel Jonathan Date: Mon, 27 Oct 2025 12:44:07 -0400 Subject: [PATCH 2/2] feat(236): Added missing defaults in maxRetry & minWaitInMs --- client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client.ts b/client.ts index a8ab34a..2e146a2 100644 --- a/client.ts +++ b/client.ts @@ -883,7 +883,7 @@ export class OpenFgaClient extends BaseAPI { const url = this.configuration.getBasePath() + `/stores/${storeId}/streamed-list-objects`; - const retryParams = options.retryParams || this.configuration.retryParams || { maxRetry: 3, minWaitInMs: 100 }; + const retryParams = options.retryParams || this.configuration.retryParams || { maxRetry: DEFAULT_MAX_STREAM_RETRY, minWaitInMs: DEFAULT_MIN_STREAM_WAIT_MS }; const response = await attemptFetchRequest( url,