Skip to content

Commit d949734

Browse files
committed
Add HTTP2 connection
1 parent b723844 commit d949734

File tree

4 files changed

+589
-2
lines changed

4 files changed

+589
-2
lines changed

Package.swift

+3-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ let package = Package(
2121
.library(name: "AsyncHTTPClient", targets: ["AsyncHTTPClient"]),
2222
],
2323
dependencies: [
24-
.package(url: "https://github.com/apple/swift-nio.git", from: "2.27.0"),
24+
.package(url: "https://github.com/apple/swift-nio.git", from: "2.29.0"),
25+
.package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.7.0"),
2526
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.13.0"),
2627
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.3.0"),
2728
.package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.5.1"),
@@ -30,7 +31,7 @@ let package = Package(
3031
targets: [
3132
.target(
3233
name: "AsyncHTTPClient",
33-
dependencies: ["NIO", "NIOHTTP1", "NIOSSL", "NIOConcurrencyHelpers", "NIOHTTPCompression",
34+
dependencies: ["NIO", "NIOHTTP1", "NIOHTTP2", "NIOSSL", "NIOConcurrencyHelpers", "NIOHTTPCompression",
3435
"NIOFoundationCompat", "NIOTransportServices", "Logging"]
3536
),
3637
.testTarget(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Logging
16+
import NIO
17+
import NIOHTTP1
18+
@_implementationOnly import NIOHTTP2
19+
20+
class HTTP2ClientRequestHandler: ChannelDuplexHandler {
21+
typealias OutboundIn = HTTPRequestTask
22+
typealias OutboundOut = HTTPClientRequestPart
23+
typealias InboundIn = HTTPClientResponsePart
24+
25+
var channelContext: ChannelHandlerContext!
26+
27+
var state: HTTP1ConnectionStateMachine = .init() {
28+
didSet {
29+
self.logger.trace("Connection state did change", metadata: [
30+
"state": "\(String(describing: self.state))",
31+
])
32+
}
33+
}
34+
35+
var task: HTTPRequestTask!
36+
37+
let logger: Logger
38+
39+
init(logger: Logger) {
40+
self.logger = logger
41+
}
42+
43+
func channelActive(context: ChannelHandlerContext) {
44+
let action = self.state.channelActive(isWritable: context.channel.isWritable)
45+
self.run(action, context: context)
46+
}
47+
48+
func handlerAdded(context: ChannelHandlerContext) {
49+
self.channelContext = context
50+
51+
let action = self.state.channelActive(isWritable: context.channel.isWritable)
52+
self.run(action, context: context)
53+
}
54+
55+
func handlerRemoved(context: ChannelHandlerContext) {
56+
self.channelContext = nil
57+
}
58+
59+
func channelWritabilityChanged(context: ChannelHandlerContext) {
60+
self.logger.trace("Channel writability changed", metadata: [
61+
"writable": "\(context.channel.isWritable)",
62+
])
63+
64+
let action = self.state.writabilityChanged(writable: context.channel.isWritable)
65+
self.run(action, context: context)
66+
}
67+
68+
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
69+
self.logger.trace("Write")
70+
71+
#warning("fixme: We need to have good idle state handling here!")
72+
self.task = self.unwrapOutboundIn(data)
73+
74+
let action = self.state.runNewRequest(idleReadTimeout: self.task!.idleReadTimeout)
75+
self.run(action, context: context)
76+
}
77+
78+
func read(context: ChannelHandlerContext) {
79+
self.logger.trace("Read")
80+
81+
let action = self.state.readEventCaught()
82+
self.run(action, context: context)
83+
}
84+
85+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
86+
let httpPart = unwrapInboundIn(data)
87+
88+
self.logger.trace("Message received", metadata: [
89+
"message": "\(httpPart)",
90+
])
91+
92+
let action: HTTP1ConnectionStateMachine.Action
93+
switch httpPart {
94+
case .head(let head):
95+
action = self.state.receivedHTTPResponseHead(head)
96+
case .body(let buffer):
97+
action = self.state.receivedHTTPResponseBodyPart(buffer)
98+
case .end:
99+
action = self.state.receivedHTTPResponseEnd()
100+
}
101+
102+
self.run(action, context: context)
103+
}
104+
105+
func errorCaught(context: ChannelHandlerContext, error: Error) {
106+
self.logger.trace("Error caught", metadata: [
107+
"error": "\(error)",
108+
])
109+
}
110+
111+
func produceMoreResponseBodyParts(for task: HTTPRequestTask) {
112+
// ensure the message is received on correct eventLoop
113+
guard self.channelContext.eventLoop.inEventLoop else {
114+
return self.channelContext.eventLoop.execute {
115+
self.produceMoreResponseBodyParts(for: task)
116+
}
117+
}
118+
119+
guard self.task === task else {
120+
// very likely we got threading issues here...
121+
return
122+
}
123+
124+
self.logger.trace("Downstream requests more response body data")
125+
126+
let action = self.state.forwardMoreBodyParts()
127+
self.run(action, context: self.channelContext)
128+
}
129+
130+
// MARK: - Run Actions
131+
132+
func run(_ action: HTTP1ConnectionStateMachine.Action, context: ChannelHandlerContext) {
133+
// switch action {
134+
// case .verifyRequest:
135+
// do {
136+
// let head = try self.verifyRequest(request: self.task.request)
137+
// let action = self.state.requestVerified(head)
138+
// self.run(action, context: context)
139+
// } catch {
140+
// preconditionFailure("Create error here")
141+
// //self.state.failed
142+
// }
143+
// case .sendRequestHead(let head, let andEnd):
144+
// self.sendRequestHead(head, context: context)
145+
//
146+
// case .produceMoreRequestBodyData:
147+
// self.produceNextRequestBodyPart(context: context)
148+
//
149+
// case .sendBodyPart(let part, produceMoreRequestBodyData: let produceMore):
150+
// self.sendRequestBodyPart(part, context: context)
151+
//
152+
// if produceMore {
153+
// self.produceNextRequestBodyPart(context: context)
154+
// }
155+
//
156+
// case .sendRequestEnd:
157+
// self.sendRequestEnd(context: context)
158+
//
159+
// case .read:
160+
// context.read()
161+
//
162+
// case .wait:
163+
// break
164+
//
165+
// case .fireChannelActive:
166+
// break
167+
//
168+
// case .fireChannelInactive:
169+
// break
170+
//
171+
// case .forwardResponseHead(let head):
172+
// self.task.receiveResponseHead(head, source: self)
173+
//
174+
// case .forwardResponseBodyPart(let buffer):
175+
// self.task.receiveResponseBodyPart(buffer)
176+
//
177+
// case .forwardResponseEndAndCloseConnection:
178+
// self.task.receiveResponseEnd()
179+
// self.task = nil
180+
// context.close(mode: .all, promise: nil)
181+
//
182+
// case .forwardResponseEndAndFireTaskCompleted(let read):
183+
// self.task.receiveResponseEnd()
184+
// self.task = nil
185+
//
186+
// if read {
187+
// context.read()
188+
// }
189+
//
190+
// case .forwardError(let error, closeConnection: let closeConnection):
191+
// self.task.fail(error)
192+
// self.task = nil
193+
// if closeConnection {
194+
// context.close(promise: nil)
195+
// }
196+
// }
197+
}
198+
199+
// MARK: - Private Methods -
200+
201+
private func verifyRequest(request: HTTPClient.Request) throws -> HTTPRequestHead {
202+
var headers = request.headers
203+
204+
if !headers.contains(name: "host") {
205+
let port = request.port
206+
var host = request.host
207+
if !(port == 80 && request.scheme == "http"), !(port == 443 && request.scheme == "https") {
208+
host += ":\(port)"
209+
}
210+
headers.add(name: "host", value: host)
211+
}
212+
213+
do {
214+
try headers.validate(method: request.method, body: request.body)
215+
} catch {
216+
preconditionFailure("Unimplemented: We should go for an early exit here!")
217+
}
218+
219+
let head = HTTPRequestHead(
220+
version: .http1_1,
221+
method: request.method,
222+
uri: request.uri,
223+
headers: headers
224+
)
225+
226+
// 3. preparing to send body
227+
228+
if head.headers[canonicalForm: "connection"].map({ $0.lowercased() }).contains("close") {
229+
// self.closing = true
230+
}
231+
// This assert can go away when (if ever!) the above `if` correctly handles other HTTP versions. For example
232+
// in HTTP/1.0, we need to treat the absence of a 'connection: keep-alive' as a close too.
233+
assert(head.version == HTTPVersion(major: 1, minor: 1),
234+
"Sending a request in HTTP version \(head.version) which is unsupported by the above `if`")
235+
236+
return head
237+
}
238+
239+
private func sendRequestHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
240+
// context.writeAndFlush(wrapOutboundOut(.head(head)), promise: nil)
241+
//
242+
// let action = self.state.requestHeadSent()
243+
// self.run(action, context: context)
244+
}
245+
246+
private func sendRequestBodyPart(_ part: IOData, context: ChannelHandlerContext) {
247+
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: nil)
248+
}
249+
250+
private func sendRequestEnd(context: ChannelHandlerContext) {
251+
context.writeAndFlush(wrapOutboundOut(.end(nil)), promise: nil)
252+
}
253+
254+
private func produceNextRequestBodyPart(context: ChannelHandlerContext) {
255+
// self.task.nextRequestBodyPart(channelEL: context.eventLoop)
256+
// .hop(to: context.eventLoop)
257+
// .whenComplete() { result in
258+
// let action: HTTP1ConnectionStateMachine.Action
259+
// switch result {
260+
// case .success(.some(let part)):
261+
// action = self.state.requestStreamPartReceived(part)
262+
// case .success(.none):
263+
// action = self.state.requestStreamFinished()
264+
// case .failure(let error):
265+
// action = self.state.requestStreamFailed(error)
266+
// }
267+
// self.run(action, context: context)
268+
// }
269+
}
270+
}

0 commit comments

Comments
 (0)