@@ -23,20 +23,19 @@ import (
23
23
// while sending requests over regular HTTP POST calls. The client handles
24
24
// automatic reconnection and message routing between requests and responses.
25
25
type SSEMCPClient struct {
26
- baseURL * url.URL
27
- endpoint * url.URL
28
- httpClient * http.Client
29
- requestID atomic.Int64
30
- responses map [int64 ]chan RPCResponse
31
- mu sync.RWMutex
32
- done chan struct {}
33
- initialized bool
34
- notifications []func (mcp.JSONRPCNotification )
35
- notifyMu sync.RWMutex
36
- endpointChan chan struct {}
37
- capabilities mcp.ServerCapabilities
38
- headers map [string ]string
39
- sseReadTimeout time.Duration
26
+ baseURL * url.URL
27
+ endpoint * url.URL
28
+ httpClient * http.Client
29
+ requestID atomic.Int64
30
+ responses map [int64 ]chan RPCResponse
31
+ mu sync.RWMutex
32
+ done chan struct {}
33
+ initialized bool
34
+ notifications []func (mcp.JSONRPCNotification )
35
+ notifyMu sync.RWMutex
36
+ endpointChan chan struct {}
37
+ capabilities mcp.ServerCapabilities
38
+ headers map [string ]string
40
39
}
41
40
42
41
type ClientOption func (* SSEMCPClient )
@@ -68,13 +67,12 @@ func NewSSEMCPClient(baseURL string, options ...ClientOption) (*SSEMCPClient, er
68
67
}
69
68
70
69
smc := & SSEMCPClient {
71
- baseURL : parsedURL ,
72
- httpClient : & http.Client {},
73
- responses : make (map [int64 ]chan RPCResponse ),
74
- done : make (chan struct {}),
75
- endpointChan : make (chan struct {}),
76
- sseReadTimeout : 30 * time .Second ,
77
- headers : make (map [string ]string ),
70
+ baseURL : parsedURL ,
71
+ httpClient : & http.Client {},
72
+ responses : make (map [int64 ]chan RPCResponse ),
73
+ done : make (chan struct {}),
74
+ endpointChan : make (chan struct {}),
75
+ headers : make (map [string ]string ),
78
76
}
79
77
80
78
for _ , opt := range options {
@@ -99,6 +97,9 @@ func (c *SSEMCPClient) Start(ctx context.Context) error {
99
97
req .Header .Set ("Accept" , "text/event-stream" )
100
98
req .Header .Set ("Cache-Control" , "no-cache" )
101
99
req .Header .Set ("Connection" , "keep-alive" )
100
+ for k , v := range c .headers {
101
+ req .Header .Set (k , v )
102
+ }
102
103
103
104
resp , err := c .httpClient .Do (req )
104
105
if err != nil {
@@ -134,12 +135,9 @@ func (c *SSEMCPClient) readSSE(reader io.ReadCloser) {
134
135
br := bufio .NewReader (reader )
135
136
var event , data string
136
137
137
- ctx , cancel := context .WithTimeout (context .Background (), c .sseReadTimeout )
138
- defer cancel ()
139
-
140
138
for {
141
139
select {
142
- case <- ctx . Done () :
140
+ case <- c . done :
143
141
return
144
142
default :
145
143
line , err := br .ReadString ('\n' )
@@ -405,7 +403,7 @@ func (c *SSEMCPClient) Initialize(
405
403
err ,
406
404
)
407
405
}
408
- resp .Body .Close ()
406
+ defer resp .Body .Close ()
409
407
410
408
c .initialized = true
411
409
return & result , nil
@@ -416,42 +414,77 @@ func (c *SSEMCPClient) Ping(ctx context.Context) error {
416
414
return err
417
415
}
418
416
419
- func (c * SSEMCPClient ) ListResources (
417
+ // ListResourcesByPage manually list resources by page.
418
+ func (c * SSEMCPClient ) ListResourcesByPage (
420
419
ctx context.Context ,
421
420
request mcp.ListResourcesRequest ,
422
421
) (* mcp.ListResourcesResult , error ) {
423
- response , err := c . sendRequest (ctx , "resources/list" , request . Params )
422
+ result , err := listByPage [mcp. ListResourcesResult ] (ctx , c , request . PaginatedRequest , "resources/list" )
424
423
if err != nil {
425
424
return nil , err
426
425
}
426
+ return result , nil
427
+ }
427
428
428
- var result mcp.ListResourcesResult
429
- if err := json .Unmarshal (* response , & result ); err != nil {
430
- return nil , fmt .Errorf ("failed to unmarshal response: %w" , err )
429
+ func (c * SSEMCPClient ) ListResources (
430
+ ctx context.Context ,
431
+ request mcp.ListResourcesRequest ,
432
+ ) (* mcp.ListResourcesResult , error ) {
433
+ result , err := c .ListResourcesByPage (ctx , request )
434
+ if err != nil {
435
+ return nil , err
431
436
}
437
+ for result .NextCursor != "" {
438
+ select {
439
+ case <- ctx .Done ():
440
+ return nil , ctx .Err ()
441
+ default :
442
+ request .Params .Cursor = result .NextCursor
443
+ newPageRes , err := c .ListResourcesByPage (ctx , request )
444
+ if err != nil {
445
+ return nil , err
446
+ }
447
+ result .Resources = append (result .Resources , newPageRes .Resources ... )
448
+ result .NextCursor = newPageRes .NextCursor
449
+ }
450
+ }
451
+ return result , nil
452
+ }
432
453
433
- return & result , nil
454
+ func (c * SSEMCPClient ) ListResourceTemplatesByPage (
455
+ ctx context.Context ,
456
+ request mcp.ListResourceTemplatesRequest ,
457
+ ) (* mcp.ListResourceTemplatesResult , error ) {
458
+ result , err := listByPage [mcp.ListResourceTemplatesResult ](ctx , c , request .PaginatedRequest , "resources/templates/list" )
459
+ if err != nil {
460
+ return nil , err
461
+ }
462
+ return result , nil
434
463
}
435
464
436
465
func (c * SSEMCPClient ) ListResourceTemplates (
437
466
ctx context.Context ,
438
467
request mcp.ListResourceTemplatesRequest ,
439
468
) (* mcp.ListResourceTemplatesResult , error ) {
440
- response , err := c .sendRequest (
441
- ctx ,
442
- "resources/templates/list" ,
443
- request .Params ,
444
- )
469
+ result , err := c .ListResourceTemplatesByPage (ctx , request )
445
470
if err != nil {
446
471
return nil , err
447
472
}
448
-
449
- var result mcp.ListResourceTemplatesResult
450
- if err := json .Unmarshal (* response , & result ); err != nil {
451
- return nil , fmt .Errorf ("failed to unmarshal response: %w" , err )
473
+ for result .NextCursor != "" {
474
+ select {
475
+ case <- ctx .Done ():
476
+ return nil , ctx .Err ()
477
+ default :
478
+ request .Params .Cursor = result .NextCursor
479
+ newPageRes , err := c .ListResourceTemplatesByPage (ctx , request )
480
+ if err != nil {
481
+ return nil , err
482
+ }
483
+ result .ResourceTemplates = append (result .ResourceTemplates , newPageRes .ResourceTemplates ... )
484
+ result .NextCursor = newPageRes .NextCursor
485
+ }
452
486
}
453
-
454
- return & result , nil
487
+ return result , nil
455
488
}
456
489
457
490
func (c * SSEMCPClient ) ReadResource (
@@ -482,21 +515,40 @@ func (c *SSEMCPClient) Unsubscribe(
482
515
return err
483
516
}
484
517
485
- func (c * SSEMCPClient ) ListPrompts (
518
+ func (c * SSEMCPClient ) ListPromptsByPage (
486
519
ctx context.Context ,
487
520
request mcp.ListPromptsRequest ,
488
521
) (* mcp.ListPromptsResult , error ) {
489
- response , err := c . sendRequest (ctx , "prompts/list" , request . Params )
522
+ result , err := listByPage [mcp. ListPromptsResult ] (ctx , c , request . PaginatedRequest , "prompts/list" )
490
523
if err != nil {
491
524
return nil , err
492
525
}
526
+ return result , nil
527
+ }
493
528
494
- var result mcp.ListPromptsResult
495
- if err := json .Unmarshal (* response , & result ); err != nil {
496
- return nil , fmt .Errorf ("failed to unmarshal response: %w" , err )
529
+ func (c * SSEMCPClient ) ListPrompts (
530
+ ctx context.Context ,
531
+ request mcp.ListPromptsRequest ,
532
+ ) (* mcp.ListPromptsResult , error ) {
533
+ result , err := c .ListPromptsByPage (ctx , request )
534
+ if err != nil {
535
+ return nil , err
497
536
}
498
-
499
- return & result , nil
537
+ for result .NextCursor != "" {
538
+ select {
539
+ case <- ctx .Done ():
540
+ return nil , ctx .Err ()
541
+ default :
542
+ request .Params .Cursor = result .NextCursor
543
+ newPageRes , err := c .ListPromptsByPage (ctx , request )
544
+ if err != nil {
545
+ return nil , err
546
+ }
547
+ result .Prompts = append (result .Prompts , newPageRes .Prompts ... )
548
+ result .NextCursor = newPageRes .NextCursor
549
+ }
550
+ }
551
+ return result , nil
500
552
}
501
553
502
554
func (c * SSEMCPClient ) GetPrompt (
@@ -511,21 +563,40 @@ func (c *SSEMCPClient) GetPrompt(
511
563
return mcp .ParseGetPromptResult (response )
512
564
}
513
565
514
- func (c * SSEMCPClient ) ListTools (
566
+ func (c * SSEMCPClient ) ListToolsByPage (
515
567
ctx context.Context ,
516
568
request mcp.ListToolsRequest ,
517
569
) (* mcp.ListToolsResult , error ) {
518
- response , err := c . sendRequest (ctx , "tools/list" , request . Params )
570
+ result , err := listByPage [mcp. ListToolsResult ] (ctx , c , request . PaginatedRequest , "tools/list" )
519
571
if err != nil {
520
572
return nil , err
521
573
}
574
+ return result , nil
575
+ }
522
576
523
- var result mcp.ListToolsResult
524
- if err := json .Unmarshal (* response , & result ); err != nil {
525
- return nil , fmt .Errorf ("failed to unmarshal response: %w" , err )
577
+ func (c * SSEMCPClient ) ListTools (
578
+ ctx context.Context ,
579
+ request mcp.ListToolsRequest ,
580
+ ) (* mcp.ListToolsResult , error ) {
581
+ result , err := c .ListToolsByPage (ctx , request )
582
+ if err != nil {
583
+ return nil , err
526
584
}
527
-
528
- return & result , nil
585
+ for result .NextCursor != "" {
586
+ select {
587
+ case <- ctx .Done ():
588
+ return nil , ctx .Err ()
589
+ default :
590
+ request .Params .Cursor = result .NextCursor
591
+ newPageRes , err := c .ListToolsByPage (ctx , request )
592
+ if err != nil {
593
+ return nil , err
594
+ }
595
+ result .Tools = append (result .Tools , newPageRes .Tools ... )
596
+ result .NextCursor = newPageRes .NextCursor
597
+ }
598
+ }
599
+ return result , nil
529
600
}
530
601
531
602
func (c * SSEMCPClient ) CallTool (
0 commit comments