Skip to content

fix(realtime): add max attempts for resubscribing #743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 46 additions & 28 deletions Sources/Realtime/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
}

@MainActor
private var mutableState = MutableState()

Check warning on line 35 in Sources/Realtime/RealtimeChannelV2.swift

View workflow job for this annotation

GitHub Actions / xcodebuild (15) (IOS, 15.2)

stored property 'mutableState' of 'Sendable'-conforming class 'RealtimeChannelV2' is mutable

Check warning on line 35 in Sources/Realtime/RealtimeChannelV2.swift

View workflow job for this annotation

GitHub Actions / xcodebuild (15) (MACOS, 15.2)

stored property 'mutableState' of 'Sendable'-conforming class 'RealtimeChannelV2' is mutable

let topic: String

Expand All @@ -40,6 +40,7 @@

let logger: (any SupabaseLogger)?
let socket: RealtimeClientV2
let maxRetryAttempt = 5

@MainActor var joinRef: String? { mutableState.joinRef }

Expand Down Expand Up @@ -86,6 +87,38 @@
/// Subscribes to the channel
@MainActor
public func subscribe() async {
logger?.debug("Starting subscription to channel '\(topic)' (attempt 1/\(maxRetryAttempt))")

var attempts = 0

while attempts < maxRetryAttempt {
attempts += 1

do {
logger?.debug("Attempting to subscribe to channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt))")
try await withTimeout(interval: socket.options.timeoutInterval) { [self] in
await _subscribe()
}
logger?.debug("Successfully subscribed to channel '\(topic)'")
return
} catch is TimeoutError {
logger?.debug("Subscribe timed out for channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt))")
if attempts < maxRetryAttempt {
logger?.debug("Retrying subscription to channel '\(topic)'...")
} else {
logger?.error("Failed to subscribe to channel '\(topic)' after \(maxRetryAttempt) attempts due to timeout")
}
} catch {
logger?.error("Subscribe failed for channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt)): \(error)")
break
}
}

logger?.error("Subscription to channel '\(topic)' failed after \(attempts) attempts")
}

@MainActor
private func _subscribe() async {
if socket.status != .connected {
if socket.options.connectOnSubscribe != true {
reportIssue(
Expand All @@ -97,7 +130,6 @@
}

status = .subscribing
logger?.debug("Subscribing to channel \(topic)")

config.presence.enabled = callbackManager.callbacks.contains(where: { $0.isPresence })

Expand All @@ -108,35 +140,23 @@
isPrivate: config.isPrivate
)

let accessToken = await socket._getAccessToken()
let payload = RealtimeJoinPayload(
config: joinConfig,
accessToken: await socket._getAccessToken(),
accessToken: accessToken,
version: socket.options.headers[.xClientInfo]
)

let joinRef = socket.makeRef()
mutableState.joinRef = joinRef

logger?.debug("Subscribing to channel with body: \(joinConfig)")

await push(
ChannelEvent.join,
ref: joinRef,
payload: try! JSONObject(payload)
)

do {
try await withTimeout(interval: socket.options.timeoutInterval) { [self] in
_ = await statusChange.first { @Sendable in $0 == .subscribed }
}
} catch {
if error is TimeoutError {
logger?.debug("Subscribe timed out.")
await subscribe()
} else {
logger?.error("Subscribe failed: \(error)")
}
}
_ = await statusChange.first { @Sendable in $0 == .subscribed }
}

public func unsubscribe() async {
Expand All @@ -153,7 +173,6 @@
"manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
)
public func updateAuth(jwt: String?) async {
logger?.debug("Updating auth token for channel \(topic)")
await push(
ChannelEvent.accessToken,
payload: ["access_token": jwt.map { .string($0) } ?? .null]
Expand Down Expand Up @@ -269,7 +288,6 @@
func onMessage(_ message: RealtimeMessageV2) async {
do {
guard let eventType = message._eventType else {
logger?.debug("Received message without event type: \(message)")
return
}

Expand All @@ -280,12 +298,9 @@

case .system:
if message.status == .ok {
logger?.debug("Subscribed to channel \(message.topic)")
status = .subscribed
} else {
logger?.debug(
"Failed to subscribe to channel \(message.topic): \(message.payload)"
)
logger?.error("Failed to subscribe to channel '\(topic)' via system event: \(message.payload)")
}

callbackManager.triggerSystem(message: message)
Expand All @@ -311,13 +326,12 @@

if self.status != .subscribed {
self.status = .subscribed
logger?.debug("Subscribed to channel \(message.topic)")
}
}

case .postgresChanges:
guard let data = message.payload["data"] else {
logger?.debug("Expected \"data\" key in message payload.")
logger?.error("Expected \"data\" key in message payload.")
return
}

Expand Down Expand Up @@ -375,12 +389,11 @@

case .close:
socket._remove(self)
logger?.debug("Unsubscribed from channel \(message.topic)")
status = .unsubscribed

case .error:
logger?.error(
"Received an error in channel \(message.topic). That could be as a result of an invalid access token"
"Received an error in channel '\(topic)'. This could be due to an invalid access token: \(message.payload)"
)

case .presenceDiff:
Expand All @@ -393,7 +406,7 @@
callbackManager.triggerPresenceDiffs(joins: joins, leaves: [:], rawMessage: message)
}
} catch {
logger?.debug("Failed: \(error)")
logger?.error("Failed to process message for channel '\(topic)': \(error)")
}
}

Expand Down Expand Up @@ -578,6 +591,11 @@
@MainActor
private func didReceiveReply(ref: String, status: String) {
let push = mutableState.pushes.removeValue(forKey: ref)
push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
if let push = push {
let pushStatus = PushStatus(rawValue: status) ?? .ok
push.didReceive(status: pushStatus)
} else {
logger?.error("No push found for reply on channel '\(topic)': ref='\(ref)'")
}
}
}
Loading