Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
## [Unreleased](https://github.com/openfga/js-sdk/compare/v0.9.0...HEAD)

- feat: add support for handling Retry-After header (#267)
- feat: streamedListObjects (streaming ListObjects)
- Supports both Node.js (axios streaming) and browser (Fetch API)
- Enables retrieving >1000 objects beyond standard listObjects limit
- Node path uses API layer; telemetry preserved; returns raw stream
- Browser path uses retries (exponential backoff with jitter) and SDK error mapping
- Resilient NDJSON parsing (async-iterable and Buffer/string compatibility)
- Adds example for usage: `example/streamed-list-objects`
- Adds example for local usage: `example/streamed-list-objects-local`

## v0.9.0

Expand Down
197 changes: 141 additions & 56 deletions api.ts

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions apiModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,19 @@ export interface ListObjectsResponse {
*/
objects: Array<string>;
}
/**
* The response for a StreamedListObjects RPC.
* @export
* @interface StreamedListObjectsResponse
*/
export interface StreamedListObjectsResponse {
/**
*
* @type {string}
* @memberof StreamedListObjectsResponse
*/
object: string;
}
/**
*
* @export
Expand Down
128 changes: 112 additions & 16 deletions client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
GetStoreResponse,
ListObjectsRequest,
ListObjectsResponse,
StreamedListObjectsResponse,
ListStoresResponse,
ListUsersRequest,
ListUsersResponse,
Expand All @@ -54,6 +55,7 @@ import { BaseAPI } from "./base";
import { CallResult, PromiseResult } from "./common";
import { Configuration, RetryParams, UserConfigurationParams } from "./configuration";
import { FgaApiAuthenticationError, FgaRequiredParamError, FgaValidationError } from "./errors";
import { parseNDJSONStream, parseNDJSONReadableStream, isNodeEnvironment, attemptFetchRequest } from "./streaming";
import {
chunkArray,
generateRandomIdWithNonUniqueFallback,
Expand All @@ -66,6 +68,9 @@ export type UserClientConfigurationParams = UserConfigurationParams & {
authorizationModelId?: string;
}

export const DEFAULT_MAX_STREAM_RETRY = 3;
export const DEFAULT_MIN_STREAM_WAIT_MS = 100;

export class ClientConfiguration extends Configuration {
/**
* provide storeId
Expand Down Expand Up @@ -130,8 +135,8 @@ export type ClientRequestOptsWithConsistency = ClientRequestOpts & StoreIdOpts &
export type PaginationOptions = { pageSize?: number, continuationToken?: string, name?: string; };

export type ClientCheckRequest = CheckRequestTupleKey &
Pick<CheckRequest, "context"> &
{ contextualTuples?: Array<TupleKey> };
Pick<CheckRequest, "context"> &
{ contextualTuples?: Array<TupleKey> };

export type ClientBatchCheckClientRequest = ClientCheckRequest[];

Expand Down Expand Up @@ -170,17 +175,17 @@ export type ClientBatchCheckRequest = {

// for server batch check
export interface ClientBatchCheckRequestOpts {
maxParallelRequests?: number;
maxBatchSize?: number;
maxParallelRequests?: number;
maxBatchSize?: number;
}


// for server batch check
export type ClientBatchCheckSingleResponse = {
allowed: boolean;
request: ClientBatchCheckItem;
correlationId: string;
error?: CheckError;
allowed: boolean;
request: ClientBatchCheckItem;
correlationId: string;
error?: CheckError;
}

export interface ClientBatchCheckResponse {
Expand Down Expand Up @@ -226,17 +231,17 @@ export interface ClientReadChangesRequest {
}

export type ClientExpandRequest = ExpandRequestTupleKey & Omit<ExpandRequest, "tuple_key" | "authorization_model_id" | "contextual_tuples" | "consistency"> & {
contextualTuples?: Array<TupleKey>
contextualTuples?: Array<TupleKey>
};
export type ClientReadRequest = ReadRequestTupleKey;
export type ClientListObjectsRequest = Omit<ListObjectsRequest, "authorization_model_id" | "contextual_tuples" | "consistency"> & {
contextualTuples?: Array<TupleKey>
contextualTuples?: Array<TupleKey>
};
export type ClientListUsersRequest = Omit<ListUsersRequest, "authorization_model_id" | "contextual_tuples" | "consistency"> & {
contextualTuples?: Array<TupleKey>
contextualTuples?: Array<TupleKey>
};
export type ClientListRelationsRequest = Omit<ClientCheckRequest, "relation" | "consistency"> & {
relations?: string[],
relations?: string[],
};
export type ClientWriteAssertionsRequest = (CheckRequestTupleKey & Pick<Assertion, "expectation">)[];

Expand Down Expand Up @@ -521,7 +526,7 @@ export class OpenFgaClient extends BaseAPI {
const writeResponses: ClientWriteSingleResponse[][] = [];
if (writes?.length) {
for await (const singleChunkResponse of asyncPool(maxParallelRequests, chunkArray(writes, maxPerChunk),
(chunk) => this.writeTuples(chunk,{ ...options, headers, transaction: undefined }).catch(err => {
(chunk) => this.writeTuples(chunk, { ...options, headers, transaction: undefined }).catch(err => {
if (err instanceof FgaApiAuthenticationError) {
throw err;
}
Expand Down Expand Up @@ -672,7 +677,7 @@ export class OpenFgaClient extends BaseAPI {



private singleBatchCheck(body: BatchCheckRequest, options: ClientRequestOptsWithConsistency & ClientBatchCheckRequestOpts = {}): Promise<BatchCheckResponse> {
private singleBatchCheck(body: BatchCheckRequest, options: ClientRequestOptsWithConsistency & ClientBatchCheckRequestOpts = {}): Promise<BatchCheckResponse> {
return this.api.batchCheck(this.getStoreId(options)!, body, options);
}

Expand Down Expand Up @@ -750,7 +755,7 @@ export class OpenFgaClient extends BaseAPI {

// Collect the responses and associate them with their correlation IDs
for await (const response of batchResponses) {
if (response) {
if (response) {
for (const [correlationId, result] of Object.entries(response)) {
const check = correlationIdToCheck.get(correlationId);
if (check && result) {
Expand All @@ -766,7 +771,7 @@ export class OpenFgaClient extends BaseAPI {
}

return { result: results };
}
}

/**
* Expand - Expands the relationships in userset tree format (evaluates)
Expand Down Expand Up @@ -816,6 +821,97 @@ export class OpenFgaClient extends BaseAPI {
}, options);
}

/**
* StreamedListObjects - Stream all objects of a particular type that the user has a certain relation to (evaluates)
*
* Note: This method supports both Node.js and browser environments.
* The response will be streamed as newline-delimited JSON objects.
*
* @param {ClientListObjectsRequest} body
* @param {ClientRequestOptsWithConsistency} [options]
* @param {string} [options.authorizationModelId] - Overrides the authorization model id in the configuration
* @param {object} [options.headers] - Custom headers to send alongside the request
* @param {ConsistencyPreference} [options.consistency] - The consistency preference to use
* @param {object} [options.retryParams] - Override the retry parameters for this request
* @param {number} [options.retryParams.maxRetry] - Override the max number of retries on each API request
* @param {number} [options.retryParams.minWaitInMs] - Override the minimum wait before a retry is initiated
* @returns {AsyncGenerator<StreamedListObjectsResponse>} An async generator that yields objects as they are received
*/
async *streamedListObjects(body: ClientListObjectsRequest, options: ClientRequestOptsWithConsistency = {}): AsyncGenerator<StreamedListObjectsResponse> {
// Detect the environment and use the appropriate streaming method
if (isNodeEnvironment()) {
// Node.js: Use the API layer which properly handles auth, retries, and telemetry
const stream = await this.api.streamedListObjects(this.getStoreId(options)!, {
authorization_model_id: this.getAuthorizationModelId(options),
user: body.user,
relation: body.relation,
type: body.type,
context: body.context,
contextual_tuples: { tuple_keys: body.contextualTuples || [] },
consistency: options.consistency
}, options);

// Unwrap axios CallResult to get the raw Node.js stream when needed
const source = (stream as any)?.$response?.data ?? stream;

// Parse the Node.js stream
for await (const item of parseNDJSONStream(source as any)) {
if (item && item.result && item.result.object) {
yield { object: item.result.object } as StreamedListObjectsResponse;
}
}
} else {
// Browser: Use Fetch API streaming with retry support
const storeId = this.getStoreId(options);
const requestBody: ListObjectsRequest = {
authorization_model_id: this.getAuthorizationModelId(options),
user: body.user,
relation: body.relation,
type: body.type,
context: body.context,
contextual_tuples: { tuple_keys: body.contextualTuples || [] },
consistency: options.consistency
};

// Set up authentication
const accessTokenHeader = await this.credentials.getAccessTokenHeader();
const headers: Record<string, string> = { ...options.headers };
if (accessTokenHeader) {
headers[accessTokenHeader.name] = accessTokenHeader.value;
}
headers["Content-Type"] = "application/json";

const url = this.configuration.getBasePath() + `/stores/${storeId}/streamed-list-objects`;

const retryParams = options.retryParams || this.configuration.retryParams || { maxRetry: DEFAULT_MAX_STREAM_RETRY, minWaitInMs: DEFAULT_MIN_STREAM_WAIT_MS };

const response = await attemptFetchRequest(
url,
{
method: "POST",
headers: headers,
body: JSON.stringify(requestBody),
},
requestBody,
{
maxRetry: retryParams.maxRetry || DEFAULT_MAX_STREAM_RETRY,
minWaitInMs: retryParams.minWaitInMs || DEFAULT_MIN_STREAM_WAIT_MS
}
);

if (!response.body) {
throw new FgaValidationError("response.body", "Response body is not readable");
}

// Parse the browser ReadableStream
for await (const item of parseNDJSONReadableStream(response.body)) {
if (item && item.result && item.result.object) {
yield { object: item.result.object } as StreamedListObjectsResponse;
}
}
}
}

/**
* ListRelations - List all the relations a user has with an object (evaluates)
* @param {object} listRelationsRequest
Expand Down
Loading