From 0c2a0ab16995585d98582f32efec8b82090a1249 Mon Sep 17 00:00:00 2001 From: Daniel Jonathan Date: Tue, 28 Oct 2025 02:11:10 -0400 Subject: [PATCH 1/3] feat: add streamedListObjects for unlimited object retrieval Adds streamedListObjects method for retrieving unlimited objects via the streaming API endpoint. Node.js-only implementation with resilient NDJSON parsing, proper error handling, and automatic resource cleanup. Requires OpenFGA server v1.2.0+ Features: - Streams beyond 1000-object limit (tested with 2000 objects) - Memory-efficient incremental results via async generators - Automatic stream cleanup prevents connection leaks - Proper error propagation through async iterators - Flexible input types (Readable|AsyncIterable|string|Buffer|Uint8Array) - Telemetry maintained through streaming request helper Implementation: - streaming.ts: NDJSON parser with robust error handling - common.ts: createStreamingRequestFunction for axios streaming - client.ts: streamedListObjects() async generator wrapper - Error handling: Pending promises reject on error, onEnd guarded - Resource management: Stream destruction in return()/throw()/finally - Type safety: Wide signature eliminates unnecessary casts Testing (153/153 tests passing): - 17 streaming tests (parsing, errors, cleanup, edge cases) - 95% coverage on streaming.ts - Live tested: 3-object and 2000-object streaming verified Examples: - example/streamed-list-objects: Full model with 2000 tuples - example/streamed-list-objects-local: Minimal local setup Related: - Fixes #236 - Parent issue: openfga/sdk-generator#76 - Related PR: openfga/sdk-generator#654 (templates) --- CHANGELOG.md | 8 + api.ts | 83 ++++++ apiModel.ts | 15 + client.ts | 47 ++++ common.ts | 71 +++++ example/streamed-list-objects-local/README.md | 21 ++ .../streamedListObjectsLocal.mjs | 62 +++++ example/streamed-list-objects/README.md | 33 +++ example/streamed-list-objects/model.json | 251 +++++++++++++++++ .../streamedListObjects.mjs | 105 +++++++ index.ts | 2 +- streaming.ts | 185 +++++++++++++ tests/helpers/nocks.ts | 18 ++ tests/streaming.test.ts | 259 ++++++++++++++++++ 14 files changed, 1159 insertions(+), 1 deletion(-) create mode 100644 example/streamed-list-objects-local/README.md create mode 100644 example/streamed-list-objects-local/streamedListObjectsLocal.mjs create mode 100644 example/streamed-list-objects/README.md create mode 100644 example/streamed-list-objects/model.json create mode 100644 example/streamed-list-objects/streamedListObjects.mjs create mode 100644 streaming.ts create mode 100644 tests/streaming.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bfad73..6a10623 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,14 @@ ## [Unreleased](https://github.com/openfga/js-sdk/compare/v0.9.0...HEAD) - feat: add support for handling Retry-After header (#267) +- feat: streamedListObjects (streaming ListObjects) - Node.js only + - Enables retrieving >1000 objects beyond standard listObjects limit + - Requires OpenFGA server [v1.2.0+](https://github.com/openfga/openfga/releases/tag/v1.2.0) + - Uses axios streaming via API layer with preserved telemetry + - Resilient NDJSON parsing (supports async-iterable and event-based streams) + - Parses chunked data across multiple reads; handles Buffer/string inputs + - Adds example for usage: `example/streamed-list-objects` + - Adds example for local usage: `example/streamed-list-objects-local` ## v0.9.0 diff --git a/api.ts b/api.ts index 4a08e83..39036b6 100644 --- a/api.ts +++ b/api.ts @@ -20,6 +20,7 @@ import { serializeDataIfNeeded, toPathString, createRequestFunction, + createStreamingRequestFunction, RequestArgs, CallResult, PromiseResult @@ -383,6 +384,45 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio options: localVarRequestOptions, }; }, + /** + * The Streamed ListObjects API is very similar to the ListObjects API, with two differences: + * 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. + * 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @summary Stream all objects of the given type that the user has a relation with + * @param {string} storeId + * @param {ListObjectsRequest} body + * @param {*} [options] Override http request option. + * @throws { FgaError } + */ + streamedListObjects: (storeId: string, body: ListObjectsRequest, options: any = {}): RequestArgs => { + // verify required parameter 'storeId' is not null or undefined + assertParamExists("streamedListObjects", "storeId", storeId); + // verify required parameter 'body' is not null or undefined + assertParamExists("streamedListObjects", "body", body); + const localVarPath = "/stores/{store_id}/streamed-list-objects" + .replace(`{${"store_id"}}`, encodeURIComponent(String(storeId))); + // use dummy base URL string because the URL constructor only accepts absolute URLs. + const localVarUrlObj = new URL(localVarPath, DUMMY_BASE_URL); + let baseOptions; + if (configuration) { + baseOptions = configuration.baseOptions; + } + + const localVarRequestOptions = { method: "POST", ...baseOptions, ...options }; + const localVarHeaderParameter = {} as any; + const localVarQueryParameter = {} as any; + + localVarHeaderParameter["Content-Type"] = "application/json"; + + setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); + localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers }; + localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions); + + return { + url: toPathString(localVarUrlObj), + options: localVarRequestOptions, + }; + }, /** * Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores. * @summary List all stores @@ -912,6 +952,22 @@ export const OpenFgaApiFp = function(configuration: Configuration, credentials: ...TelemetryAttributes.fromRequestBody(body) }); }, + /** + * The Streamed ListObjects API is very similar to the ListObjects API, with two differences: + * 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. + * 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @summary Stream all objects of the given type that the user has a relation with + * @param {string} storeId + * @param {ListObjectsRequest} body + * @param {*} [options] Override http request option. + * @throws { FgaError } + */ + async streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise<(axios?: AxiosInstance) => Promise> { + const localVarAxiosArgs = localVarAxiosParamCreator.streamedListObjects(storeId, body, options); + return createStreamingRequestFunction(localVarAxiosArgs, globalAxios, configuration, credentials, { + [TelemetryAttribute.FgaClientRequestMethod]: "StreamedListObjects" + }); + }, /** * Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores. * @summary List all stores @@ -1156,6 +1212,19 @@ export const OpenFgaApiFactory = function (configuration: Configuration, credent listObjects(storeId: string, body: ListObjectsRequest, options?: any): PromiseResult { return localVarFp.listObjects(storeId, body, options).then((request) => request(axios)); }, + /** + * The Streamed ListObjects API is very similar to the ListObjects API, with two differences: + * 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. + * 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @summary Stream all objects of the given type that the user has a relation with + * @param {string} storeId + * @param {ListObjectsRequest} body + * @param {*} [options] Override http request option. + * @throws { FgaError } + */ + streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise { + return localVarFp.streamedListObjects(storeId, body, options).then((request) => request(axios)); + }, /** * Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores. * @summary List all stores @@ -1370,6 +1439,20 @@ export class OpenFgaApi extends BaseAPI { return OpenFgaApiFp(this.configuration, this.credentials).listObjects(storeId, body, options).then((request) => request(this.axios)); } + /** + * The Streamed ListObjects API is very similar to the ListObjects API, with two differences: + * 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. + * 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @summary Stream all objects of the given type that the user has a relation with + * @param {string} storeId + * @param {ListObjectsRequest} body + * @param {*} [options] Override http request option. + * @throws { FgaError } + */ + public streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise { + return OpenFgaApiFp(this.configuration, this.credentials).streamedListObjects(storeId, body, options).then((request) => request(this.axios)); + } + /** * Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores. * @summary List all stores diff --git a/apiModel.ts b/apiModel.ts index 1678c09..282a71f 100644 --- a/apiModel.ts +++ b/apiModel.ts @@ -860,6 +860,21 @@ export interface ListObjectsResponse { */ objects: Array; } + +/** + * The response for a StreamedListObjects RPC. + * @export + * @interface StreamedListObjectsResponse + */ +export interface StreamedListObjectsResponse { + /** + * + * @type {string} + * @memberof StreamedListObjectsResponse + */ + object: string; +} + /** * * @export diff --git a/client.ts b/client.ts index d6278fa..326a0eb 100644 --- a/client.ts +++ b/client.ts @@ -21,6 +21,7 @@ import { GetStoreResponse, ListObjectsRequest, ListObjectsResponse, + StreamedListObjectsResponse, ListStoresResponse, ListUsersRequest, ListUsersResponse, @@ -48,6 +49,7 @@ import { } from "./utils"; import { isWellFormedUlidString } from "./validation"; import SdkConstants from "./constants"; +import { parseNDJSONStream } from "./streaming"; export type UserClientConfigurationParams = UserConfigurationParams & { storeId?: string; @@ -804,6 +806,51 @@ export class OpenFgaClient extends BaseAPI { }, options); } + /** + * StreamedListObjects - Stream all objects of a particular type that the user has a certain relation to (evaluates) + * + * Note: This method is Node.js only. Streams are supported via the axios API layer. + * The response will be streamed as newline-delimited JSON objects. + * + * @param {ClientListObjectsRequest} body + * @param {ClientRequestOptsWithConsistency} [options] + * @param {string} [options.authorizationModelId] - Overrides the authorization model id in the configuration + * @param {object} [options.headers] - Custom headers to send alongside the request + * @param {ConsistencyPreference} [options.consistency] - The consistency preference to use + * @param {object} [options.retryParams] - Override the retry parameters for this request + * @param {number} [options.retryParams.maxRetry] - Override the max number of retries on each API request + * @param {number} [options.retryParams.minWaitInMs] - Override the minimum wait before a retry is initiated + * @returns {AsyncGenerator} An async generator that yields objects as they are received + */ + async *streamedListObjects(body: ClientListObjectsRequest, options: ClientRequestOptsWithConsistency = {}): AsyncGenerator { + const stream = await this.api.streamedListObjects(this.getStoreId(options)!, { + authorization_model_id: this.getAuthorizationModelId(options), + user: body.user, + relation: body.relation, + type: body.type, + context: body.context, + contextual_tuples: { tuple_keys: body.contextualTuples || [] }, + consistency: options.consistency + }, options); + + // Unwrap axios CallResult to get the raw Node.js stream when needed + const source = stream?.$response?.data ?? stream; + + // Parse the Node.js stream + try { + for await (const item of parseNDJSONStream(source as any)) { + if (item && item.result && item.result.object) { + yield { object: item.result.object } as StreamedListObjectsResponse; + } + } + } finally { + // Ensure underlying HTTP connection closes if consumer stops early + if (source && typeof source.destroy === "function") { + try { source.destroy(); } catch { } + } + } + } + /** * ListRelations - List all the relations a user has with an object (evaluates) * @param {object} listRelationsRequest diff --git a/common.ts b/common.ts index f48c28e..04a8695 100644 --- a/common.ts +++ b/common.ts @@ -342,6 +342,77 @@ export const createRequestFunction = function (axiosArgs: RequestArgs, axiosInst ); } + return result; + }; +}; + +/** + * creates an axios streaming request function that returns the raw response stream + * for incremental parsing (used by streamedListObjects) + */ +export const createStreamingRequestFunction = function (axiosArgs: RequestArgs, axiosInstance: AxiosInstance, configuration: Configuration, credentials: Credentials, methodAttributes: Record = {}) { + configuration.isValid(); + + const retryParams = axiosArgs.options?.retryParams ? axiosArgs.options?.retryParams : configuration.retryParams; + const maxRetry: number = retryParams ? retryParams.maxRetry : 0; + const minWaitInMs: number = retryParams ? retryParams.minWaitInMs : 0; + + const start = performance.now(); + + return async (axios: AxiosInstance = axiosInstance): Promise => { + await setBearerAuthToObject(axiosArgs.options.headers, credentials!); + + const url = configuration.getBasePath() + axiosArgs.url; + + const axiosRequestArgs = { ...axiosArgs.options, responseType: "stream", url: url }; + const wrappedResponse = await attemptHttpRequest(axiosRequestArgs, { + maxRetry, + minWaitInMs, + }, axios); + const response = wrappedResponse?.response; + + const result: any = response?.data; // raw stream + + let attributes: StringIndexable = {}; + + attributes = TelemetryAttributes.fromRequest({ + userAgent: configuration.baseOptions?.headers["User-Agent"], + httpMethod: axiosArgs.options?.method, + url, + resendCount: wrappedResponse?.retries, + start: start, + credentials: credentials, + attributes: methodAttributes, + }); + + attributes = TelemetryAttributes.fromResponse({ + response, + attributes, + }); + + const serverRequestDuration = attributes[TelemetryAttribute.HttpServerRequestDuration]; + if (configuration.telemetry?.metrics?.histogramQueryDuration && typeof serverRequestDuration !== "undefined") { + configuration.telemetry.recorder.histogram( + TelemetryHistograms.queryDuration, + parseInt(attributes[TelemetryAttribute.HttpServerRequestDuration] as string, 10), + TelemetryAttributes.prepare( + attributes, + configuration.telemetry.metrics.histogramQueryDuration.attributes + ) + ); + } + + if (configuration.telemetry?.metrics?.histogramRequestDuration) { + configuration.telemetry.recorder.histogram( + TelemetryHistograms.requestDuration, + attributes[TelemetryAttribute.HttpClientRequestDuration], + TelemetryAttributes.prepare( + attributes, + configuration.telemetry.metrics.histogramRequestDuration.attributes + ) + ); + } + return result; }; }; \ No newline at end of file diff --git a/example/streamed-list-objects-local/README.md b/example/streamed-list-objects-local/README.md new file mode 100644 index 0000000..e7f87fb --- /dev/null +++ b/example/streamed-list-objects-local/README.md @@ -0,0 +1,21 @@ +# Streamed List Objects (Local) + +This example demonstrates using the js-sdk `streamedListObjects` API against a locally running OpenFGA server that you manage yourself. + +Prerequisites: +- Node.js 18+ +- An OpenFGA server reachable at `FGA_API_URL` (defaults to `http://localhost:8080`) + +Run: +1. From repo root, build the SDK once: + - `npm run build` +2. Set the API URL (optional) and run the example: + - `cd example/streamed-list-objects-local` + - `FGA_API_URL=http://localhost:8080 node streamedListObjectsLocal.mjs` + +What it does: +- Creates a temporary store +- Writes a schema 1.1 model with an assignable relation +- Inserts 3 tuples +- Streams them via `streamedListObjects` +- Cleans up the store \ No newline at end of file diff --git a/example/streamed-list-objects-local/streamedListObjectsLocal.mjs b/example/streamed-list-objects-local/streamedListObjectsLocal.mjs new file mode 100644 index 0000000..48e4198 --- /dev/null +++ b/example/streamed-list-objects-local/streamedListObjectsLocal.mjs @@ -0,0 +1,62 @@ +import { ClientConfiguration, OpenFgaClient, ConsistencyPreference } from "../../dist/index.js"; + +const apiUrl = process.env.FGA_API_URL || "http://localhost:8080"; + +async function main() { + const client = new OpenFgaClient(new ClientConfiguration({ apiUrl })); + + console.log("Creating temporary store"); + const { id: storeId } = await client.createStore({ name: "streamed-list-objects-local" }); + + const clientWithStore = new OpenFgaClient(new ClientConfiguration({ apiUrl, storeId })); + + const model = { + schema_version: "1.1", + type_definitions: [ + { type: "user" }, + { + type: "document", + relations: { can_read: { this: {} } }, + metadata: { + relations: { + can_read: { + directly_related_user_types: [{ type: "user" }] + } + } + } + } + ] + }; + + console.log("Writing authorization model"); + const { authorization_model_id: authorizationModelId } = await clientWithStore.writeAuthorizationModel(model); + + const fga = new OpenFgaClient(new ClientConfiguration({ apiUrl, storeId, authorizationModelId })); + + console.log("Writing tuples"); + await fga.write({ + writes: [ + { user: "user:anne", relation: "can_read", object: "document:1" }, + { user: "user:anne", relation: "can_read", object: "document:2" }, + { user: "user:anne", relation: "can_read", object: "document:3" } + ] + }); + + console.log("Streaming objects..."); + let count = 0; + for await (const _ of fga.streamedListObjects( + { user: "user:anne", relation: "can_read", type: "document" }, + { consistency: ConsistencyPreference.HigherConsistency } + )) { + count++; + } + console.log(`\u2713 Streamed count: ${count}`); + + console.log("Cleaning up..."); + await fga.deleteStore(); + console.log("Done"); +} + +main().catch(_err => { + process.exit(1); +}); \ No newline at end of file diff --git a/example/streamed-list-objects/README.md b/example/streamed-list-objects/README.md new file mode 100644 index 0000000..1b247a5 --- /dev/null +++ b/example/streamed-list-objects/README.md @@ -0,0 +1,33 @@ +# Streamed List Objects Example + +This example demonstrates working with [OpenFGA's `/streamed-list-objects` endpoint](https://openfga.dev/api/service#/Relationship%20Queries/StreamedListObjects) using the JavaScript SDK's `streamedListObjects()` method. + +## Prerequisites + +- Node.js 16.15.0+ +- OpenFGA running on `localhost:8080` + +You can start OpenFGA with Docker by running the following command: + +```bash +docker pull openfga/openfga && docker run -it --rm -p 8080:8080 openfga/openfga run +``` + +## Running the example + +No additional setup is required to run this example. Simply run the following command: + +```bash +node streamedListObjects.mjs +``` + +## About this example + +This example: +1. Creates a temporary store +2. Writes an authorization model (defined in `model.json`) +3. Writes 2000 relationship tuples to the store +4. Calls both `/streamed-list-objects` (streaming) and `/list-objects` (standard) endpoints +5. Compares the results + +The streaming endpoint allows retrieving more than 1000 objects (the default limit for the standard endpoint), and streams results as they are found rather than waiting for all results to be collected. diff --git a/example/streamed-list-objects/model.json b/example/streamed-list-objects/model.json new file mode 100644 index 0000000..3037fc3 --- /dev/null +++ b/example/streamed-list-objects/model.json @@ -0,0 +1,251 @@ +{ + "schema_version": "1.1", + "type_definitions": [ + { + "type": "user", + "relations": {} + }, + { + "type": "group", + "relations": { + "member": { + "this": {} + } + }, + "metadata": { + "relations": { + "member": { + "directly_related_user_types": [ + { + "type": "user" + } + ] + } + } + } + }, + { + "type": "folder", + "relations": { + "can_create_file": { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + "owner": { + "this": {} + }, + "parent": { + "this": {} + }, + "viewer": { + "union": { + "child": [ + { + "this": {} + }, + { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + { + "tupleToUserset": { + "tupleset": { + "object": "", + "relation": "parent" + }, + "computedUserset": { + "object": "", + "relation": "viewer" + } + } + } + ] + } + } + }, + "metadata": { + "relations": { + "can_create_file": { + "directly_related_user_types": [] + }, + "owner": { + "directly_related_user_types": [ + { + "type": "user" + } + ] + }, + "parent": { + "directly_related_user_types": [ + { + "type": "folder" + } + ] + }, + "viewer": { + "directly_related_user_types": [ + { + "type": "user" + }, + { + "type": "user", + "wildcard": {} + }, + { + "type": "group", + "relation": "member" + } + ] + } + } + } + }, + { + "type": "document", + "relations": { + "can_change_owner": { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + "owner": { + "this": {} + }, + "parent": { + "this": {} + }, + "can_read": { + "union": { + "child": [ + { + "computedUserset": { + "object": "", + "relation": "viewer" + } + }, + { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + { + "tupleToUserset": { + "tupleset": { + "object": "", + "relation": "parent" + }, + "computedUserset": { + "object": "", + "relation": "viewer" + } + } + } + ] + } + }, + "can_share": { + "union": { + "child": [ + { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + { + "tupleToUserset": { + "tupleset": { + "object": "", + "relation": "parent" + }, + "computedUserset": { + "object": "", + "relation": "owner" + } + } + } + ] + } + }, + "viewer": { + "this": {} + }, + "can_write": { + "union": { + "child": [ + { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + { + "tupleToUserset": { + "tupleset": { + "object": "", + "relation": "parent" + }, + "computedUserset": { + "object": "", + "relation": "owner" + } + } + } + ] + } + } + }, + "metadata": { + "relations": { + "can_change_owner": { + "directly_related_user_types": [] + }, + "owner": { + "directly_related_user_types": [ + { + "type": "user" + } + ] + }, + "parent": { + "directly_related_user_types": [ + { + "type": "folder" + } + ] + }, + "can_read": { + "directly_related_user_types": [] + }, + "can_share": { + "directly_related_user_types": [] + }, + "viewer": { + "directly_related_user_types": [ + { + "type": "user" + }, + { + "type": "user", + "wildcard": {} + }, + { + "type": "group", + "relation": "member" + } + ] + }, + "can_write": { + "directly_related_user_types": [] + } + } + } + } + ] +} \ No newline at end of file diff --git a/example/streamed-list-objects/streamedListObjects.mjs b/example/streamed-list-objects/streamedListObjects.mjs new file mode 100644 index 0000000..3ac2cfc --- /dev/null +++ b/example/streamed-list-objects/streamedListObjects.mjs @@ -0,0 +1,105 @@ +import { OpenFgaClient } from "../../dist/index.js"; +import { readFileSync } from "fs"; + +async function createStore(fgaClient) { + console.log("Creating temporary store"); + const store = await fgaClient.createStore({ name: "Streamed List Objects Demo Store" }); + console.log(`Created store: ${store.id}\n`); + return store.id; +} + +async function writeModel(fgaClient) { + console.log("Writing authorization model"); + const model = JSON.parse(readFileSync("./model.json", "utf8")); + const response = await fgaClient.writeAuthorizationModel(model); + console.log(`Created authorization model: ${response.authorization_model_id}\n`); + return response.authorization_model_id; +} + +async function writeTuples(fgaClient, quantity) { + console.log(`Writing ${quantity} tuples to the store`); + const chunks = Math.floor(quantity / 100); + + for (let chunk = 0; chunk < chunks; ++chunk) { + const tuples = []; + for (let t = 0; t < 100; ++t) { + tuples.push({ + user: "user:anne", + relation: "owner", + object: `document:${chunk * 100 + t}` + }); + } + await fgaClient.writeTuples(tuples); + } + console.log(`Done writing ${quantity} tuples\n`); + return quantity; +} + +async function streamedListObjects(fgaClient, request) { + console.log("Calling /streamed-list-objects endpoint..."); + const results = []; + + // Note: streamedListObjects() is an async generator + // We can process results as they arrive, or collect them all like this + for await (const response of fgaClient.streamedListObjects(request)) { + results.push(response.object); + } + + return results; +} + +async function listObjects(fgaClient, request) { + console.log("Calling /list-objects endpoint for comparison..."); + const response = await fgaClient.listObjects(request); + return response.objects; +} + +async function main() { + const fgaClient = new OpenFgaClient({ + apiUrl: process.env.FGA_API_URL || "http://localhost:8080" + }); + + try { + // Create temporary store + const storeId = await createStore(fgaClient); + fgaClient.storeId = storeId; + + // Write authorization model + const modelId = await writeModel(fgaClient); + fgaClient.authorizationModelId = modelId; + + // Write test data + await writeTuples(fgaClient, 2000); + + // Prepare request to list all documents owned by user:anne + const request = { + type: "document", + relation: "owner", + user: "user:anne" + }; + + // Test streaming endpoint + const streamedResults = await streamedListObjects(fgaClient, request); + console.log(`✓ Streamed endpoint returned ${streamedResults.length} objects\n`); + + // Test standard endpoint for comparison + const standardResults = await listObjects(fgaClient, request); + console.log(`✓ Standard endpoint returned ${standardResults.length} objects\n`); + + console.log("Comparison:"); + console.log(` Streaming: ${streamedResults.length} objects`); + console.log(` Standard: ${standardResults.length} objects (max 1000)`); + console.log(`\nStreaming endpoint retrieved all ${streamedResults.length} objects successfully!`); + + // Cleanup + console.log("\nCleaning up..."); + await fgaClient.deleteStore(); + console.log("Deleted temporary store"); + + } catch (error) { + console.error("Error:", error); + process.exitCode = 1; + } +} + +main(); \ No newline at end of file diff --git a/index.ts b/index.ts index 291e294..2b94e82 100644 --- a/index.ts +++ b/index.ts @@ -24,5 +24,5 @@ export * from "./telemetry/counters"; export * from "./telemetry/histograms"; export * from "./telemetry/metrics"; export * from "./errors"; - +export { parseNDJSONStream } from "./streaming"; diff --git a/streaming.ts b/streaming.ts new file mode 100644 index 0000000..e5e71fa --- /dev/null +++ b/streaming.ts @@ -0,0 +1,185 @@ +/** + * JavaScript and Node.js SDK for OpenFGA + * + * API version: 1.x + * Website: https://openfga.dev + * Documentation: https://openfga.dev/docs + * Support: https://openfga.dev/community + * License: [Apache-2.0](https://github.com/openfga/js-sdk/blob/main/LICENSE) + * + * NOTE: This file was auto generated by OpenAPI Generator (https://openapi-generator.tech). DO NOT EDIT. + */ + +import type { Readable } from "node:stream"; + +// Helper: create async iterable from classic EventEmitter-style Readable streams +const createAsyncIterableFromReadable = (readable: any): AsyncIterable => { + return { + [Symbol.asyncIterator](): AsyncIterator { + const chunkQueue: any[] = []; + const pendings: Array<{ resolve: (v: IteratorResult) => void; reject: (e?: any) => void }> = []; + let ended = false; + let error: any = null; + + const onData = (chunk: any) => { + if (pendings.length > 0) { + const { resolve } = pendings.shift()!; + resolve({ value: chunk, done: false }); + } else { + chunkQueue.push(chunk); + } + }; + + const onEnd = () => { + if (error) return; // Don't process end if error already occurred + ended = true; + while (pendings.length > 0) { + const { resolve } = pendings.shift()!; + resolve({ value: undefined, done: true }); + } + }; + + const onError = (err: any) => { + error = err; + while (pendings.length > 0) { + const { reject } = pendings.shift()!; + reject(err); + } + cleanup(); + }; + + readable.on("data", onData); + readable.once("end", onEnd); + readable.once("error", onError); + + const cleanup = () => { + readable.off("data", onData); + readable.off("end", onEnd); + readable.off("error", onError); + }; + + return { + next(): Promise> { + if (error) { + return Promise.reject(error); + } + if (chunkQueue.length > 0) { + const value = chunkQueue.shift(); + return Promise.resolve({ value, done: false }); + } + if (ended) { + cleanup(); + return Promise.resolve({ value: undefined, done: true }); + } + return new Promise>((resolve, reject) => { + pendings.push({ resolve, reject }); + }); + }, + return(): Promise> { + try { + cleanup(); + } finally { + if (readable && typeof readable.destroy === "function") { + readable.destroy(); + } + } + return Promise.resolve({ value: undefined, done: true }); + }, + throw(e?: any): Promise> { + try { + cleanup(); + } finally { + if (readable && typeof readable.destroy === "function") { + readable.destroy(e); + } + } + return Promise.reject(e); + } + }; + } + }; +}; + +/** + * Parse newline-delimited JSON (NDJSON) from a Node.js readable stream + * @param stream - Node.js readable stream, AsyncIterable, string, or Buffer + * @returns AsyncGenerator that yields parsed JSON objects + */ +export async function* parseNDJSONStream( + stream: Readable | AsyncIterable | string | Uint8Array | Buffer +): AsyncGenerator { + const decoder = new TextDecoder("utf-8"); + let buffer = ""; + + // If stream is actually a string or Buffer-like, handle as whole payload + const isString = typeof stream === "string"; + const isBuffer = typeof Buffer !== "undefined" && Buffer.isBuffer && Buffer.isBuffer(stream); + const isUint8Array = typeof Uint8Array !== "undefined" && stream instanceof Uint8Array; + + if (isString || isBuffer || isUint8Array) { + const text = isString + ? (stream as string) + : decoder.decode(isBuffer ? new Uint8Array(stream as Buffer) : (stream as Uint8Array)); + const lines = text.split("\n"); + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) { + continue; + } + + try { + yield JSON.parse(trimmed); + } catch (err) { + console.warn("Failed to parse JSON line:", err); + } + } + return; + } + + const isAsyncIterable = stream && typeof (stream as any)[Symbol.asyncIterator] === "function"; + const source: AsyncIterable = isAsyncIterable ? (stream as any) : createAsyncIterableFromReadable(stream as any); + + for await (const chunk of source) { + // Node.js streams can return Buffer or string chunks + // Convert to Uint8Array if needed for TextDecoder + const uint8Chunk = typeof chunk === "string" + ? new TextEncoder().encode(chunk) + : chunk instanceof Buffer + ? new Uint8Array(chunk) + : chunk; + + // Append decoded chunk to buffer + buffer += decoder.decode(uint8Chunk, { stream: true }); + + // Split on newlines + const lines = buffer.split("\n"); + + // Keep the last (potentially incomplete) line in the buffer + buffer = lines.pop() || ""; + + // Parse and yield complete lines + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed) { + try { + yield JSON.parse(trimmed); + } catch (err) { + console.warn("Failed to parse JSON line:", err); + } + } + } + } + + // Flush any remaining decoder state + buffer += decoder.decode(); + + // Handle any remaining data in buffer + if (buffer.trim()) { + try { + yield JSON.parse(buffer); + } catch (err) { + console.warn("Failed to parse final JSON buffer:", err); + } + } +} \ No newline at end of file diff --git a/tests/helpers/nocks.ts b/tests/helpers/nocks.ts index bc46123..11e73a1 100644 --- a/tests/helpers/nocks.ts +++ b/tests/helpers/nocks.ts @@ -1,5 +1,7 @@ import type * as Nock from "nock"; +import { Readable } from "node:stream"; + import { AuthorizationModel, BatchCheckRequest, @@ -244,6 +246,22 @@ export const getNocks = ((nock: typeof Nock) => ({ ) .reply(200, responseBody); }, + streamedListObjects: ( + storeId: string, + objects: string[], + basePath = defaultConfiguration.getBasePath(), + ) => { + // Create NDJSON response (newline-delimited JSON) as a stream + const ndjsonResponse = objects + .map(obj => JSON.stringify({ result: { object: obj } })) + .join("\n") + "\n"; + + return nock(basePath) + .post(`/stores/${storeId}/streamed-list-objects`) + .reply(200, () => Readable.from([ndjsonResponse]), { + "Content-Type": "application/x-ndjson" + }); + }, listUsers: ( storeId: string, responseBody: ListUsersResponse, diff --git a/tests/streaming.test.ts b/tests/streaming.test.ts new file mode 100644 index 0000000..19882e7 --- /dev/null +++ b/tests/streaming.test.ts @@ -0,0 +1,259 @@ +/** + * JavaScript and Node.js SDK for OpenFGA + * + * API version: 1.x + * Website: https://openfga.dev + * Documentation: https://openfga.dev/docs + * Support: https://openfga.dev/community + * License: [Apache-2.0](https://github.com/openfga/js-sdk/blob/main/LICENSE) + * + * NOTE: This file was auto generated by OpenAPI Generator (https://openapi-generator.tech). DO NOT EDIT. + */ + + +import { Readable } from "node:stream"; +import { EventEmitter } from "node:events"; +import { parseNDJSONStream } from "../streaming"; + +describe("Streaming Utilities", () => { + describe("parseNDJSONStream (Node.js)", () => { + it("should parse single line NDJSON", async () => { + const ndjson = '{"result":{"object":"document:1"}}\n'; + const stream = Readable.from([ndjson]); + + const results: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(1); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + }); + + it("should parse multiple line NDJSON", async () => { + const ndjson = '{"result":{"object":"document:1"}}\n{"result":{"object":"document:2"}}\n{"result":{"object":"document:3"}}\n'; + const stream = Readable.from([ndjson]); + + const results: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(3); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + expect(results[1]).toEqual({ result: { object: "document:2" } }); + expect(results[2]).toEqual({ result: { object: "document:3" } }); + }); + + it("should handle chunked data across multiple reads", async () => { + // Simulate data coming in chunks that split JSON objects mid-line + const chunks = [ + '{"result":{"object":"document:1"}}\n{"res', + 'ult":{"object":"document:2"}}\n' + ]; + + const stream = Readable.from(chunks); + + const results: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(2); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + expect(results[1]).toEqual({ result: { object: "document:2" } }); + }); + + it("should handle empty lines", async () => { + const ndjson = '{"result":{"object":"document:1"}}\n\n{"result":{"object":"document:2"}}\n'; + const stream = Readable.from([ndjson]); + + const results: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(2); + }); + + it("should skip invalid JSON lines", async () => { + const consoleWarnSpy = jest.spyOn(console, "warn").mockImplementation(); + + const ndjson = '{"result":{"object":"document:1"}}\ninvalid json\n{"result":{"object":"document:2"}}\n'; + const stream = Readable.from([ndjson]); + + const results: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + results.push(item); + } + + expect(results).toHaveLength(2); + expect(results[0]).toEqual({ result: { object: "document:1" } }); + expect(results[1]).toEqual({ result: { object: "document:2" } }); + expect(consoleWarnSpy).toHaveBeenCalled(); + + consoleWarnSpy.mockRestore(); + }); + + it("should parse when Readable emits Buffer chunks", async () => { + const ndjson = Buffer.from('{"a":1}\n{"b":2}\n'); + const stream = Readable.from([ndjson]); + + const out: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + out.push(item); + } + expect(out).toEqual([{ a: 1 }, { b: 2 }]); + }); + + it("should parse last JSON without trailing newline", async () => { + const stream = Readable.from(['{"a":1}\n{"b":2}']); + + const out: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + out.push(item); + } + expect(out).toEqual([{ a: 1 }, { b: 2 }]); + }); + + it("should skip invalid final JSON buffer and warn", async () => { + const warn = jest.spyOn(console, "warn").mockImplementation(); + const stream = Readable.from(['{"a":1}\n{"b":']); + + const out: any[] = []; + for await (const item of parseNDJSONStream(stream)) { + out.push(item); + } + expect(out).toEqual([{ a: 1 }]); + expect(warn).toHaveBeenCalled(); + warn.mockRestore(); + }); + + it("should parse when given a string input", async () => { + const input = '{"a":1}\n{"b":2}\n'; + const out: any[] = []; + for await (const item of parseNDJSONStream(input as any)) { + out.push(item); + } + expect(out).toEqual([{ a: 1 }, { b: 2 }]); + }); + + it("should parse when given a Buffer input", async () => { + const input = Buffer.from('{"a":1}\n{"b":2}\n'); + const out: any[] = []; + for await (const item of parseNDJSONStream(input as any)) { + out.push(item); + } + expect(out).toEqual([{ a: 1 }, { b: 2 }]); + }); + + it("should accept async iterable that yields Uint8Array", async () => { + const src = { + [Symbol.asyncIterator]: async function* () { + yield new TextEncoder().encode('{"a":1}\n{"b":2}\n'); + } + } as any; + + const out: any[] = []; + for await (const item of parseNDJSONStream(src)) { + out.push(item); + } + expect(out).toEqual([{ a: 1 }, { b: 2 }]); + }); + + it("should reject pending iteration when classic emitter errors", async () => { + const emitter = new EventEmitter() as any; + const gen = parseNDJSONStream(emitter); + + const firstPromise = gen.next(); + emitter.emit("data", '{"a":1}\n'); + const first = await firstPromise; + expect(first.value).toEqual({ a: 1 }); + + const pendingNext = gen.next(); + emitter.emit("error", new Error("boom")); + + // Pending next should now reject with the error + await expect(pendingNext).rejects.toThrow("boom"); + + // After error, iterator is exhausted (standard async iterator behavior) + await expect(gen.next()).resolves.toEqual({ value: undefined, done: true }); + }); + + it("should clean up listeners on early cancellation", async () => { + const emitter = new EventEmitter() as any; + const gen = parseNDJSONStream(emitter); + + const p = gen.next(); + emitter.emit("data", '{"a":1}\n'); + const first = await p; + expect(first.value).toEqual({ a: 1 }); + + await gen.return(undefined as any); + expect(emitter.listenerCount("data")).toBe(0); + expect(emitter.listenerCount("end")).toBe(0); + expect(emitter.listenerCount("error")).toBe(0); + }); + + it("should read from buffered queue when data exceeds pending resolvers", async () => { + const emitter = new EventEmitter() as any; + const gen = parseNDJSONStream(emitter); + + // Start consumption so listeners are attached and a pending resolver exists + const firstPromise = gen.next(); + + // First chunk fulfills the pending resolver + emitter.emit("data", '{"x":1}\n'); + const first = await firstPromise; + expect(first).toEqual({ value: { x: 1 }, done: false }); + + // Emit another chunk before requesting next; it should be queued + emitter.emit("data", '{"y":2}\n'); + + // Next pull should be served from the buffered queue path + const second = await gen.next(); + expect(second).toEqual({ value: { y: 2 }, done: false }); + + // end stream to complete + emitter.emit("end"); + const done = await gen.next(); + expect(done).toEqual({ value: undefined, done: true }); + }); + + it("should resolve pending next to done when end occurs", async () => { + const emitter = new EventEmitter() as any; + const gen = parseNDJSONStream(emitter); + + const pending = gen.next(); + emitter.emit("end"); + + await expect(pending).resolves.toEqual({ value: undefined, done: true }); + await expect(gen.next()).resolves.toEqual({ value: undefined, done: true }); + }); + + it("should cleanup and reject on iterator throw", async () => { + const emitter = new EventEmitter() as any; + const gen = parseNDJSONStream(emitter); + + const thrown = gen.throw(new Error("stop")); + await expect(thrown).rejects.toThrow("stop"); + + expect(emitter.listenerCount("data")).toBe(0); + expect(emitter.listenerCount("end")).toBe(0); + expect(emitter.listenerCount("error")).toBe(0); + }); + + it("should warn on invalid JSON line in string input path", async () => { + const warn = jest.spyOn(console, "warn").mockImplementation(); + const input = '{"a":1}\nnot json\n{"b":2}\n'; + + const out: any[] = []; + for await (const item of parseNDJSONStream(input as any)) { + out.push(item); + } + expect(out).toEqual([{ a: 1 }, { b: 2 }]); + expect(warn).toHaveBeenCalled(); + warn.mockRestore(); + }); + }); +}); \ No newline at end of file From 6fe722dedb257c2bbc4804424e1095826276dac4 Mon Sep 17 00:00:00 2001 From: Daniel Jonathan Date: Mon, 3 Nov 2025 10:19:01 -0500 Subject: [PATCH 2/3] feat/streaming-only: Removed unneeded requirement in CHANGELOG. --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a10623..f524bd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,6 @@ - feat: add support for handling Retry-After header (#267) - feat: streamedListObjects (streaming ListObjects) - Node.js only - Enables retrieving >1000 objects beyond standard listObjects limit - - Requires OpenFGA server [v1.2.0+](https://github.com/openfga/openfga/releases/tag/v1.2.0) - Uses axios streaming via API layer with preserved telemetry - Resilient NDJSON parsing (supports async-iterable and event-based streams) - Parses chunked data across multiple reads; handles Buffer/string inputs From b010a2595368f1f4ed479b371cdee32c9ae060ee Mon Sep 17 00:00:00 2001 From: Daniel Jonathan Date: Mon, 3 Nov 2025 12:22:07 -0500 Subject: [PATCH 3/3] refactor: address code review feedback - Simplify to one example using syntax transformer - Add StreamedListObjects documentation to main README - Add 5 client integration tests (streaming, headers, errors, retry, consistency) - Simplify CHANGELOG with links to documentation - Use @openfga/syntax-transformer in example --- CHANGELOG.md | 8 +- README.md | 26 ++ example/streamed-list-objects-local/README.md | 21 -- .../streamedListObjectsLocal.mjs | 62 ----- example/streamed-list-objects/README.md | 39 +-- example/streamed-list-objects/model.json | 251 ------------------ example/streamed-list-objects/package.json | 19 ++ .../streamedListObjects.mjs | 134 +++------- tests/client.test.ts | 134 ++++++++++ 9 files changed, 238 insertions(+), 456 deletions(-) delete mode 100644 example/streamed-list-objects-local/README.md delete mode 100644 example/streamed-list-objects-local/streamedListObjectsLocal.mjs delete mode 100644 example/streamed-list-objects/model.json create mode 100644 example/streamed-list-objects/package.json diff --git a/CHANGELOG.md b/CHANGELOG.md index f524bd0..c67ffdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,13 +4,7 @@ ## [Unreleased](https://github.com/openfga/js-sdk/compare/v0.9.0...HEAD) - feat: add support for handling Retry-After header (#267) -- feat: streamedListObjects (streaming ListObjects) - Node.js only - - Enables retrieving >1000 objects beyond standard listObjects limit - - Uses axios streaming via API layer with preserved telemetry - - Resilient NDJSON parsing (supports async-iterable and event-based streams) - - Parses chunked data across multiple reads; handles Buffer/string inputs - - Adds example for usage: `example/streamed-list-objects` - - Adds example for local usage: `example/streamed-list-objects-local` +- feat: add support for [StreamedListObjects](https://openfga.dev/api/service#/Relationship%20Queries/StreamedListObjects) with streaming semantics. See [documentation](https://github.com/openfga/js-sdk/blob/main/README.md#streamed-list-objects) for more. ## v0.9.0 diff --git a/README.md b/README.md index eea0edd..8c336a0 100644 --- a/README.md +++ b/README.md @@ -660,6 +660,32 @@ const response = await fgaClient.listObjects({ // response.objects = ["document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a"] ``` +##### Streamed List Objects + +The Streamed ListObjects API is very similar to the ListObjects API, with two differences: + +1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. +2. The number of results returned is only limited by the execution timeout specified in the flag `OPENFGA_LIST_OBJECTS_DEADLINE`. + +[API Documentation](https://openfga.dev/api/service#/Relationship%20Queries/StreamedListObjects) + +```javascript +const options = {}; + +// To override the authorization model id for this request +options.authorizationModelId = "01GXSA8YR785C4FYS3C0RTG7B1"; + +const objects = []; +for await (const response of fgaClient.streamedListObjects( + { user: "user:anne", relation: "can_read", type: "document" }, + { consistency: ConsistencyPreference.HigherConsistency } +)) { + objects.push(response.object); +} + +// objects = ["document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a"] +``` + ##### List Relations List the relations a user has with an object. This wraps around [BatchCheck](#batchcheck) to allow checking multiple relationships at once. diff --git a/example/streamed-list-objects-local/README.md b/example/streamed-list-objects-local/README.md deleted file mode 100644 index e7f87fb..0000000 --- a/example/streamed-list-objects-local/README.md +++ /dev/null @@ -1,21 +0,0 @@ -# Streamed List Objects (Local) - -This example demonstrates using the js-sdk `streamedListObjects` API against a locally running OpenFGA server that you manage yourself. - -Prerequisites: -- Node.js 18+ -- An OpenFGA server reachable at `FGA_API_URL` (defaults to `http://localhost:8080`) - -Run: -1. From repo root, build the SDK once: - - `npm run build` -2. Set the API URL (optional) and run the example: - - `cd example/streamed-list-objects-local` - - `FGA_API_URL=http://localhost:8080 node streamedListObjectsLocal.mjs` - -What it does: -- Creates a temporary store -- Writes a schema 1.1 model with an assignable relation -- Inserts 3 tuples -- Streams them via `streamedListObjects` -- Cleans up the store \ No newline at end of file diff --git a/example/streamed-list-objects-local/streamedListObjectsLocal.mjs b/example/streamed-list-objects-local/streamedListObjectsLocal.mjs deleted file mode 100644 index 48e4198..0000000 --- a/example/streamed-list-objects-local/streamedListObjectsLocal.mjs +++ /dev/null @@ -1,62 +0,0 @@ -import { ClientConfiguration, OpenFgaClient, ConsistencyPreference } from "../../dist/index.js"; - -const apiUrl = process.env.FGA_API_URL || "http://localhost:8080"; - -async function main() { - const client = new OpenFgaClient(new ClientConfiguration({ apiUrl })); - - console.log("Creating temporary store"); - const { id: storeId } = await client.createStore({ name: "streamed-list-objects-local" }); - - const clientWithStore = new OpenFgaClient(new ClientConfiguration({ apiUrl, storeId })); - - const model = { - schema_version: "1.1", - type_definitions: [ - { type: "user" }, - { - type: "document", - relations: { can_read: { this: {} } }, - metadata: { - relations: { - can_read: { - directly_related_user_types: [{ type: "user" }] - } - } - } - } - ] - }; - - console.log("Writing authorization model"); - const { authorization_model_id: authorizationModelId } = await clientWithStore.writeAuthorizationModel(model); - - const fga = new OpenFgaClient(new ClientConfiguration({ apiUrl, storeId, authorizationModelId })); - - console.log("Writing tuples"); - await fga.write({ - writes: [ - { user: "user:anne", relation: "can_read", object: "document:1" }, - { user: "user:anne", relation: "can_read", object: "document:2" }, - { user: "user:anne", relation: "can_read", object: "document:3" } - ] - }); - - console.log("Streaming objects..."); - let count = 0; - for await (const _ of fga.streamedListObjects( - { user: "user:anne", relation: "can_read", type: "document" }, - { consistency: ConsistencyPreference.HigherConsistency } - )) { - count++; - } - console.log(`\u2713 Streamed count: ${count}`); - - console.log("Cleaning up..."); - await fga.deleteStore(); - console.log("Done"); -} - -main().catch(_err => { - process.exit(1); -}); \ No newline at end of file diff --git a/example/streamed-list-objects/README.md b/example/streamed-list-objects/README.md index 1b247a5..b23e088 100644 --- a/example/streamed-list-objects/README.md +++ b/example/streamed-list-objects/README.md @@ -1,33 +1,22 @@ # Streamed List Objects Example -This example demonstrates working with [OpenFGA's `/streamed-list-objects` endpoint](https://openfga.dev/api/service#/Relationship%20Queries/StreamedListObjects) using the JavaScript SDK's `streamedListObjects()` method. +Demonstrates using `streamedListObjects` to retrieve objects via the streaming API. ## Prerequisites +- OpenFGA server running on `http://localhost:8080` (or set `FGA_API_URL`) -- Node.js 16.15.0+ -- OpenFGA running on `localhost:8080` - -You can start OpenFGA with Docker by running the following command: - +## Running ```bash -docker pull openfga/openfga && docker run -it --rm -p 8080:8080 openfga/openfga run +# From repo root +npm run build +cd example/streamed-list-objects +npm install +npm start ``` -## Running the example - -No additional setup is required to run this example. Simply run the following command: - -```bash -node streamedListObjects.mjs -``` - -## About this example - -This example: -1. Creates a temporary store -2. Writes an authorization model (defined in `model.json`) -3. Writes 2000 relationship tuples to the store -4. Calls both `/streamed-list-objects` (streaming) and `/list-objects` (standard) endpoints -5. Compares the results - -The streaming endpoint allows retrieving more than 1000 objects (the default limit for the standard endpoint), and streams results as they are found rather than waiting for all results to be collected. +## What it does +- Creates a temporary store +- Writes a simple authorization model +- Adds 3 tuples +- Streams results via `streamedListObjects` +- Cleans up the store \ No newline at end of file diff --git a/example/streamed-list-objects/model.json b/example/streamed-list-objects/model.json deleted file mode 100644 index 3037fc3..0000000 --- a/example/streamed-list-objects/model.json +++ /dev/null @@ -1,251 +0,0 @@ -{ - "schema_version": "1.1", - "type_definitions": [ - { - "type": "user", - "relations": {} - }, - { - "type": "group", - "relations": { - "member": { - "this": {} - } - }, - "metadata": { - "relations": { - "member": { - "directly_related_user_types": [ - { - "type": "user" - } - ] - } - } - } - }, - { - "type": "folder", - "relations": { - "can_create_file": { - "computedUserset": { - "object": "", - "relation": "owner" - } - }, - "owner": { - "this": {} - }, - "parent": { - "this": {} - }, - "viewer": { - "union": { - "child": [ - { - "this": {} - }, - { - "computedUserset": { - "object": "", - "relation": "owner" - } - }, - { - "tupleToUserset": { - "tupleset": { - "object": "", - "relation": "parent" - }, - "computedUserset": { - "object": "", - "relation": "viewer" - } - } - } - ] - } - } - }, - "metadata": { - "relations": { - "can_create_file": { - "directly_related_user_types": [] - }, - "owner": { - "directly_related_user_types": [ - { - "type": "user" - } - ] - }, - "parent": { - "directly_related_user_types": [ - { - "type": "folder" - } - ] - }, - "viewer": { - "directly_related_user_types": [ - { - "type": "user" - }, - { - "type": "user", - "wildcard": {} - }, - { - "type": "group", - "relation": "member" - } - ] - } - } - } - }, - { - "type": "document", - "relations": { - "can_change_owner": { - "computedUserset": { - "object": "", - "relation": "owner" - } - }, - "owner": { - "this": {} - }, - "parent": { - "this": {} - }, - "can_read": { - "union": { - "child": [ - { - "computedUserset": { - "object": "", - "relation": "viewer" - } - }, - { - "computedUserset": { - "object": "", - "relation": "owner" - } - }, - { - "tupleToUserset": { - "tupleset": { - "object": "", - "relation": "parent" - }, - "computedUserset": { - "object": "", - "relation": "viewer" - } - } - } - ] - } - }, - "can_share": { - "union": { - "child": [ - { - "computedUserset": { - "object": "", - "relation": "owner" - } - }, - { - "tupleToUserset": { - "tupleset": { - "object": "", - "relation": "parent" - }, - "computedUserset": { - "object": "", - "relation": "owner" - } - } - } - ] - } - }, - "viewer": { - "this": {} - }, - "can_write": { - "union": { - "child": [ - { - "computedUserset": { - "object": "", - "relation": "owner" - } - }, - { - "tupleToUserset": { - "tupleset": { - "object": "", - "relation": "parent" - }, - "computedUserset": { - "object": "", - "relation": "owner" - } - } - } - ] - } - } - }, - "metadata": { - "relations": { - "can_change_owner": { - "directly_related_user_types": [] - }, - "owner": { - "directly_related_user_types": [ - { - "type": "user" - } - ] - }, - "parent": { - "directly_related_user_types": [ - { - "type": "folder" - } - ] - }, - "can_read": { - "directly_related_user_types": [] - }, - "can_share": { - "directly_related_user_types": [] - }, - "viewer": { - "directly_related_user_types": [ - { - "type": "user" - }, - { - "type": "user", - "wildcard": {} - }, - { - "type": "group", - "relation": "member" - } - ] - }, - "can_write": { - "directly_related_user_types": [] - } - } - } - } - ] -} \ No newline at end of file diff --git a/example/streamed-list-objects/package.json b/example/streamed-list-objects/package.json new file mode 100644 index 0000000..2ef7198 --- /dev/null +++ b/example/streamed-list-objects/package.json @@ -0,0 +1,19 @@ +{ + "name": "streamed-list-objects", + "private": "true", + "version": "1.0.0", + "description": "Example demonstrating streamedListObjects", + "author": "OpenFGA", + "license": "Apache-2.0", + "scripts": { + "start": "node streamedListObjects.mjs" + }, + "dependencies": { + "@openfga/sdk": "^0.9.0", + "@openfga/syntax-transformer": "^0.2.0" + }, + "engines": { + "node": ">=16.15.0" + } +} + diff --git a/example/streamed-list-objects/streamedListObjects.mjs b/example/streamed-list-objects/streamedListObjects.mjs index 3ac2cfc..7517bfc 100644 --- a/example/streamed-list-objects/streamedListObjects.mjs +++ b/example/streamed-list-objects/streamedListObjects.mjs @@ -1,105 +1,59 @@ -import { OpenFgaClient } from "../../dist/index.js"; -import { readFileSync } from "fs"; +import { ClientConfiguration, OpenFgaClient, ConsistencyPreference } from "../../dist/index.js"; +import { transformer } from "@openfga/syntax-transformer"; -async function createStore(fgaClient) { - console.log("Creating temporary store"); - const store = await fgaClient.createStore({ name: "Streamed List Objects Demo Store" }); - console.log(`Created store: ${store.id}\n`); - return store.id; -} - -async function writeModel(fgaClient) { - console.log("Writing authorization model"); - const model = JSON.parse(readFileSync("./model.json", "utf8")); - const response = await fgaClient.writeAuthorizationModel(model); - console.log(`Created authorization model: ${response.authorization_model_id}\n`); - return response.authorization_model_id; -} - -async function writeTuples(fgaClient, quantity) { - console.log(`Writing ${quantity} tuples to the store`); - const chunks = Math.floor(quantity / 100); - - for (let chunk = 0; chunk < chunks; ++chunk) { - const tuples = []; - for (let t = 0; t < 100; ++t) { - tuples.push({ - user: "user:anne", - relation: "owner", - object: `document:${chunk * 100 + t}` - }); - } - await fgaClient.writeTuples(tuples); - } - console.log(`Done writing ${quantity} tuples\n`); - return quantity; -} - -async function streamedListObjects(fgaClient, request) { - console.log("Calling /streamed-list-objects endpoint..."); - const results = []; - - // Note: streamedListObjects() is an async generator - // We can process results as they arrive, or collect them all like this - for await (const response of fgaClient.streamedListObjects(request)) { - results.push(response.object); - } - - return results; -} - -async function listObjects(fgaClient, request) { - console.log("Calling /list-objects endpoint for comparison..."); - const response = await fgaClient.listObjects(request); - return response.objects; -} +const apiUrl = process.env.FGA_API_URL || "http://localhost:8080"; async function main() { - const fgaClient = new OpenFgaClient({ - apiUrl: process.env.FGA_API_URL || "http://localhost:8080" - }); + const client = new OpenFgaClient(new ClientConfiguration({ apiUrl })); - try { - // Create temporary store - const storeId = await createStore(fgaClient); - fgaClient.storeId = storeId; + console.log("Creating temporary store"); + const { id: storeId } = await client.createStore({ name: "streamed-list-objects" }); - // Write authorization model - const modelId = await writeModel(fgaClient); - fgaClient.authorizationModelId = modelId; + const clientWithStore = new OpenFgaClient(new ClientConfiguration({ apiUrl, storeId })); - // Write test data - await writeTuples(fgaClient, 2000); + const dslString = ` + model + schema 1.1 - // Prepare request to list all documents owned by user:anne - const request = { - type: "document", - relation: "owner", - user: "user:anne" - }; + type user - // Test streaming endpoint - const streamedResults = await streamedListObjects(fgaClient, request); - console.log(`✓ Streamed endpoint returned ${streamedResults.length} objects\n`); + type document + relations + define can_read: [user] + `; - // Test standard endpoint for comparison - const standardResults = await listObjects(fgaClient, request); - console.log(`✓ Standard endpoint returned ${standardResults.length} objects\n`); + const model = transformer.transformDSLToJSONObject(dslString); - console.log("Comparison:"); - console.log(` Streaming: ${streamedResults.length} objects`); - console.log(` Standard: ${standardResults.length} objects (max 1000)`); - console.log(`\nStreaming endpoint retrieved all ${streamedResults.length} objects successfully!`); + console.log("Writing authorization model"); + const { authorization_model_id: authorizationModelId } = await clientWithStore.writeAuthorizationModel(model); - // Cleanup - console.log("\nCleaning up..."); - await fgaClient.deleteStore(); - console.log("Deleted temporary store"); + const fga = new OpenFgaClient(new ClientConfiguration({ apiUrl, storeId, authorizationModelId })); - } catch (error) { - console.error("Error:", error); - process.exitCode = 1; + console.log("Writing tuples"); + await fga.write({ + writes: [ + { user: "user:anne", relation: "can_read", object: "document:1" }, + { user: "user:anne", relation: "can_read", object: "document:2" }, + { user: "user:anne", relation: "can_read", object: "document:3" } + ] + }); + + console.log("Streaming objects..."); + let count = 0; + for await (const response of fga.streamedListObjects( + { user: "user:anne", relation: "can_read", type: "document" }, + { consistency: ConsistencyPreference.HigherConsistency } + )) { + console.log(`- ${response.object}`) + count++; } + console.log(`\u2713 Streamed count: ${count}`); + + console.log("Cleaning up..."); + await fga.deleteStore(); + console.log("Done"); } -main(); \ No newline at end of file +main().catch(_err => { + process.exit(1); +}); \ No newline at end of file diff --git a/tests/client.test.ts b/tests/client.test.ts index a8c06fc..edf8c90 100644 --- a/tests/client.test.ts +++ b/tests/client.test.ts @@ -1,4 +1,5 @@ import * as nock from "nock"; +import { Readable } from "node:stream"; import { ClientWriteStatus, @@ -810,6 +811,139 @@ describe("OpenFGA Client", () => { }); }); + describe("StreamedListObjects", () => { + it("should stream objects and yield them incrementally", async () => { + const objects = ["document:1", "document:2", "document:3"]; + const scope = nocks.streamedListObjects(baseConfig.storeId!, objects); + + expect(scope.isDone()).toBe(false); + + const results: string[] = []; + for await (const response of fgaClient.streamedListObjects({ + user: "user:81684243-9356-4421-8fbf-a4f8d36aa31b", + relation: "can_read", + type: "document", + })) { + results.push(response.object); + } + + expect(scope.isDone()).toBe(true); + expect(results).toHaveLength(3); + expect(results).toEqual(expect.arrayContaining(objects)); + }); + + it("should handle custom headers", async () => { + const objects = ["document:1"]; + + const scope = nock(defaultConfiguration.getBasePath()) + .post(`/stores/${baseConfig.storeId}/streamed-list-objects`) + .reply(function () { + // Verify custom headers were sent + expect(this.req.headers["x-custom-header"]).toBe("custom-value"); + expect(this.req.headers["x-request-id"]).toBe("test-123"); + + // Return NDJSON stream + const ndjsonResponse = objects + .map(obj => JSON.stringify({ result: { object: obj } })) + .join("\n") + "\n"; + + return [200, Readable.from([ndjsonResponse]), { + "Content-Type": "application/x-ndjson" + }]; + }); + + const results: string[] = []; + for await (const response of fgaClient.streamedListObjects({ + user: "user:anne", + relation: "owner", + type: "document", + }, { + headers: { + "X-Custom-Header": "custom-value", + "X-Request-ID": "test-123" + } + })) { + results.push(response.object); + } + + expect(scope.isDone()).toBe(true); + expect(results).toEqual(objects); + }); + + it("should handle errors from the stream", async () => { + const scope = nock(defaultConfiguration.getBasePath()) + .post(`/stores/${baseConfig.storeId}/streamed-list-objects`) + .reply(500, { code: "internal_error", message: "Server error" }); + + await expect(async () => { + for await (const response of fgaClient.streamedListObjects({ + user: "user:anne", + relation: "owner", + type: "document", + })) { + // Should not get here + } + }).rejects.toThrow(); + + expect(scope.isDone()).toBe(true); + }); + + it("should handle retry on 429 error", async () => { + const objects = ["document:1"]; + + // Create client with retry enabled + const fgaClientWithRetry = new OpenFgaClient({ + ...baseConfig, + credentials: { method: CredentialsMethod.None }, + retryParams: { maxRetry: 2, minWaitInMs: 10 } + }); + + // First attempt fails with 429 (called exactly once) + const scope1 = nock(defaultConfiguration.getBasePath()) + .post(`/stores/${baseConfig.storeId}/streamed-list-objects`) + .times(1) + .reply(429, { code: "rate_limit_exceeded", message: "Rate limited" }, { + "Retry-After": "1" + }); + + // Second attempt succeeds (retry - called exactly once) + const scope2 = nocks.streamedListObjects(baseConfig.storeId!, objects); + + const results: string[] = []; + for await (const response of fgaClientWithRetry.streamedListObjects({ + user: "user:anne", + relation: "owner", + type: "document", + })) { + results.push(response.object); + } + + // Verify both scopes were called (proves retry happened) + expect(scope1.isDone()).toBe(true); + expect(scope2.isDone()).toBe(true); + expect(results).toEqual(objects); + }); + + it("should support consistency preference", async () => { + const objects = ["document:1"]; + const scope = nocks.streamedListObjects(baseConfig.storeId!, objects); + + const results: string[] = []; + for await (const response of fgaClient.streamedListObjects({ + user: "user:anne", + relation: "owner", + type: "document", + }, { + consistency: ConsistencyPreference.HigherConsistency + })) { + results.push(response.object); + } + + expect(scope.isDone()).toBe(true); + expect(results).toEqual(objects); + }); + }); + describe("ListRelations", () => { it("should properly pass the request and return an allowed API response", async () => { const tuples = [{