Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ let package = Package(
dependencies: [
.package(url: "https://github.com/ashleymills/Reachability.swift", from: "5.1.0"),
.package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.6.0")
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.6.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0")
],
targets: [
.target(
name: "DiscordKitCore",
dependencies: [
.product(name: "Reachability", package: "Reachability.swift", condition: .when(platforms: [.macOS])),
.product(name: "SwiftProtobuf", package: "swift-protobuf")
.product(name: "SwiftProtobuf", package: "swift-protobuf"),
.product(name: "Logging", package: "swift-log")
],
exclude: [
"REST/README.md",
Expand Down
23 changes: 23 additions & 0 deletions Sources/DiscordKitCore/Extensions/Logger+.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//
// Logger+.swift
//
//
// Created by Vincent Kwok on 25/11/22.
//

import Foundation
import Logging

extension Logger {
/// Create a Logger instance at a specific log level
init(label: String, level: Level?) {
self.init(label: label)
if let level = level {
logLevel = level
} else {
#if DEBUG
logLevel = .trace
#endif
}
}
}
23 changes: 13 additions & 10 deletions Sources/DiscordKitCore/Gateway/DecompressionEngine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import Foundation
import Compression
import OSLog
import Logging

/// Decompresses `zlib-stream`-compressed payloads received
/// from the Gateway
Expand All @@ -23,7 +23,7 @@ import OSLog
public class DecompressionEngine {
private static let ZLIB_SUFFIX = Data([0x00, 0x00, 0xff, 0xff]), BUFFER_SIZE = 32_768

private static let log = Logger(subsystem: Bundle.main.bundleIdentifier ?? DiscordREST.subsystem, category: "DecompressionEngine")
private static let log = Logger(label: "DecompressionEngine", level: nil)
private var buf = Data(), stream: compression_stream, status: compression_status,
decompressing = false

Expand Down Expand Up @@ -58,21 +58,21 @@ public class DecompressionEngine {
buf.append(data)

guard buf.count >= 4, buf.suffix(4) == DecompressionEngine.ZLIB_SUFFIX else {
DecompressionEngine.log.debug("Appending to buf, current buf len: \(self.buf.count, privacy: .public)")
Self.log.debug("Appending to buf", metadata: ["buf.count": "\(buf.count)"])
return nil
}

let output = decompress(buf)

buf.removeAll()

return String(decoding: output, as: UTF8.self)
}
}

public extension DecompressionEngine {
fileprivate func decompress(_ data: Data) -> Data {
guard !decompressing else {
DecompressionEngine.log.warning("Another decompression is currently taking place, skipping")
Self.log.warning("Another decompression is currently taking place, skipping")
return Data()
}
decompressing = true
Expand Down Expand Up @@ -114,12 +114,10 @@ public extension DecompressionEngine {

// Perform compression or decompression.
if let srcChunk = srcChunk {
let count = srcChunk.count

srcChunk.withUnsafeBytes {
let baseAddress = $0.bindMemory(to: UInt8.self).baseAddress!

stream.src_ptr = baseAddress.advanced(by: count - stream.src_size)
stream.src_ptr = baseAddress.advanced(by: $0.count - stream.src_size)
status = compression_stream_process(&stream, flags)
}
}
Expand All @@ -136,11 +134,16 @@ public extension DecompressionEngine {
// Reset the stream to receive the next batch of output.
stream.dst_ptr = destinationBufferPointer
stream.dst_size = bufferSize
case COMPRESSION_STATUS_ERROR: return decompressed
// This "error" happens when decompression is done, what a hack
case COMPRESSION_STATUS_ERROR: break // This "error" occurs when decompression is done, what a hack
default: break
}
} while status == COMPRESSION_STATUS_OK

Self.log.warning("Decompressed data", metadata: [
"original.count": "\(buf.count)",
"decompressed.count": "\(decompressed.count)"
])

return decompressed
}
}
58 changes: 37 additions & 21 deletions Sources/DiscordKitCore/Gateway/RobustWebSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import Foundation
import Reachability
import OSLog
import Combine
import Logging

/// A robust WebSocket that handles resuming, reconnection and heartbeats
/// with the Discord Gateway
Expand Down Expand Up @@ -49,7 +49,7 @@ public class RobustWebSocket: NSObject {
private let reachability = try! Reachability()

// Logger instance
private static let log = Logger(subsystem: Bundle.main.bundleIdentifier ?? DiscordREST.subsystem, category: "RobustWebSocket")
private static let log = Logger(label: "RobustWebSocket", level: nil)

// Operation queue for the URLSessionWebSocketTask
private let queue: OperationQueue = {
Expand Down Expand Up @@ -111,7 +111,7 @@ public class RobustWebSocket: NSObject {

private func invalidateConnTimeout(reason: String = "") {
if let timer = connTimeout {
Self.log.debug("Invalidating conn timeout, reason: \(reason)")
Self.log.debug("Invalidating conn timeout", metadata: ["reason": "\(reason)"])
timer.invalidate()
connTimeout = nil
}
Expand Down Expand Up @@ -151,9 +151,13 @@ public class RobustWebSocket: NSObject {

let delay = reconnectInterval(code, attempts)
if let delay = delay {
Self.log.info("Retrying connection in \(delay)s, attempt \(String(self.attempts))")
Self.log.info("Retrying connection", metadata: [
"connectIn": "\(delay)",
"attempt": "\(attempts)"
])
DispatchQueue.main.async { [weak self] in
self?.pendingReconnect = Timer.scheduledTimer(withTimeInterval: delay, repeats: false) { [weak self] _ in
Self.log.trace("Attempting reconnection now", metadata: ["attempt": "\(self?.attempts ?? 0)"])
self?.connect()
}
}
Expand All @@ -172,17 +176,17 @@ public class RobustWebSocket: NSObject {
case .data(let data):
if let decompressed = self?.decompressor.push_data(data) {
try self?.handleMessage(with: decompressed)
} else { Self.log.debug("Data has not ended yet") }
} else { Self.log.trace("Decompression did not return any result - compressed packet is not complete") }
case .string(let str): try self?.handleMessage(with: str)
@unknown default: Self.log.warning("Unknown sock message case!")
}
} catch {
Self.log.warning("Error decoding message: \(error.localizedDescription, privacy: .public)")
Self.log.warning("Error decoding message", metadata: ["error": "\(error.localizedDescription)"])
}
self?.attachSockReceiveListener()
case .failure(let error):
// If an error is encountered here, the connection is probably broken
Self.log.error("Error when receiving: \(error.localizedDescription, privacy: .public)")
Self.log.error("Receive error", metadata: ["error": "\(error.localizedDescription)"])
self?.forceClose()
}
}
Expand All @@ -194,7 +198,10 @@ public class RobustWebSocket: NSObject {
socket.cancel()
}

Self.log.info("[CONNECT] \(DiscordKitConfig.default.gateway), version: \(DiscordKitConfig.default.version)")
Self.log.info("[CONNECT]", metadata: [
"ws": "\(DiscordKitConfig.default.gateway)",
"version": "\(DiscordKitConfig.default.version)"
])
pendingReconnect = nil

var gatewayReq = URLRequest(url: URL(string: DiscordKitConfig.default.gateway)!)
Expand All @@ -208,7 +215,7 @@ public class RobustWebSocket: NSObject {
self?.connTimeout = nil
guard self?.connected != true else { return }
// reachability.stopNotifier()
Self.log.warning("Connection timed out after \(self!.timeout)s")
Self.log.warning("Connection timed out", metadata: ["timeout": "\(self?.timeout ?? -1)"])
self?.forceClose()
Self.log.info("[RECONNECT] Preemptively attempting reconnection")
self?.reconnect(code: nil)
Expand Down Expand Up @@ -260,22 +267,25 @@ public class RobustWebSocket: NSObject {
case .hello(let hello):
onHello()
// Start heartbeating and send identify
Self.log.debug("[HELLO] heartbeat interval: \(hello.heartbeat_interval, privacy: .public)")
Self.log.info("[HELLO]", metadata: ["heartbeat_interval": "\(hello.heartbeat_interval)"])
startHeartbeating(interval: Double(hello.heartbeat_interval) / 1000.0)

// Check if we're attempting to and can resume
if canResume, let sessionID = sessionID {
Self.log.info("[RESUME] Resuming session \(sessionID, privacy: .public), seq: \(String(describing: self.seq), privacy: .public)")
Self.log.info("[RESUME] Resuming session", metadata: [
"sessionID": "\(sessionID)",
"seq": "\(self.seq ?? -1)"
])
guard let resume = getResume(seq: seq, sessionID: sessionID)
else { return }
send(.resume, data: resume)
return
}
Self.log.debug("[IDENTIFY] intents: \(self.intents?.rawValue.description ?? "not applicable")")
Self.log.info("[IDENTIFY]", metadata: ["intents": "\(intents?.rawValue.description ?? "none")"])
// Send identify
seq = nil // Clear sequence #
guard let identify = getIdentify() else {
Self.log.debug("Could not get identify")
Self.log.error("Could not get identify!")
close(code: .normalClosure)
onAuthFailure.notify()
return
Expand Down Expand Up @@ -305,7 +315,7 @@ public class RobustWebSocket: NSObject {
case .botReady(let ready):
sessionID = ready.session_id
canResume = true
Self.log.trace("[READY] session: \(ready.session_id)")
Self.log.info("[READY]", metadata: ["session": "\(ready.session_id)"])
fallthrough
case .resumed:
sessionOpen = true
Expand Down Expand Up @@ -383,14 +393,14 @@ public extension RobustWebSocket {
private func setupReachability() {
reachability.whenReachable = { [weak self] _ in
self?.reachable = true
Self.log.info("Reset backoff for reason: connection is reachable")
Self.log.debug("Reset backoff", metadata: ["reason": "connection is reachable"])
self?.clearPendingReconnectIfNeeded()
self?.attempts = 0
self?.reconnect(code: nil)
}
reachability.whenUnreachable = { [weak self] _ in
self?.reachable = false
Self.log.info("Connection unreachable, sending expedited heartbeat")
Self.log.warning("Connection unreachable, sending expedited heartbeat")
self?.sendHeartbeat(4*4)
}
do { try reachability.startNotifier() } catch { Self.log.error("Starting reachability notifier failed!") }
Expand All @@ -402,7 +412,7 @@ public extension RobustWebSocket {
@objc private func sendHeartbeat(_ interval: TimeInterval) {
guard connected else { return }
if let hbTimeout = hbTimeout, hbTimeout.isValid {
Self.log.warning("Skipping sending heartbeat - already waiting for one")
Self.log.warning("Skipping sending heartbeat", metadata: ["reason": "already waiting for one"])
return
}

Expand All @@ -412,14 +422,14 @@ public extension RobustWebSocket {
hbTimeout?.invalidate()
DispatchQueue.main.async { [weak self] in
self?.hbTimeout = Timer.scheduledTimer(withTimeInterval: interval * 0.25, repeats: false) { [weak self] _ in
Self.log.warning("[HEARTBEAT] Force-closing connection, reason: socket timed out")
Self.log.warning("[HEARTBEAT] Force-closing connection", metadata: ["reason": "socket timed out"])
self?.forceClose()
}
}
}

private func startHeartbeating(interval: TimeInterval) {
Self.log.debug("Sending heartbeats every \(interval)s")
Self.log.debug("Start heartbeating", metadata: ["interval": "\(interval)"])

if let hbCancellable = hbCancellable {
Self.log.debug("Cancelling existing heartbeat timer")
Expand Down Expand Up @@ -530,10 +540,16 @@ public extension RobustWebSocket {
guard let encoded = try? DiscordREST.encoder.encode(sendPayload)
else { return }

Self.log.debug("Outgoing Payload: <\(String(describing: opcode), privacy: .public)> \(String(describing: data), privacy: .sensitive(mask: .hash)) [seq: \(String(describing: self.seq), privacy: .public)]")
Self.log.trace("Outgoing Payload", metadata: [
"opcode": "\(opcode)",
"data": "\(data)",
"seq": "\(seq ?? -1)"
])

socket.send(.data(encoded), completionHandler: completionHandler ?? { err in
if let err = err { Self.log.error("Socket send error: \(err.localizedDescription, privacy: .public)") }
if let err = err { Self.log.error("Socket send error", metadata: [
"error": "\(err.localizedDescription)"
]) }
})
}
}