Skip to content

Code clean up to to fix issue #234 - ConnectionsState exposes a setter into internal state #310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 174 additions & 0 deletions Sources/AsyncHTTPClient/Connection.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2019-2020 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation
import Logging
import NIO
import NIOConcurrencyHelpers
import NIOHTTP1
import NIOHTTPCompression
import NIOTLS
import NIOTransportServices

/// A `Connection` represents a `Channel` in the context of the connection pool
///
/// In the `ConnectionPool`, each `Channel` belongs to a given `HTTP1ConnectionProvider`
/// and has a certain "lease state" (see the `inUse` property).
/// The role of `Connection` is to model this by storing a `Channel` alongside its associated properties
/// so that they can be passed around together and correct provider can be identified when connection is released.
class Connection {
/// The provider this `Connection` belongs to.
///
/// This enables calling methods like `release()` directly on a `Connection` instead of
/// calling `provider.release(connection)`. This gives a more object oriented feel to the API
/// and can avoid having to keep explicit references to the pool at call site.
private let provider: HTTP1ConnectionProvider

/// The `Channel` of this `Connection`
///
/// - Warning: Requests that lease connections from the `ConnectionPool` are responsible
/// for removing the specific handlers they added to the `Channel` pipeline before releasing it to the pool.
let channel: Channel

init(channel: Channel, provider: HTTP1ConnectionProvider) {
self.channel = channel
self.provider = provider
}
}

extension Connection {
/// Release this `Connection` to its associated `HTTP1ConnectionProvider`.
///
/// - Warning: This only releases the connection and doesn't take care of cleaning handlers in the `Channel` pipeline.
func release(closing: Bool, logger: Logger) {
self.channel.eventLoop.assertInEventLoop()
self.provider.release(connection: self, closing: closing, logger: logger)
}

/// Called when channel exceeds idle time in pool.
func timeout(logger: Logger) {
self.channel.eventLoop.assertInEventLoop()
self.provider.timeout(connection: self, logger: logger)
}

/// Called when channel goes inactive while in the pool.
func remoteClosed(logger: Logger) {
self.channel.eventLoop.assertInEventLoop()
self.provider.remoteClosed(connection: self, logger: logger)
}

/// Called from `HTTP1ConnectionProvider.close` when client is shutting down.
func close() -> EventLoopFuture<Void> {
return self.channel.close()
}
}

/// Methods of Connection which are used in ConnectionsState extracted as protocol
/// to facilitate test of ConnectionsState.
protocol PoolManageableConnection: AnyObject {
func cancel() -> EventLoopFuture<Void>
var eventLoop: EventLoop { get }
var isActiveEstimation: Bool { get }
}

/// Implementation of methods used by ConnectionsState and its tests to manage Connection
extension Connection: PoolManageableConnection {
/// Convenience property indicating whether the underlying `Channel` is active or not.
var isActiveEstimation: Bool {
return self.channel.isActive
}

var eventLoop: EventLoop {
return self.channel.eventLoop
}

func cancel() -> EventLoopFuture<Void> {
return self.channel.triggerUserOutboundEvent(TaskCancelEvent())
}
}

extension Connection {
/// Sets idle timeout handler and channel inactivity listener.
func setIdleTimeout(timeout: TimeAmount?, logger: Logger) {
_ = self.channel.pipeline.addHandler(IdleStateHandler(writeTimeout: timeout), position: .first).flatMap { _ in
self.channel.pipeline.addHandler(IdlePoolConnectionHandler(connection: self, logger: logger))
}
}

/// Removes idle timeout handler and channel inactivity listener
func cancelIdleTimeout() -> EventLoopFuture<Void> {
return self.removeHandler(IdleStateHandler.self).flatMap { _ in
self.removeHandler(IdlePoolConnectionHandler.self)
}
}
}

class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler {
typealias InboundIn = NIOAny

let connection: Connection
var eventSent: Bool
let logger: Logger

init(connection: Connection, logger: Logger) {
self.connection = connection
self.eventSent = false
self.logger = logger
}

// this is needed to detect when remote end closes connection while connection is in the pool idling
func channelInactive(context: ChannelHandlerContext) {
if !self.eventSent {
self.eventSent = true
self.connection.remoteClosed(logger: self.logger)
}
}

func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write {
if !self.eventSent {
self.eventSent = true
self.connection.timeout(logger: self.logger)
}
} else {
context.fireUserInboundEventTriggered(event)
}
}
}

extension Connection: CustomStringConvertible {
var description: String {
return "\(self.channel)"
}
}

struct ConnectionKey<ConnectionType>: Hashable where ConnectionType: PoolManageableConnection {
let connection: ConnectionType

init(_ connection: ConnectionType) {
self.connection = connection
}

static func == (lhs: ConnectionKey<ConnectionType>, rhs: ConnectionKey<ConnectionType>) -> Bool {
return ObjectIdentifier(lhs.connection) == ObjectIdentifier(rhs.connection)
}

func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self.connection))
}

func cancel() -> EventLoopFuture<Void> {
return self.connection.cancel()
}
}
142 changes: 7 additions & 135 deletions Sources/AsyncHTTPClient/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -163,101 +163,6 @@ final class ConnectionPool {
}
}

/// A `Connection` represents a `Channel` in the context of the connection pool
///
/// In the `ConnectionPool`, each `Channel` belongs to a given `HTTP1ConnectionProvider`
/// and has a certain "lease state" (see the `inUse` property).
/// The role of `Connection` is to model this by storing a `Channel` alongside its associated properties
/// so that they can be passed around together and correct provider can be identified when connection is released.
class Connection {
/// The provider this `Connection` belongs to.
///
/// This enables calling methods like `release()` directly on a `Connection` instead of
/// calling `provider.release(connection)`. This gives a more object oriented feel to the API
/// and can avoid having to keep explicit references to the pool at call site.
let provider: HTTP1ConnectionProvider

/// The `Channel` of this `Connection`
///
/// - Warning: Requests that lease connections from the `ConnectionPool` are responsible
/// for removing the specific handlers they added to the `Channel` pipeline before releasing it to the pool.
let channel: Channel

init(channel: Channel, provider: HTTP1ConnectionProvider) {
self.channel = channel
self.provider = provider
}

/// Convenience property indicating wether the underlying `Channel` is active or not.
var isActiveEstimation: Bool {
return self.channel.isActive
}

/// Release this `Connection` to its associated `HTTP1ConnectionProvider`.
///
/// - Warning: This only releases the connection and doesn't take care of cleaning handlers in the `Channel` pipeline.
func release(closing: Bool, logger: Logger) {
self.channel.eventLoop.assertInEventLoop()
self.provider.release(connection: self, closing: closing, logger: logger)
}

/// Called when channel exceeds idle time in pool.
func timeout(logger: Logger) {
self.channel.eventLoop.assertInEventLoop()
self.provider.timeout(connection: self, logger: logger)
}

/// Called when channel goes inactive while in the pool.
func remoteClosed(logger: Logger) {
self.channel.eventLoop.assertInEventLoop()
self.provider.remoteClosed(connection: self, logger: logger)
}

func cancel() -> EventLoopFuture<Void> {
return self.channel.triggerUserOutboundEvent(TaskCancelEvent())
}

/// Called from `HTTP1ConnectionProvider.close` when client is shutting down.
func close() -> EventLoopFuture<Void> {
return self.channel.close()
}

/// Sets idle timeout handler and channel inactivity listener.
func setIdleTimeout(timeout: TimeAmount?, logger: Logger) {
_ = self.channel.pipeline.addHandler(IdleStateHandler(writeTimeout: timeout), position: .first).flatMap { _ in
self.channel.pipeline.addHandler(IdlePoolConnectionHandler(connection: self,
logger: logger))
}
}

/// Removes idle timeout handler and channel inactivity listener
func cancelIdleTimeout() -> EventLoopFuture<Void> {
return self.removeHandler(IdleStateHandler.self).flatMap { _ in
self.removeHandler(IdlePoolConnectionHandler.self)
}
}
}

struct ConnectionKey: Hashable {
let connection: Connection

init(_ connection: Connection) {
self.connection = connection
}

static func == (lhs: ConnectionKey, rhs: ConnectionKey) -> Bool {
return ObjectIdentifier(lhs.connection) == ObjectIdentifier(rhs.connection)
}

func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self.connection))
}

func cancel() -> EventLoopFuture<Void> {
return self.connection.cancel()
}
}

/// A connection provider of `HTTP/1.1` connections with a given `Key` (host, scheme, port)
///
/// On top of enabling connection reuse this provider it also facilitates the creation
Expand Down Expand Up @@ -286,7 +191,7 @@ class HTTP1ConnectionProvider {

var closePromise: EventLoopPromise<Void>

var state: ConnectionsState
var state: ConnectionsState<Connection>

private let backgroundActivityLogger: Logger

Expand Down Expand Up @@ -317,7 +222,7 @@ class HTTP1ConnectionProvider {
self.state.assertInvariants()
}

func execute(_ action: Action, logger: Logger) {
func execute(_ action: Action<Connection>, logger: Logger) {
switch action {
case .lease(let connection, let waiter):
// if connection is became inactive, we create a new one.
Expand Down Expand Up @@ -392,7 +297,7 @@ class HTTP1ConnectionProvider {
func getConnection(preference: HTTPClient.EventLoopPreference,
setupComplete: EventLoopFuture<Void>,
logger: Logger) -> EventLoopFuture<Connection> {
let waiter = Waiter(promise: self.eventLoop.makePromise(), setupComplete: setupComplete, preference: preference)
let waiter = Waiter<Connection>(promise: self.eventLoop.makePromise(), setupComplete: setupComplete, preference: preference)

let action: Action = self.lock.withLock {
self.state.acquire(waiter: waiter)
Expand All @@ -404,10 +309,10 @@ class HTTP1ConnectionProvider {
}

func connect(_ result: Result<Channel, Error>,
waiter: Waiter,
waiter: Waiter<Connection>,
replacing closedConnection: Connection? = nil,
logger: Logger) {
let action: Action
let action: Action<Connection>
switch result {
case .success(let channel):
logger.trace("successfully created connection",
Expand Down Expand Up @@ -573,9 +478,9 @@ class HTTP1ConnectionProvider {
///
/// `Waiter`s are created when `maximumConcurrentConnections` is reached
/// and we cannot create new connections anymore.
struct Waiter {
struct Waiter<ConnectionType: PoolManageableConnection> {
/// The promise to complete once a connection is available
let promise: EventLoopPromise<Connection>
let promise: EventLoopPromise<ConnectionType>

/// Future that will be succeeded when request timeout handler and `TaskHandler` are added to the pipeline.
let setupComplete: EventLoopFuture<Void>
Expand All @@ -586,39 +491,6 @@ class HTTP1ConnectionProvider {
}
}

class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler {
typealias InboundIn = NIOAny

let connection: Connection
var eventSent: Bool
let logger: Logger

init(connection: Connection, logger: Logger) {
self.connection = connection
self.eventSent = false
self.logger = logger
}

// this is needed to detect when remote end closes connection while connection is in the pool idling
func channelInactive(context: ChannelHandlerContext) {
if !self.eventSent {
self.eventSent = true
self.connection.remoteClosed(logger: self.logger)
}
}

func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write {
if !self.eventSent {
self.eventSent = true
self.connection.timeout(logger: self.logger)
}
} else {
context.fireUserInboundEventTriggered(event)
}
}
}

extension CircularBuffer {
mutating func swap(at index: Index, with value: Element) -> Element {
let tmp = self[index]
Expand Down
Loading