-
Notifications
You must be signed in to change notification settings - Fork 465
feat: client-side streamable-http transport supports continuously listening #317
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis update enhances the StreamableHTTP transport by introducing continuous server-to-client notifications using a persistent HTTP GET connection, controlled via a new option. The HTTP request logic is unified, error handling is improved, and new logger support is added. Comprehensive tests and a mock server verify the new listening feature and error scenarios. Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (1.64.8)Error: you are using a configuration file for golangci-lint v2 with golangci-lint v1: please use golangci-lint v2 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
✅ Files skipped from review due to trivial changes (1)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
client/transport/streamable_http.go (1)
202-210
:⚠️ Potential issueGuard against double-closing
initialized
channel
close(c.initialized)
is executed each time aninitialize
request succeeds.
A secondinitialize
(or a retry after a transient 50x) will panic because a closed
channel cannot be closed again.- close(c.initialized) + // close only once + select { + case <-c.initialized: // already closed – nothing to do + default: + close(c.initialized) + }
🧹 Nitpick comments (3)
client/transport/streamable_http.go (3)
258-266
: Avoid settingContent-Type
on GET requestsThe
Content-Type
header is meaningless (and sometimes confusing for proxies)
when the request has no body. It is safer to omit it for GET.- req.Header.Set("Content-Type", "application/json") + if method != http.MethodGet { + req.Header.Set("Content-Type", "application/json") + }
469-476
:resp.Body
is closed twice
createGETConnectionToServer
defersresp.Body.Close()
and passes the same
reader tohandleSSEResponse
, which in turn closes it inreadSSE
. A double
close is harmless but unnecessary and can mislead future maintainers.- defer resp.Body.Close() + // readSSE will close resp.Body for us
57-61
: Nil-logger safety
WithLogger
directly assigns the provided logger. If a caller passesnil
the transport will panic on first call. Either document the contract or add a
nil-fallback:return func(sc *StreamableHTTP) { - sc.logger = logger + if logger != nil { + sc.logger = logger + } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (2)
client/transport/streamable_http.go
(11 hunks)client/transport/streamable_http_test.go
(2 hunks)
🔇 Additional comments (1)
client/transport/streamable_http_test.go (1)
563-569
: Dead code: returnednotificationCount
is never usedThe helper returns
notificationCount
but none of the tests read the value.
Drop the return or use it; otherwise it misleads readers and linters.[ suggest_nitpick ]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
client/transport/streamable_http.go (1)
292-295
: Differentiate 404 "session terminated" from generic 404Blindly treating every 404 as "session terminated" can hide genuine endpoint-routing issues. Consider checking a sentinel header or body message returned by the server before wiping the session id.
🧹 Nitpick comments (4)
client/transport/streamable_http.go (4)
445-466
: Consider specific handling for session termination in listenForeverThe
listenForever
method doesn't specifically check for theerrSessionTerminated
error, which might lead to unnecessary retries when the session is actually terminated.func (c *StreamableHTTP) listenForever() { c.logger.Infof("listening to server forever") for { err := c.createGETConnectionToServer() if errors.Is(err, errGetMethodNotAllowed) { // server does not support listening c.logger.Errorf("server does not support listening") return } + if errors.Is(err, errSessionTerminated) { + // Session was terminated, no need to retry until reinitialization + c.logger.Errorf("session terminated, waiting for reinitialization") + // Wait for reinitialization signal + select { + case <-c.closed: + return + case <-c.initialized: // This would require resetting and recreating this channel on reinitialization + c.logger.Infof("session reinitialized, resuming listening") + continue + } + } select { case <-c.closed: return default: } if err != nil { c.logger.Errorf("failed to listen to server. retry in 1 second: %v", err) } time.Sleep(retryInterval) } }
472-473
: Consider making retryInterval configurableThe
retryInterval
variable is used for testing convenience, but it's not exposed for configuration. Consider making it configurable via an option function to allow users to adjust the retry behavior.+// WithRetryInterval sets the interval between retries for continuous listening. +func WithRetryInterval(interval time.Duration) StreamableHTTPCOption { + return func(sc *StreamableHTTP) { + sc.retryInterval = interval + } +} type StreamableHTTP struct { baseURL *url.URL httpClient *http.Client headers map[string]string headerFunc HTTPHeaderFunc logger util.Logger getListeningEnabled bool + retryInterval time.Duration initialized chan struct{} sessionID atomic.Value // string notificationHandler func(mcp.JSONRPCNotification) notifyMu sync.RWMutex closed chan struct{} } // In NewStreamableHTTP smc := &StreamableHTTP{ baseURL: parsedURL, httpClient: &http.Client{}, headers: make(map[string]string), closed: make(chan struct{}), logger: util.DefaultLogger(), initialized: make(chan struct{}), + retryInterval: 1 * time.Second, } // In listenForever time.Sleep(c.retryInterval)
477-478
: Consider using a context with timeout for GET requestsThe code creates a background context without timeout for GET requests. Consider using a context with timeout to prevent overly long-running requests if there are network issues.
-ctx := context.Background() // the sendHTTP will be automatically canceled when the client is closed +ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) // adjust timeout as needed +defer cancel() // ensure resources are cleaned up resp, err := c.sendHTTP(ctx, http.MethodGet, nil, "text/event-stream")
151-158
: Use logger instead of fmt.Printf for error messagesThere are several instances of
fmt.Printf
in the code that should be replaced with the logger for consistency. This includes lines 151, 157, 321, 329, and 382.-fmt.Printf("failed to create close request\n: %v", err) +c.logger.Errorf("failed to create close request: %v", err) -fmt.Printf("failed to send close request\n: %v", err) +c.logger.Errorf("failed to send close request: %v", err)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (1)
client/transport/streamable_http.go
(11 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
client/transport/streamable_http.go (4)
server/streamable_http.go (1)
WithLogger
(77-81)util/logger.go (2)
Logger
(8-11)DefaultLogger
(16-20)client/transport/interface.go (1)
HTTPHeaderFunc
(13-13)mcp/types.go (1)
MethodInitialize
(19-19)
🔇 Additional comments (7)
client/transport/streamable_http.go (7)
25-36
: Well-documented option for continuous listeningThe
WithContinuousListening
option is clearly documented with references to the MCP specification. This makes it easy for users to understand the purpose and behavior of this feature.
57-61
: Good addition of logger supportAdding logger support is a good practice for this type of component, especially for a feature that involves long-lived connections and retry logic.
120-126
: Clean implementation of continuous listening initializationThe goroutine that waits for initialization before starting the continuous listening process is a good approach. This ensures the session ID is established before attempting to listen for server notifications.
244-298
: Good refactoring of HTTP request handlingThe
sendHTTP
method nicely centralizes the common HTTP request creation and sending logic that was previously duplicated inSendRequest
andSendNotification
. This improves maintainability and ensures consistent behavior.
186-192
: Good handling of initialize request failuresThe special handling for 404 errors during initialization is a good addition. It correctly distinguishes between genuine routing issues and session termination errors.
468-473
: Clear error definitionsThe error definitions are clear and help distinguish between different error conditions. This makes the error handling logic more readable and maintainable.
1-507
: Overall excellent implementation of continuous listening featureThe implementation of the continuous listening feature is well-designed and follows good practices:
- It correctly implements the MCP specification for listening for server messages
- It handles errors and retries appropriately
- It provides clear configuration options
- It includes proper logging
- It refactors common code to reduce duplication
There are a few minor improvements suggested, but overall this is a solid implementation of the feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (1)
client/transport/streamable_http.go (1)
284-354
: Addresses previous 404 handling concern with improved logicThe
sendHTTP
method centralizes request handling and addresses the previous review comment about 404 handling. The universal 404 handling clears session ID, but the caller (SendRequest) has special logic for initialization requests to distinguish routing issues from session termination.
🧹 Nitpick comments (2)
client/transport/streamable_http.go (2)
347-351
: Session ID race condition in CompareAndSwapThe
CompareAndSwap
operation could fail if the session ID changed between reading it (line 312) and this comparison, potentially leaving a stale session ID. Consider using a more robust approach.// universal handling for session terminated if resp.StatusCode == http.StatusNotFound { - c.sessionID.CompareAndSwap(sessionID, "") + c.sessionID.Store("") return nil, errSessionTerminated }
548-579
: Consider context handling in long-lived connectionUsing
context.Background()
means the GET connection won't be canceled by request-specific contexts, which is appropriate for a long-lived listening connection. However, consider if this could lead to resource leaks.The comment on line 550 could be more specific:
- ctx := context.Background() // the sendHTTP will be automatically canceled when the client is closed + ctx := context.Background() // sendHTTP will cancel when client.closed is triggered
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
client/transport/streamable_http.go
(11 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
client/transport/streamable_http.go (6)
server/streamable_http.go (1)
WithLogger
(77-81)util/logger.go (2)
Logger
(8-11)DefaultLogger
(16-20)client/client.go (1)
Client
(16-25)client/transport/interface.go (1)
HTTPHeaderFunc
(13-13)mcp/types.go (1)
MethodInitialize
(19-19)client/oauth.go (1)
OAuthAuthorizationRequiredError
(48-48)
🔇 Additional comments (7)
client/transport/streamable_http.go (7)
20-20
: LGTM: Appropriate import additionThe util package import is correctly added to support the new logger functionality.
25-36
: Well-documented continuous listening optionThe option is properly documented with clear explanation of functionality and links to the MCP specification. The warning about server support is helpful.
57-61
: Standard logger option implementationFollows the same pattern as the server-side WithLogger option and properly integrates with the struct.
116-117
: Good default logger initializationProperly initializes the logger with a default and creates the coordination channel.
473-473
: Good refactoring to use centralized HTTP handlingThe SendNotification method correctly uses the new
sendHTTP
method for consistency.
518-539
: Robust retry logic with proper error handlingThe continuous listening implementation properly handles different error scenarios and includes appropriate logging. The check for method not allowed prevents infinite retries when server doesn't support GET.
541-546
: Well-defined error constantsClear error definitions with a configurable retry interval for testing flexibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (1)
client/transport/streamable_http.go (1)
385-386
: 🛠️ Refactor suggestionInconsistent error logging - use structured logger.
Several locations use
fmt.Printf
for error logging instead of the configured logger, which bypasses the logging infrastructure.Replace
fmt.Printf
calls with the structured logger:- fmt.Printf("failed to unmarshal message: %v\n", err) + c.logger.Errorf("failed to unmarshal message: %v", err)- fmt.Printf("failed to unmarshal notification: %v\n", err) + c.logger.Errorf("failed to unmarshal notification: %v", err)- fmt.Printf("SSE stream error: %v\n", err) + c.logger.Errorf("SSE stream error: %v", err)Also applies to: 393-394, 446-447
🧹 Nitpick comments (2)
client/transport/streamable_http.go (2)
292-362
: Excellent refactoring with minor context leak concern.The
sendHTTP
method effectively consolidates HTTP request logic, eliminating duplication. However, there's a potential goroutine leak in the context cancellation logic.In lines 301-308, the goroutine for context cancellation may not be cleaned up if the HTTP request completes quickly. Consider this pattern instead:
- go func() { - select { - case <-c.closed: - cancel() - case <-newCtx.Done(): - // The original context was canceled, no need to do anything - } - }() + go func() { + defer cancel() + select { + case <-c.closed: + case <-newCtx.Done(): + } + }()This ensures the cancel function is always called, cleaning up resources properly.
549-554
: Consider making retry interval configurable.The hardcoded 1-second retry interval may not be suitable for all deployment scenarios.
Consider adding a configuration option for the retry interval:
+func WithRetryInterval(interval time.Duration) StreamableHTTPCOption { + return func(sc *StreamableHTTP) { + sc.retryInterval = interval + } +}And add a
retryInterval
field to the struct with a default value.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
client/transport/streamable_http.go
(10 hunks)
🔇 Additional comments (11)
client/transport/streamable_http.go (11)
20-20
: Import addition looks good.The addition of the
util
package import supports the new logger functionality.
25-36
: Well-documented continuous listening option.The function documentation clearly explains the feature and includes relevant specification links. Good defensive notice about server support variability.
57-61
: Logger option implementation is correct.Simple and straightforward option function for logger configuration.
88-94
: Good synchronization design for initialization coordination.The combination of
initialized
channel andinitializedOnce
provides proper one-time signaling without risk of double-close panics.
118-119
: Proper initialization of new fields.Default logger and initialized channel are correctly set up in the constructor.
139-149
: Excellent fix for goroutine leak prevention.The select statement properly handles both initialization completion and client closure, addressing the previous review concern about potential goroutine leaks.
223-232
: Good differentiation of 404 errors during initialization.This addresses the previous review feedback by treating 404 errors during initialization as routing issues rather than session termination, which is the correct behavior.
261-263
: Proper use of sync.Once for channel coordination.The
initializedOnce.Do()
ensures the channel is closed exactly once, preventing panics and addressing previous review concerns about missing channel closure on error paths.
481-481
: Good refactoring to use common HTTP method.The use of
sendHTTP
eliminates code duplication and ensures consistent behavior across request types.
526-547
: Robust listening implementation with proper error handling.The retry logic and graceful handling of unsupported servers (405 Method Not Allowed) is well-implemented. The infinite loop with proper exit conditions is correct.
556-587
: Well-structured GET connection handling.The method properly handles different response scenarios including method not allowed, status code validation, and content type verification. The error wrapping provides good debugging context.
} | ||
|
||
// Start the transport - this will launch listenForever in a goroutine | ||
if err := trans.Start(context.Background()); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the SendRequest#initialize
method called before Start()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've fixed this.
It will not affect the test results, as start
in http transport is doing nothing for sending
@leavez Please let me know when this PR is ready, and I'd like to review it. Also, it will be great If you can provide some examples in |
Description
Implemented for client-side streamable-http transport
Type of Change
Checklist
MCP Spec Compliance
Summary by CodeRabbit
New Features
Bug Fixes
Tests
Documentation