Skip to content

Commit 465f3dd

Browse files
zsheakpavlov
authored andcommitted
Add Streamable Http Transport
1 parent 1397019 commit 465f3dd

File tree

5 files changed

+740
-5
lines changed

5 files changed

+740
-5
lines changed

kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio
256256
JSONRPCResponse(
257257
id = request.id,
258258
error = JSONRPCError(
259-
ErrorCode.Defined.MethodNotFound,
259+
code = ErrorCode.Defined.MethodNotFound,
260260
message = "Server does not support ${request.method}",
261261
),
262262
),

kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types.kt

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,14 +249,14 @@ public data class JSONRPCNotification(
249249
*/
250250
@Serializable
251251
public class JSONRPCResponse(
252-
public val id: RequestId,
252+
public val id: RequestId?,
253253
public val jsonrpc: String = JSONRPC_VERSION,
254254
public val result: RequestResult? = null,
255255
public val error: JSONRPCError? = null,
256256
) : JSONRPCMessage {
257257

258258
public fun copy(
259-
id: RequestId = this.id,
259+
id: RequestId? = this.id,
260260
jsonrpc: String = this.jsonrpc,
261261
result: RequestResult? = this.result,
262262
error: JSONRPCError? = this.error,
@@ -292,8 +292,12 @@ public sealed interface ErrorCode {
292292
* A response to a request that indicates an error occurred.
293293
*/
294294
@Serializable
295-
public data class JSONRPCError(val code: ErrorCode, val message: String, val data: JsonObject = EmptyJsonObject) :
296-
JSONRPCMessage
295+
public data class JSONRPCError(
296+
val id: RequestId? = null,
297+
val code: ErrorCode,
298+
val message: String,
299+
val data: JsonObject = EmptyJsonObject,
300+
) : JSONRPCMessage
297301

298302
/**
299303
* Base interface for notification parameters with optional metadata.

kotlin-sdk-server/api/kotlin-sdk-server.api

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,20 @@ public final class io/modelcontextprotocol/kotlin/sdk/LibVersionKt {
22
public static final field LIB_VERSION Ljava/lang/String;
33
}
44

5+
public abstract interface class io/modelcontextprotocol/kotlin/sdk/server/EventStore {
6+
public abstract fun replayEventsAfter (Ljava/lang/String;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
7+
public abstract fun storeEvent (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
8+
}
9+
510
public final class io/modelcontextprotocol/kotlin/sdk/server/KtorServerKt {
611
public static final fun MCP (Lio/ktor/server/application/Application;Lkotlin/jvm/functions/Function1;)V
712
public static final fun mcp (Lio/ktor/server/application/Application;Lkotlin/jvm/functions/Function1;)V
813
public static final fun mcp (Lio/ktor/server/routing/Routing;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)V
914
public static final fun mcp (Lio/ktor/server/routing/Routing;Lkotlin/jvm/functions/Function1;)V
15+
public static final fun mcpStatelessStreamableHttp (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;)V
16+
public static synthetic fun mcpStatelessStreamableHttp$default (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
17+
public static final fun mcpStreamableHttp (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;)V
18+
public static synthetic fun mcpStreamableHttp$default (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
1019
}
1120

1221
public final class io/modelcontextprotocol/kotlin/sdk/server/RegisteredPrompt {
@@ -115,6 +124,24 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/StdioServerTranspor
115124
public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
116125
}
117126

127+
public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport {
128+
public static final field STANDALONE_SSE_STREAM_ID Ljava/lang/String;
129+
public fun <init> ()V
130+
public fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;)V
131+
public synthetic fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
132+
public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
133+
public final fun getSessionId ()Ljava/lang/String;
134+
public final fun handleDeleteRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
135+
public final fun handleGetRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
136+
public final fun handlePostRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
137+
public final fun handleRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
138+
public fun send (Lio/modelcontextprotocol/kotlin/sdk/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
139+
public final fun setOnSessionClosed (Lkotlin/jvm/functions/Function1;)V
140+
public final fun setOnSessionInitialized (Lkotlin/jvm/functions/Function1;)V
141+
public final fun setSessionIdGenerator (Lkotlin/jvm/functions/Function0;)V
142+
public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
143+
}
144+
118145
public final class io/modelcontextprotocol/kotlin/sdk/server/WebSocketMcpKtorServerExtensionsKt {
119146
public static final fun mcpWebSocket (Lio/ktor/server/routing/Route;Lio/modelcontextprotocol/kotlin/sdk/server/ServerOptions;Lkotlin/jvm/functions/Function2;)V
120147
public static final fun mcpWebSocket (Lio/ktor/server/routing/Route;Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/server/ServerOptions;Lkotlin/jvm/functions/Function2;)V

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging
44
import io.ktor.http.HttpStatusCode
55
import io.ktor.server.application.Application
66
import io.ktor.server.application.install
7+
import io.ktor.server.request.header
78
import io.ktor.server.response.respond
89
import io.ktor.server.routing.Routing
910
import io.ktor.server.routing.RoutingContext
@@ -15,6 +16,7 @@ import io.ktor.server.sse.ServerSSESession
1516
import io.ktor.server.sse.sse
1617
import io.ktor.util.collections.ConcurrentMap
1718
import io.ktor.utils.io.KtorDsl
19+
import io.modelcontextprotocol.kotlin.sdk.ErrorCode
1820

1921
private val logger = KotlinLogging.logger {}
2022

@@ -64,6 +66,51 @@ public fun Application.mcp(block: ServerSSESession.() -> Server) {
6466
}
6567
}
6668

69+
@KtorDsl
70+
public fun Application.mcpStreamableHttp(
71+
enableDnsRebindingProtection: Boolean = false,
72+
allowedHosts: List<String>? = null,
73+
allowedOrigins: List<String>? = null,
74+
eventStore: EventStore? = null,
75+
block: RoutingContext.() -> Server,
76+
) {
77+
val transports = ConcurrentMap<String, StreamableHttpServerTransport>()
78+
79+
routing {
80+
post("/mcp") {
81+
mcpStreamableHttpEndpoint(
82+
transports,
83+
enableDnsRebindingProtection,
84+
allowedHosts,
85+
allowedOrigins,
86+
eventStore,
87+
block,
88+
)
89+
}
90+
}
91+
}
92+
93+
@KtorDsl
94+
public fun Application.mcpStatelessStreamableHttp(
95+
enableDnsRebindingProtection: Boolean = false,
96+
allowedHosts: List<String>? = null,
97+
allowedOrigins: List<String>? = null,
98+
eventStore: EventStore? = null,
99+
block: RoutingContext.() -> Server,
100+
) {
101+
routing {
102+
post("/mcp") {
103+
mcpStatelessStreamableHttpEndpoint(
104+
enableDnsRebindingProtection,
105+
allowedHosts,
106+
allowedOrigins,
107+
eventStore,
108+
block,
109+
)
110+
}
111+
}
112+
}
113+
67114
private suspend fun ServerSSESession.mcpSseEndpoint(
68115
postEndpoint: String,
69116
transports: ConcurrentMap<String, SseServerTransport>,
@@ -94,6 +141,88 @@ internal fun ServerSSESession.mcpSseTransport(
94141
return transport
95142
}
96143

144+
private suspend fun RoutingContext.mcpStreamableHttpEndpoint(
145+
transports: ConcurrentMap<String, StreamableHttpServerTransport>,
146+
enableDnsRebindingProtection: Boolean = false,
147+
allowedHosts: List<String>? = null,
148+
allowedOrigins: List<String>? = null,
149+
eventStore: EventStore? = null,
150+
block: RoutingContext.() -> Server,
151+
) {
152+
val sessionId = this.call.request.header(MCP_SESSION_ID_HEADER)
153+
val transport = if (sessionId != null && transports.containsKey(sessionId)) {
154+
transports[sessionId]!!
155+
} else if (sessionId == null) {
156+
val transport = StreamableHttpServerTransport(
157+
enableDnsRebindingProtection = enableDnsRebindingProtection,
158+
allowedHosts = allowedHosts,
159+
allowedOrigins = allowedOrigins,
160+
eventStore = eventStore,
161+
enableJsonResponse = true,
162+
)
163+
164+
transport.setOnSessionInitialized { sessionId ->
165+
transports[sessionId] = transport
166+
167+
logger.info { "New StreamableHttp connection established and stored with sessionId: $sessionId" }
168+
}
169+
170+
val server = block()
171+
server.onClose {
172+
logger.info { "Server connection closed for sessionId: ${transport.sessionId}" }
173+
}
174+
175+
server.connect(transport)
176+
177+
transport
178+
} else {
179+
null
180+
}
181+
182+
if (transport == null) {
183+
this.call.reject(
184+
HttpStatusCode.BadRequest,
185+
ErrorCode.Unknown(-32000),
186+
"Bad Request: No valid session ID provided",
187+
)
188+
return
189+
}
190+
191+
transport.handleRequest(null, this.call)
192+
logger.debug { "Server connected to transport for sessionId: ${transport.sessionId}" }
193+
}
194+
195+
private suspend fun RoutingContext.mcpStatelessStreamableHttpEndpoint(
196+
enableDnsRebindingProtection: Boolean = false,
197+
allowedHosts: List<String>? = null,
198+
allowedOrigins: List<String>? = null,
199+
eventStore: EventStore? = null,
200+
block: RoutingContext.() -> Server,
201+
) {
202+
val transport = StreamableHttpServerTransport(
203+
enableDnsRebindingProtection = enableDnsRebindingProtection,
204+
allowedHosts = allowedHosts,
205+
allowedOrigins = allowedOrigins,
206+
eventStore = eventStore,
207+
enableJsonResponse = true,
208+
)
209+
transport.setSessionIdGenerator(null)
210+
211+
logger.info { "New stateless StreamableHttp connection established without sessionId" }
212+
213+
val server = block()
214+
215+
server.onClose {
216+
logger.info { "Server connection closed without sessionId" }
217+
}
218+
219+
server.connect(transport)
220+
221+
transport.handleRequest(null, this.call)
222+
223+
logger.debug { "Server connected to transport without sessionId" }
224+
}
225+
97226
internal suspend fun RoutingContext.mcpPostEndpoint(transports: ConcurrentMap<String, SseServerTransport>) {
98227
val sessionId: String = call.request.queryParameters["sessionId"]
99228
?: run {

0 commit comments

Comments
 (0)