Skip to content

Commit 5db7719

Browse files
authored
async/await execute (#524)
* async/await execute * remove default length for `HTTPClientRequest.Body` * make redirect logic iterative * move Task creation into `TransactionCancelHandler`
1 parent bf5668c commit 5db7719

File tree

6 files changed

+239
-18
lines changed

6 files changed

+239
-18
lines changed
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
#if compiler(>=5.5) && canImport(_Concurrency)
16+
import struct Foundation.URL
17+
import Logging
18+
import NIOCore
19+
import NIOHTTP1
20+
21+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
22+
extension HTTPClient {
23+
/// Execute arbitrary HTTP requests.
24+
///
25+
/// - Parameters:
26+
/// - request: HTTP request to execute.
27+
/// - deadline: Point in time by which the request must complete.
28+
/// - logger: The logger to use for this request.
29+
/// - Returns: The response to the request. Note that the `body` of the response may not yet have been fully received.
30+
func execute(
31+
_ request: HTTPClientRequest,
32+
deadline: NIODeadline,
33+
logger: Logger
34+
) async throws -> HTTPClientResponse {
35+
try await self.executeAndFollowRedirectsIfNeeded(
36+
request,
37+
deadline: deadline,
38+
logger: logger,
39+
redirectState: RedirectState(self.configuration.redirectConfiguration.mode, initialURL: request.url)
40+
)
41+
}
42+
}
43+
44+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
45+
extension HTTPClient {
46+
private func executeAndFollowRedirectsIfNeeded(
47+
_ request: HTTPClientRequest,
48+
deadline: NIODeadline,
49+
logger: Logger,
50+
redirectState: RedirectState?
51+
) async throws -> HTTPClientResponse {
52+
var currentRequest = request
53+
var currentRedirectState = redirectState
54+
55+
// this loop is there to follow potential redirects
56+
while true {
57+
let preparedRequest = try HTTPClientRequest.Prepared(currentRequest)
58+
let response = try await executeCancellable(preparedRequest, deadline: deadline, logger: logger)
59+
60+
guard var redirectState = currentRedirectState else {
61+
// a `nil` redirectState means we should not follow redirects
62+
return response
63+
}
64+
65+
guard let redirectURL = response.headers.extractRedirectTarget(
66+
status: response.status,
67+
originalURL: preparedRequest.url,
68+
originalScheme: preparedRequest.poolKey.scheme
69+
) else {
70+
// response does not want a redirect
71+
return response
72+
}
73+
74+
// validate that we do not exceed any limits or are running circles
75+
try redirectState.redirect(to: redirectURL.absoluteString)
76+
currentRedirectState = redirectState
77+
78+
let newRequest = preparedRequest.followingRedirect(to: redirectURL, status: response.status)
79+
80+
guard newRequest.body.canBeConsumedMultipleTimes else {
81+
// we already send the request body and it cannot be send again
82+
return response
83+
}
84+
85+
currentRequest = newRequest
86+
}
87+
}
88+
89+
private func executeCancellable(
90+
_ request: HTTPClientRequest.Prepared,
91+
deadline: NIODeadline,
92+
logger: Logger
93+
) async throws -> HTTPClientResponse {
94+
let cancelHandler = TransactionCancelHandler()
95+
96+
return try await withTaskCancellationHandler(operation: { () async throws -> HTTPClientResponse in
97+
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<HTTPClientResponse, Swift.Error>) -> Void in
98+
let transaction = Transaction(
99+
request: request,
100+
requestOptions: .init(idleReadTimeout: nil),
101+
logger: logger,
102+
connectionDeadline: deadline,
103+
preferredEventLoop: self.eventLoopGroup.next(),
104+
responseContinuation: continuation
105+
)
106+
107+
cancelHandler.registerTransaction(transaction)
108+
109+
self.poolManager.executeRequest(transaction)
110+
}
111+
}, onCancel: {
112+
cancelHandler.cancel()
113+
})
114+
}
115+
}
116+
117+
/// There is currently no good way to asynchronously cancel an object that is initiated inside the `body` closure of `with*Continuation`.
118+
/// As a workaround we use `TransactionCancelHandler` which will take care of the race between instantiation of `Transaction`
119+
/// in the `body` closure and cancelation from the `onCancel` closure of `withTaskCancellationHandler`.
120+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
121+
private actor TransactionCancelHandler {
122+
private enum State {
123+
case initialised
124+
case register(Transaction)
125+
case cancelled
126+
}
127+
128+
private var state: State = .initialised
129+
130+
init() {}
131+
132+
private func _registerTransaction(_ transaction: Transaction) {
133+
switch self.state {
134+
case .initialised:
135+
self.state = .register(transaction)
136+
case .cancelled:
137+
transaction.cancel()
138+
case .register:
139+
preconditionFailure("transaction already set")
140+
}
141+
}
142+
143+
nonisolated func registerTransaction(_ transaction: Transaction) {
144+
Task {
145+
await self._registerTransaction(transaction)
146+
}
147+
}
148+
149+
private func _cancel() {
150+
switch self.state {
151+
case .register(let bag):
152+
self.state = .cancelled
153+
bag.cancel()
154+
case .cancelled:
155+
break
156+
case .initialised:
157+
self.state = .cancelled
158+
}
159+
}
160+
161+
nonisolated func cancel() {
162+
Task {
163+
await self._cancel()
164+
}
165+
}
166+
}
167+
168+
#endif

Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest+Prepared.swift

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
#if compiler(>=5.5) && canImport(_Concurrency)
16+
import struct Foundation.URL
1617
import NIOHTTP1
1718

1819
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
1920
extension HTTPClientRequest {
2021
struct Prepared {
22+
var url: URL
2123
var poolKey: ConnectionPool.Key
2224
var requestFramingMetadata: RequestFramingMetadata
2325
var head: HTTPRequestHead
@@ -28,22 +30,27 @@ extension HTTPClientRequest {
2830
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
2931
extension HTTPClientRequest.Prepared {
3032
init(_ request: HTTPClientRequest) throws {
31-
let url = try DeconstructedURL(url: request.url)
33+
guard let url = URL(string: request.url) else {
34+
throw HTTPClientError.invalidURL
35+
}
36+
37+
let deconstructedURL = try DeconstructedURL(url: url)
3238

3339
var headers = request.headers
34-
headers.addHostIfNeeded(for: url)
40+
headers.addHostIfNeeded(for: deconstructedURL)
3541
let metadata = try headers.validateAndSetTransportFraming(
3642
method: request.method,
3743
bodyLength: .init(request.body)
3844
)
3945

4046
self.init(
41-
poolKey: .init(url: url, tlsConfiguration: nil),
47+
url: url,
48+
poolKey: .init(url: deconstructedURL, tlsConfiguration: nil),
4249
requestFramingMetadata: metadata,
4350
head: .init(
4451
version: .http1_1,
4552
method: request.method,
46-
uri: url.uri,
53+
uri: deconstructedURL.uri,
4754
headers: headers
4855
),
4956
body: request.body
@@ -59,12 +66,31 @@ extension RequestBodyLength {
5966
self = .fixed(length: 0)
6067
case .byteBuffer(let buffer):
6168
self = .fixed(length: buffer.readableBytes)
62-
case .sequence(nil, _), .asyncSequence(nil, _):
69+
case .sequence(nil, _, _), .asyncSequence(nil, _):
6370
self = .dynamic
64-
case .sequence(.some(let length), _), .asyncSequence(.some(let length), _):
71+
case .sequence(.some(let length), _, _), .asyncSequence(.some(let length), _):
6572
self = .fixed(length: length)
6673
}
6774
}
6875
}
6976

77+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
78+
extension HTTPClientRequest.Prepared {
79+
func followingRedirect(to redirectURL: URL, status: HTTPResponseStatus) -> HTTPClientRequest {
80+
let (method, headers, body) = transformRequestForRedirect(
81+
from: self.url,
82+
method: self.head.method,
83+
headers: self.head.headers,
84+
body: self.body,
85+
to: redirectURL,
86+
status: status
87+
)
88+
var newRequest = HTTPClientRequest(url: redirectURL.absoluteString)
89+
newRequest.method = method
90+
newRequest.headers = headers
91+
newRequest.body = body
92+
return newRequest
93+
}
94+
}
95+
7096
#endif

Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ extension HTTPClientRequest {
3737
struct Body {
3838
internal enum Mode {
3939
case asyncSequence(length: Int?, (ByteBufferAllocator) async throws -> ByteBuffer?)
40-
case sequence(length: Int?, (ByteBufferAllocator) -> ByteBuffer)
40+
case sequence(length: Int?, canBeConsumedMultipleTimes: Bool, (ByteBufferAllocator) -> ByteBuffer)
4141
case byteBuffer(ByteBuffer)
4242
}
4343

@@ -57,10 +57,25 @@ extension HTTPClientRequest.Body {
5757

5858
@inlinable
5959
static func bytes<Bytes: Sequence>(
60-
length: Int? = nil,
60+
length: Int?,
6161
_ bytes: Bytes
6262
) -> Self where Bytes.Element == UInt8 {
63-
self.init(.sequence(length: length) { allocator in
63+
self.init(.sequence(length: length, canBeConsumedMultipleTimes: false) { allocator in
64+
if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) {
65+
// fastpath
66+
return buffer
67+
}
68+
// potentially really slow path
69+
return allocator.buffer(bytes: bytes)
70+
})
71+
}
72+
73+
@inlinable
74+
static func bytes<Bytes: Collection>(
75+
length: Int?,
76+
_ bytes: Bytes
77+
) -> Self where Bytes.Element == UInt8 {
78+
self.init(.sequence(length: length, canBeConsumedMultipleTimes: true) { allocator in
6479
if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) {
6580
// fastpath
6681
return buffer
@@ -74,7 +89,7 @@ extension HTTPClientRequest.Body {
7489
static func bytes<Bytes: RandomAccessCollection>(
7590
_ bytes: Bytes
7691
) -> Self where Bytes.Element == UInt8 {
77-
self.init(.sequence(length: bytes.count) { allocator in
92+
self.init(.sequence(length: bytes.count, canBeConsumedMultipleTimes: true) { allocator in
7893
if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) {
7994
// fastpath
8095
return buffer
@@ -86,7 +101,7 @@ extension HTTPClientRequest.Body {
86101

87102
@inlinable
88103
static func stream<SequenceOfBytes: AsyncSequence>(
89-
length: Int? = nil,
104+
length: Int?,
90105
_ sequenceOfBytes: SequenceOfBytes
91106
) -> Self where SequenceOfBytes.Element == ByteBuffer {
92107
var iterator = sequenceOfBytes.makeAsyncIterator()
@@ -98,11 +113,11 @@ extension HTTPClientRequest.Body {
98113

99114
@inlinable
100115
static func stream<Bytes: AsyncSequence>(
101-
length: Int? = nil,
116+
length: Int?,
102117
_ bytes: Bytes
103118
) -> Self where Bytes.Element == UInt8 {
104119
var iterator = bytes.makeAsyncIterator()
105-
let body = self.init(.asyncSequence(length: nil) { allocator -> ByteBuffer? in
120+
let body = self.init(.asyncSequence(length: length) { allocator -> ByteBuffer? in
106121
var buffer = allocator.buffer(capacity: 1024) // TODO: Magic number
107122
while buffer.writableBytes > 0, let byte = try await iterator.next() {
108123
buffer.writeInteger(byte)
@@ -116,4 +131,16 @@ extension HTTPClientRequest.Body {
116131
}
117132
}
118133

134+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
135+
extension Optional where Wrapped == HTTPClientRequest.Body {
136+
internal var canBeConsumedMultipleTimes: Bool {
137+
switch self?.mode {
138+
case .none: return true
139+
case .byteBuffer: return true
140+
case .sequence(_, let canBeConsumedMultipleTimes, _): return canBeConsumedMultipleTimes
141+
case .asyncSequence: return false
142+
}
143+
}
144+
}
145+
119146
#endif

Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ extension Transaction: HTTPExecutableRequest {
203203
case .none:
204204
break
205205

206-
case .sequence(_, let create):
206+
case .sequence(_, _, let create):
207207
let byteBuffer = create(allocator)
208208
self.writeOnceAndOneTimeOnly(byteBuffer: byteBuffer)
209209
}

Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ class HTTPClientRequestTests: XCTestCase {
411411
.asAsyncSequence()
412412
.map { ByteBuffer($0) }
413413

414-
request.body = .stream(asyncSequence)
414+
request.body = .stream(length: nil, asyncSequence)
415415
var preparedRequest: PreparedRequest?
416416
XCTAssertNoThrow(preparedRequest = try PreparedRequest(request))
417417
guard let preparedRequest = preparedRequest else { return }
@@ -498,7 +498,7 @@ extension Optional where Wrapped == HTTPClientRequest.Body {
498498
return ByteBuffer()
499499
case .byteBuffer(let buffer):
500500
return buffer
501-
case .sequence(let announcedLength, let generate):
501+
case .sequence(let announcedLength, _, let generate):
502502
let buffer = generate(ByteBufferAllocator())
503503
if let announcedLength = announcedLength,
504504
announcedLength != buffer.readableBytes {

Tests/AsyncHTTPClientTests/TransactionTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ final class TransactionTests: XCTestCase {
189189

190190
var request = HTTPClientRequest(url: "https://localhost/")
191191
request.method = .POST
192-
request.body = .stream(streamWriter)
192+
request.body = .stream(length: nil, streamWriter)
193193

194194
var maybePreparedRequest: PreparedRequest?
195195
XCTAssertNoThrow(maybePreparedRequest = try PreparedRequest(request))
@@ -318,7 +318,7 @@ final class TransactionTests: XCTestCase {
318318

319319
var request = HTTPClientRequest(url: "https://localhost/")
320320
request.method = .POST
321-
request.body = .bytes("Hello world!".utf8)
321+
request.body = .bytes(length: nil, "Hello world!".utf8)
322322
var maybePreparedRequest: PreparedRequest?
323323
XCTAssertNoThrow(maybePreparedRequest = try PreparedRequest(request))
324324
guard let preparedRequest = maybePreparedRequest else {

0 commit comments

Comments
 (0)