Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
## [Unreleased](https://github.com/openfga/js-sdk/compare/v0.9.0...HEAD)

- feat: add support for handling Retry-After header (#267)
- feat: add support for [StreamedListObjects](https://openfga.dev/api/service#/Relationship%20Queries/StreamedListObjects)
with streaming semantics. See [documentation](https://github.com/openfga/js-sdk/blob/main/README.md#streamed-list-objects) for more.
- feat: add support for conflict options for Write operations**: (#276)
The client now supports setting `conflict` on `ClientWriteRequestOpts` to control behavior when writing duplicate tuples or deleting non-existent tuples. This feature requires OpenFGA server [v1.10.0](https://github.com/openfga/openfga/releases/tag/v1.10.0) or later.
See [Conflict Options for Write Operations](./README.md#conflict-options-for-write-operations) for more.
Expand Down
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,32 @@ const response = await fgaClient.listObjects({
// response.objects = ["document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a"]
```

##### Streamed List Objects

The Streamed ListObjects API is very similar to the ListObjects API, with two differences:

1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
2. The number of results returned is only limited by the execution timeout specified in the flag `OPENFGA_LIST_OBJECTS_DEADLINE`.

[API Documentation](https://openfga.dev/api/service#/Relationship%20Queries/StreamedListObjects)

```javascript
const options = {};

// To override the authorization model id for this request
options.authorizationModelId = "01GXSA8YR785C4FYS3C0RTG7B1";

const objects = [];
for await (const response of fgaClient.streamedListObjects(
{ user: "user:anne", relation: "can_read", type: "document" },
{ consistency: ConsistencyPreference.HigherConsistency }
)) {
objects.push(response.object);
}

// objects = ["document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a"]
```

##### List Relations

List the relations a user has with an object. This wraps around [BatchCheck](#batchcheck) to allow checking multiple relationships at once.
Expand Down
83 changes: 83 additions & 0 deletions api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
serializeDataIfNeeded,
toPathString,
createRequestFunction,
createStreamingRequestFunction,
RequestArgs,
CallResult,
PromiseResult
Expand Down Expand Up @@ -383,6 +384,45 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio
options: localVarRequestOptions,
};
},
/**
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
* @summary Stream all objects of the given type that the user has a relation with
* @param {string} storeId
* @param {ListObjectsRequest} body
* @param {*} [options] Override http request option.
* @throws { FgaError }
*/
streamedListObjects: (storeId: string, body: ListObjectsRequest, options: any = {}): RequestArgs => {
// verify required parameter 'storeId' is not null or undefined
assertParamExists("streamedListObjects", "storeId", storeId);
// verify required parameter 'body' is not null or undefined
assertParamExists("streamedListObjects", "body", body);
const localVarPath = "/stores/{store_id}/streamed-list-objects"
.replace(`{${"store_id"}}`, encodeURIComponent(String(storeId)));
// use dummy base URL string because the URL constructor only accepts absolute URLs.
const localVarUrlObj = new URL(localVarPath, DUMMY_BASE_URL);
let baseOptions;
if (configuration) {
baseOptions = configuration.baseOptions;
}

const localVarRequestOptions = { method: "POST", ...baseOptions, ...options };
const localVarHeaderParameter = {} as any;
const localVarQueryParameter = {} as any;

localVarHeaderParameter["Content-Type"] = "application/json";

setSearchParams(localVarUrlObj, localVarQueryParameter, options.query);
localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers };
localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions);

return {
url: toPathString(localVarUrlObj),
options: localVarRequestOptions,
};
},
/**
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
* @summary List all stores
Expand Down Expand Up @@ -912,6 +952,22 @@ export const OpenFgaApiFp = function(configuration: Configuration, credentials:
...TelemetryAttributes.fromRequestBody(body)
});
},
/**
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
* @summary Stream all objects of the given type that the user has a relation with
* @param {string} storeId
* @param {ListObjectsRequest} body
* @param {*} [options] Override http request option.
* @throws { FgaError }
*/
async streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise<(axios?: AxiosInstance) => Promise<any>> {
const localVarAxiosArgs = localVarAxiosParamCreator.streamedListObjects(storeId, body, options);
return createStreamingRequestFunction(localVarAxiosArgs, globalAxios, configuration, credentials, {
[TelemetryAttribute.FgaClientRequestMethod]: "StreamedListObjects"
});
},
/**
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
* @summary List all stores
Expand Down Expand Up @@ -1156,6 +1212,19 @@ export const OpenFgaApiFactory = function (configuration: Configuration, credent
listObjects(storeId: string, body: ListObjectsRequest, options?: any): PromiseResult<ListObjectsResponse> {
return localVarFp.listObjects(storeId, body, options).then((request) => request(axios));
},
/**
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
* @summary Stream all objects of the given type that the user has a relation with
* @param {string} storeId
* @param {ListObjectsRequest} body
* @param {*} [options] Override http request option.
* @throws { FgaError }
*/
streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise<any> {
return localVarFp.streamedListObjects(storeId, body, options).then((request) => request(axios));
},
/**
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
* @summary List all stores
Expand Down Expand Up @@ -1370,6 +1439,20 @@ export class OpenFgaApi extends BaseAPI {
return OpenFgaApiFp(this.configuration, this.credentials).listObjects(storeId, body, options).then((request) => request(this.axios));
}

/**
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
* @summary Stream all objects of the given type that the user has a relation with
* @param {string} storeId
* @param {ListObjectsRequest} body
* @param {*} [options] Override http request option.
* @throws { FgaError }
*/
public streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise<any> {
return OpenFgaApiFp(this.configuration, this.credentials).streamedListObjects(storeId, body, options).then((request) => request(this.axios));
}

/**
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
* @summary List all stores
Expand Down
15 changes: 15 additions & 0 deletions apiModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,21 @@ export interface ListObjectsResponse {
*/
objects: Array<string>;
}

/**
* The response for a StreamedListObjects RPC.
* @export
* @interface StreamedListObjectsResponse
*/
export interface StreamedListObjectsResponse {
/**
*
* @type {string}
* @memberof StreamedListObjectsResponse
*/
object: string;
}

/**
*
* @export
Expand Down
47 changes: 47 additions & 0 deletions client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
GetStoreResponse,
ListObjectsRequest,
ListObjectsResponse,
StreamedListObjectsResponse,
ListStoresResponse,
ListUsersRequest,
ListUsersResponse,
Expand Down Expand Up @@ -50,6 +51,7 @@ import {
} from "./utils";
import { isWellFormedUlidString } from "./validation";
import SdkConstants from "./constants";
import { parseNDJSONStream } from "./streaming";

export type UserClientConfigurationParams = UserConfigurationParams & {
storeId?: string;
Expand Down Expand Up @@ -847,6 +849,51 @@ export class OpenFgaClient extends BaseAPI {
}, options);
}

/**
* StreamedListObjects - Stream all objects of a particular type that the user has a certain relation to (evaluates)
*
* Note: This method is Node.js only. Streams are supported via the axios API layer.
* The response will be streamed as newline-delimited JSON objects.
*
* @param {ClientListObjectsRequest} body
* @param {ClientRequestOptsWithConsistency} [options]
* @param {string} [options.authorizationModelId] - Overrides the authorization model id in the configuration
* @param {object} [options.headers] - Custom headers to send alongside the request
* @param {ConsistencyPreference} [options.consistency] - The consistency preference to use
* @param {object} [options.retryParams] - Override the retry parameters for this request
* @param {number} [options.retryParams.maxRetry] - Override the max number of retries on each API request
* @param {number} [options.retryParams.minWaitInMs] - Override the minimum wait before a retry is initiated
* @returns {AsyncGenerator<StreamedListObjectsResponse>} An async generator that yields objects as they are received
*/
async *streamedListObjects(body: ClientListObjectsRequest, options: ClientRequestOptsWithConsistency = {}): AsyncGenerator<StreamedListObjectsResponse> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add the relevant tests to client.test.js?

Note include in the tests, tests for:

  • retry handling
  • custom headers
  • error handling

const stream = await this.api.streamedListObjects(this.getStoreId(options)!, {
authorization_model_id: this.getAuthorizationModelId(options),
user: body.user,
relation: body.relation,
type: body.type,
context: body.context,
contextual_tuples: { tuple_keys: body.contextualTuples || [] },
consistency: options.consistency
}, options);

// Unwrap axios CallResult to get the raw Node.js stream when needed
const source = stream?.$response?.data ?? stream;

// Parse the Node.js stream
try {
for await (const item of parseNDJSONStream(source as any)) {
if (item && item.result && item.result.object) {
yield { object: item.result.object } as StreamedListObjectsResponse;
}
}
} finally {
// Ensure underlying HTTP connection closes if consumer stops early
if (source && typeof source.destroy === "function") {
try { source.destroy(); } catch { }
}
}
}

/**
* ListRelations - List all the relations a user has with an object (evaluates)
* @param {object} listRelationsRequest
Expand Down
71 changes: 71 additions & 0 deletions common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,77 @@ export const createRequestFunction = function (axiosArgs: RequestArgs, axiosInst
);
}

return result;
};
};

/**
* creates an axios streaming request function that returns the raw response stream
* for incremental parsing (used by streamedListObjects)
*/
export const createStreamingRequestFunction = function (axiosArgs: RequestArgs, axiosInstance: AxiosInstance, configuration: Configuration, credentials: Credentials, methodAttributes: Record<string, string | number> = {}) {
configuration.isValid();

const retryParams = axiosArgs.options?.retryParams ? axiosArgs.options?.retryParams : configuration.retryParams;
const maxRetry: number = retryParams ? retryParams.maxRetry : 0;
const minWaitInMs: number = retryParams ? retryParams.minWaitInMs : 0;

const start = performance.now();

return async (axios: AxiosInstance = axiosInstance): Promise<any> => {
await setBearerAuthToObject(axiosArgs.options.headers, credentials!);

const url = configuration.getBasePath() + axiosArgs.url;

const axiosRequestArgs = { ...axiosArgs.options, responseType: "stream", url: url };
const wrappedResponse = await attemptHttpRequest(axiosRequestArgs, {
maxRetry,
minWaitInMs,
}, axios);
const response = wrappedResponse?.response;

const result: any = response?.data; // raw stream

let attributes: StringIndexable = {};

attributes = TelemetryAttributes.fromRequest({
userAgent: configuration.baseOptions?.headers["User-Agent"],
httpMethod: axiosArgs.options?.method,
url,
resendCount: wrappedResponse?.retries,
start: start,
credentials: credentials,
attributes: methodAttributes,
});

attributes = TelemetryAttributes.fromResponse({
response,
attributes,
});

const serverRequestDuration = attributes[TelemetryAttribute.HttpServerRequestDuration];
if (configuration.telemetry?.metrics?.histogramQueryDuration && typeof serverRequestDuration !== "undefined") {
configuration.telemetry.recorder.histogram(
TelemetryHistograms.queryDuration,
parseInt(attributes[TelemetryAttribute.HttpServerRequestDuration] as string, 10),
TelemetryAttributes.prepare(
attributes,
configuration.telemetry.metrics.histogramQueryDuration.attributes
)
);
}

if (configuration.telemetry?.metrics?.histogramRequestDuration) {
configuration.telemetry.recorder.histogram(
TelemetryHistograms.requestDuration,
attributes[TelemetryAttribute.HttpClientRequestDuration],
TelemetryAttributes.prepare(
attributes,
configuration.telemetry.metrics.histogramRequestDuration.attributes
)
);
}

return result;
};
};
22 changes: 22 additions & 0 deletions example/streamed-list-objects/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Streamed List Objects Example
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this example I think


Demonstrates using `streamedListObjects` to retrieve objects via the streaming API.

## Prerequisites
- OpenFGA server running on `http://localhost:8080` (or set `FGA_API_URL`)

## Running
```bash
# From repo root
npm run build
cd example/streamed-list-objects
npm install
npm start
```

## What it does
- Creates a temporary store
- Writes a simple authorization model
- Adds 3 tuples
- Streams results via `streamedListObjects`
- Cleans up the store
19 changes: 19 additions & 0 deletions example/streamed-list-objects/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "streamed-list-objects",
"private": "true",
"version": "1.0.0",
"description": "Example demonstrating streamedListObjects",
"author": "OpenFGA",
"license": "Apache-2.0",
"scripts": {
"start": "node streamedListObjects.mjs"
},
"dependencies": {
"@openfga/sdk": "^0.9.0",
"@openfga/syntax-transformer": "^0.2.0"
},
"engines": {
"node": ">=16.15.0"
}
}

Loading