Skip to content

Commit 07c3d45

Browse files
author
Soumyajit Saha
committed
Fix: Added removal of subscriptions on complete
Previously there was only a single disposal for all the subsriptions. So, all the subscriptions were getting cancelled when the disposal bag was going out of scope and not when a completion request was received on a particular subscription. So, I added a dictionary of disposal bags with subscription 'id' as key and the disposal bag for that subscription as value. On subscription, a new disposal bag corresponding to the subscription is created and inserted in the dictionary. On completion, the disposal bag is removed from the dictionary. Now, the test for consecutive subscription of same operation is passing. Link: https://app.clickup.com/t/866amdey9
1 parent 237d290 commit 07c3d45

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

Sources/GraphQLTransportWS-DataSync/Server.swift

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,13 @@ public class Server<InitPayload: Equatable & Codable> {
2525

2626
var initialized = false
2727

28-
let disposeBag = DisposeBag()
28+
var disposeBags = [String: DisposeBag]()
2929
let encoder = GraphQLJSONEncoder()
3030
let decoder = JSONDecoder()
31+
32+
enum DuplicateKeyError: Error {
33+
case duplicateKey
34+
}
3135

3236
/// Create a new server
3337
///
@@ -87,6 +91,7 @@ public class Server<InitPayload: Equatable & Codable> {
8791
self.error(.invalidRequestFormat(messageType: .complete))
8892
return
8993
}
94+
self.disposeBags.removeValue(forKey: completeRequest.id)
9095
self.onOperationComplete(completeRequest.id)
9196
case .unknown:
9297
self.error(.invalidType())
@@ -207,6 +212,12 @@ public class Server<InitPayload: Equatable & Codable> {
207212
// swiftlint:disable:next force_cast
208213
let stream = streamOpt as! ObservableSubscriptionEventStream
209214
let observable = stream.observable
215+
guard disposeBags[id] == nil else {
216+
self.sendError(DuplicateKeyError.duplicateKey, id: id)
217+
return
218+
}
219+
let disposeBag = DisposeBag()
220+
disposeBags[id] = disposeBag
210221

211222
observable.subscribe(
212223
onNext: { [weak self] resultFuture in
@@ -226,7 +237,7 @@ public class Server<InitPayload: Equatable & Codable> {
226237
guard let self = self else { return }
227238
self.sendComplete(id: id)
228239
}
229-
).disposed(by: self.disposeBag)
240+
).disposed(by: disposeBag)
230241
}
231242
subscribeFuture.whenFailure { error in
232243
self.sendError(error, id: id)
@@ -274,6 +285,7 @@ public class Server<InitPayload: Equatable & Codable> {
274285
id: id
275286
).toJSON(encoder)
276287
)
288+
self.disposeBags.removeValue(forKey: id)
277289
self.onOperationComplete(id)
278290
}
279291

0 commit comments

Comments
 (0)