diff --git a/CHANGELOG.md b/CHANGELOG.md index d1bd0b9..0845cb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ## [Unreleased](https://github.com/openfga/js-sdk/compare/v0.9.0...HEAD) - feat: add support for handling Retry-After header (#267) +- feat: add support for [StreamedListObjects](https://openfga.dev/api/service#/Relationship%20Queries/StreamedListObjects) + with streaming semantics. See [documentation](https://github.com/openfga/js-sdk/blob/main/README.md#streamed-list-objects) for more. - feat: add support for conflict options for Write operations**: (#276) The client now supports setting `conflict` on `ClientWriteRequestOpts` to control behavior when writing duplicate tuples or deleting non-existent tuples. This feature requires OpenFGA server [v1.10.0](https://github.com/openfga/openfga/releases/tag/v1.10.0) or later. See [Conflict Options for Write Operations](./README.md#conflict-options-for-write-operations) for more. diff --git a/README.md b/README.md index 70c5b63..d46c340 100644 --- a/README.md +++ b/README.md @@ -739,6 +739,32 @@ const response = await fgaClient.listObjects({ // response.objects = ["document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a"] ``` +##### Streamed List Objects + +The Streamed ListObjects API is very similar to the ListObjects API, with two differences: + +1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. +2. The number of results returned is only limited by the execution timeout specified in the flag `OPENFGA_LIST_OBJECTS_DEADLINE`. + +[API Documentation](https://openfga.dev/api/service#/Relationship%20Queries/StreamedListObjects) + +```javascript +const options = {}; + +// To override the authorization model id for this request +options.authorizationModelId = "01GXSA8YR785C4FYS3C0RTG7B1"; + +const objects = []; +for await (const response of fgaClient.streamedListObjects( + { user: "user:anne", relation: "can_read", type: "document" }, + { consistency: ConsistencyPreference.HigherConsistency } +)) { + objects.push(response.object); +} + +// objects = ["document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a"] +``` + ##### List Relations List the relations a user has with an object. This wraps around [BatchCheck](#batchcheck) to allow checking multiple relationships at once. diff --git a/api.ts b/api.ts index bee86ba..d04d1a6 100644 --- a/api.ts +++ b/api.ts @@ -20,6 +20,7 @@ import { serializeDataIfNeeded, toPathString, createRequestFunction, + createStreamingRequestFunction, RequestArgs, CallResult, PromiseResult @@ -383,6 +384,45 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio options: localVarRequestOptions, }; }, + /** + * The Streamed ListObjects API is very similar to the ListObjects API, with two differences: + * 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. + * 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @summary Stream all objects of the given type that the user has a relation with + * @param {string} storeId + * @param {ListObjectsRequest} body + * @param {*} [options] Override http request option. + * @throws { FgaError } + */ + streamedListObjects: (storeId: string, body: ListObjectsRequest, options: any = {}): RequestArgs => { + // verify required parameter 'storeId' is not null or undefined + assertParamExists("streamedListObjects", "storeId", storeId); + // verify required parameter 'body' is not null or undefined + assertParamExists("streamedListObjects", "body", body); + const localVarPath = "/stores/{store_id}/streamed-list-objects" + .replace(`{${"store_id"}}`, encodeURIComponent(String(storeId))); + // use dummy base URL string because the URL constructor only accepts absolute URLs. + const localVarUrlObj = new URL(localVarPath, DUMMY_BASE_URL); + let baseOptions; + if (configuration) { + baseOptions = configuration.baseOptions; + } + + const localVarRequestOptions = { method: "POST", ...baseOptions, ...options }; + const localVarHeaderParameter = {} as any; + const localVarQueryParameter = {} as any; + + localVarHeaderParameter["Content-Type"] = "application/json"; + + setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; + localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions); + + return { + url: toPathString(localVarUrlObj), + options: localVarRequestOptions, + }; + }, /** * Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores. * @summary List all stores @@ -912,6 +952,22 @@ export const OpenFgaApiFp = function(configuration: Configuration, credentials: ...TelemetryAttributes.fromRequestBody(body) }); }, + /** + * The Streamed ListObjects API is very similar to the ListObjects API, with two differences: + * 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. + * 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @summary Stream all objects of the given type that the user has a relation with + * @param {string} storeId + * @param {ListObjectsRequest} body + * @param {*} [options] Override http request option. + * @throws { FgaError } + */ + async streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise<(axios?: AxiosInstance) => Promise> { + const localVarAxiosArgs = localVarAxiosParamCreator.streamedListObjects(storeId, body, options); + return createStreamingRequestFunction(localVarAxiosArgs, globalAxios, configuration, credentials, { + [TelemetryAttribute.FgaClientRequestMethod]: "StreamedListObjects" + }); + }, /** * Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores. * @summary List all stores @@ -1156,6 +1212,19 @@ export const OpenFgaApiFactory = function (configuration: Configuration, credent listObjects(storeId: string, body: ListObjectsRequest, options?: any): PromiseResult { return localVarFp.listObjects(storeId, body, options).then((request) => request(axios)); }, + /** + * The Streamed ListObjects API is very similar to the ListObjects API, with two differences: + * 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. + * 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @summary Stream all objects of the given type that the user has a relation with + * @param {string} storeId + * @param {ListObjectsRequest} body + * @param {*} [options] Override http request option. + * @throws { FgaError } + */ + streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise { + return localVarFp.streamedListObjects(storeId, body, options).then((request) => request(axios)); + }, /** * Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores. * @summary List all stores @@ -1370,6 +1439,20 @@ export class OpenFgaApi extends BaseAPI { return OpenFgaApiFp(this.configuration, this.credentials).listObjects(storeId, body, options).then((request) => request(this.axios)); } + /** + * The Streamed ListObjects API is very similar to the ListObjects API, with two differences: + * 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. + * 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @summary Stream all objects of the given type that the user has a relation with + * @param {string} storeId + * @param {ListObjectsRequest} body + * @param {*} [options] Override http request option. + * @throws { FgaError } + */ + public streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise { + return OpenFgaApiFp(this.configuration, this.credentials).streamedListObjects(storeId, body, options).then((request) => request(this.axios)); + } + /** * Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores. * @summary List all stores diff --git a/apiModel.ts b/apiModel.ts index b5a8090..2ed9fd4 100644 --- a/apiModel.ts +++ b/apiModel.ts @@ -860,6 +860,21 @@ export interface ListObjectsResponse { */ objects: Array; } + +/** + * The response for a StreamedListObjects RPC. + * @export + * @interface StreamedListObjectsResponse + */ +export interface StreamedListObjectsResponse { + /** + * + * @type {string} + * @memberof StreamedListObjectsResponse + */ + object: string; +} + /** * * @export diff --git a/client.ts b/client.ts index f55b301..00805c3 100644 --- a/client.ts +++ b/client.ts @@ -21,6 +21,7 @@ import { GetStoreResponse, ListObjectsRequest, ListObjectsResponse, + StreamedListObjectsResponse, ListStoresResponse, ListUsersRequest, ListUsersResponse, @@ -50,6 +51,7 @@ import { } from "./utils"; import { isWellFormedUlidString } from "./validation"; import SdkConstants from "./constants"; +import { parseNDJSONStream } from "./streaming"; export type UserClientConfigurationParams = UserConfigurationParams & { storeId?: string; @@ -847,6 +849,51 @@ export class OpenFgaClient extends BaseAPI { }, options); } + /** + * StreamedListObjects - Stream all objects of a particular type that the user has a certain relation to (evaluates) + * + * Note: This method is Node.js only. Streams are supported via the axios API layer. + * The response will be streamed as newline-delimited JSON objects. + * + * @param {ClientListObjectsRequest} body + * @param {ClientRequestOptsWithConsistency} [options] + * @param {string} [options.authorizationModelId] - Overrides the authorization model id in the configuration + * @param {object} [options.headers] - Custom headers to send alongside the request + * @param {ConsistencyPreference} [options.consistency] - The consistency preference to use + * @param {object} [options.retryParams] - Override the retry parameters for this request + * @param {number} [options.retryParams.maxRetry] - Override the max number of retries on each API request + * @param {number} [options.retryParams.minWaitInMs] - Override the minimum wait before a retry is initiated + * @returns {AsyncGenerator} An async generator that yields objects as they are received + */ + async *streamedListObjects(body: ClientListObjectsRequest, options: ClientRequestOptsWithConsistency = {}): AsyncGenerator { + const stream = await this.api.streamedListObjects(this.getStoreId(options)!, { + authorization_model_id: this.getAuthorizationModelId(options), + user: body.user, + relation: body.relation, + type: body.type, + context: body.context, + contextual_tuples: { tuple_keys: body.contextualTuples || [] }, + consistency: options.consistency + }, options); + + // Unwrap axios CallResult to get the raw Node.js stream when needed + const source = stream?.$response?.data ?? stream; + + // Parse the Node.js stream + try { + for await (const item of parseNDJSONStream(source as any)) { + if (item && item.result && item.result.object) { + yield { object: item.result.object } as StreamedListObjectsResponse; + } + } + } finally { + // Ensure underlying HTTP connection closes if consumer stops early + if (source && typeof source.destroy === "function") { + try { source.destroy(); } catch { } + } + } + } + /** * ListRelations - List all the relations a user has with an object (evaluates) * @param {object} listRelationsRequest diff --git a/common.ts b/common.ts index f48c28e..04a8695 100644 --- a/common.ts +++ b/common.ts @@ -342,6 +342,77 @@ export const createRequestFunction = function (axiosArgs: RequestArgs, axiosInst ); } + return result; + }; +}; + +/** + * creates an axios streaming request function that returns the raw response stream + * for incremental parsing (used by streamedListObjects) + */ +export const createStreamingRequestFunction = function (axiosArgs: RequestArgs, axiosInstance: AxiosInstance, configuration: Configuration, credentials: Credentials, methodAttributes: Record = {}) { + configuration.isValid(); + + const retryParams = axiosArgs.options?.retryParams ? axiosArgs.options?.retryParams : configuration.retryParams; + const maxRetry: number = retryParams ? retryParams.maxRetry : 0; + const minWaitInMs: number = retryParams ? retryParams.minWaitInMs : 0; + + const start = performance.now(); + + return async (axios: AxiosInstance = axiosInstance): Promise => { + await setBearerAuthToObject(axiosArgs.options.headers, credentials!); + + const url = configuration.getBasePath() + axiosArgs.url; + + const axiosRequestArgs = { ...axiosArgs.options, responseType: "stream", url: url }; + const wrappedResponse = await attemptHttpRequest(axiosRequestArgs, { + maxRetry, + minWaitInMs, + }, axios); + const response = wrappedResponse?.response; + + const result: any = response?.data; // raw stream + + let attributes: StringIndexable = {}; + + attributes = TelemetryAttributes.fromRequest({ + userAgent: configuration.baseOptions?.headers["User-Agent"], + httpMethod: axiosArgs.options?.method, + url, + resendCount: wrappedResponse?.retries, + start: start, + credentials: credentials, + attributes: methodAttributes, + }); + + attributes = TelemetryAttributes.fromResponse({ + response, + attributes, + }); + + const serverRequestDuration = attributes[TelemetryAttribute.HttpServerRequestDuration]; + if (configuration.telemetry?.metrics?.histogramQueryDuration && typeof serverRequestDuration !== "undefined") { + configuration.telemetry.recorder.histogram( + TelemetryHistograms.queryDuration, + parseInt(attributes[TelemetryAttribute.HttpServerRequestDuration] as string, 10), + TelemetryAttributes.prepare( + attributes, + configuration.telemetry.metrics.histogramQueryDuration.attributes + ) + ); + } + + if (configuration.telemetry?.metrics?.histogramRequestDuration) { + configuration.telemetry.recorder.histogram( + TelemetryHistograms.requestDuration, + attributes[TelemetryAttribute.HttpClientRequestDuration], + TelemetryAttributes.prepare( + attributes, + configuration.telemetry.metrics.histogramRequestDuration.attributes + ) + ); + } + return result; }; }; \ No newline at end of file diff --git a/example/streamed-list-objects/README.md b/example/streamed-list-objects/README.md new file mode 100644 index 0000000..b23e088 --- /dev/null +++ b/example/streamed-list-objects/README.md @@ -0,0 +1,22 @@ +# Streamed List Objects Example + +Demonstrates using `streamedListObjects` to retrieve objects via the streaming API. + +## Prerequisites +- OpenFGA server running on `http://localhost:8080` (or set `FGA_API_URL`) + +## Running +```bash +# From repo root +npm run build +cd example/streamed-list-objects +npm install +npm start +``` + +## What it does +- Creates a temporary store +- Writes a simple authorization model +- Adds 3 tuples +- Streams results via `streamedListObjects` +- Cleans up the store \ No newline at end of file diff --git a/example/streamed-list-objects/package.json b/example/streamed-list-objects/package.json new file mode 100644 index 0000000..2ef7198 --- /dev/null +++ b/example/streamed-list-objects/package.json @@ -0,0 +1,19 @@ +{ + "name": "streamed-list-objects", + "private": "true", + "version": "1.0.0", + "description": "Example demonstrating streamedListObjects", + "author": "OpenFGA", + "license": "Apache-2.0", + "scripts": { + "start": "node streamedListObjects.mjs" + }, + "dependencies": { + "@openfga/sdk": "^0.9.0", + "@openfga/syntax-transformer": "^0.2.0" + }, + "engines": { + "node": ">=16.15.0" + } +} + diff --git a/example/streamed-list-objects/streamedListObjects.mjs b/example/streamed-list-objects/streamedListObjects.mjs new file mode 100644 index 0000000..7517bfc --- /dev/null +++ b/example/streamed-list-objects/streamedListObjects.mjs @@ -0,0 +1,59 @@ +import { ClientConfiguration, OpenFgaClient, ConsistencyPreference } from "../../dist/index.js"; +import { transformer } from "@openfga/syntax-transformer"; + +const apiUrl = process.env.FGA_API_URL || "http://localhost:8080"; + +async function main() { + const client = new OpenFgaClient(new ClientConfiguration({ apiUrl })); + + console.log("Creating temporary store"); + const { id: storeId } = await client.createStore({ name: "streamed-list-objects" }); + + const clientWithStore = new OpenFgaClient(new ClientConfiguration({ apiUrl, storeId })); + + const dslString = ` + model + schema 1.1 + + type user + + type document + relations + define can_read: [user] + `; + + const model = transformer.transformDSLToJSONObject(dslString); + + console.log("Writing authorization model"); + const { authorization_model_id: authorizationModelId } = await clientWithStore.writeAuthorizationModel(model); + + const fga = new OpenFgaClient(new ClientConfiguration({ apiUrl, storeId, authorizationModelId })); + + console.log("Writing tuples"); + await fga.write({ + writes: [ + { user: "user:anne", relation: "can_read", object: "document:1" }, + { user: "user:anne", relation: "can_read", object: "document:2" }, + { user: "user:anne", relation: "can_read", object: "document:3" } + ] + }); + + console.log("Streaming objects..."); + let count = 0; + for await (const response of fga.streamedListObjects( + { user: "user:anne", relation: "can_read", type: "document" }, + { consistency: ConsistencyPreference.HigherConsistency } + )) { + console.log(`- ${response.object}`) + count++; + } + console.log(`\u2713 Streamed count: ${count}`); + + console.log("Cleaning up..."); + await fga.deleteStore(); + console.log("Done"); +} + +main().catch(_err => { + process.exit(1); +}); \ No newline at end of file diff --git a/index.ts b/index.ts index 291e294..2b94e82 100644 --- a/index.ts +++ b/index.ts @@ -24,5 +24,5 @@ export * from "./telemetry/counters"; export * from "./telemetry/histograms"; export * from "./telemetry/metrics"; export * from "./errors"; - +export { parseNDJSONStream } from "./streaming"; diff --git a/streaming.ts b/streaming.ts new file mode 100644 index 0000000..e5e71fa --- /dev/null +++ b/streaming.ts @@ -0,0 +1,185 @@ +/** + * JavaScript and Node.js SDK for OpenFGA + * + * API version: 1.x + * Website: https://openfga.dev + * Documentation: https://openfga.dev/docs + * Support: https://openfga.dev/community + * License: [Apache-2.0](https://github.com/openfga/js-sdk/blob/main/LICENSE) + * + * NOTE: This file was auto generated by OpenAPI Generator (https://openapi-generator.tech). DO NOT EDIT. + */ + +import type { Readable } from "node:stream"; + +// Helper: create async iterable from classic EventEmitter-style Readable streams +const createAsyncIterableFromReadable = (readable: any): AsyncIterable => { + return { + [Symbol.asyncIterator](): AsyncIterator { + const chunkQueue: any[] = []; + const pendings: Array<{ resolve: (v: IteratorResult) => void; reject: (e?: any) => void }> = []; + let ended = false; + let error: any = null; + + const onData = (chunk: any) => { + if (pendings.length > 0) { + const { resolve } = pendings.shift()!; + resolve({ value: chunk, done: false }); + } else { + chunkQueue.push(chunk); + } + }; + + const onEnd = () => { + if (error) return; // Don't process end if error already occurred + ended = true; + while (pendings.length > 0) { + const { resolve } = pendings.shift()!; + resolve({ value: undefined, done: true }); + } + }; + + const onError = (err: any) => { + error = err; + while (pendings.length > 0) { + const { reject } = pendings.shift()!; + reject(err); + } + cleanup(); + }; + + readable.on("data", onData); + readable.once("end", onEnd); + readable.once("error", onError); + + const cleanup = () => { + readable.off("data", onData); + readable.off("end", onEnd); + readable.off("error", onError); + }; + + return { + next(): Promise> { + if (error) { + return Promise.reject(error); + } + if (chunkQueue.length > 0) { + const value = chunkQueue.shift(); + return Promise.resolve({ value, done: false }); + } + if (ended) { + cleanup(); + return Promise.resolve({ value: undefined, done: true }); + } + return new Promise>((resolve, reject) => { + pendings.push({ resolve, reject }); + }); + }, + return(): Promise> { + try { + cleanup(); + } finally { + if (readable && typeof readable.destroy === "function") { + readable.destroy(); + } + } + return Promise.resolve({ value: undefined, done: true }); + }, + throw(e?: any): Promise> { + try { + cleanup(); + } finally { + if (readable && typeof readable.destroy === "function") { + readable.destroy(e); + } + } + return Promise.reject(e); + } + }; + } + }; +}; + +/** + * Parse newline-delimited JSON (NDJSON) from a Node.js readable stream + * @param stream - Node.js readable stream, AsyncIterable, string, or Buffer + * @returns AsyncGenerator that yields parsed JSON objects + */ +export async function* parseNDJSONStream( + stream: Readable | AsyncIterable | string | Uint8Array | Buffer +): AsyncGenerator { + const decoder = new TextDecoder("utf-8"); + let buffer = ""; + + // If stream is actually a string or Buffer-like, handle as whole payload + const isString = typeof stream === "string"; + const isBuffer = typeof Buffer !== "undefined" && Buffer.isBuffer && Buffer.isBuffer(stream); + const isUint8Array = typeof Uint8Array !== "undefined" && stream instanceof Uint8Array; + + if (isString || isBuffer || isUint8Array) { + const text = isString + ? (stream as string) + : decoder.decode(isBuffer ? new Uint8Array(stream as Buffer) : (stream as Uint8Array)); + const lines = text.split("\n"); + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) { + continue; + } + + try { + yield JSON.parse(trimmed); + } catch (err) { + console.warn("Failed to parse JSON line:", err); + } + } + return; + } + + const isAsyncIterable = stream && typeof (stream as any)[Symbol.asyncIterator] === "function"; + const source: AsyncIterable = isAsyncIterable ? (stream as any) : createAsyncIterableFromReadable(stream as any); + + for await (const chunk of source) { + // Node.js streams can return Buffer or string chunks + // Convert to Uint8Array if needed for TextDecoder + const uint8Chunk = typeof chunk === "string" + ? new TextEncoder().encode(chunk) + : chunk instanceof Buffer + ? new Uint8Array(chunk) + : chunk; + + // Append decoded chunk to buffer + buffer += decoder.decode(uint8Chunk, { stream: true }); + + // Split on newlines + const lines = buffer.split("\n"); + + // Keep the last (potentially incomplete) line in the buffer + buffer = lines.pop() || ""; + + // Parse and yield complete lines + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed) { + try { + yield JSON.parse(trimmed); + } catch (err) { + console.warn("Failed to parse JSON line:", err); + } + } + } + } + + // Flush any remaining decoder state + buffer += decoder.decode(); + + // Handle any remaining data in buffer + if (buffer.trim()) { + try { + yield JSON.parse(buffer); + } catch (err) { + console.warn("Failed to parse final JSON buffer:", err); + } + } +} \ No newline at end of file diff --git a/tests/client.test.ts b/tests/client.test.ts index 77119cf..82f270a 100644 --- a/tests/client.test.ts +++ b/tests/client.test.ts @@ -1,4 +1,5 @@ import * as nock from "nock"; +import { Readable } from "node:stream"; import { ClientWriteStatus, @@ -1597,6 +1598,139 @@ describe("OpenFGA Client", () => { }); }); + describe("StreamedListObjects", () => { + it("should stream objects and yield them incrementally", async () => { + const objects = ["document:1", "document:2", "document:3"]; + const scope = nocks.streamedListObjects(baseConfig.storeId!, objects); + + expect(scope.isDone()).toBe(false); + + const results: string[] = []; + for await (const response of fgaClient.streamedListObjects({ + user: "user:81684243-9356-4421-8fbf-a4f8d36aa31b", + relation: "can_read", + type: "document", + })) { + results.push(response.object); + } + + expect(scope.isDone()).toBe(true); + expect(results).toHaveLength(3); + expect(results).toEqual(expect.arrayContaining(objects)); + }); + + it("should handle custom headers", async () => { + const objects = ["document:1"]; + + const scope = nock(defaultConfiguration.getBasePath()) + .post(`/stores/${baseConfig.storeId}/streamed-list-objects`) + .reply(function () { + // Verify custom headers were sent + expect(this.req.headers["x-custom-header"]).toBe("custom-value"); + expect(this.req.headers["x-request-id"]).toBe("test-123"); + + // Return NDJSON stream + const ndjsonResponse = objects + .map(obj => JSON.stringify({ result: { object: obj } })) + .join("\n") + "\n"; + + return [200, Readable.from([ndjsonResponse]), { + "Content-Type": "application/x-ndjson" + }]; + }); + + const results: string[] = []; + for await (const response of fgaClient.streamedListObjects({ + user: "user:anne", + relation: "owner", + type: "document", + }, { + headers: { + "X-Custom-Header": "custom-value", + "X-Request-ID": "test-123" + } + })) { + results.push(response.object); + } + + expect(scope.isDone()).toBe(true); + expect(results).toEqual(objects); + }); + + it("should handle errors from the stream", async () => { + const scope = nock(defaultConfiguration.getBasePath()) + .post(`/stores/${baseConfig.storeId}/streamed-list-objects`) + .reply(500, { code: "internal_error", message: "Server error" }); + + await expect(async () => { + for await (const response of fgaClient.streamedListObjects({ + user: "user:anne", + relation: "owner", + type: "document", + })) { + // Should not get here + } + }).rejects.toThrow(); + + expect(scope.isDone()).toBe(true); + }); + + it("should handle retry on 429 error", async () => { + const objects = ["document:1"]; + + // Create client with retry enabled + const fgaClientWithRetry = new OpenFgaClient({ + ...baseConfig, + credentials: { method: CredentialsMethod.None }, + retryParams: { maxRetry: 2, minWaitInMs: 10 } + }); + + // First attempt fails with 429 (called exactly once) + const scope1 = nock(defaultConfiguration.getBasePath()) + .post(`/stores/${baseConfig.storeId}/streamed-list-objects`) + .times(1) + .reply(429, { code: "rate_limit_exceeded", message: "Rate limited" }, { + "Retry-After": "1" + }); + + // Second attempt succeeds (retry - called exactly once) + const scope2 = nocks.streamedListObjects(baseConfig.storeId!, objects); + + const results: string[] = []; + for await (const response of fgaClientWithRetry.streamedListObjects({ + user: "user:anne", + relation: "owner", + type: "document", + })) { + results.push(response.object); + } + + // Verify both scopes were called (proves retry happened) + expect(scope1.isDone()).toBe(true); + expect(scope2.isDone()).toBe(true); + expect(results).toEqual(objects); + }); + + it("should support consistency preference", async () => { + const objects = ["document:1"]; + const scope = nocks.streamedListObjects(baseConfig.storeId!, objects); + + const results: string[] = []; + for await (const response of fgaClient.streamedListObjects({ + user: "user:anne", + relation: "owner", + type: "document", + }, { + consistency: ConsistencyPreference.HigherConsistency + })) { + results.push(response.object); + } + + expect(scope.isDone()).toBe(true); + expect(results).toEqual(objects); + }); + }); + describe("ListRelations", () => { it("should properly pass the request and return an allowed API response", async () => { const tuples = [{ diff --git a/tests/helpers/nocks.ts b/tests/helpers/nocks.ts index bc46123..11e73a1 100644 --- a/tests/helpers/nocks.ts +++ b/tests/helpers/nocks.ts @@ -1,5 +1,7 @@ import type * as Nock from "nock"; +import { Readable } from "node:stream"; + import { AuthorizationModel, BatchCheckRequest, @@ -244,6 +246,22 @@ export const getNocks = ((nock: typeof Nock) => ({ ) .reply(200, responseBody); }, + streamedListObjects: ( + storeId: string, + objects: string[], + basePath = defaultConfiguration.getBasePath(), + ) => { + // Create NDJSON response (newline-delimited JSON) as a stream + const ndjsonResponse = objects + .map(obj => JSON.stringify({ result: { object: obj } })) + .join("\n") + "\n"; + + return nock(basePath) + .post(`/stores/${storeId}/streamed-list-objects`) + .reply(200, () => Readable.from([ndjsonResponse]), { + "Content-Type": "application/x-ndjson" + }); + }, listUsers: ( storeId: string, responseBody: ListUsersResponse, diff --git a/tests/streaming.test.ts b/tests/streaming.test.ts new file mode 100644 index 0000000..19882e7 --- /dev/null +++ b/tests/streaming.test.ts @@ -0,0 +1,259 @@ +/** + * JavaScript and Node.js SDK for OpenFGA + * + * API version: 1.x + * Website: https://openfga.dev + * Documentation: https://openfga.dev/docs + * Support: https://openfga.dev/community + * License: [Apache-2.0](https://github.com/openfga/js-sdk/blob/main/LICENSE) + * + * NOTE: This file was auto generated by OpenAPI Generator (https://openapi-generator.tech). DO NOT EDIT. + */ + + +import { Readable } from "node:stream"; +import { EventEmitter } from "node:events"; +import { parseNDJSONStream } from "../streaming"; + +describe("Streaming Utilities", () => { + describe("parseNDJSONStream (Node.js)", () => { + it("should parse single line NDJSON", async () => { + const ndjson = '{"result":{"object":"document:1"}}\n'; + const stream = Readable.from([ndjson]); + + const results: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(1); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + }); + + it("should parse multiple line NDJSON", async () => { + const ndjson = '{"result":{"object":"document:1"}}\n{"result":{"object":"document:2"}}\n{"result":{"object":"document:3"}}\n'; + const stream = Readable.from([ndjson]); + + const results: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(3); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + expect(results[1]).toEqual({ result: { object: "document:2" } }); + expect(results[2]).toEqual({ result: { object: "document:3" } }); + }); + + it("should handle chunked data across multiple reads", async () => { + // Simulate data coming in chunks that split JSON objects mid-line + const chunks = [ + '{"result":{"object":"document:1"}}\n{"res', + 'ult":{"object":"document:2"}}\n' + ]; + + const stream = Readable.from(chunks); + + const results: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(2); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + expect(results[1]).toEqual({ result: { object: "document:2" } }); + }); + + it("should handle empty lines", async () => { + const ndjson = '{"result":{"object":"document:1"}}\n\n{"result":{"object":"document:2"}}\n'; + const stream = Readable.from([ndjson]); + + const results: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(2); + }); + + it("should skip invalid JSON lines", async () => { + const consoleWarnSpy = jest.spyOn(console, "warn").mockImplementation(); + + const ndjson = '{"result":{"object":"document:1"}}\ninvalid json\n{"result":{"object":"document:2"}}\n'; + const stream = Readable.from([ndjson]); + + const results: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(2); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + expect(results[1]).toEqual({ result: { object: "document:2" } }); + expect(consoleWarnSpy).toHaveBeenCalled(); + + consoleWarnSpy.mockRestore(); + }); + + it("should parse when Readable emits Buffer chunks", async () => { + const ndjson = Buffer.from('{"a":1}\n{"b":2}\n'); + const stream = Readable.from([ndjson]); + + const out: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + out.push(item); + } + expect(out).toEqual([{ a: 1 }, { b: 2 }]); + }); + + it("should parse last JSON without trailing newline", async () => { + const stream = Readable.from(['{"a":1}\n{"b":2}']); + + const out: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + out.push(item); + } + expect(out).toEqual([{ a: 1 }, { b: 2 }]); + }); + + it("should skip invalid final JSON buffer and warn", async () => { + const warn = jest.spyOn(console, "warn").mockImplementation(); + const stream = Readable.from(['{"a":1}\n{"b":']); + + const out: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + out.push(item); + } + expect(out).toEqual([{ a: 1 }]); + expect(warn).toHaveBeenCalled(); + warn.mockRestore(); + }); + + it("should parse when given a string input", async () => { + const input = '{"a":1}\n{"b":2}\n'; + const out: any[] = []; + for await (const item of parseNDJSONStream(input as any)) { + out.push(item); + } + expect(out).toEqual([{ a: 1 }, { b: 2 }]); + }); + + it("should parse when given a Buffer input", async () => { + const input = Buffer.from('{"a":1}\n{"b":2}\n'); + const out: any[] = []; + for await (const item of parseNDJSONStream(input as any)) { + out.push(item); + } + expect(out).toEqual([{ a: 1 }, { b: 2 }]); + }); + + it("should accept async iterable that yields Uint8Array", async () => { + const src = { + [Symbol.asyncIterator]: async function* () { + yield new TextEncoder().encode('{"a":1}\n{"b":2}\n'); + } + } as any; + + const out: any[] = []; + for await (const item of parseNDJSONStream(src)) { + out.push(item); + } + expect(out).toEqual([{ a: 1 }, { b: 2 }]); + }); + + it("should reject pending iteration when classic emitter errors", async () => { + const emitter = new EventEmitter() as any; + const gen = parseNDJSONStream(emitter); + + const firstPromise = gen.next(); + emitter.emit("data", '{"a":1}\n'); + const first = await firstPromise; + expect(first.value).toEqual({ a: 1 }); + + const pendingNext = gen.next(); + emitter.emit("error", new Error("boom")); + + // Pending next should now reject with the error + await expect(pendingNext).rejects.toThrow("boom"); + + // After error, iterator is exhausted (standard async iterator behavior) + await expect(gen.next()).resolves.toEqual({ value: undefined, done: true }); + }); + + it("should clean up listeners on early cancellation", async () => { + const emitter = new EventEmitter() as any; + const gen = parseNDJSONStream(emitter); + + const p = gen.next(); + emitter.emit("data", '{"a":1}\n'); + const first = await p; + expect(first.value).toEqual({ a: 1 }); + + await gen.return(undefined as any); + expect(emitter.listenerCount("data")).toBe(0); + expect(emitter.listenerCount("end")).toBe(0); + expect(emitter.listenerCount("error")).toBe(0); + }); + + it("should read from buffered queue when data exceeds pending resolvers", async () => { + const emitter = new EventEmitter() as any; + const gen = parseNDJSONStream(emitter); + + // Start consumption so listeners are attached and a pending resolver exists + const firstPromise = gen.next(); + + // First chunk fulfills the pending resolver + emitter.emit("data", '{"x":1}\n'); + const first = await firstPromise; + expect(first).toEqual({ value: { x: 1 }, done: false }); + + // Emit another chunk before requesting next; it should be queued + emitter.emit("data", '{"y":2}\n'); + + // Next pull should be served from the buffered queue path + const second = await gen.next(); + expect(second).toEqual({ value: { y: 2 }, done: false }); + + // end stream to complete + emitter.emit("end"); + const done = await gen.next(); + expect(done).toEqual({ value: undefined, done: true }); + }); + + it("should resolve pending next to done when end occurs", async () => { + const emitter = new EventEmitter() as any; + const gen = parseNDJSONStream(emitter); + + const pending = gen.next(); + emitter.emit("end"); + + await expect(pending).resolves.toEqual({ value: undefined, done: true }); + await expect(gen.next()).resolves.toEqual({ value: undefined, done: true }); + }); + + it("should cleanup and reject on iterator throw", async () => { + const emitter = new EventEmitter() as any; + const gen = parseNDJSONStream(emitter); + + const thrown = gen.throw(new Error("stop")); + await expect(thrown).rejects.toThrow("stop"); + + expect(emitter.listenerCount("data")).toBe(0); + expect(emitter.listenerCount("end")).toBe(0); + expect(emitter.listenerCount("error")).toBe(0); + }); + + it("should warn on invalid JSON line in string input path", async () => { + const warn = jest.spyOn(console, "warn").mockImplementation(); + const input = '{"a":1}\nnot json\n{"b":2}\n'; + + const out: any[] = []; + for await (const item of parseNDJSONStream(input as any)) { + out.push(item); + } + expect(out).toEqual([{ a: 1 }, { b: 2 }]); + expect(warn).toHaveBeenCalled(); + warn.mockRestore(); + }); + }); +}); \ No newline at end of file