diff --git a/Sources/Realtime/RealtimeChannelV2.swift b/Sources/Realtime/RealtimeChannelV2.swift index 9b6efa14..c37f79fe 100644 --- a/Sources/Realtime/RealtimeChannelV2.swift +++ b/Sources/Realtime/RealtimeChannelV2.swift @@ -40,6 +40,7 @@ public final class RealtimeChannelV2: Sendable { let logger: (any SupabaseLogger)? let socket: RealtimeClientV2 + let maxRetryAttempt = 5 @MainActor var joinRef: String? { mutableState.joinRef } @@ -86,6 +87,38 @@ public final class RealtimeChannelV2: Sendable { /// Subscribes to the channel @MainActor public func subscribe() async { + logger?.debug("Starting subscription to channel '\(topic)' (attempt 1/\(maxRetryAttempt))") + + var attempts = 0 + + while attempts < maxRetryAttempt { + attempts += 1 + + do { + logger?.debug("Attempting to subscribe to channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt))") + try await withTimeout(interval: socket.options.timeoutInterval) { [self] in + await _subscribe() + } + logger?.debug("Successfully subscribed to channel '\(topic)'") + return + } catch is TimeoutError { + logger?.debug("Subscribe timed out for channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt))") + if attempts < maxRetryAttempt { + logger?.debug("Retrying subscription to channel '\(topic)'...") + } else { + logger?.error("Failed to subscribe to channel '\(topic)' after \(maxRetryAttempt) attempts due to timeout") + } + } catch { + logger?.error("Subscribe failed for channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt)): \(error)") + break + } + } + + logger?.error("Subscription to channel '\(topic)' failed after \(attempts) attempts") + } + + @MainActor + private func _subscribe() async { if socket.status != .connected { if socket.options.connectOnSubscribe != true { reportIssue( @@ -97,7 +130,6 @@ public final class RealtimeChannelV2: Sendable { } status = .subscribing - logger?.debug("Subscribing to channel \(topic)") config.presence.enabled = callbackManager.callbacks.contains(where: { $0.isPresence }) @@ -108,35 +140,23 @@ public final class RealtimeChannelV2: Sendable { isPrivate: config.isPrivate ) + let accessToken = await socket._getAccessToken() let payload = RealtimeJoinPayload( config: joinConfig, - accessToken: await socket._getAccessToken(), + accessToken: accessToken, version: socket.options.headers[.xClientInfo] ) let joinRef = socket.makeRef() mutableState.joinRef = joinRef - logger?.debug("Subscribing to channel with body: \(joinConfig)") - await push( ChannelEvent.join, ref: joinRef, payload: try! JSONObject(payload) ) - do { - try await withTimeout(interval: socket.options.timeoutInterval) { [self] in - _ = await statusChange.first { @Sendable in $0 == .subscribed } - } - } catch { - if error is TimeoutError { - logger?.debug("Subscribe timed out.") - await subscribe() - } else { - logger?.error("Subscribe failed: \(error)") - } - } + _ = await statusChange.first { @Sendable in $0 == .subscribed } } public func unsubscribe() async { @@ -153,7 +173,6 @@ public final class RealtimeChannelV2: Sendable { "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead." ) public func updateAuth(jwt: String?) async { - logger?.debug("Updating auth token for channel \(topic)") await push( ChannelEvent.accessToken, payload: ["access_token": jwt.map { .string($0) } ?? .null] @@ -269,7 +288,6 @@ public final class RealtimeChannelV2: Sendable { func onMessage(_ message: RealtimeMessageV2) async { do { guard let eventType = message._eventType else { - logger?.debug("Received message without event type: \(message)") return } @@ -280,12 +298,9 @@ public final class RealtimeChannelV2: Sendable { case .system: if message.status == .ok { - logger?.debug("Subscribed to channel \(message.topic)") status = .subscribed } else { - logger?.debug( - "Failed to subscribe to channel \(message.topic): \(message.payload)" - ) + logger?.error("Failed to subscribe to channel '\(topic)' via system event: \(message.payload)") } callbackManager.triggerSystem(message: message) @@ -311,13 +326,12 @@ public final class RealtimeChannelV2: Sendable { if self.status != .subscribed { self.status = .subscribed - logger?.debug("Subscribed to channel \(message.topic)") } } case .postgresChanges: guard let data = message.payload["data"] else { - logger?.debug("Expected \"data\" key in message payload.") + logger?.error("Expected \"data\" key in message payload.") return } @@ -375,12 +389,11 @@ public final class RealtimeChannelV2: Sendable { case .close: socket._remove(self) - logger?.debug("Unsubscribed from channel \(message.topic)") status = .unsubscribed case .error: logger?.error( - "Received an error in channel \(message.topic). That could be as a result of an invalid access token" + "Received an error in channel '\(topic)'. This could be due to an invalid access token: \(message.payload)" ) case .presenceDiff: @@ -393,7 +406,7 @@ public final class RealtimeChannelV2: Sendable { callbackManager.triggerPresenceDiffs(joins: joins, leaves: [:], rawMessage: message) } } catch { - logger?.debug("Failed: \(error)") + logger?.error("Failed to process message for channel '\(topic)': \(error)") } } @@ -578,6 +591,11 @@ public final class RealtimeChannelV2: Sendable { @MainActor private func didReceiveReply(ref: String, status: String) { let push = mutableState.pushes.removeValue(forKey: ref) - push?.didReceive(status: PushStatus(rawValue: status) ?? .ok) + if let push = push { + let pushStatus = PushStatus(rawValue: status) ?? .ok + push.didReceive(status: pushStatus) + } else { + logger?.error("No push found for reply on channel '\(topic)': ref='\(ref)'") + } } }