Skip to content

Commit 75abd9a

Browse files
ezynda3opencode
andauthored
fix: SSE parser now correctly handles events without event field (#376)
* fix: SSE parser now correctly handles events without event field According to the W3C SSE specification, the event field is optional. SSE events with only data fields should be processed normally and treated as "message" events (the default event type). This fixes issue #369 where the SSE parser incorrectly required both event and data fields, causing "unexpected nil response" errors when servers sent valid SSE events with only data fields. 🤖 Generated with opencode Co-Authored-By: opencode <[email protected]> * fix: resolve race condition in SSE test Fixed race condition in TestSSE/SSEEventWithoutEventField where multiple goroutines were accessing the same HTTP response writer concurrently. Simplified the test to use channel-based synchronization instead of shared state between HTTP handlers. 🤖 Generated with opencode Co-Authored-By: opencode <[email protected]> * fix: StreamableHTTP SSE parser now correctly handles events without event field - Fixed SSE parser in StreamableHTTP transport to process events with only data field - According to W3C SSE specification, the event field is optional - When no event type is specified, defaults to "message" event type - Added comprehensive test case to verify the fix works correctly - Resolves issue #369 where SSE events without event field caused "unexpected nil response" errors 🤖 Generated with opencode Co-Authored-By: opencode <[email protected]> --------- Co-authored-by: opencode <[email protected]>
1 parent cf6a2e9 commit 75abd9a

File tree

4 files changed

+199
-4
lines changed

4 files changed

+199
-4
lines changed

client/transport/sse.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,11 @@ func (c *SSE) readSSE(reader io.ReadCloser) {
194194
if err != nil {
195195
if err == io.EOF {
196196
// Process any pending event before exit
197-
if event != "" && data != "" {
197+
if data != "" {
198+
// If no event type is specified, use empty string (default event type)
199+
if event == "" {
200+
event = "message"
201+
}
198202
c.handleSSEEvent(event, data)
199203
}
200204
break
@@ -209,7 +213,11 @@ func (c *SSE) readSSE(reader io.ReadCloser) {
209213
line = strings.TrimRight(line, "\r\n")
210214
if line == "" {
211215
// Empty line means end of event
212-
if event != "" && data != "" {
216+
if data != "" {
217+
// If no event type is specified, use empty string (default event type)
218+
if event == "" {
219+
event = "message"
220+
}
213221
c.handleSSEEvent(event, data)
214222
event = ""
215223
data = ""

client/transport/sse_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,109 @@ func TestSSE(t *testing.T) {
405405
}
406406
})
407407

408+
t.Run("SSEEventWithoutEventField", func(t *testing.T) {
409+
// Test that SSE events with only data field (no event field) are processed correctly
410+
// This tests the fix for issue #369
411+
412+
var messageReceived chan struct{}
413+
414+
// Create a custom mock server that sends SSE events without event field
415+
sseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
416+
w.Header().Set("Content-Type", "text/event-stream")
417+
flusher, ok := w.(http.Flusher)
418+
if !ok {
419+
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
420+
return
421+
}
422+
423+
// Send initial endpoint event
424+
fmt.Fprintf(w, "event: endpoint\ndata: %s\n\n", "/message")
425+
flusher.Flush()
426+
427+
// Wait for message to be received, then send response
428+
select {
429+
case <-messageReceived:
430+
// Send response via SSE WITHOUT event field (only data field)
431+
// This should be processed as a "message" event according to SSE spec
432+
response := map[string]any{
433+
"jsonrpc": "2.0",
434+
"id": 1,
435+
"result": "test response without event field",
436+
}
437+
responseBytes, _ := json.Marshal(response)
438+
fmt.Fprintf(w, "data: %s\n\n", responseBytes)
439+
flusher.Flush()
440+
case <-r.Context().Done():
441+
return
442+
}
443+
444+
// Keep connection open
445+
<-r.Context().Done()
446+
})
447+
448+
// Create message handler
449+
messageHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
450+
w.Header().Set("Content-Type", "application/json")
451+
w.WriteHeader(http.StatusAccepted)
452+
453+
// Signal that message was received
454+
close(messageReceived)
455+
})
456+
457+
// Initialize the channel
458+
messageReceived = make(chan struct{})
459+
460+
// Create test server
461+
mux := http.NewServeMux()
462+
mux.Handle("/", sseHandler)
463+
mux.Handle("/message", messageHandler)
464+
testServer := httptest.NewServer(mux)
465+
defer testServer.Close()
466+
467+
// Create SSE transport
468+
trans, err := NewSSE(testServer.URL)
469+
if err != nil {
470+
t.Fatal(err)
471+
}
472+
473+
// Start the transport
474+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
475+
defer cancel()
476+
477+
err = trans.Start(ctx)
478+
if err != nil {
479+
t.Fatalf("Failed to start transport: %v", err)
480+
}
481+
defer trans.Close()
482+
483+
// Send a request
484+
request := JSONRPCRequest{
485+
JSONRPC: "2.0",
486+
ID: mcp.NewRequestId(int64(1)),
487+
Method: "test",
488+
}
489+
490+
// This should succeed because the SSE event without event field should be processed
491+
response, err := trans.SendRequest(ctx, request)
492+
if err != nil {
493+
t.Fatalf("SendRequest failed: %v", err)
494+
}
495+
496+
if response == nil {
497+
t.Fatal("Expected response, got nil")
498+
}
499+
500+
// Verify the response
501+
var result string
502+
if err := json.Unmarshal(response.Result, &result); err != nil {
503+
t.Fatalf("Failed to unmarshal result: %v", err)
504+
}
505+
506+
if result != "test response without event field" {
507+
t.Errorf("Expected 'test response without event field', got '%s'", result)
508+
}
509+
})
510+
408511
}
409512

410513
func TestSSEErrors(t *testing.T) {

client/transport/streamable_http.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,11 @@ func (c *StreamableHTTP) readSSE(ctx context.Context, reader io.ReadCloser, hand
380380
if err != nil {
381381
if err == io.EOF {
382382
// Process any pending event before exit
383-
if event != "" && data != "" {
383+
if data != "" {
384+
// If no event type is specified, use empty string (default event type)
385+
if event == "" {
386+
event = "message"
387+
}
384388
handler(event, data)
385389
}
386390
return
@@ -398,7 +402,11 @@ func (c *StreamableHTTP) readSSE(ctx context.Context, reader io.ReadCloser, hand
398402
line = strings.TrimRight(line, "\r\n")
399403
if line == "" {
400404
// Empty line means end of event
401-
if event != "" && data != "" {
405+
if data != "" {
406+
// If no event type is specified, use empty string (default event type)
407+
if event == "" {
408+
event = "message"
409+
}
402410
handler(event, data)
403411
event = ""
404412
data = ""

client/transport/streamable_http_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,82 @@ func TestStreamableHTTP(t *testing.T) {
413413
t.Errorf("Expected JSONRPC '2.0', got '%s'", responseError.JSONRPC)
414414
}
415415
})
416+
417+
t.Run("SSEEventWithoutEventField", func(t *testing.T) {
418+
// Test that SSE events with only data field (no event field) are processed correctly
419+
// This tests the fix for issue #369
420+
421+
// Create a custom mock server that sends SSE events without event field
422+
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
423+
if r.Method != http.MethodPost {
424+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
425+
return
426+
}
427+
428+
// Parse incoming JSON-RPC request
429+
var request map[string]any
430+
decoder := json.NewDecoder(r.Body)
431+
if err := decoder.Decode(&request); err != nil {
432+
http.Error(w, fmt.Sprintf("Invalid JSON: %v", err), http.StatusBadRequest)
433+
return
434+
}
435+
436+
// Send response via SSE WITHOUT event field (only data field)
437+
// This should be processed as a "message" event according to SSE spec
438+
w.Header().Set("Content-Type", "text/event-stream")
439+
w.WriteHeader(http.StatusOK)
440+
441+
response := map[string]any{
442+
"jsonrpc": "2.0",
443+
"id": request["id"],
444+
"result": "test response without event field",
445+
}
446+
responseBytes, _ := json.Marshal(response)
447+
// Note: No "event:" field, only "data:" field
448+
fmt.Fprintf(w, "data: %s\n\n", responseBytes)
449+
})
450+
451+
// Create test server
452+
testServer := httptest.NewServer(handler)
453+
defer testServer.Close()
454+
455+
// Create StreamableHTTP transport
456+
trans, err := NewStreamableHTTP(testServer.URL)
457+
if err != nil {
458+
t.Fatal(err)
459+
}
460+
defer trans.Close()
461+
462+
// Send a request
463+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
464+
defer cancel()
465+
466+
request := JSONRPCRequest{
467+
JSONRPC: "2.0",
468+
ID: mcp.NewRequestId(int64(1)),
469+
Method: "test",
470+
}
471+
472+
// This should succeed because the SSE event without event field should be processed
473+
response, err := trans.SendRequest(ctx, request)
474+
if err != nil {
475+
t.Fatalf("SendRequest failed: %v", err)
476+
}
477+
478+
if response == nil {
479+
t.Fatal("Expected response, got nil")
480+
}
481+
482+
// Verify the response
483+
var result string
484+
if err := json.Unmarshal(response.Result, &result); err != nil {
485+
t.Fatalf("Failed to unmarshal result: %v", err)
486+
}
487+
488+
if result != "test response without event field" {
489+
t.Errorf("Expected 'test response without event field', got '%s'", result)
490+
}
491+
})
416492
}
417493

418494
func TestStreamableHTTPErrors(t *testing.T) {

0 commit comments

Comments
 (0)