Skip to content

Conversation

@nathaliellenaa
Copy link
Contributor

@nathaliellenaa nathaliellenaa commented Sep 23, 2025

Description

Add a new predict stream API as an experimental feature to support streaming predictions. The implementation currently utilized transport-reactor-netty4 for REST streaming. This is a temporary solution as we need to do more research to determine a long-term approach because transport-reactor-netty4 from the OpenSearch core is still experimental.

This predict stream API currently supports 2 models:

Note: This PR only covers predict streaming, will open another PR to cover agent streaming

API Endpoint:

POST /_plugins/_ml/models/{model_id}/_predict/stream

Sample workflow:

// Enable feature flag
PUT /_cluster/settings
{
    "persistent": {
        "plugins.ml_commons.stream_enabled": true
    }
}

// Register OpenAI chat completion model
POST /_plugins/_ml/models/_register
{
    "name": "openai gpt 3.5 turbo",
    "function_name": "remote",
    "description": "openai model",
    "connector": {
        "name": "OpenAI Chat Connector",
        "description": "The connector to public OpenAI model service for GPT 3.5",
        "version": 1,
        "protocol": "http",
        "parameters": {
            "endpoint": "api.openai.com",
            "model": "gpt-3.5-turbo"
        },
        "credential": {
            "openAI_key": "<your_api_key>"
        },
        "actions": [
            {
                "action_type": "predict",
                "method": "POST",
                "url": "https://${parameters.endpoint}/v1/chat/completions",
                "headers": {
                    "Authorization": "Bearer ${credential.openAI_key}"
                },
                "request_body": "{ \"model\": \"${parameters.model}\", \"messages\": ${parameters.messages} }",
                "response_filter": "$.choices[0].delta.content"
            }
        ]
    }
}

// Run predict stream API
POST /_plugins/_ml/models/mm92cpkBiefJiMfDtq8_/_predict/stream
{
  "parameters": {
    "messages": [
      {
        "role": "system",
        "content": "You are a helpful assistant."
      },
      {
        "role": "user",
        "content": "Can you summarize Prince Hamlet of William Shakespeare in around 100 words?"
      }
    ],
    "_llm_interface": "openai/v1/chat/completions"
  }
}

// Sample response
data: {"inference_results":[{"output":[{"name":"response","dataAsMap":{"content":"Sure","is_last":false}}]}]}

data: {"inference_results":[{"output":[{"name":"response","dataAsMap":{"content":"!","is_last":false}}]}]}

data: {"inference_results":[{"output":[{"name":"response","dataAsMap":{"content":" \"","is_last":false}}]}]}

....
data: {"inference_results":[{"output":[{"name":"response","dataAsMap":{"content":" psyche","is_last":false}}]}]}

data: {"inference_results":[{"output":[{"name":"response","dataAsMap":{"content":".","is_last":false}}]}]}

data: {"inference_results":[{"output":[{"name":"response","dataAsMap":{"content":"","is_last":true}}]}]}

Error handling:

  1. Invalid model ID
{
    "error": {
        "root_cause": [
            {
                "type": "status_exception",
                "reason": "Failed to find model"
            }
        ],
        "type": "status_exception",
        "reason": "Failed to find model"
    },
    "status": 404
}
  1. Error from remote service (e.g., invalid payload, invalid API key, etc)
    Error message will be sent as one chunk and the connection will be closed
data: {"error": "Error from remote service: {\n  \"error\": {\n    \"message\": \"Missing required parameter: 'messages[1].role'. You provided 'roles', did you mean to provide 'role'?\",\n    \"type\": \"invalid_request_error\",\n    \"param\": \"messages[1].role\",\n    \"code\": \"missing_required_parameter\"\n  }\n}"}
  1. Error in the middle of streaming
    Error message will be sent as one chunk and the connection will be closed
data: {"inference_results":[{"output":[{"name":"response","dataAsMap":{"content":"tering his father's ghost, who","is_last":false}}]}]}

data: {"inference_results":[{"output":[{"name":"response","dataAsMap":{"content":" claims Claudius murdered him, Hamlet struggles","is_last":false}}]}]}

data: {"error": "Error from remote service: Too many requests, please wait before trying again. You have sent too many requests.  Wait before trying again. (Service: bedrock, Status Code: 429, Request ID: xxx)"}

Related Issues

Resolves #3630

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@nathaliellenaa
Copy link
Contributor Author

Will push more commit to add UTs

@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 23, 2025 08:32 — with GitHub Actions Failure
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 23, 2025 08:32 — with GitHub Actions Error
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 23, 2025 08:32 — with GitHub Actions Error
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 23, 2025 08:32 — with GitHub Actions Failure
if (t != null) {
log.error("Error: " + t.getMessage(), t);
if (t instanceof StreamResetException && t.getMessage().contains("NO_ERROR")) {
// TODO: reconnect
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we planning to keep like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we will keep it like this for current implementation. We can implement this as enhancement in the future

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to completeStream here? What is the actual expectation during Failure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connection should be closed during failure and the error message should be displayed in regular HTTP response, not streaming. Working on enhancing this part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The model validation check occurs before streaming start, so it will be send as a regular HTTP response. However, remote service error happens after the streaming connection is established, so it cannot be sent as a regular HTTP response. Current implementation is to send the remote service error message as one data chunk and close the connection. I put more details about the error handling in the PR description.

@Getter
private StreamTransportService streamTransportService;

private AtomicBoolean isStreamClosed = new AtomicBoolean(false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be shared across multiple streaming requests (I assume), which can cause race conditions between concurrent streams?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I moved the shared instance variable isStreamClosed to a local variable inside invokeRemoteServiceStream method. This way the flag will be local per request. I tested it with concurrent requests and confirmed this change eliminates race condition.

public void invokeRemoteServiceStream(
String action,
MLInput mlInput,
Map<String, String> parameters,
String payload,
ExecutionContext executionContext,
StreamPredictActionListener<MLTaskResponse, ?> actionListener
) {
try {
AtomicBoolean isStreamClosed = new AtomicBoolean(false);

@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 24, 2025 19:41 — with GitHub Actions Failure
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 24, 2025 19:41 — with GitHub Actions Failure
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 24, 2025 19:41 — with GitHub Actions Failure
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 24, 2025 19:41 — with GitHub Actions Failure
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 24, 2025 19:53 — with GitHub Actions Failure
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 24, 2025 19:53 — with GitHub Actions Failure
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 24, 2025 19:53 — with GitHub Actions Error
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 29, 2025 03:24 — with GitHub Actions Error
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 29, 2025 03:24 — with GitHub Actions Error
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 29, 2025 03:24 — with GitHub Actions Failure
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 29, 2025 03:24 — with GitHub Actions Failure
Signed-off-by: Nathalie Jonathan <[email protected]>
@nathaliellenaa nathaliellenaa temporarily deployed to ml-commons-cicd-env-require-approval September 29, 2025 05:54 — with GitHub Actions Inactive
@nathaliellenaa nathaliellenaa temporarily deployed to ml-commons-cicd-env-require-approval September 29, 2025 05:54 — with GitHub Actions Inactive
@nathaliellenaa nathaliellenaa temporarily deployed to ml-commons-cicd-env-require-approval September 29, 2025 05:54 — with GitHub Actions Inactive
@nathaliellenaa nathaliellenaa temporarily deployed to ml-commons-cicd-env-require-approval September 29, 2025 05:54 — with GitHub Actions Inactive
@mingshl
Copy link
Collaborator

mingshl commented Sep 29, 2025

I took a skim through the PR, I have a high level question, the implementation for openai and bedrock models are totally different.

-Bedrock: Uses AWS SDK client (BedrockRuntimeAsyncClient) with native streaming
-OpenAI: Uses OkHttp with SSE parsing and manual event handling

This creates two completely different code paths that are hard to maintain and extend. Have you ever considering making a generic StreamingHandler interface and use factory pattern for the handler, so that we can a unified streaming client interface that can be implemented by different model providers.

@codecov
Copy link

codecov bot commented Sep 29, 2025

Codecov Report

❌ Patch coverage is 59.00735% with 223 lines in your changes missing coverage. Please review.
✅ Project coverage is 81.73%. Comparing base (87f179b) to head (1e07c3f).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
...ensearch/ml/rest/RestMLPredictionStreamAction.java 30.12% 107 Missing and 2 partials ⚠️
...e/algorithms/remote/HttpJsonConnectorExecutor.java 50.72% 31 Missing and 3 partials ⚠️
...va/org/opensearch/ml/task/MLPredictTaskRunner.java 29.54% 28 Missing and 3 partials ⚠️
...engine/algorithms/remote/AwsConnectorExecutor.java 67.30% 14 Missing and 3 partials ⚠️
...rediction/TransportPredictionStreamTaskAction.java 88.39% 6 Missing and 7 partials ⚠️
...ine/algorithms/remote/RemoteConnectorExecutor.java 61.53% 4 Missing and 1 partial ⚠️
...sport/prediction/MLPredictionStreamTaskAction.java 0.00% 3 Missing ⚠️
...algorithms/remote/StreamPredictActionListener.java 90.32% 2 Missing and 1 partial ⚠️
.../opensearch/ml/common/connector/HttpConnector.java 88.23% 1 Missing and 1 partial ⚠️
...e/algorithms/remote/AbstractConnectorExecutor.java 83.33% 1 Missing and 1 partial ⚠️
... and 4 more
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #4187      +/-   ##
============================================
- Coverage     82.04%   81.73%   -0.32%     
- Complexity     9306     9372      +66     
============================================
  Files           792      796       +4     
  Lines         40001    40535     +534     
  Branches       4456     4508      +52     
============================================
+ Hits          32818    33130     +312     
- Misses         5263     5464     +201     
- Partials       1920     1941      +21     
Flag Coverage Δ
ml-commons 81.73% <59.00%> (-0.32%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 29, 2025 15:18 — with GitHub Actions Error
@nathaliellenaa nathaliellenaa had a problem deploying to ml-commons-cicd-env-require-approval September 29, 2025 15:18 — with GitHub Actions Failure

private SdkAsyncHttpClient httpClient;

private BedrockRuntimeAsyncClient bedrockRuntimeAsyncClient;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this creates a hard dependency on Bedrock SDK for ALL AWS connectors, even if they're not using Bedrock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved all Bedrock SDK related imports and implementation to a separate BedrockStreamingHandler class and used reflection to access the class.

private static StreamingHandler createBedrockHandler(SdkAsyncHttpClient httpClient, Connector connector) {
try {
// Use reflection to avoid hard dependency
Class<?> handlerClass = Class.forName("org.opensearch.ml.engine.algorithms.remote.streaming.BedrockStreamingHandler");
Constructor<?> constructor = handlerClass
.getConstructor(SdkAsyncHttpClient.class, Class.forName("org.opensearch.ml.common.connector.AwsConnector"));
return (StreamingHandler) constructor.newInstance(httpClient, connector);

@nathaliellenaa
Copy link
Contributor Author

nathaliellenaa commented Sep 29, 2025

This creates two completely different code paths that are hard to maintain and extend. Have you ever considering making a generic StreamingHandler interface and use factory pattern for the handler, so that we can a unified streaming client interface that can be implemented by different model providers.

@mingshl Right, I agree with this. We can make a generic stream handler so it will be easier to extend this for other model providers. Created streaming handler factory in agent stream PR #4212

if (payload != null) {
requestBody = okhttp3.RequestBody.create(payload, MediaType.parse("application/json; charset=utf-8"));
} else {
throw new IllegalArgumentException("Content length is 0. Aborting request to remote model");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens after that? What does customer see finally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connection will be closed and user will see this error message

{
    "error": "Content length is 0. Aborting request to remote model"
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a helpful message and also this is a 4xx error. At this point as customer I won't have any idea what to do to fix this issue.

Copy link
Collaborator

@ylwu-amzn ylwu-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved to unblock agent stream PR.
Please add more unit test to increase coverage in new PR.

@dhrubo-os
Copy link
Collaborator

In the follow up PR let's address the comments and also improve the code coverage. Thanks

@dhrubo-os dhrubo-os merged commit 4864f66 into opensearch-project:main Sep 29, 2025
9 of 13 checks passed
@nathaliellenaa
Copy link
Contributor Author

Thanks, will address all comments and improve coverage in the agent stream PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[RFC] Remote Model Inference Streaming

5 participants