Skip to content

Commit 9c85560

Browse files
author
Sebastien Stormacq
committed
State : use a struct instead of enum
1 parent 669ad43 commit 9c85560

File tree

1 file changed

+68
-100
lines changed

1 file changed

+68
-100
lines changed

Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift

Lines changed: 68 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -28,51 +28,36 @@ extension LambdaHTTPServer {
2828

2929
typealias Element = T
3030

31-
enum State: ~Copyable {
32-
case buffer(Deque<T>)
33-
// FIFO waiting (for invocations)
34-
case waitingForAny(CheckedContinuation<T, any Error>)
35-
// RequestId-based waiting (for responses)
36-
case waitingForSpecific([String: CheckedContinuation<T, any Error>])
31+
struct State {
32+
var buffer: Deque<T> = []
33+
var waitingForAny: CheckedContinuation<T, any Error>?
34+
var waitingForSpecific: [String: CheckedContinuation<T, any Error>] = [:]
3735
}
3836

39-
private let lock = Mutex<State>(.buffer([]))
37+
private let lock = Mutex<State>(State())
4038

4139
/// enqueue an element, or give it back immediately to the iterator if it is waiting for an element
4240
public func push(_ item: T) {
4341
let continuationToResume = self.lock.withLock { state -> CheckedContinuation<T, any Error>? in
44-
switch consume state {
45-
case .buffer(var buffer):
46-
buffer.append(item)
47-
state = .buffer(buffer)
48-
return nil
49-
50-
case .waitingForAny(let continuation):
51-
// Someone is waiting for any item (FIFO)
52-
state = .buffer([])
42+
// First check if there's a waiting continuation that can handle this item
43+
44+
// Check for FIFO waiter first
45+
if let continuation = state.waitingForAny {
46+
state.waitingForAny = nil
5347
return continuation
48+
}
5449

55-
case .waitingForSpecific(var continuations):
56-
// Check if this item matches any waiting continuation
57-
if let response = item as? LocalServerResponse,
58-
let requestId = response.requestId,
59-
let continuation = continuations.removeValue(forKey: requestId)
60-
{
61-
// Found a matching continuation
62-
if continuations.isEmpty {
63-
state = .buffer([])
64-
} else {
65-
state = .waitingForSpecific(continuations)
66-
}
67-
return continuation
68-
} else {
69-
// No matching continuation, add to buffer
70-
var buffer = Deque<T>()
71-
buffer.append(item)
72-
state = .buffer(buffer)
73-
return nil
74-
}
50+
// Check for specific waiter
51+
if let response = item as? LocalServerResponse,
52+
let requestId = response.requestId,
53+
let continuation = state.waitingForSpecific.removeValue(forKey: requestId)
54+
{
55+
return continuation
7556
}
57+
58+
// No waiting continuation, add to buffer
59+
state.buffer.append(item)
60+
return nil
7661
}
7762

7863
// Resume continuation outside the lock to prevent potential deadlocks
@@ -89,63 +74,44 @@ extension LambdaHTTPServer {
8974
return try await withTaskCancellationHandler {
9075
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<T, any Error>) in
9176
let nextAction: Result<T, PoolError>? = self.lock.withLock { state -> Result<T, PoolError>? in
92-
switch consume state {
93-
case .buffer(var buffer):
94-
if let requestId = requestId {
95-
// Look for oldest (first) item for this requestId in buffer
96-
if let index = buffer.firstIndex(where: { item in
97-
if let response = item as? LocalServerResponse {
98-
return response.requestId == requestId
99-
}
100-
return false
101-
}) {
102-
let item = buffer.remove(at: index)
103-
state = .buffer(buffer)
104-
return .success(item)
105-
} else {
106-
// No matching item, wait for it
107-
var continuations: [String: CheckedContinuation<T, any Error>] = [:]
108-
continuations[requestId] = continuation
109-
state = .waitingForSpecific(continuations)
110-
return nil
77+
if let requestId = requestId {
78+
// Look for oldest (first) item for this requestId in buffer
79+
if let index = state.buffer.firstIndex(where: { item in
80+
if let response = item as? LocalServerResponse {
81+
return response.requestId == requestId
11182
}
83+
return false
84+
}) {
85+
let item = state.buffer.remove(at: index)
86+
return .success(item)
11287
} else {
113-
// FIFO mode - take first item
114-
if let first = buffer.popFirst() {
115-
state = .buffer(buffer)
116-
return .success(first)
117-
} else {
118-
state = .waitingForAny(continuation)
119-
return nil
88+
// Check for conflicting waiters
89+
if state.waitingForAny != nil {
90+
return .failure(PoolError(cause: .mixedWaitingModes))
12091
}
121-
}
122-
123-
case .waitingForAny(let previousContinuation):
124-
if requestId == nil {
125-
// Another FIFO call while already waiting
126-
state = .buffer([])
127-
return .failure(PoolError(cause: .nextCalledTwice(previousContinuation)))
128-
} else {
129-
// Can't mix FIFO and specific waiting
130-
state = .waitingForAny(previousContinuation)
131-
return .failure(PoolError(cause: .mixedWaitingModes))
132-
}
133-
134-
case .waitingForSpecific(var continuations):
135-
if let requestId = requestId {
136-
if continuations[requestId] != nil {
137-
// Already waiting for this requestId
138-
state = .waitingForSpecific(continuations)
92+
if state.waitingForSpecific[requestId] != nil {
13993
return .failure(PoolError(cause: .duplicateRequestIdWait(requestId)))
140-
} else {
141-
continuations[requestId] = continuation
142-
state = .waitingForSpecific(continuations)
143-
return nil
14494
}
95+
96+
// No matching item, wait for it
97+
state.waitingForSpecific[requestId] = continuation
98+
return nil
99+
}
100+
} else {
101+
// FIFO mode - take first item
102+
if let first = state.buffer.popFirst() {
103+
return .success(first)
145104
} else {
146-
// Can't mix FIFO and specific waiting
147-
state = .waitingForSpecific(continuations)
148-
return .failure(PoolError(cause: .mixedWaitingModes))
105+
// Check for conflicting waiters
106+
if !state.waitingForSpecific.isEmpty {
107+
return .failure(PoolError(cause: .mixedWaitingModes))
108+
}
109+
if state.waitingForAny != nil {
110+
return .failure(PoolError(cause: .nextCalledTwice(state.waitingForAny!)))
111+
}
112+
113+
state.waitingForAny = continuation
114+
return nil
149115
}
150116
}
151117
}
@@ -164,23 +130,25 @@ extension LambdaHTTPServer {
164130
}
165131
}
166132
} onCancel: {
167-
// Ensure we properly handle cancellation by checking if we have a stored continuation
168-
let continuationsToCancel = self.lock.withLock { state -> [String: CheckedContinuation<T, any Error>] in
169-
switch consume state {
170-
case .buffer(let buffer):
171-
state = .buffer(buffer)
172-
return [:]
173-
case .waitingForAny(let continuation):
174-
state = .buffer([])
175-
return ["": continuation] // Use empty string as key for single continuation
176-
case .waitingForSpecific(let continuations):
177-
state = .buffer([])
178-
return continuations
133+
// Ensure we properly handle cancellation by removing stored continuation
134+
let continuationsToCancel = self.lock.withLock { state -> [CheckedContinuation<T, any Error>] in
135+
var toCancel: [CheckedContinuation<T, any Error>] = []
136+
137+
if let continuation = state.waitingForAny {
138+
toCancel.append(continuation)
139+
state.waitingForAny = nil
179140
}
141+
142+
for continuation in state.waitingForSpecific.values {
143+
toCancel.append(continuation)
144+
}
145+
state.waitingForSpecific.removeAll()
146+
147+
return toCancel
180148
}
181149

182150
// Resume all continuations outside the lock to avoid potential deadlocks
183-
for continuation in continuationsToCancel.values {
151+
for continuation in continuationsToCancel {
184152
continuation.resume(throwing: CancellationError())
185153
}
186154
}

0 commit comments

Comments
 (0)