Skip to content

Commit 4839d57

Browse files
committed
Make detection more robust
Signed-off-by: Ettore Di Giacinto <[email protected]>
1 parent cf0270d commit 4839d57

File tree

3 files changed

+36
-9
lines changed

3 files changed

+36
-9
lines changed

core/http/endpoints/openai/chat.go

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package openai
33
import (
44
"bufio"
55
"bytes"
6+
"context"
67
"encoding/json"
78
"fmt"
89
"net"
@@ -30,21 +31,47 @@ import (
3031
// We access the connection directly via c.Context().Conn() to monitor it
3132
// during ComputeChoices execution, not after the response is sent
3233
// see: https://github.com/mudler/LocalAI/pull/7187#issuecomment-3506720906
33-
func handleConnectionCancellation(c *fiber.Ctx, cancelFunc func()) {
34+
func handleConnectionCancellation(c *fiber.Ctx, cancelFunc func(), requestCtx context.Context) {
3435
var conn net.Conn = c.Context().Conn()
3536
if conn == nil {
3637
return
3738
}
3839

3940
go func() {
41+
defer func() {
42+
// Clear read deadline when goroutine exits
43+
conn.SetReadDeadline(time.Time{})
44+
}()
45+
4046
buf := make([]byte, 1)
47+
// Use a short read deadline to periodically check if connection is closed
48+
// Without a deadline, Read() would block indefinitely waiting for data
49+
// that will never come (client is waiting for response, not sending more data)
50+
ticker := time.NewTicker(100 * time.Millisecond)
51+
defer ticker.Stop()
52+
4153
for {
42-
_, err := conn.Read(buf)
43-
if err != nil {
44-
// Connection closed - cancel the context to stop gRPC call
45-
log.Debug().Msgf("Calling cancellation function")
46-
cancelFunc()
54+
select {
55+
case <-requestCtx.Done():
56+
// Request completed or was cancelled - exit goroutine
4757
return
58+
case <-ticker.C:
59+
// Set a short deadline - if connection is closed, read will fail immediately
60+
// If connection is open but no data, it will timeout and we check again
61+
conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond))
62+
_, err := conn.Read(buf)
63+
if err != nil {
64+
// Check if it's a timeout (connection still open, just no data)
65+
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
66+
// Timeout is expected - connection is still open, just no data to read
67+
// Continue the loop to check again
68+
continue
69+
}
70+
// Connection closed or other error - cancel the context to stop gRPC call
71+
log.Debug().Msgf("Calling cancellation function")
72+
cancelFunc()
73+
return
74+
}
4875
}
4976
}
5077
}()
@@ -546,7 +573,7 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator
546573

547574
// NOTE: this is a workaround as fasthttp
548575
// context cancellation does not fire in non-streaming requests
549-
handleConnectionCancellation(c, input.Cancel)
576+
handleConnectionCancellation(c, input.Cancel, input.Context)
550577

551578
result, tokenUsage, err := ComputeChoices(
552579
input,

core/http/endpoints/openai/mcp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func MCPCompletionEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader,
8080

8181
ctxWithCancellation, cancel := context.WithCancel(ctx)
8282
defer cancel()
83-
handleConnectionCancellation(c, cancel)
83+
handleConnectionCancellation(c, cancel, ctxWithCancellation)
8484
// TODO: instead of connecting to the API, we should just wire this internally
8585
// and act like completion.go.
8686
// We can do this as cogito expects an interface and we can create one that

core/http/middleware/request.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (re *RequestExtractor) SetOpenAIRequest(ctx *fiber.Ctx) error {
164164
//c1, cancel := context.WithCancel(re.applicationConfig.Context)
165165
// Use the application context as parent to ensure cancellation on app shutdown
166166
// We'll monitor the Fiber context separately and cancel our context when the request is canceled
167-
c1, cancel := context.WithCancel(ctx.Context())
167+
c1, cancel := context.WithCancel(re.applicationConfig.Context)
168168
// Monitor the Fiber context and cancel our context when it's canceled
169169
// This ensures we respect request cancellation without causing panics
170170
go func() {

0 commit comments

Comments
 (0)