@@ -38,36 +38,27 @@ final class MockRequestExecutor {
38
38
}
39
39
}
40
40
41
- let eventLoop : EventLoop
42
- let _blockingQueue = BlockingQueue < RequestParts > ( )
43
- let pauseRequestBodyPartStreamAfterASingleWrite : Bool
44
-
45
41
var isCancelled : Bool {
46
- if self . eventLoop. inEventLoop {
47
- return self . _isCancelled
48
- } else {
49
- return try ! self . eventLoop. submit { self . _isCancelled } . wait ( )
50
- }
42
+ self . cancellationLock. value
51
43
}
52
44
53
45
var signalledDemandForResponseBody : Bool {
54
- if self . eventLoop. inEventLoop {
55
- return self . _signaledDemandForResponseBody
56
- } else {
57
- return try ! self . eventLoop. submit { self . _signaledDemandForResponseBody } . wait ( )
58
- }
46
+ self . responseBodyDemandLock. value
59
47
}
60
48
61
49
var requestBodyPartsCount : Int {
62
- return self . _blockingQueue . count
50
+ return self . blockingQueue . count
63
51
}
64
52
53
+ let eventLoop : EventLoop
54
+ let pauseRequestBodyPartStreamAfterASingleWrite : Bool
55
+
56
+ private let blockingQueue = BlockingQueue < RequestParts > ( )
57
+ private let responseBodyDemandLock = ConditionLock ( value: false )
58
+ private let cancellationLock = ConditionLock ( value: false )
59
+
65
60
private var request : HTTPExecutableRequest ?
66
- private var _requestBodyParts = CircularBuffer < RequestParts > ( )
67
61
private var _signaledDemandForRequestBody : Bool = false
68
- private var _signaledDemandForResponseBody : Bool = false
69
- private var _whenWritable : EventLoopPromise < RequestParts > ?
70
- private var _isCancelled : Bool = false
71
62
72
63
init ( pauseRequestBodyPartStreamAfterASingleWrite: Bool = false , eventLoop: EventLoop ) {
73
64
self . pauseRequestBodyPartStreamAfterASingleWrite = pauseRequestBodyPartStreamAfterASingleWrite
@@ -91,13 +82,13 @@ final class MockRequestExecutor {
91
82
request. requestHeadSent ( )
92
83
}
93
84
94
- func receiveRequestBody( deadline: NIODeadline = . now( ) + . seconds( 60 ) , _ verify: ( ByteBuffer ) throws -> Void ) throws {
85
+ func receiveRequestBody( deadline: NIODeadline = . now( ) + . seconds( 5 ) , _ verify: ( ByteBuffer ) throws -> Void ) throws {
95
86
enum ReceiveAction {
96
87
case value( RequestParts )
97
88
case future( EventLoopFuture < RequestParts > )
98
89
}
99
90
100
- switch try self . _blockingQueue . popFirst ( deadline: deadline) {
91
+ switch try self . blockingQueue . popFirst ( deadline: deadline) {
101
92
case . body( . byteBuffer( let buffer) ) :
102
93
try verify ( buffer)
103
94
case . body( . fileRegion) :
@@ -107,13 +98,13 @@ final class MockRequestExecutor {
107
98
}
108
99
}
109
100
110
- func receiveEndOfStream( deadline: NIODeadline = . now( ) + . seconds( 60 ) ) throws {
101
+ func receiveEndOfStream( deadline: NIODeadline = . now( ) + . seconds( 5 ) ) throws {
111
102
enum ReceiveAction {
112
103
case value( RequestParts )
113
104
case future( EventLoopFuture < RequestParts > )
114
105
}
115
106
116
- switch try self . _blockingQueue . popFirst ( deadline: deadline) {
107
+ switch try self . blockingQueue . popFirst ( deadline: deadline) {
117
108
case . body( . byteBuffer) :
118
109
throw Errors . unexpectedByteBuffer
119
110
case . body( . fileRegion) :
@@ -158,17 +149,34 @@ final class MockRequestExecutor {
158
149
}
159
150
160
151
func resetResponseStreamDemandSignal( ) {
161
- if self . eventLoop. inEventLoop {
162
- self . resetResponseStreamDemandSignal0 ( )
163
- } else {
164
- self . eventLoop. execute {
165
- self . resetResponseStreamDemandSignal0 ( )
166
- }
152
+ self . responseBodyDemandLock. lock ( )
153
+ self . responseBodyDemandLock. unlock ( withValue: false )
154
+ }
155
+
156
+ func receiveResponseDemand( deadline: NIODeadline = . now( ) + . seconds( 5 ) ) throws {
157
+ let secondsUntilDeath = deadline - NIODeadline. now ( )
158
+ guard self . responseBodyDemandLock. lock (
159
+ whenValue: true ,
160
+ timeoutSeconds: . init( secondsUntilDeath. nanoseconds / 1_000_000_000 )
161
+ )
162
+ else {
163
+ throw TimeoutError ( )
167
164
}
165
+
166
+ self . responseBodyDemandLock. unlock ( )
168
167
}
169
168
170
- private func resetResponseStreamDemandSignal0( ) {
171
- self . _signaledDemandForResponseBody = false
169
+ func receiveCancellation( deadline: NIODeadline = . now( ) + . seconds( 5 ) ) throws {
170
+ let secondsUntilDeath = deadline - NIODeadline. now ( )
171
+ guard self . cancellationLock. lock (
172
+ whenValue: true ,
173
+ timeoutSeconds: . init( secondsUntilDeath. nanoseconds / 1_000_000_000 )
174
+ )
175
+ else {
176
+ throw TimeoutError ( )
177
+ }
178
+
179
+ self . cancellationLock. unlock ( )
172
180
}
173
181
}
174
182
@@ -192,12 +200,12 @@ extension MockRequestExecutor: HTTPRequestExecutor {
192
200
193
201
let stateChange = { ( ) -> WriteAction in
194
202
var pause = false
195
- if self . _blockingQueue . isEmpty && self . pauseRequestBodyPartStreamAfterASingleWrite && part. isBody {
203
+ if self . blockingQueue . isEmpty && self . pauseRequestBodyPartStreamAfterASingleWrite && part. isBody {
196
204
pause = true
197
205
self . _signaledDemandForRequestBody = false
198
206
}
199
207
200
- self . _blockingQueue . append ( . success( part) )
208
+ self . blockingQueue . append ( . success( part) )
201
209
202
210
return pause ? . pauseBodyStream : . none
203
211
}
@@ -218,29 +226,23 @@ extension MockRequestExecutor: HTTPRequestExecutor {
218
226
}
219
227
220
228
func demandResponseBodyStream( _: HTTPExecutableRequest ) {
221
- if self . eventLoop. inEventLoop {
222
- self . _signaledDemandForResponseBody = true
223
- } else {
224
- self . eventLoop. execute { self . _signaledDemandForResponseBody = true }
225
- }
229
+ self . responseBodyDemandLock. lock ( )
230
+ self . responseBodyDemandLock. unlock ( withValue: true )
226
231
}
227
232
228
233
func cancelRequest( _: HTTPExecutableRequest ) {
229
- if self . eventLoop. inEventLoop {
230
- self . _isCancelled = true
231
- } else {
232
- self . eventLoop. execute { self . _isCancelled = true }
233
- }
234
+ self . cancellationLock. lock ( )
235
+ self . cancellationLock. unlock ( withValue: true )
234
236
}
235
237
}
236
238
237
239
extension MockRequestExecutor {
240
+ public struct TimeoutError : Error { }
241
+
238
242
final class BlockingQueue < Element> {
239
243
private let condition = ConditionLock ( value: false )
240
244
private var buffer = CircularBuffer < Result < Element , Error > > ( )
241
245
242
- public struct TimeoutError : Error { }
243
-
244
246
internal func append( _ element: Result < Element , Error > ) {
245
247
self . condition. lock ( )
246
248
self . buffer. append ( element)
0 commit comments