From faa42f99c3880503afaafaa867263f5f999b60c9 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Fri, 4 Jul 2025 07:50:19 -0300 Subject: [PATCH 1/3] fix(realtime): add max attempts for resubscrining # Conflicts: # Sources/Realtime/RealtimeChannelV2.swift --- Sources/Realtime/RealtimeChannelV2.swift | 83 +++++++++++++++--------- 1 file changed, 53 insertions(+), 30 deletions(-) diff --git a/Sources/Realtime/RealtimeChannelV2.swift b/Sources/Realtime/RealtimeChannelV2.swift index 9b6efa14..0877507b 100644 --- a/Sources/Realtime/RealtimeChannelV2.swift +++ b/Sources/Realtime/RealtimeChannelV2.swift @@ -40,6 +40,7 @@ public final class RealtimeChannelV2: Sendable { let logger: (any SupabaseLogger)? let socket: RealtimeClientV2 + let maxRetryAttempt = 5 @MainActor var joinRef: String? { mutableState.joinRef } @@ -86,6 +87,40 @@ public final class RealtimeChannelV2: Sendable { /// Subscribes to the channel @MainActor public func subscribe() async { + logger?.debug("Starting subscription to channel '\(topic)' (attempt 1/\(maxRetryAttempt))") + + var attempts = 0 + + while attempts < maxRetryAttempt { + attempts += 1 + + do { + logger?.debug("Attempting to subscribe to channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt))") + try await withTimeout(interval: socket.options.timeoutInterval) { [self] in + await _subscribe() + } + logger?.debug("Successfully subscribed to channel '\(topic)'") + return + } catch { + if error is TimeoutError { + logger?.debug("Subscribe timed out for channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt))") + if attempts < maxRetryAttempt { + logger?.debug("Retrying subscription to channel '\(topic)'...") + } else { + logger?.error("Failed to subscribe to channel '\(topic)' after \(maxRetryAttempt) attempts due to timeout") + } + } else { + logger?.error("Subscribe failed for channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt)): \(error)") + break + } + } + } + + logger?.error("Subscription to channel '\(topic)' failed after \(attempts) attempts") + } + + @MainActor + private func _subscribe() async { if socket.status != .connected { if socket.options.connectOnSubscribe != true { reportIssue( @@ -97,7 +132,6 @@ public final class RealtimeChannelV2: Sendable { } status = .subscribing - logger?.debug("Subscribing to channel \(topic)") config.presence.enabled = callbackManager.callbacks.contains(where: { $0.isPresence }) @@ -108,35 +142,23 @@ public final class RealtimeChannelV2: Sendable { isPrivate: config.isPrivate ) + let accessToken = await socket._getAccessToken() let payload = RealtimeJoinPayload( config: joinConfig, - accessToken: await socket._getAccessToken(), + accessToken: accessToken, version: socket.options.headers[.xClientInfo] ) let joinRef = socket.makeRef() mutableState.joinRef = joinRef - logger?.debug("Subscribing to channel with body: \(joinConfig)") - await push( ChannelEvent.join, ref: joinRef, payload: try! JSONObject(payload) ) - do { - try await withTimeout(interval: socket.options.timeoutInterval) { [self] in - _ = await statusChange.first { @Sendable in $0 == .subscribed } - } - } catch { - if error is TimeoutError { - logger?.debug("Subscribe timed out.") - await subscribe() - } else { - logger?.error("Subscribe failed: \(error)") - } - } + _ = await statusChange.first { @Sendable in $0 == .subscribed } } public func unsubscribe() async { @@ -153,7 +175,6 @@ public final class RealtimeChannelV2: Sendable { "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead." ) public func updateAuth(jwt: String?) async { - logger?.debug("Updating auth token for channel \(topic)") await push( ChannelEvent.accessToken, payload: ["access_token": jwt.map { .string($0) } ?? .null] @@ -269,7 +290,6 @@ public final class RealtimeChannelV2: Sendable { func onMessage(_ message: RealtimeMessageV2) async { do { guard let eventType = message._eventType else { - logger?.debug("Received message without event type: \(message)") return } @@ -280,12 +300,9 @@ public final class RealtimeChannelV2: Sendable { case .system: if message.status == .ok { - logger?.debug("Subscribed to channel \(message.topic)") status = .subscribed } else { - logger?.debug( - "Failed to subscribe to channel \(message.topic): \(message.payload)" - ) + logger?.error("Failed to subscribe to channel '\(topic)' via system event: \(message.payload)") } callbackManager.triggerSystem(message: message) @@ -311,13 +328,12 @@ public final class RealtimeChannelV2: Sendable { if self.status != .subscribed { self.status = .subscribed - logger?.debug("Subscribed to channel \(message.topic)") } } case .postgresChanges: guard let data = message.payload["data"] else { - logger?.debug("Expected \"data\" key in message payload.") + logger?.error("Expected \"data\" key in message payload.") return } @@ -375,12 +391,11 @@ public final class RealtimeChannelV2: Sendable { case .close: socket._remove(self) - logger?.debug("Unsubscribed from channel \(message.topic)") status = .unsubscribed case .error: logger?.error( - "Received an error in channel \(message.topic). That could be as a result of an invalid access token" + "Received an error in channel '\(topic)'. This could be due to an invalid access token: \(message.payload)" ) case .presenceDiff: @@ -393,7 +408,7 @@ public final class RealtimeChannelV2: Sendable { callbackManager.triggerPresenceDiffs(joins: joins, leaves: [:], rawMessage: message) } } catch { - logger?.debug("Failed: \(error)") + logger?.error("Failed to process message for channel '\(topic)': \(error)") } } @@ -559,9 +574,11 @@ public final class RealtimeChannelV2: Sendable { @MainActor @discardableResult func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus { + let messageRef = ref ?? socket.makeRef() + let message = RealtimeMessageV2( joinRef: joinRef, - ref: ref ?? socket.makeRef(), + ref: messageRef, topic: self.topic, event: event, payload: payload @@ -572,12 +589,18 @@ public final class RealtimeChannelV2: Sendable { mutableState.pushes[ref] = push } - return await push.send() + let status = await push.send() + return status } @MainActor private func didReceiveReply(ref: String, status: String) { let push = mutableState.pushes.removeValue(forKey: ref) - push?.didReceive(status: PushStatus(rawValue: status) ?? .ok) + if let push = push { + let pushStatus = PushStatus(rawValue: status) ?? .ok + push.didReceive(status: pushStatus) + } else { + logger?.error("No push found for reply on channel '\(topic)': ref='\(ref)'") + } } } From dc4ea0a0deda39bd52d8ccb19360e95bad6aaecc Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Fri, 4 Jul 2025 07:55:17 -0300 Subject: [PATCH 2/3] proper use try catch --- Sources/Realtime/RealtimeChannelV2.swift | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/Sources/Realtime/RealtimeChannelV2.swift b/Sources/Realtime/RealtimeChannelV2.swift index 0877507b..bffba4a2 100644 --- a/Sources/Realtime/RealtimeChannelV2.swift +++ b/Sources/Realtime/RealtimeChannelV2.swift @@ -101,18 +101,16 @@ public final class RealtimeChannelV2: Sendable { } logger?.debug("Successfully subscribed to channel '\(topic)'") return - } catch { - if error is TimeoutError { + } catch is TimeoutError { logger?.debug("Subscribe timed out for channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt))") if attempts < maxRetryAttempt { logger?.debug("Retrying subscription to channel '\(topic)'...") } else { logger?.error("Failed to subscribe to channel '\(topic)' after \(maxRetryAttempt) attempts due to timeout") } - } else { - logger?.error("Subscribe failed for channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt)): \(error)") - break - } + } catch { + logger?.error("Subscribe failed for channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt)): \(error)") + break } } From 8c4f5c421cf4cfee9de6e7de7a6f6024a73b3e54 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Fri, 4 Jul 2025 07:57:15 -0300 Subject: [PATCH 3/3] revert push --- Sources/Realtime/RealtimeChannelV2.swift | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/Sources/Realtime/RealtimeChannelV2.swift b/Sources/Realtime/RealtimeChannelV2.swift index bffba4a2..c37f79fe 100644 --- a/Sources/Realtime/RealtimeChannelV2.swift +++ b/Sources/Realtime/RealtimeChannelV2.swift @@ -572,11 +572,9 @@ public final class RealtimeChannelV2: Sendable { @MainActor @discardableResult func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus { - let messageRef = ref ?? socket.makeRef() - let message = RealtimeMessageV2( joinRef: joinRef, - ref: messageRef, + ref: ref ?? socket.makeRef(), topic: self.topic, event: event, payload: payload @@ -587,8 +585,7 @@ public final class RealtimeChannelV2: Sendable { mutableState.pushes[ref] = push } - let status = await push.send() - return status + return await push.send() } @MainActor