Skip to content

Commit 0bce647

Browse files
authored
[HTTPConnectionPool] Fix timer action races (AsyncHttpClient#434)
- `_idleTimer` and `_backoffTimer` are protected by `stateLock` - Added a new `struct Actions` that splits up actions from the state machine into actions that need to be executed inside the `stateLock` and outside in `stateLock` - Add HTTP/1.1 connection pool stress test
1 parent d45fa9a commit 0bce647

File tree

3 files changed

+198
-73
lines changed

3 files changed

+198
-73
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift

Lines changed: 134 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,17 @@ protocol HTTPConnectionPoolDelegate {
2424
final class HTTPConnectionPool {
2525
private let stateLock = Lock()
2626
private var _state: StateMachine
27+
/// The connection idle timeout timers. Protected by the stateLock
28+
private var _idleTimer = [Connection.ID: Scheduled<Void>]()
29+
/// The connection backoff timeout timers. Protected by the stateLock
30+
private var _backoffTimer = [Connection.ID: Scheduled<Void>]()
2731

2832
private static let fallbackConnectTimeout: TimeAmount = .seconds(30)
2933

3034
let key: ConnectionPool.Key
3135

3236
private let timerLock = Lock()
3337
private var _requestTimer = [Request.ID: Scheduled<Void>]()
34-
private var _idleTimer = [Connection.ID: Scheduled<Void>]()
35-
private var _backoffTimer = [Connection.ID: Scheduled<Void>]()
3638

3739
private var logger: Logger
3840

@@ -75,32 +77,91 @@ final class HTTPConnectionPool {
7577
}
7678

7779
func executeRequest(_ request: HTTPSchedulableRequest) {
78-
let action = self.stateLock.withLock { () -> StateMachine.Action in
79-
self._state.executeRequest(.init(request))
80-
}
81-
self.run(action: action)
80+
self.modifyStateAndRunActions { $0.executeRequest(.init(request)) }
8281
}
8382

8483
func shutdown() {
85-
let action = self.stateLock.withLock { () -> StateMachine.Action in
86-
self._state.shutdown()
84+
self.modifyStateAndRunActions { $0.shutdown() }
85+
}
86+
87+
// MARK: - Private Methods -
88+
89+
// MARK: Actions
90+
91+
/// An `HTTPConnectionPool` internal action type that matches the `StateMachine`'s action.
92+
/// However it splits up the actions into actions that need to be executed inside the `stateLock`
93+
/// and outside the `stateLock`.
94+
private struct Actions {
95+
enum ConnectionAction {
96+
enum Unlocked {
97+
case createConnection(Connection.ID, on: EventLoop)
98+
case closeConnection(Connection, isShutdown: StateMachine.ConnectionAction.IsShutdown)
99+
case cleanupConnections(CleanupContext, isShutdown: StateMachine.ConnectionAction.IsShutdown)
100+
case none
101+
}
102+
103+
enum Locked {
104+
case scheduleBackoffTimer(Connection.ID, backoff: TimeAmount, on: EventLoop)
105+
case cancelBackoffTimers([Connection.ID])
106+
case scheduleTimeoutTimer(Connection.ID, on: EventLoop)
107+
case cancelTimeoutTimer(Connection.ID)
108+
case none
109+
}
110+
}
111+
112+
struct Locked {
113+
var connection: ConnectionAction.Locked
114+
}
115+
116+
struct Unlocked {
117+
var connection: ConnectionAction.Unlocked
118+
var request: StateMachine.RequestAction
119+
}
120+
121+
var locked: Locked
122+
var unlocked: Unlocked
123+
124+
init(from stateMachineAction: StateMachine.Action) {
125+
self.locked = Locked(connection: .none)
126+
self.unlocked = Unlocked(connection: .none, request: stateMachineAction.request)
127+
128+
switch stateMachineAction.connection {
129+
case .createConnection(let connectionID, on: let eventLoop):
130+
self.unlocked.connection = .createConnection(connectionID, on: eventLoop)
131+
case .scheduleBackoffTimer(let connectionID, backoff: let backoff, on: let eventLoop):
132+
self.locked.connection = .scheduleBackoffTimer(connectionID, backoff: backoff, on: eventLoop)
133+
case .scheduleTimeoutTimer(let connectionID, on: let eventLoop):
134+
self.locked.connection = .scheduleTimeoutTimer(connectionID, on: eventLoop)
135+
case .cancelTimeoutTimer(let connectionID):
136+
self.locked.connection = .cancelTimeoutTimer(connectionID)
137+
case .closeConnection(let connection, isShutdown: let isShutdown):
138+
self.unlocked.connection = .closeConnection(connection, isShutdown: isShutdown)
139+
case .cleanupConnections(var cleanupContext, isShutdown: let isShutdown):
140+
//
141+
self.locked.connection = .cancelBackoffTimers(cleanupContext.connectBackoff)
142+
cleanupContext.connectBackoff = []
143+
self.unlocked.connection = .cleanupConnections(cleanupContext, isShutdown: isShutdown)
144+
case .none:
145+
break
146+
}
87147
}
88-
self.run(action: action)
89148
}
90149

91150
// MARK: Run actions
92151

93-
private func run(action: StateMachine.Action) {
94-
self.runConnectionAction(action.connection)
95-
self.runRequestAction(action.request)
152+
private func modifyStateAndRunActions(_ closure: (inout StateMachine) -> StateMachine.Action) {
153+
let unlockedActions = self.stateLock.withLock { () -> Actions.Unlocked in
154+
let stateMachineAction = closure(&self._state)
155+
let poolAction = Actions(from: stateMachineAction)
156+
self.runLockedActions(poolAction.locked)
157+
return poolAction.unlocked
158+
}
159+
self.runUnlockedActions(unlockedActions)
96160
}
97161

98-
private func runConnectionAction(_ action: StateMachine.ConnectionAction) {
99-
switch action {
100-
case .createConnection(let connectionID, let eventLoop):
101-
self.createConnection(connectionID, on: eventLoop)
102-
103-
case .scheduleBackoffTimer(let connectionID, let backoff, on: let eventLoop):
162+
private func runLockedActions(_ actions: Actions.Locked) {
163+
switch actions.connection {
164+
case .scheduleBackoffTimer(let connectionID, backoff: let backoff, on: let eventLoop):
104165
self.scheduleConnectionStartBackoffTimer(connectionID, backoff, on: eventLoop)
105166

106167
case .scheduleTimeoutTimer(let connectionID, on: let eventLoop):
@@ -109,6 +170,26 @@ final class HTTPConnectionPool {
109170
case .cancelTimeoutTimer(let connectionID):
110171
self.cancelIdleTimerForConnection(connectionID)
111172

173+
case .cancelBackoffTimers(let connectionIDs):
174+
for connectionID in connectionIDs {
175+
self.cancelConnectionStartBackoffTimer(connectionID)
176+
}
177+
178+
case .none:
179+
break
180+
}
181+
}
182+
183+
private func runUnlockedActions(_ actions: Actions.Unlocked) {
184+
self.runUnlockedConnectionAction(actions.connection)
185+
self.runUnlockedRequestAction(actions.request)
186+
}
187+
188+
private func runUnlockedConnectionAction(_ action: Actions.ConnectionAction.Unlocked) {
189+
switch action {
190+
case .createConnection(let connectionID, let eventLoop):
191+
self.createConnection(connectionID, on: eventLoop)
192+
112193
case .closeConnection(let connection, isShutdown: let isShutdown):
113194
self.logger.trace("close connection", metadata: [
114195
"ahc-connection-id": "\(connection.id)",
@@ -143,7 +224,7 @@ final class HTTPConnectionPool {
143224
}
144225
}
145226

146-
private func runRequestAction(_ action: StateMachine.RequestAction) {
227+
private func runUnlockedRequestAction(_ action: StateMachine.RequestAction) {
147228
// The order of execution fail/execute request vs cancelling the request timeout timer does
148229
// not matter in the actions here. The actions don't cause any side effects that will be
149230
// reported back to the state machine and are not dependent on each other.
@@ -215,11 +296,9 @@ final class HTTPConnectionPool {
215296
guard timeoutFired else { return }
216297

217298
// 3. Tell the state machine about the timeout
218-
let action = self.stateLock.withLock {
219-
self._state.timeoutRequest(requestID)
299+
self.modifyStateAndRunActions {
300+
$0.timeoutRequest(requestID)
220301
}
221-
222-
self.run(action: action)
223302
}
224303

225304
self.timerLock.withLockVoid {
@@ -254,34 +333,27 @@ final class HTTPConnectionPool {
254333
let scheduled = eventLoop.scheduleTask(in: self.idleConnectionTimeout) {
255334
// there might be a race between a cancelTimer call and the triggering
256335
// of this scheduled task. both want to acquire the lock
257-
let timerExisted = self.timerLock.withLock {
258-
self._idleTimer.removeValue(forKey: connectionID) != nil
336+
self.modifyStateAndRunActions { stateMachine in
337+
if self._idleTimer.removeValue(forKey: connectionID) != nil {
338+
// The timer still exists. State Machines assumes it is alive
339+
return stateMachine.connectionIdleTimeout(connectionID)
340+
}
341+
return .none
259342
}
260-
261-
guard timerExisted else { return }
262-
263-
let action = self.stateLock.withLock {
264-
self._state.connectionIdleTimeout(connectionID)
265-
}
266-
self.run(action: action)
267343
}
268344

269-
self.timerLock.withLock {
270-
assert(self._idleTimer[connectionID] == nil)
271-
self._idleTimer[connectionID] = scheduled
272-
}
345+
assert(self._idleTimer[connectionID] == nil)
346+
self._idleTimer[connectionID] = scheduled
273347
}
274348

275349
private func cancelIdleTimerForConnection(_ connectionID: Connection.ID) {
276350
self.logger.trace("Cancel idle connection timeout timer", metadata: [
277351
"ahc-connection-id": "\(connectionID)",
278352
])
279-
280-
let cancelTimer = self.timerLock.withLock {
281-
self._idleTimer.removeValue(forKey: connectionID)
353+
guard let cancelTimer = self._idleTimer.removeValue(forKey: connectionID) else {
354+
preconditionFailure("Expected to have an idle timer for connection \(connectionID) at this point.")
282355
}
283-
284-
cancelTimer?.cancel()
356+
cancelTimer.cancel()
285357
}
286358

287359
private func scheduleConnectionStartBackoffTimer(
@@ -295,30 +367,24 @@ final class HTTPConnectionPool {
295367

296368
let scheduled = eventLoop.scheduleTask(in: timeAmount) {
297369
// there might be a race between a backoffTimer and the pool shutting down.
298-
let timerExisted = self.timerLock.withLock {
299-
self._backoffTimer.removeValue(forKey: connectionID) != nil
300-
}
301-
302-
guard timerExisted else { return }
303-
304-
let action = self.stateLock.withLock {
305-
self._state.connectionCreationBackoffDone(connectionID)
370+
self.modifyStateAndRunActions { stateMachine in
371+
if self._backoffTimer.removeValue(forKey: connectionID) != nil {
372+
// The timer still exists. State Machines assumes it is alive
373+
return stateMachine.connectionCreationBackoffDone(connectionID)
374+
}
375+
return .none
306376
}
307-
self.run(action: action)
308377
}
309378

310-
self.timerLock.withLock {
311-
assert(self._backoffTimer[connectionID] == nil)
312-
self._backoffTimer[connectionID] = scheduled
313-
}
379+
assert(self._backoffTimer[connectionID] == nil)
380+
self._backoffTimer[connectionID] = scheduled
314381
}
315382

316383
private func cancelConnectionStartBackoffTimer(_ connectionID: Connection.ID) {
317-
let backoffTimer = self.timerLock.withLock {
318-
self._backoffTimer[connectionID]
384+
guard let backoffTimer = self._backoffTimer.removeValue(forKey: connectionID) else {
385+
preconditionFailure("Expected to have a backoff timer for connection \(connectionID) at this point.")
319386
}
320-
321-
backoffTimer?.cancel()
387+
backoffTimer.cancel()
322388
}
323389
}
324390

@@ -330,10 +396,9 @@ extension HTTPConnectionPool: HTTPConnectionRequester {
330396
"ahc-connection-id": "\(connection.id)",
331397
"ahc-http-version": "http/1.1",
332398
])
333-
let action = self.stateLock.withLock {
334-
self._state.newHTTP1ConnectionCreated(.http1_1(connection))
399+
self.modifyStateAndRunActions {
400+
$0.newHTTP1ConnectionCreated(.http1_1(connection))
335401
}
336-
self.run(action: action)
337402
}
338403

339404
func http2ConnectionCreated(_ connection: HTTP2Connection, maximumStreams: Int) {
@@ -356,10 +421,9 @@ extension HTTPConnectionPool: HTTPConnectionRequester {
356421
"ahc-error": "\(error)",
357422
"ahc-connection-id": "\(connectionID)",
358423
])
359-
let action = self.stateLock.withLock {
360-
self._state.failedToCreateNewConnection(error, connectionID: connectionID)
424+
self.modifyStateAndRunActions {
425+
$0.failedToCreateNewConnection(error, connectionID: connectionID)
361426
}
362-
self.run(action: action)
363427
}
364428
}
365429

@@ -369,21 +433,19 @@ extension HTTPConnectionPool: HTTP1ConnectionDelegate {
369433
"ahc-connection-id": "\(connection.id)",
370434
"ahc-http-version": "http/1.1",
371435
])
372-
let action = self.stateLock.withLock {
373-
self._state.connectionClosed(connection.id)
436+
self.modifyStateAndRunActions {
437+
$0.connectionClosed(connection.id)
374438
}
375-
self.run(action: action)
376439
}
377440

378441
func http1ConnectionReleased(_ connection: HTTP1Connection) {
379442
self.logger.trace("releasing connection", metadata: [
380443
"ahc-connection-id": "\(connection.id)",
381444
"ahc-http-version": "http/1.1",
382445
])
383-
let action = self.stateLock.withLock {
384-
self._state.http1ConnectionReleased(connection.id)
446+
self.modifyStateAndRunActions {
447+
$0.http1ConnectionReleased(connection.id)
385448
}
386-
self.run(action: action)
387449
}
388450
}
389451

@@ -416,10 +478,9 @@ extension HTTPConnectionPool: HTTP2ConnectionDelegate {
416478
extension HTTPConnectionPool: HTTPRequestScheduler {
417479
func cancelRequest(_ request: HTTPSchedulableRequest) {
418480
let requestID = Request(request).id
419-
let action = self.stateLock.withLock {
420-
self._state.cancelRequest(requestID)
481+
self.modifyStateAndRunActions {
482+
$0.cancelRequest(requestID)
421483
}
422-
self.run(action: action)
423484
}
424485
}
425486

Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests+XCTest.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ extension HTTPConnectionPoolTests {
3232
("testConnectionCreationIsRetriedUntilPoolIsShutdown", testConnectionCreationIsRetriedUntilPoolIsShutdown),
3333
("testConnectionCreationIsRetriedUntilRequestIsCancelled", testConnectionCreationIsRetriedUntilRequestIsCancelled),
3434
("testConnectionShutdownIsCalledOnActiveConnections", testConnectionShutdownIsCalledOnActiveConnections),
35+
("testConnectionPoolStressResistanceHTTP1", testConnectionPoolStressResistanceHTTP1),
3536
]
3637
}
3738
}

0 commit comments

Comments
 (0)