@@ -25,9 +25,13 @@ public class Server<InitPayload: Equatable & Codable> {
25
25
26
26
var initialized = false
27
27
28
- let disposeBag = DisposeBag ( )
28
+ var disposeBags = [ String : DisposeBag] ( )
29
29
let encoder = GraphQLJSONEncoder ( )
30
30
let decoder = JSONDecoder ( )
31
+
32
+ enum DuplicateKeyError : Error {
33
+ case duplicateKey
34
+ }
31
35
32
36
/// Create a new server
33
37
///
@@ -87,6 +91,7 @@ public class Server<InitPayload: Equatable & Codable> {
87
91
self . error ( . invalidRequestFormat( messageType: . complete) )
88
92
return
89
93
}
94
+ self . disposeBags. removeValue ( forKey: completeRequest. id)
90
95
self . onOperationComplete ( completeRequest. id)
91
96
case . unknown:
92
97
self . error ( . invalidType( ) )
@@ -207,6 +212,12 @@ public class Server<InitPayload: Equatable & Codable> {
207
212
// swiftlint:disable:next force_cast
208
213
let stream = streamOpt as! ObservableSubscriptionEventStream
209
214
let observable = stream. observable
215
+ guard disposeBags [ id] == nil else {
216
+ self . sendError ( DuplicateKeyError . duplicateKey, id: id)
217
+ return
218
+ }
219
+ let disposeBag = DisposeBag ( )
220
+ disposeBags [ id] = disposeBag
210
221
211
222
observable. subscribe (
212
223
onNext: { [ weak self] resultFuture in
@@ -226,7 +237,7 @@ public class Server<InitPayload: Equatable & Codable> {
226
237
guard let self = self else { return }
227
238
self . sendComplete ( id: id)
228
239
}
229
- ) . disposed ( by: self . disposeBag)
240
+ ) . disposed ( by: disposeBag)
230
241
}
231
242
subscribeFuture. whenFailure { error in
232
243
self . sendError ( error, id: id)
@@ -274,6 +285,7 @@ public class Server<InitPayload: Equatable & Codable> {
274
285
id: id
275
286
) . toJSON ( encoder)
276
287
)
288
+ self . disposeBags. removeValue ( forKey: id)
277
289
self . onOperationComplete ( id)
278
290
}
279
291
0 commit comments