Skip to content

Commit 1d6a781

Browse files
committed
fix(realtime): add max attempts for resubscrining
1 parent 09f047b commit 1d6a781

File tree

1 file changed

+54
-31
lines changed

1 file changed

+54
-31
lines changed

Sources/Realtime/RealtimeChannelV2.swift

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public final class RealtimeChannelV2: Sendable {
3838
let config: RealtimeChannelConfig
3939
let logger: (any SupabaseLogger)?
4040
let socket: RealtimeClientV2
41+
let maxRetryAttempt = 5
4142

4243
@MainActor var joinRef: String? { mutableState.joinRef }
4344

@@ -84,6 +85,40 @@ public final class RealtimeChannelV2: Sendable {
8485
/// Subscribes to the channel
8586
@MainActor
8687
public func subscribe() async {
88+
logger?.debug("Starting subscription to channel '\(topic)' (attempt 1/\(maxRetryAttempt))")
89+
90+
var attempts = 0
91+
92+
while attempts < maxRetryAttempt {
93+
attempts += 1
94+
95+
do {
96+
logger?.debug("Attempting to subscribe to channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt))")
97+
try await withTimeout(interval: socket.options.timeoutInterval) { [self] in
98+
await _subscribe()
99+
}
100+
logger?.debug("Successfully subscribed to channel '\(topic)'")
101+
return
102+
} catch {
103+
if error is TimeoutError {
104+
logger?.debug("Subscribe timed out for channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt))")
105+
if attempts < maxRetryAttempt {
106+
logger?.debug("Retrying subscription to channel '\(topic)'...")
107+
} else {
108+
logger?.error("Failed to subscribe to channel '\(topic)' after \(maxRetryAttempt) attempts due to timeout")
109+
}
110+
} else {
111+
logger?.error("Subscribe failed for channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt)): \(error)")
112+
break
113+
}
114+
}
115+
}
116+
117+
logger?.error("Subscription to channel '\(topic)' failed after \(attempts) attempts")
118+
}
119+
120+
@MainActor
121+
private func _subscribe() async {
87122
if socket.status != .connected {
88123
if socket.options.connectOnSubscribe != true {
89124
reportIssue(
@@ -95,7 +130,6 @@ public final class RealtimeChannelV2: Sendable {
95130
}
96131

97132
status = .subscribing
98-
logger?.debug("Subscribing to channel \(topic)")
99133

100134
let joinConfig = RealtimeJoinConfig(
101135
broadcast: config.broadcast,
@@ -104,35 +138,23 @@ public final class RealtimeChannelV2: Sendable {
104138
isPrivate: config.isPrivate
105139
)
106140

141+
let accessToken = await socket._getAccessToken()
107142
let payload = RealtimeJoinPayload(
108143
config: joinConfig,
109-
accessToken: await socket._getAccessToken(),
144+
accessToken: accessToken,
110145
version: socket.options.headers[.xClientInfo]
111146
)
112147

113148
let joinRef = socket.makeRef()
114149
mutableState.joinRef = joinRef
115150

116-
logger?.debug("Subscribing to channel with body: \(joinConfig)")
117-
118151
await push(
119152
ChannelEvent.join,
120153
ref: joinRef,
121154
payload: try! JSONObject(payload)
122155
)
123156

124-
do {
125-
try await withTimeout(interval: socket.options.timeoutInterval) { [self] in
126-
_ = await statusChange.first { @Sendable in $0 == .subscribed }
127-
}
128-
} catch {
129-
if error is TimeoutError {
130-
logger?.debug("Subscribe timed out.")
131-
await subscribe()
132-
} else {
133-
logger?.error("Subscribe failed: \(error)")
134-
}
135-
}
157+
_ = await statusChange.first { @Sendable in $0 == .subscribed }
136158
}
137159

138160
public func unsubscribe() async {
@@ -149,7 +171,6 @@ public final class RealtimeChannelV2: Sendable {
149171
"manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
150172
)
151173
public func updateAuth(jwt: String?) async {
152-
logger?.debug("Updating auth token for channel \(topic)")
153174
await push(
154175
ChannelEvent.accessToken,
155176
payload: ["access_token": jwt.map { .string($0) } ?? .null]
@@ -264,7 +285,6 @@ public final class RealtimeChannelV2: Sendable {
264285
func onMessage(_ message: RealtimeMessageV2) async {
265286
do {
266287
guard let eventType = message._eventType else {
267-
logger?.debug("Received message without event type: \(message)")
268288
return
269289
}
270290

@@ -275,12 +295,9 @@ public final class RealtimeChannelV2: Sendable {
275295

276296
case .system:
277297
if message.status == .ok {
278-
logger?.debug("Subscribed to channel \(message.topic)")
279298
status = .subscribed
280299
} else {
281-
logger?.debug(
282-
"Failed to subscribe to channel \(message.topic): \(message.payload)"
283-
)
300+
logger?.error("Failed to subscribe to channel '\(topic)' via system event: \(message.payload)")
284301
}
285302

286303
callbackManager.triggerSystem(message: message)
@@ -306,13 +323,12 @@ public final class RealtimeChannelV2: Sendable {
306323

307324
if self.status != .subscribed {
308325
self.status = .subscribed
309-
logger?.debug("Subscribed to channel \(message.topic)")
310326
}
311327
}
312328

313329
case .postgresChanges:
314330
guard let data = message.payload["data"] else {
315-
logger?.debug("Expected \"data\" key in message payload.")
331+
logger?.error("Expected \"data\" key in message payload.")
316332
return
317333
}
318334

@@ -370,12 +386,11 @@ public final class RealtimeChannelV2: Sendable {
370386

371387
case .close:
372388
socket._remove(self)
373-
logger?.debug("Unsubscribed from channel \(message.topic)")
374389
status = .unsubscribed
375390

376391
case .error:
377-
logger?.debug(
378-
"Received an error in channel \(message.topic). That could be as a result of an invalid access token"
392+
logger?.error(
393+
"Received an error in channel '\(topic)'. This could be due to an invalid access token: \(message.payload)"
379394
)
380395

381396
case .presenceDiff:
@@ -388,7 +403,7 @@ public final class RealtimeChannelV2: Sendable {
388403
callbackManager.triggerPresenceDiffs(joins: joins, leaves: [:], rawMessage: message)
389404
}
390405
} catch {
391-
logger?.debug("Failed: \(error)")
406+
logger?.error("Failed to process message for channel '\(topic)': \(error)")
392407
}
393408
}
394409

@@ -543,9 +558,11 @@ public final class RealtimeChannelV2: Sendable {
543558
@MainActor
544559
@discardableResult
545560
func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus {
561+
let messageRef = ref ?? socket.makeRef()
562+
546563
let message = RealtimeMessageV2(
547564
joinRef: joinRef,
548-
ref: ref ?? socket.makeRef(),
565+
ref: messageRef,
549566
topic: self.topic,
550567
event: event,
551568
payload: payload
@@ -556,12 +573,18 @@ public final class RealtimeChannelV2: Sendable {
556573
mutableState.pushes[ref] = push
557574
}
558575

559-
return await push.send()
576+
let status = await push.send()
577+
return status
560578
}
561579

562580
@MainActor
563581
private func didReceiveReply(ref: String, status: String) {
564582
let push = mutableState.pushes.removeValue(forKey: ref)
565-
push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
583+
if let push = push {
584+
let pushStatus = PushStatus(rawValue: status) ?? .ok
585+
push.didReceive(status: pushStatus)
586+
} else {
587+
logger?.error("No push found for reply on channel '\(topic)': ref='\(ref)'")
588+
}
566589
}
567590
}

0 commit comments

Comments
 (0)