Skip to content

Commit 32f2a27

Browse files
authored
Merge pull request #165 from php-enqueue/queue-consumer-timeout
[consumption] adjust receive and idle timeouts
2 parents 248cdaf + 0fda676 commit 32f2a27

File tree

3 files changed

+47
-8
lines changed

3 files changed

+47
-8
lines changed

pkg/enqueue/Consumption/QueueConsumer.php

+15-7
Original file line numberDiff line numberDiff line change
@@ -34,23 +34,31 @@ class QueueConsumer
3434
private $boundProcessors;
3535

3636
/**
37-
* @var int
37+
* @var int|float in milliseconds
3838
*/
39-
private $idleMicroseconds;
39+
private $idleTimeout;
40+
41+
/**
42+
* @var int|float in milliseconds
43+
*/
44+
private $receiveTimeout;
4045

4146
/**
4247
* @param PsrContext $psrContext
4348
* @param ExtensionInterface|ChainExtension|null $extension
44-
* @param int $idleMicroseconds 100ms by default
49+
* @param int|float $idleTimeout the time in milliseconds queue consumer waits if no message received
50+
* @param int|float $receiveTimeout the time in milliseconds queue consumer waits for a message (10 ms by default)
4551
*/
4652
public function __construct(
4753
PsrContext $psrContext,
4854
ExtensionInterface $extension = null,
49-
$idleMicroseconds = 100000
55+
$idleTimeout = 0,
56+
$receiveTimeout = 10000
5057
) {
5158
$this->psrContext = $psrContext;
5259
$this->extension = $extension;
53-
$this->idleMicroseconds = $idleMicroseconds;
60+
$this->idleTimeout = $idleTimeout;
61+
$this->receiveTimeout = $receiveTimeout;
5462

5563
$this->boundProcessors = [];
5664
}
@@ -181,7 +189,7 @@ protected function doConsume(ExtensionInterface $extension, Context $context)
181189
throw new ConsumptionInterruptedException();
182190
}
183191

184-
if ($message = $consumer->receive($timeout = 5000)) {
192+
if ($message = $consumer->receive($this->receiveTimeout)) {
185193
$logger->info('Message received from the queue: '.$context->getPsrQueue()->getQueueName());
186194
$logger->debug('Headers: {headers}', ['headers' => new VarExport($message->getHeaders())]);
187195
$logger->debug('Properties: {properties}', ['properties' => new VarExport($message->getProperties())]);
@@ -215,7 +223,7 @@ protected function doConsume(ExtensionInterface $extension, Context $context)
215223

216224
$extension->onPostReceived($context);
217225
} else {
218-
usleep($this->idleMicroseconds);
226+
usleep($this->idleTimeout * 1000);
219227
$extension->onIdle($context);
220228
}
221229

pkg/enqueue/Tests/Consumption/QueueConsumerTest.php

+31
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,37 @@ public function testShouldReturnSelfOnBind()
149149
$this->assertSame($consumer, $consumer->bind(new NullQueue('aQueueName'), $processorMock));
150150
}
151151

152+
public function testShouldSubscribeToGivenQueueWithExpectedTimeout()
153+
{
154+
$expectedQueue = new NullQueue('theQueueName');
155+
156+
$messageConsumerMock = $this->createMock(PsrConsumer::class);
157+
$messageConsumerMock
158+
->expects($this->once())
159+
->method('receive')
160+
->with(12345)
161+
->willReturn(null)
162+
;
163+
164+
$contextMock = $this->createMock(PsrContext::class);
165+
$contextMock
166+
->expects($this->once())
167+
->method('createConsumer')
168+
->with($this->identicalTo($expectedQueue))
169+
->willReturn($messageConsumerMock)
170+
;
171+
172+
$processorMock = $this->createProcessorMock();
173+
$processorMock
174+
->expects($this->never())
175+
->method('process')
176+
;
177+
178+
$queueConsumer = new QueueConsumer($contextMock, new BreakCycleExtension(1), 0, 12345);
179+
$queueConsumer->bind($expectedQueue, $processorMock);
180+
$queueConsumer->consume();
181+
}
182+
152183
public function testShouldSubscribeToGivenQueueAndQuitAfterFifthIdleCycle()
153184
{
154185
$expectedQueue = new NullQueue('theQueueName');

pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveFromTopicTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
/**
1010
* @group functional
11-
* @retry 5
11+
* @retry 10
1212
*/
1313
class RdKafkaSendToAndReceiveFromTopicTest extends SendToAndReceiveFromTopicSpec
1414
{

0 commit comments

Comments
 (0)