@@ -18,20 +18,6 @@ import (
18
18
"github.com/mark3labs/mcp-go/mcp"
19
19
)
20
20
21
- type Option func (* SSEMCPClient )
22
-
23
- func WithHeaders (headers map [string ]string ) Option {
24
- return func (sc * SSEMCPClient ) {
25
- sc .headers = headers
26
- }
27
- }
28
-
29
- func WithSSEReadTimeout (timeout time.Duration ) Option {
30
- return func (sc * SSEMCPClient ) {
31
- sc .sseReadTimeout = timeout
32
- }
33
- }
34
-
35
21
// SSEMCPClient implements the MCPClient interface using Server-Sent Events (SSE).
36
22
// It maintains a persistent HTTP connection to receive server-pushed events
37
23
// while sending requests over regular HTTP POST calls. The client handles
@@ -40,7 +26,6 @@ type SSEMCPClient struct {
40
26
baseURL * url.URL
41
27
endpoint * url.URL
42
28
httpClient * http.Client
43
- headers map [string ]string
44
29
requestID atomic.Int64
45
30
responses map [int64 ]chan RPCResponse
46
31
mu sync.RWMutex
@@ -50,18 +35,33 @@ type SSEMCPClient struct {
50
35
notifyMu sync.RWMutex
51
36
endpointChan chan struct {}
52
37
capabilities mcp.ServerCapabilities
38
+ headers map [string ]string
53
39
sseReadTimeout time.Duration
54
40
}
55
41
42
+ type ClientOption func (* SSEMCPClient )
43
+
44
+ func WithHeaders (headers map [string ]string ) ClientOption {
45
+ return func (sc * SSEMCPClient ) {
46
+ sc .headers = headers
47
+ }
48
+ }
49
+
50
+ func WithSSEReadTimeout (timeout time.Duration ) ClientOption {
51
+ return func (sc * SSEMCPClient ) {
52
+ sc .sseReadTimeout = timeout
53
+ }
54
+ }
55
+
56
56
// NewSSEMCPClient creates a new SSE-based MCP client with the given base URL.
57
57
// Returns an error if the URL is invalid.
58
- func NewSSEMCPClient (baseURL string , options ... Option ) (* SSEMCPClient , error ) {
58
+ func NewSSEMCPClient (baseURL string , options ... ClientOption ) (* SSEMCPClient , error ) {
59
59
parsedURL , err := url .Parse (baseURL )
60
60
if err != nil {
61
61
return nil , fmt .Errorf ("invalid URL: %w" , err )
62
62
}
63
63
64
- sc := & SSEMCPClient {
64
+ smc := & SSEMCPClient {
65
65
baseURL : parsedURL ,
66
66
httpClient : & http.Client {},
67
67
responses : make (map [int64 ]chan RPCResponse ),
@@ -72,10 +72,10 @@ func NewSSEMCPClient(baseURL string, options ...Option) (*SSEMCPClient, error) {
72
72
}
73
73
74
74
for _ , opt := range options {
75
- opt (sc )
75
+ opt (smc )
76
76
}
77
77
78
- return sc , nil
78
+ return smc , nil
79
79
}
80
80
81
81
// Start initiates the SSE connection to the server and waits for the endpoint information.
@@ -125,31 +125,37 @@ func (c *SSEMCPClient) Start(ctx context.Context) error {
125
125
func (c * SSEMCPClient ) readSSE (reader io.ReadCloser ) {
126
126
defer reader .Close ()
127
127
128
- ctx , cancel := context .WithTimeout (context .Background (), c .sseReadTimeout )
129
- defer cancel ()
130
-
131
- scanner := bufio .NewScanner (reader )
128
+ br := bufio .NewReader (reader )
132
129
var event , data string
133
130
134
- var done bool
135
- for {
136
- if done {
137
- break
138
- }
131
+ ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
132
+ defer cancel ()
139
133
134
+ for {
140
135
select {
141
136
case <- ctx .Done ():
142
- if err := ctx .Err (); err != nil {
143
- fmt .Printf ("SSE read timed out: %v\n " , err )
144
- return
145
- }
137
+ return
146
138
default :
147
- if ! scanner .Scan () {
148
- done = true
149
- break
139
+ line , err := br .ReadString ('\n' )
140
+ if err != nil {
141
+ if err == io .EOF {
142
+ // Process any pending event before exit
143
+ if event != "" && data != "" {
144
+ c .handleSSEEvent (event , data )
145
+ }
146
+ break
147
+ }
148
+ select {
149
+ case <- c .done :
150
+ return
151
+ default :
152
+ fmt .Printf ("SSE stream error: %v\n " , err )
153
+ return
154
+ }
150
155
}
151
- line := scanner .Text ()
152
156
157
+ // Remove only newline markers
158
+ line = strings .TrimRight (line , "\r \n " )
153
159
if line == "" {
154
160
// Empty line means end of event
155
161
if event != "" && data != "" {
@@ -167,15 +173,6 @@ func (c *SSEMCPClient) readSSE(reader io.ReadCloser) {
167
173
}
168
174
}
169
175
}
170
-
171
- if err := scanner .Err (); err != nil {
172
- select {
173
- case <- c .done :
174
- return
175
- default :
176
- fmt .Printf ("SSE stream error: %v\n " , err )
177
- }
178
- }
179
176
}
180
177
181
178
// handleSSEEvent processes SSE events based on their type.
0 commit comments