Skip to content

Commit 5ce4792

Browse files
fix: Improve subscriptionTask cleanup
1 parent 5ec2935 commit 5ce4792

File tree

1 file changed

+22
-8
lines changed

1 file changed

+22
-8
lines changed

Sources/GraphQLTransportWS/Server.swift

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ public class Server<
1010
>: @unchecked Sendable where
1111
SubscriptionSequenceType.Element == GraphQLResult
1212
{
13-
1413
// We keep this weak because we strongly inject this object into the messenger callback
1514
weak var messenger: Messenger?
1615

@@ -89,13 +88,17 @@ public class Server<
8988
try await self.error(.invalidRequestFormat(messageType: .complete))
9089
return
9190
}
92-
try await self.onOperationComplete(completeRequest.id)
91+
try await self.onOperationComplete(completeRequest)
9392
case .unknown:
9493
try await self.error(.invalidType())
9594
}
9695
}
9796
}
9897

98+
deinit {
99+
subscriptionTasks.values.forEach { $0.cancel() }
100+
}
101+
99102
/// Define a custom callback run during `connection_init` resolution that allows authorization using the `payload`.
100103
/// Throw from this closure to indicate that authorization has failed.
101104
/// - Parameter callback: The callback to assign
@@ -171,18 +174,15 @@ public class Server<
171174
let stream = try await onSubscribe(graphQLRequest)
172175
for try await event in stream {
173176
try Task.checkCancellation()
174-
do {
175-
try await self.sendNext(event, id: id)
176-
} catch {
177-
try await self.sendError(error, id: id)
178-
throw error
179-
}
177+
try await self.sendNext(event, id: id)
180178
}
181179
} catch {
182180
try await sendError(error, id: id)
181+
subscriptionTasks.removeValue(forKey: id)
183182
throw error
184183
}
185184
try await self.sendComplete(id: id)
185+
subscriptionTasks.removeValue(forKey: id)
186186
}
187187
} else {
188188
do {
@@ -196,6 +196,20 @@ public class Server<
196196
}
197197
}
198198

199+
private func onOperationComplete(_ completeRequest: CompleteRequest) async throws {
200+
guard initialized else {
201+
try await error(.notInitialized())
202+
return
203+
}
204+
205+
let id = completeRequest.id
206+
if let task = subscriptionTasks[id] {
207+
task.cancel()
208+
subscriptionTasks.removeValue(forKey: id)
209+
}
210+
try await onOperationComplete(id)
211+
}
212+
199213
/// Send a `connection_ack` response through the messenger
200214
private func sendConnectionAck(_ payload: [String: Map]? = nil) async throws {
201215
guard let messenger = messenger else { return }

0 commit comments

Comments
 (0)