-
Notifications
You must be signed in to change notification settings - Fork 26
feat: Add mcp transport protocol #345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
twishabansal
wants to merge
55
commits into
client-transport-decouple
Choose a base branch
from
mcp-transport-implement
base: client-transport-decouple
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
55 commits
Select commit
Hold shift + click to select a range
47a2a46
try
twishabansal 9cd9a79
version negotiation
twishabansal 5964bcc
small changes
twishabansal 4bac725
lint
twishabansal d8c6efb
fix endpoint
twishabansal b17e3ee
add some todos
twishabansal 5b4d12c
lint
twishabansal 388c7f9
initialise in init
twishabansal ef4e543
lint
twishabansal da384be
add support for 'Mcp-session-id'
twishabansal c2ad274
lint
twishabansal e88dfa7
add todo
twishabansal c9728a9
add mcp protocol version to the latest protocol
twishabansal c66dd26
add test coverage
twishabansal 3cd00ea
small fix
twishabansal 11ac6a2
small fix
twishabansal 02baad7
small fix
twishabansal 6ae38e1
thread fixes
twishabansal fb59bb5
try
twishabansal 765db81
add tests
twishabansal f1c0807
lint
twishabansal 24db78d
change small
twishabansal dcc811a
nit
twishabansal a4a4f55
small debugging
twishabansal 19a1cf2
add todos
twishabansal 914ec46
small bug fixes
twishabansal e922472
add todo
twishabansal 8c14096
remove id field from notifications
twishabansal 6c97083
refactor
twishabansal 9dfa8cb
preprocess tools with empty params
twishabansal 6f74838
fix types
twishabansal 9118a89
fix bugs
twishabansal fbce7e9
better error log
twishabansal b6b2dbe
small cleanup
twishabansal ac2a924
handle notifications
twishabansal 1fd0581
fix unit tests
twishabansal ec17eb8
lint
twishabansal 1cffac1
decouple client from transport
twishabansal cc30a17
lint
twishabansal 2f04c95
use toolbox protocol for e2e tests
twishabansal d80c41f
add e2e tests for mcp
twishabansal baf9d06
lint
twishabansal cd9841e
remove mcp as default protocol
twishabansal 83030dc
remove auth tests from mcp
twishabansal bb8dc97
remove redundant lines
twishabansal 8920538
remove redundant lines
twishabansal f70710f
lint
twishabansal 80c688a
revert some changes
twishabansal 4c42d33
initialise session in a better way
twishabansal 9c119e8
small fix
twishabansal e281556
Made methods private
twishabansal e0a1337
lint
twishabansal 85e5d29
rename base url
twishabansal ac2acfe
resolve comment
twishabansal d061f3e
better readability
twishabansal File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
288 changes: 288 additions & 0 deletions
288
packages/toolbox-core/src/toolbox_core/mcp_transport.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,288 @@ | ||
# Copyright 2025 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
import asyncio | ||
import os | ||
import uuid | ||
from typing import Any, Mapping, Optional, Union | ||
|
||
from aiohttp import ClientSession | ||
|
||
from . import version | ||
from .itransport import ITransport | ||
from .protocol import ( | ||
AdditionalPropertiesSchema, | ||
ManifestSchema, | ||
ParameterSchema, | ||
Protocol, | ||
ToolSchema, | ||
) | ||
|
||
|
||
class McpHttpTransport(ITransport): | ||
"""Transport for the MCP protocol.""" | ||
|
||
def __init__( | ||
self, | ||
base_url: str, | ||
session: Optional[ClientSession] = None, | ||
protocol: Protocol = Protocol.MCP, | ||
): | ||
self.__mcp_base_url = base_url + "/mcp/" | ||
# Will be updated after negotiation | ||
self.__protocol_version = protocol.value | ||
self.__server_version: Optional[str] = None | ||
self.__session_id: Optional[str] = None | ||
|
||
self.__manage_session = session is None | ||
self.__session = session or ClientSession() | ||
self.__init_task = asyncio.create_task(self.__initialize_session()) | ||
|
||
@property | ||
def base_url(self) -> str: | ||
return self.__mcp_base_url | ||
|
||
def __convert_tool_schema(self, tool_data: dict) -> ToolSchema: | ||
parameters = [] | ||
input_schema = tool_data.get("inputSchema", {}) | ||
properties = input_schema.get("properties", {}) | ||
required = input_schema.get("required", []) | ||
|
||
for name, schema in properties.items(): | ||
additional_props = schema.get("additionalProperties") | ||
if isinstance(additional_props, dict): | ||
additional_props = AdditionalPropertiesSchema( | ||
type=additional_props["type"] | ||
) | ||
else: | ||
additional_props = True | ||
parameters.append( | ||
ParameterSchema( | ||
name=name, | ||
type=schema["type"], | ||
description=schema.get("description", ""), | ||
required=name in required, | ||
additionalProperties=additional_props, | ||
) | ||
) | ||
|
||
return ToolSchema(description=tool_data["description"], parameters=parameters) | ||
|
||
async def __list_tools( | ||
self, | ||
toolset_name: Optional[str] = None, | ||
headers: Optional[Mapping[str, str]] = None, | ||
) -> Any: | ||
"""Private helper to fetch the raw tool list from the server.""" | ||
if toolset_name: | ||
url = self.__mcp_base_url + toolset_name | ||
else: | ||
url = self.__mcp_base_url | ||
return await self.__send_request( | ||
url=url, method="tools/list", params={}, headers=headers | ||
) | ||
|
||
async def tool_get( | ||
self, tool_name: str, headers: Optional[Mapping[str, str]] = None | ||
) -> ManifestSchema: | ||
"""Gets a single tool from the server by listing all and filtering.""" | ||
await self.__init_task | ||
|
||
if self.__server_version is None: | ||
raise RuntimeError("Server version not available.") | ||
|
||
result = await self.__list_tools(headers=headers) | ||
tool_def = None | ||
for tool in result.get("tools", []): | ||
if tool.get("name") == tool_name: | ||
tool_def = self.__convert_tool_schema(tool) | ||
break | ||
|
||
if tool_def is None: | ||
raise ValueError(f"Tool '{tool_name}' not found.") | ||
|
||
tool_details = ManifestSchema( | ||
serverVersion=self.__server_version, | ||
tools={tool_name: tool_def}, | ||
) | ||
return tool_details | ||
|
||
async def tools_list( | ||
self, | ||
toolset_name: Optional[str] = None, | ||
headers: Optional[Mapping[str, str]] = None, | ||
) -> ManifestSchema: | ||
"""Lists available tools from the server using the MCP protocol.""" | ||
await self.__init_task | ||
|
||
if self.__server_version is None: | ||
raise RuntimeError("Server version not available.") | ||
|
||
result = await self.__list_tools(toolset_name, headers) | ||
tools = result.get("tools") | ||
|
||
return ManifestSchema( | ||
serverVersion=self.__server_version, | ||
tools={tool["name"]: self.__convert_tool_schema(tool) for tool in tools}, | ||
) | ||
|
||
async def tool_invoke( | ||
self, tool_name: str, arguments: dict, headers: Optional[Mapping[str, str]] | ||
) -> str: | ||
"""Invokes a specific tool on the server using the MCP protocol.""" | ||
await self.__init_task | ||
|
||
url = self.__mcp_base_url | ||
params = {"name": tool_name, "arguments": arguments} | ||
result = await self.__send_request( | ||
url=url, method="tools/call", params=params, headers=headers | ||
) | ||
all_content = result.get("content", result) | ||
content_str = "".join( | ||
content.get("text", "") | ||
for content in all_content | ||
if isinstance(content, dict) | ||
) | ||
return content_str or "null" | ||
|
||
async def close(self): | ||
try: | ||
await self.__init_task | ||
except Exception: | ||
# If initialization failed, we can still try to close the session. | ||
pass | ||
finally: | ||
if self.__manage_session and self.__session and not self.__session.closed: | ||
await self.__session.close() | ||
|
||
async def __initialize_session(self): | ||
"""Initializes the MCP session.""" | ||
if self.__session is None and self.__manage_session: | ||
self.__session = ClientSession() | ||
|
||
url = self.__mcp_base_url | ||
|
||
# Perform version negotitation | ||
client_supported_versions = Protocol.get_supported_mcp_versions() | ||
proposed_protocol_version = self.__protocol_version | ||
params = { | ||
"processId": os.getpid(), | ||
"clientInfo": { | ||
"name": "toolbox-python-sdk", | ||
"version": version.__version__, | ||
}, | ||
"capabilities": {}, | ||
"protocolVersion": proposed_protocol_version, | ||
} | ||
# Send initialize notification | ||
initialize_result = await self.__send_request( | ||
url=url, method="initialize", params=params | ||
) | ||
|
||
# Get the session id if the proposed version requires it | ||
if proposed_protocol_version == "2025-03-26": | ||
self.__session_id = initialize_result.get("Mcp-Session-Id") | ||
if not self.__session_id: | ||
if self.__manage_session: | ||
await self.close() | ||
raise RuntimeError( | ||
"Server did not return a Mcp-Session-Id during initialization." | ||
) | ||
server_info = initialize_result.get("serverInfo") | ||
if not server_info: | ||
raise RuntimeError("Server info not found in initialize response") | ||
|
||
self.__server_version = server_info.get("version") | ||
if not self.__server_version: | ||
raise RuntimeError("Server version not found in initialize response") | ||
|
||
# Perform version negotiation based on server response | ||
server_protcol_version = initialize_result.get("protocolVersion") | ||
if server_protcol_version: | ||
if server_protcol_version not in client_supported_versions: | ||
if self.__manage_session: | ||
await self.close() | ||
raise RuntimeError( | ||
f"MCP version mismatch: client does not support server version {server_protcol_version}" | ||
) | ||
# Update the protocol version to the one agreed upon by the server. | ||
self.__protocol_version = server_protcol_version | ||
else: | ||
if self.__manage_session: | ||
await self.close() | ||
raise RuntimeError("MCP Protocol version not found in initialize response") | ||
|
||
server_capabilities = initialize_result.get("capabilities") | ||
if not server_capabilities or "tools" not in server_capabilities: | ||
if self.__manage_session: | ||
await self.close() | ||
raise RuntimeError("Server does not support the 'tools' capability.") | ||
await self.__send_request( | ||
url=url, method="notifications/initialized", params={} | ||
) | ||
|
||
async def __send_request( | ||
self, | ||
url: str, | ||
method: str, | ||
params: dict, | ||
headers: Optional[Mapping[str, str]] = None, | ||
) -> Any: | ||
"""Sends a JSON-RPC request to the MCP server.""" | ||
|
||
request_params = params.copy() | ||
req_headers = dict(headers or {}) | ||
|
||
# Check based on the NEGOTIATED version (self.__protocol_version) | ||
if ( | ||
self.__protocol_version == "2025-03-26" | ||
and method != "initialize" | ||
and self.__session_id | ||
): | ||
request_params["Mcp-Session-Id"] = self.__session_id | ||
|
||
if self.__protocol_version == "2025-06-18": | ||
req_headers["MCP-Protocol-Version"] = self.__protocol_version | ||
|
||
payload = { | ||
"jsonrpc": "2.0", | ||
"method": method, | ||
"params": request_params, | ||
} | ||
|
||
if not method.startswith("notifications/"): | ||
payload["id"] = str(uuid.uuid4()) | ||
|
||
async with self.__session.post( | ||
url, json=payload, headers=req_headers | ||
) as response: | ||
if not response.ok: | ||
error_text = await response.text() | ||
raise RuntimeError( | ||
f"API request failed with status {response.status} ({response.reason}). Server response: {error_text}" | ||
) | ||
|
||
# Handle potential empty body (e.g. 204 No Content for notifications) | ||
if response.status == 204 or response.content.at_eof(): | ||
return None | ||
|
||
json_response = await response.json() | ||
if "error" in json_response: | ||
error = json_response["error"] | ||
if error["code"] == -32000: | ||
raise RuntimeError(f"MCP version mismatch: {error['message']}") | ||
else: | ||
raise RuntimeError( | ||
f"MCP request failed with code {error['code']}: {error['message']}" | ||
) | ||
return json_response.get("result") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.