From 4f038541d6aa2790f4bc367ad772522cd5fb0f8f Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 20 Feb 2019 16:04:16 +0200 Subject: [PATCH 1/7] redis --- pkg/redis/LuaScripts.php | 41 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/pkg/redis/LuaScripts.php b/pkg/redis/LuaScripts.php index 4a40d2447..2a1764f64 100644 --- a/pkg/redis/LuaScripts.php +++ b/pkg/redis/LuaScripts.php @@ -6,6 +6,47 @@ class LuaScripts { + /** + * KEYS[1] - The queue we are reading message + * KEYS[2] - The reserved queue we are moving message to + * ARGV[1] - Now timestamp + * ARGV[2] - Redelivery at timestamp + */ + public static function receiveMessage() + { + return << json['headers']['expires_at']) then + return nil + end +end + +json['headers']['attempts'] = json['headers']['attempts'] + 1 + +message = cjson.encode(json) + +redis.call('ZADD', KEYS[2], tonumber(ARGV[2]), message) + +return message +LUA; + } + /** * Get the Lua script to migrate expired messages back onto the queue. * From 37950deae3b0e92ff4badfa19a7f292a72ad3553 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 21 Feb 2019 13:10:49 +0200 Subject: [PATCH 2/7] redid --- pkg/redis/RedisBlockingConsumeStrategy.php | 104 ++++++++++++++++++ pkg/redis/RedisConnectionFactory.php | 6 +- pkg/redis/RedisConsumeStrategy.php | 14 +++ pkg/redis/RedisConsumer.php | 21 ++-- pkg/redis/RedisConsumerHelperTrait.php | 102 ++--------------- pkg/redis/RedisContext.php | 30 +++-- pkg/redis/RedisNonBlockingConsumeStrategy.php | 91 +++++++++++++++ pkg/redis/RedisSubscriptionConsumer.php | 21 ++-- pkg/redis/examples/consumer.php | 32 ++++++ pkg/redis/examples/produce.php | 32 ++++++ 10 files changed, 332 insertions(+), 121 deletions(-) create mode 100644 pkg/redis/RedisBlockingConsumeStrategy.php create mode 100644 pkg/redis/RedisConsumeStrategy.php create mode 100644 pkg/redis/RedisNonBlockingConsumeStrategy.php create mode 100644 pkg/redis/examples/consumer.php create mode 100644 pkg/redis/examples/produce.php diff --git a/pkg/redis/RedisBlockingConsumeStrategy.php b/pkg/redis/RedisBlockingConsumeStrategy.php new file mode 100644 index 000000000..4fe0e3253 --- /dev/null +++ b/pkg/redis/RedisBlockingConsumeStrategy.php @@ -0,0 +1,104 @@ +context = $context; + } + + /** + * @param RedisDestination[] $queues + * @param int $timeout + * @param int $redeliveryDelay + * + * @return RedisMessage|null + */ + public function receiveMessage(array $queues, int $timeout, int $redeliveryDelay): ?RedisMessage + { + $startAt = time(); + $thisTimeout = (int) ceil($timeout / 1000); + + if (null === $this->queueNames) { + $this->queueNames = []; + foreach ($queues as $queue) { + $this->queueNames[] = $queue->getName(); + } + } + + while ($thisTimeout > 0) { + $this->migrateExpiredMessages($this->context->getRedis(), $this->queueNames); + + if (false == $result = $this->context->getRedis()->brpop($this->queueNames, $thisTimeout)) { + return null; + } + + $this->pushQueueNameBack($this->queueNames, $result->getKey()); + + if ($message = $this->processResult($result, $redeliveryDelay)) { + return $message; + } + + $thisTimeout -= time() - $startAt; + } + + return null; + } + + public function receiveMessageNoWait(RedisDestination $queue, int $redeliveryDelay): ?RedisMessage + { + $this->migrateExpiredMessages($this->context->getRedis(), [$queue]); + + if ($result = $this->context->getRedis()->rpop($queue->getName())) { + return $this->processResult($result, $redeliveryDelay); + } + + return null; + } + + public function resetState() + { + $this->queueNames = null; + } + + protected function processResult(RedisResult $result, int $redeliveryDelay): ?RedisMessage + { + $message = $this->context->getSerializer()->toMessage($result->getMessage()); + + $now = time(); + + if (0 === $message->getAttempts() && $expiresAt = $message->getHeader('expires_at')) { + if ($now > $expiresAt) { + return null; + } + } + + $message->setHeader('attempts', $message->getAttempts() + 1); + $message->setRedelivered($message->getAttempts() > 1); + $message->setKey($result->getKey()); + $message->setReservedKey($this->context->getSerializer()->toString($message)); + + $reservedQueue = $result->getKey().':reserved'; + $redeliveryAt = $now + $redeliveryDelay; + + $this->context->getRedis()->zadd($reservedQueue, $message->getReservedKey(), $redeliveryAt); + + return $message; + } +} diff --git a/pkg/redis/RedisConnectionFactory.php b/pkg/redis/RedisConnectionFactory.php index a17726ca8..0d5c1c62e 100644 --- a/pkg/redis/RedisConnectionFactory.php +++ b/pkg/redis/RedisConnectionFactory.php @@ -10,6 +10,9 @@ class RedisConnectionFactory implements ConnectionFactory { + const CONSUME_STRATEGY_BLOCKING = 'blocking'; + const CONSUME_STRATEGY_NON_BLOCKING = 'non_blocking'; + /** * @var array */ @@ -87,7 +90,7 @@ public function createContext(): Context if ($this->config['lazy']) { return new RedisContext(function () { return $this->createRedis(); - }, $this->config['redelivery_delay']); + }, $this->config); } return new RedisContext($this->createRedis(), $this->config['redelivery_delay']); @@ -161,6 +164,7 @@ private function defaultConfig(): array 'predis_options' => null, 'ssl' => null, 'redelivery_delay' => 300, + 'consume_strategy' => self::CONSUME_STRATEGY_BLOCKING, ]; } } diff --git a/pkg/redis/RedisConsumeStrategy.php b/pkg/redis/RedisConsumeStrategy.php new file mode 100644 index 000000000..bd39ea443 --- /dev/null +++ b/pkg/redis/RedisConsumeStrategy.php @@ -0,0 +1,14 @@ +context = $context; $this->queue = $queue; + $this->consumeStrategy = $consumeStrategy; } /** @@ -63,8 +70,6 @@ public function getQueue(): Queue */ public function receive(int $timeout = 0): ?Message { - $timeout = (int) ceil($timeout / 1000); - if ($timeout <= 0) { while (true) { if ($message = $this->receive(5000)) { @@ -73,7 +78,7 @@ public function receive(int $timeout = 0): ?Message } } - return $this->receiveMessage([$this->queue], $timeout, $this->redeliveryDelay); + return $this->consumeStrategy->receiveMessage([$this->queue], $timeout, $this->redeliveryDelay); } /** @@ -81,7 +86,7 @@ public function receive(int $timeout = 0): ?Message */ public function receiveNoWait(): ?Message { - return $this->receiveMessageNoWait($this->queue, $this->redeliveryDelay); + return $this->consumeStrategy->receiveMessageNoWait($this->queue, $this->redeliveryDelay); } /** diff --git a/pkg/redis/RedisConsumerHelperTrait.php b/pkg/redis/RedisConsumerHelperTrait.php index 9939986ed..f78106aa1 100644 --- a/pkg/redis/RedisConsumerHelperTrait.php +++ b/pkg/redis/RedisConsumerHelperTrait.php @@ -6,113 +6,29 @@ trait RedisConsumerHelperTrait { - /** - * @var string[] - */ - protected $queueNames; - - abstract protected function getContext(): RedisContext; - - /** - * @param RedisDestination[] $queues - * @param int $timeout - * @param int $redeliveryDelay - * - * @return RedisMessage|null - */ - protected function receiveMessage(array $queues, int $timeout, int $redeliveryDelay): ?RedisMessage - { - $startAt = time(); - $thisTimeout = $timeout; - - if (null === $this->queueNames) { - $this->queueNames = []; - foreach ($queues as $queue) { - $this->queueNames[] = $queue->getName(); - } - } - - while ($thisTimeout > 0) { - $this->migrateExpiredMessages($this->queueNames); - - if (false == $result = $this->getContext()->getRedis()->brpop($this->queueNames, $thisTimeout)) { - return null; - } - - $this->pushQueueNameBack($result->getKey()); - - if ($message = $this->processResult($result, $redeliveryDelay)) { - return $message; - } - - $thisTimeout -= time() - $startAt; - } - - return null; - } - - protected function receiveMessageNoWait(RedisDestination $destination, int $redeliveryDelay): ?RedisMessage - { - $this->migrateExpiredMessages([$destination->getName()]); - - if ($result = $this->getContext()->getRedis()->rpop($destination->getName())) { - return $this->processResult($result, $redeliveryDelay); - } - - return null; - } - - protected function processResult(RedisResult $result, int $redeliveryDelay): ?RedisMessage - { - $message = $this->getContext()->getSerializer()->toMessage($result->getMessage()); - - $now = time(); - - if (0 === $message->getAttempts() && $expiresAt = $message->getHeader('expires_at')) { - if ($now > $expiresAt) { - return null; - } - } - - $message->setHeader('attempts', $message->getAttempts() + 1); - $message->setRedelivered($message->getAttempts() > 1); - $message->setKey($result->getKey()); - $message->setReservedKey($this->getContext()->getSerializer()->toString($message)); - - $reservedQueue = $result->getKey().':reserved'; - $redeliveryAt = $now + $redeliveryDelay; - - $this->getContext()->getRedis()->zadd($reservedQueue, $message->getReservedKey(), $redeliveryAt); - - return $message; - } - - protected function pushQueueNameBack(string $queueName): void + protected function pushQueueNameBack(array &$queueNames, string $queueName): void { - if (count($this->queueNames) <= 1) { + if (count($queueNames) <= 1) { return; } - if (false === $from = array_search($queueName, $this->queueNames, true)) { + if (false === $from = array_search($queueName, $queueNames, true)) { throw new \LogicException(sprintf('Queue name was not found: "%s"', $queueName)); } - $to = count($this->queueNames) - 1; + $to = count($queueNames) - 1; - $out = array_splice($this->queueNames, $from, 1); - array_splice($this->queueNames, $to, 0, $out); + $out = array_splice($queueNames, $from, 1); + array_splice($queueNames, $to, 0, $out); } - protected function migrateExpiredMessages(array $queueNames): void + protected function migrateExpiredMessages(Redis $redis, array $queueNames): void { $now = time(); foreach ($queueNames as $queueName) { - $this->getContext()->getRedis() - ->eval(LuaScripts::migrateExpired(), [$queueName.':delayed', $queueName], [$now]); - - $this->getContext()->getRedis() - ->eval(LuaScripts::migrateExpired(), [$queueName.':reserved', $queueName], [$now]); + $redis->eval(LuaScripts::migrateExpired(), [$queueName.':delayed', $queueName], [$now]); + $redis->eval(LuaScripts::migrateExpired(), [$queueName.':reserved', $queueName], [$now]); } } } diff --git a/pkg/redis/RedisContext.php b/pkg/redis/RedisContext.php index 344bb20c5..5bbea0feb 100644 --- a/pkg/redis/RedisContext.php +++ b/pkg/redis/RedisContext.php @@ -30,17 +30,17 @@ class RedisContext implements Context private $redisFactory; /** - * @var int + * @var array */ - private $redeliveryDelay = 300; + private $config; /** * Callable must return instance of Redis once called. * * @param Redis|callable $redis - * @param int $redeliveryDelay + * @param array $config */ - public function __construct($redis, int $redeliveryDelay) + public function __construct($redis, array $config) { if ($redis instanceof Redis) { $this->redis = $redis; @@ -54,7 +54,7 @@ public function __construct($redis, int $redeliveryDelay) )); } - $this->redeliveryDelay = $redeliveryDelay; + $this->config = $config; $this->setSerializer(new JsonSerializer()); } @@ -124,8 +124,8 @@ public function createConsumer(Destination $destination): Consumer { InvalidDestinationException::assertDestinationInstanceOf($destination, RedisDestination::class); - $consumer = new RedisConsumer($this, $destination); - $consumer->setRedeliveryDelay($this->redeliveryDelay); + $consumer = new RedisConsumer($this, $destination, $this->getConsumeStrategy()); + $consumer->setRedeliveryDelay($this->config['redelivery_delay']); return $consumer; } @@ -135,8 +135,8 @@ public function createConsumer(Destination $destination): Consumer */ public function createSubscriptionConsumer(): SubscriptionConsumer { - $consumer = new RedisSubscriptionConsumer($this); - $consumer->setRedeliveryDelay($this->redeliveryDelay); + $consumer = new RedisSubscriptionConsumer($this, $this->getConsumeStrategy()); + $consumer->setRedeliveryDelay($this->config['redelivery_delay']); return $consumer; } @@ -178,4 +178,16 @@ private function deleteDestination(RedisDestination $destination): void $this->getRedis()->del($destination->getName().':delayed'); $this->getRedis()->del($destination->getName().':reserved'); } + + private function getConsumeStrategy(): RedisConsumeStrategy + { + switch ($this->config['consume_strategy']) { + case RedisConnectionFactory::CONSUME_STRATEGY_BLOCKING: + return new RedisBlockingConsumeStrategy($this); + case RedisConnectionFactory::CONSUME_STRATEGY_NON_BLOCKING: + return new RedisNonBlockingConsumeStrategy($this); + default: + throw new \LogicException(sprintf('Unsupported consume strategy: "%s"', $this->config['consume_strategy'])); + } + } } diff --git a/pkg/redis/RedisNonBlockingConsumeStrategy.php b/pkg/redis/RedisNonBlockingConsumeStrategy.php new file mode 100644 index 000000000..498cda569 --- /dev/null +++ b/pkg/redis/RedisNonBlockingConsumeStrategy.php @@ -0,0 +1,91 @@ +context = $context; + } + + /** + * @param RedisDestination[] $queues + * @param int $timeout + * @param int $redeliveryDelay + * + * @return RedisMessage|null + */ + public function receiveMessage(array $queues, int $timeout, int $redeliveryDelay): ?RedisMessage + { + $endAt = microtime(true) + $timeout/1000; + + if (null === $this->queueNames) { + $this->queueNames = []; + foreach ($queues as $queue) { + $this->queueNames[] = $queue->getName(); + } + } + + while (true) { + $this->migrateExpiredMessages($this->context->getRedis(), $this->queueNames); + + $queue = current($this->queueNames); + $reservedQueue = $queue.':reserved'; + $now = time(); + $redeliveryAt = $now + $redeliveryDelay; + + $this->pushQueueNameBack($this->queueNames, $queue); + + if ($result = $this->context->getRedis()->eval(LuaScripts::receiveMessage(), [$queue, $reservedQueue], [$now, $redeliveryAt])) { + return $this->processResult($result, $queue); + } + + if (microtime(true) > $endAt) { + return null; + } + + usleep(10000); + } + } + + public function receiveMessageNoWait(RedisDestination $queue, int $redeliveryDelay): ?RedisMessage + { + $this->migrateExpiredMessages($this->context->getRedis(), [$queue]); + + if ($result = $this->context->getRedis()->rpop($queue->getName())) { + return $this->processResult($result->getMessage(), $queue->getName()); + } + + return null; + } + + public function resetState() + { + $this->queueNames = null; + } + + protected function processResult(string $result, string $key): ?RedisMessage + { + $message = $this->context->getSerializer()->toMessage($result); + $message->setKey($key); + $message->setReservedKey($result); + $message->setRedelivered($message->getAttempts() > 1); + + return $message; + } +} diff --git a/pkg/redis/RedisSubscriptionConsumer.php b/pkg/redis/RedisSubscriptionConsumer.php index c59cab4da..55827f05a 100644 --- a/pkg/redis/RedisSubscriptionConsumer.php +++ b/pkg/redis/RedisSubscriptionConsumer.php @@ -9,13 +9,16 @@ class RedisSubscriptionConsumer implements SubscriptionConsumer { - use RedisConsumerHelperTrait; - /** * @var RedisContext */ private $context; + /** + * @var RedisConsumeStrategy + */ + private $consumeStrategy; + /** * an item contains an array: [RedisConsumer $consumer, callable $callback];. * @@ -28,12 +31,10 @@ class RedisSubscriptionConsumer implements SubscriptionConsumer */ private $redeliveryDelay = 300; - /** - * @param RedisContext $context - */ - public function __construct(RedisContext $context) + public function __construct(RedisContext $context, RedisConsumeStrategy $consumeStrategy) { $this->context = $context; + $this->consumeStrategy = $consumeStrategy; $this->subscribers = []; } @@ -69,7 +70,7 @@ public function consume(int $timeout = 0): void } while (true) { - if ($message = $this->receiveMessage($queues, $timeout ?: 5, $this->redeliveryDelay)) { + if ($message = $this->consumeStrategy->receiveMessage($queues, $timeout ?: 5, $this->redeliveryDelay)) { list($consumer, $callback) = $this->subscribers[$message->getKey()]; if (false === call_user_func($callback, $message, $consumer)) { @@ -102,7 +103,7 @@ public function subscribe(Consumer $consumer, callable $callback): void } $this->subscribers[$queueName] = [$consumer, $callback]; - $this->queueNames = null; + $this->consumeStrategy->resetState(); } /** @@ -125,13 +126,13 @@ public function unsubscribe(Consumer $consumer): void } unset($this->subscribers[$queueName]); - $this->queueNames = null; + $this->consumeStrategy->resetState(); } public function unsubscribeAll(): void { $this->subscribers = []; - $this->queueNames = null; + $this->consumeStrategy->resetState(); } private function getContext(): RedisContext diff --git a/pkg/redis/examples/consumer.php b/pkg/redis/examples/consumer.php new file mode 100644 index 000000000..6fc0d74ef --- /dev/null +++ b/pkg/redis/examples/consumer.php @@ -0,0 +1,32 @@ +createContext(); + +$queue = $context->createQueue('queue'); + +$consumer = $context->createConsumer($queue); + +while (true) { + if ($m = $consumer->receive(20000)) { + $consumer->acknowledge($m); + echo 'Received message: '.$m->getBody().' '.json_encode($m->getHeaders()).' '.json_encode($m->getProperties()).PHP_EOL; + } +} +echo 'Done'."\n"; diff --git a/pkg/redis/examples/produce.php b/pkg/redis/examples/produce.php new file mode 100644 index 000000000..aea5cae72 --- /dev/null +++ b/pkg/redis/examples/produce.php @@ -0,0 +1,32 @@ +createContext(); + +$queue = $context->createQueue('queue'); + +$message = $context->createMessage('Hello Bar!', ['key' => 'value'], ['key2' => 'value2']); + +while (true) { + $context->createProducer()->send($queue, $message); + echo 'Sent message: '.$message->getBody().PHP_EOL; + sleep(1); +} + +echo 'Done'."\n"; From a897d8e4c2d23565a9b6839ca755d642ab32f676 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 21 Feb 2019 14:08:57 +0200 Subject: [PATCH 3/7] redid --- pkg/redis/LuaScripts.php | 8 +- pkg/redis/RedisConsumerHelperTrait.php | 2 +- pkg/redis/RedisNonBlockingConsumeStrategy.php | 2 +- .../RedisConnectionFactoryConfigTest.php | 14 ++ pkg/redis/Tests/RedisConsumerTest.php | 186 +++--------------- pkg/redis/Tests/RedisContextTest.php | 41 ++-- .../Tests/RedisSubscriptionConsumerTest.php | 46 ++++- 7 files changed, 106 insertions(+), 193 deletions(-) diff --git a/pkg/redis/LuaScripts.php b/pkg/redis/LuaScripts.php index 2a1764f64..76280bbe3 100644 --- a/pkg/redis/LuaScripts.php +++ b/pkg/redis/LuaScripts.php @@ -11,10 +11,12 @@ class LuaScripts * KEYS[2] - The reserved queue we are moving message to * ARGV[1] - Now timestamp * ARGV[2] - Redelivery at timestamp + * + * @return string */ - public static function receiveMessage() + public static function receiveMessage(): string { - return <<queueNames) { $this->queueNames = []; diff --git a/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php b/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php index eee32baac..23eed48e1 100644 --- a/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php +++ b/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php @@ -91,6 +91,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'redelivery_delay' => 300, + 'consume_strategy' => 'blocking', ], ]; @@ -112,6 +113,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'redelivery_delay' => 300, + 'consume_strategy' => 'blocking', ], ]; @@ -133,6 +135,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'redelivery_delay' => 300, + 'consume_strategy' => 'blocking', ], ]; @@ -155,6 +158,7 @@ public static function provideConfigs() 'ssl' => null, 'foo' => 'bar', 'redelivery_delay' => 300, + 'consume_strategy' => 'blocking', ], ]; @@ -177,6 +181,7 @@ public static function provideConfigs() 'ssl' => null, 'foo' => 'bar', 'redelivery_delay' => 300, + 'consume_strategy' => 'blocking', ], ]; @@ -199,6 +204,7 @@ public static function provideConfigs() 'ssl' => null, 'foo' => 'bar', 'redelivery_delay' => 300, + 'consume_strategy' => 'blocking', ], ]; @@ -222,6 +228,7 @@ public static function provideConfigs() 'ssl' => null, 'foo' => 'bar', 'redelivery_delay' => 300, + 'consume_strategy' => 'blocking', ], ]; @@ -245,6 +252,7 @@ public static function provideConfigs() 'ssl' => null, 'foo' => 'bar', 'redelivery_delay' => 300, + 'consume_strategy' => 'blocking', ], ]; @@ -268,6 +276,7 @@ public static function provideConfigs() 'ssl' => null, 'foo' => 'bar', 'redelivery_delay' => 300, + 'consume_strategy' => 'blocking', ], ]; @@ -290,6 +299,7 @@ public static function provideConfigs() 'ssl' => null, 'foo' => 'bar', 'redelivery_delay' => 300, + 'consume_strategy' => 'blocking', ], ]; @@ -312,6 +322,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'redelivery_delay' => 300, + 'consume_strategy' => 'blocking', ], ]; @@ -334,6 +345,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'redelivery_delay' => 300, + 'consume_strategy' => 'blocking', ], ]; @@ -356,6 +368,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'redelivery_delay' => 300, + 'consume_strategy' => 'blocking', ], ]; @@ -382,6 +395,7 @@ public static function provideConfigs() 'verify_peer' => '1', ], 'redelivery_delay' => 300, + 'consume_strategy' => 'blocking', ], ]; } diff --git a/pkg/redis/Tests/RedisConsumerTest.php b/pkg/redis/Tests/RedisConsumerTest.php index 8c2b1afe0..1e657801b 100644 --- a/pkg/redis/Tests/RedisConsumerTest.php +++ b/pkg/redis/Tests/RedisConsumerTest.php @@ -5,11 +5,10 @@ use Enqueue\Redis\JsonSerializer; use Enqueue\Redis\Redis; use Enqueue\Redis\RedisConsumer; +use Enqueue\Redis\RedisConsumeStrategy; use Enqueue\Redis\RedisContext; use Enqueue\Redis\RedisDestination; use Enqueue\Redis\RedisMessage; -use Enqueue\Redis\RedisProducer; -use Enqueue\Redis\RedisResult; use Enqueue\Test\ClassExtensionTrait; use Interop\Queue\Consumer; @@ -24,14 +23,14 @@ public function testShouldImplementConsumerInterface() public function testCouldBeConstructedWithContextAndDestinationAndPreFetchCountAsArguments() { - new RedisConsumer($this->createContextMock(), new RedisDestination('aQueue')); + new RedisConsumer($this->createContextMock(), new RedisDestination('aQueue'), $this->createMock(RedisConsumeStrategy::class)); } public function testShouldReturnDestinationSetInConstructorOnGetQueue() { $destination = new RedisDestination('aQueue'); - $consumer = new RedisConsumer($this->createContextMock(), $destination); + $consumer = new RedisConsumer($this->createContextMock(), $destination, $this->createMock(RedisConsumeStrategy::class)); $this->assertSame($destination, $consumer->getQueue()); } @@ -56,7 +55,7 @@ public function testShouldAcknowledgeMessage() $message = new RedisMessage(); $message->setReservedKey('reserved-key'); - $consumer = new RedisConsumer($contextMock, new RedisDestination('aQueue')); + $consumer = new RedisConsumer($contextMock, new RedisDestination('aQueue'), $this->createMock(RedisConsumeStrategy::class)); $consumer->acknowledge($message); } @@ -81,7 +80,7 @@ public function testShouldRejectMessage() $message = new RedisMessage(); $message->setReservedKey('reserved-key'); - $consumer = new RedisConsumer($contextMock, new RedisDestination('aQueue')); + $consumer = new RedisConsumer($contextMock, new RedisDestination('aQueue'), $this->createMock(RedisConsumeStrategy::class)); $consumer->reject($message); } @@ -114,180 +113,51 @@ public function testShouldSendSameMessageToDestinationOnReQueue() $message->setBody('text'); $message->setReservedKey($serializer->toString($message)); - $consumer = new RedisConsumer($contextMock, new RedisDestination('aQueue')); + $consumer = new RedisConsumer($contextMock, new RedisDestination('aQueue'), $this->createMock(RedisConsumeStrategy::class)); $consumer->reject($message, true); } - public function testShouldCallRedisBRPopAndReturnNullIfNothingInQueueOnReceive() + public function testShouldReceiveMessage() { $destination = new RedisDestination('aQueue'); - $redisMock = $this->createRedisMock(); - $redisMock - ->expects($this->once()) - ->method('brpop') - ->with(['aQueue'], 2) - ->willReturn(null) - ; - $contextMock = $this->createContextMock(); - $contextMock - ->expects($this->any()) - ->method('getRedis') - ->willReturn($redisMock) - ; - - $consumer = new RedisConsumer($contextMock, $destination); - - $this->assertNull($consumer->receive(2000)); - } - public function testShouldCallRedisBRPopAndReturnMessageIfOneInQueueOnReceive() - { - $destination = new RedisDestination('aQueue'); - - $serializer = new JsonSerializer(); - - $redisMock = $this->createRedisMock(); - $redisMock + $strategy = $this->createMock(RedisConsumeStrategy::class); + $strategy ->expects($this->once()) - ->method('brpop') - ->with(['aQueue'], 2) - ->willReturn(new RedisResult('aQueue', $serializer->toString(new RedisMessage('aBody')))) - ; - - $contextMock = $this->createContextMock(); - $contextMock - ->expects($this->any()) - ->method('getRedis') - ->willReturn($redisMock) - ; - $contextMock - ->expects($this->any()) - ->method('getSerializer') - ->willReturn($serializer) + ->method('receiveMessage') + ->willReturn($message = new RedisMessage()) ; - $consumer = new RedisConsumer($contextMock, $destination); + $consumer = new RedisConsumer($contextMock, $destination, $strategy); - $message = $consumer->receive(2000); + $result = $consumer->receive(2000); - $this->assertInstanceOf(RedisMessage::class, $message); - $this->assertSame('aBody', $message->getBody()); + $this->assertNotNull($result); + $this->assertSame($message, $result);; } - public function testShouldCallRedisBRPopSeveralTimesWithFiveSecondTimeoutIfZeroTimeoutIsPassed() + public function testShouldReceiveNoWaitMessage() { $destination = new RedisDestination('aQueue'); - $expectedTimeout = 5; - - $serializer = new JsonSerializer(); - - $redisMock = $this->createRedisMock(); - $redisMock - ->expects($this->at(2)) - ->method('brpop') - ->with(['aQueue'], $expectedTimeout) - ->willReturn(null) - ; - $redisMock - ->expects($this->at(5)) - ->method('brpop') - ->with(['aQueue'], $expectedTimeout) - ->willReturn(null) - ; - $redisMock - ->expects($this->at(8)) - ->method('brpop') - ->with(['aQueue'], $expectedTimeout) - ->willReturn(new RedisResult('aQueue', $serializer->toString(new RedisMessage('aBody')))) - ; - $contextMock = $this->createContextMock(); - $contextMock - ->expects($this->atLeastOnce()) - ->method('getRedis') - ->willReturn($redisMock) - ; - $contextMock - ->expects($this->atLeastOnce()) - ->method('getSerializer') - ->willReturn($serializer) - ; - - $consumer = new RedisConsumer($contextMock, $destination); - $message = $consumer->receive(0); - - $this->assertInstanceOf(RedisMessage::class, $message); - $this->assertSame('aBody', $message->getBody()); - } - - public function testShouldCallRedisRPopAndReturnNullIfNothingInQueueOnReceiveNoWait() - { - $destination = new RedisDestination('aQueue'); - - $serializer = new JsonSerializer(); - - $redisMock = $this->createRedisMock(); - $redisMock + $strategy = $this->createMock(RedisConsumeStrategy::class); + $strategy ->expects($this->once()) - ->method('rpop') - ->with('aQueue') - ->willReturn(null) + ->method('receiveMessageNoWait') + ->willReturn($message = new RedisMessage()) ; - $contextMock = $this->createContextMock(); - $contextMock - ->expects($this->any()) - ->method('getRedis') - ->willReturn($redisMock) - ; - $contextMock - ->expects($this->any()) - ->method('getSerializer') - ->willReturn($serializer) - ; + $consumer = new RedisConsumer($contextMock, $destination, $strategy); - $consumer = new RedisConsumer($contextMock, $destination); + $result = $consumer->receiveNoWait(); - $this->assertNull($consumer->receiveNoWait()); - } - - public function testShouldCallRedisRPopAndReturnMessageIfOneInQueueOnReceiveNoWait() - { - $destination = new RedisDestination('aQueue'); - - $serializer = new JsonSerializer(); - - $redisMock = $this->createRedisMock(); - $redisMock - ->expects($this->once()) - ->method('rpop') - ->with('aQueue') - ->willReturn(new RedisResult('aQueue', $serializer->toString(new RedisMessage('aBody')))) - ; - - $contextMock = $this->createContextMock(); - $contextMock - ->expects($this->atLeastOnce()) - ->method('getRedis') - ->willReturn($redisMock) - ; - $contextMock - ->expects($this->any()) - ->method('getSerializer') - ->willReturn($serializer) - ; - - $consumer = new RedisConsumer($contextMock, $destination); - - $message = $consumer->receiveNoWait(); - - $this->assertInstanceOf(RedisMessage::class, $message); - $this->assertSame('aBody', $message->getBody()); + $this->assertNotNull($result); + $this->assertSame($message, $result);; } /** @@ -298,14 +168,6 @@ private function createRedisMock() return $this->createMock(Redis::class); } - /** - * @return \PHPUnit_Framework_MockObject_MockObject|RedisProducer - */ - private function createProducerMock() - { - return $this->createMock(RedisProducer::class); - } - /** * @return \PHPUnit_Framework_MockObject_MockObject|RedisContext */ diff --git a/pkg/redis/Tests/RedisContextTest.php b/pkg/redis/Tests/RedisContextTest.php index 01ea67df5..51adc31b3 100644 --- a/pkg/redis/Tests/RedisContextTest.php +++ b/pkg/redis/Tests/RedisContextTest.php @@ -5,6 +5,7 @@ use Enqueue\Null\NullQueue; use Enqueue\Null\NullTopic; use Enqueue\Redis\Redis; +use Enqueue\Redis\RedisConnectionFactory; use Enqueue\Redis\RedisConsumer; use Enqueue\Redis\RedisContext; use Enqueue\Redis\RedisDestination; @@ -27,26 +28,26 @@ public function testShouldImplementContextInterface() public function testCouldBeConstructedWithRedisAsFirstArgument() { - new RedisContext($this->createRedisMock(), 300); + new RedisContext($this->createRedisMock(), []); } public function testCouldBeConstructedWithRedisFactoryAsFirstArgument() { new RedisContext(function () { return $this->createRedisMock(); - }, 300); + }, []); } public function testThrowIfNeitherRedisNorFactoryGiven() { $this->expectException(\InvalidArgumentException::class); $this->expectExceptionMessage('The $redis argument must be either Enqueue\Redis\Redis or callable that returns Enqueue\Redis\Redis once called.'); - new RedisContext(new \stdClass(), 300); + new RedisContext(new \stdClass(), []); } public function testShouldAllowCreateEmptyMessage() { - $context = new RedisContext($this->createRedisMock(), 300); + $context = new RedisContext($this->createRedisMock(), []); $message = $context->createMessage(); @@ -59,7 +60,7 @@ public function testShouldAllowCreateEmptyMessage() public function testShouldAllowCreateCustomMessage() { - $context = new RedisContext($this->createRedisMock(), 300); + $context = new RedisContext($this->createRedisMock(), []); $message = $context->createMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']); @@ -72,7 +73,7 @@ public function testShouldAllowCreateCustomMessage() public function testShouldCreateQueue() { - $context = new RedisContext($this->createRedisMock(), 300); + $context = new RedisContext($this->createRedisMock(), []); $queue = $context->createQueue('aQueue'); @@ -82,7 +83,7 @@ public function testShouldCreateQueue() public function testShouldAllowCreateTopic() { - $context = new RedisContext($this->createRedisMock(), 300); + $context = new RedisContext($this->createRedisMock(), []); $topic = $context->createTopic('aTopic'); @@ -92,7 +93,7 @@ public function testShouldAllowCreateTopic() public function testThrowNotImplementedOnCreateTmpQueueCall() { - $context = new RedisContext($this->createRedisMock(), 300); + $context = new RedisContext($this->createRedisMock(), []); $this->expectException(TemporaryQueueNotSupportedException::class); @@ -101,7 +102,7 @@ public function testThrowNotImplementedOnCreateTmpQueueCall() public function testShouldCreateProducer() { - $context = new RedisContext($this->createRedisMock(), 300); + $context = new RedisContext($this->createRedisMock(), []); $producer = $context->createProducer(); @@ -110,7 +111,7 @@ public function testShouldCreateProducer() public function testShouldThrowIfNotRedisDestinationGivenOnCreateConsumer() { - $context = new RedisContext($this->createRedisMock(), 300); + $context = new RedisContext($this->createRedisMock(), []); $this->expectException(InvalidDestinationException::class); $this->expectExceptionMessage('The destination must be an instance of Enqueue\Redis\RedisDestination but got Enqueue\Null\NullQueue.'); @@ -121,7 +122,10 @@ public function testShouldThrowIfNotRedisDestinationGivenOnCreateConsumer() public function testShouldCreateConsumer() { - $context = new RedisContext($this->createRedisMock(), 300); + $context = new RedisContext($this->createRedisMock(), [ + 'consume_strategy' => RedisConnectionFactory::CONSUME_STRATEGY_BLOCKING, + 'redelivery_delay' => 12345, + ]); $queue = $context->createQueue('aQueue'); @@ -138,7 +142,7 @@ public function testShouldCallRedisDisconnectOnClose() ->method('disconnect') ; - $context = new RedisContext($redisMock, 300); + $context = new RedisContext($redisMock, []); $context->close(); } @@ -151,7 +155,7 @@ public function testThrowIfNotRedisDestinationGivenOnDeleteQueue() ->method('del') ; - $context = new RedisContext($redisMock, 300); + $context = new RedisContext($redisMock, []); $this->expectException(InvalidDestinationException::class); $context->deleteQueue(new NullQueue('aQueue')); @@ -176,7 +180,7 @@ public function testShouldAllowDeleteQueue() ->with('aQueueName:reserved') ; - $context = new RedisContext($redisMock, 300); + $context = new RedisContext($redisMock, []); $queue = $context->createQueue('aQueueName'); @@ -191,7 +195,7 @@ public function testThrowIfNotRedisDestinationGivenOnDeleteTopic() ->method('del') ; - $context = new RedisContext($redisMock, 300); + $context = new RedisContext($redisMock, []); $this->expectException(InvalidDestinationException::class); $context->deleteTopic(new NullTopic('aTopic')); @@ -216,7 +220,7 @@ public function testShouldAllowDeleteTopic() ->with('aTopicName:reserved') ; - $context = new RedisContext($redisMock, 300); + $context = new RedisContext($redisMock, []); $topic = $context->createTopic('aTopicName'); @@ -225,7 +229,10 @@ public function testShouldAllowDeleteTopic() public function testShouldReturnExpectedSubscriptionConsumerInstance() { - $context = new RedisContext($this->createRedisMock(), 300); + $context = new RedisContext($this->createRedisMock(), [ + 'consume_strategy' => RedisConnectionFactory::CONSUME_STRATEGY_BLOCKING, + 'redelivery_delay' => 12345, + ]); $this->assertInstanceOf(RedisSubscriptionConsumer::class, $context->createSubscriptionConsumer()); } diff --git a/pkg/redis/Tests/RedisSubscriptionConsumerTest.php b/pkg/redis/Tests/RedisSubscriptionConsumerTest.php index 12c377500..5faac7561 100644 --- a/pkg/redis/Tests/RedisSubscriptionConsumerTest.php +++ b/pkg/redis/Tests/RedisSubscriptionConsumerTest.php @@ -3,6 +3,7 @@ namespace Enqueue\Redis\Tests; use Enqueue\Redis\RedisConsumer; +use Enqueue\Redis\RedisConsumeStrategy; use Enqueue\Redis\RedisContext; use Enqueue\Redis\RedisSubscriptionConsumer; use Interop\Queue\Consumer; @@ -21,12 +22,18 @@ public function testShouldImplementSubscriptionConsumerInterface() public function testCouldBeConstructedWithRedisContextAsFirstArgument() { - new RedisSubscriptionConsumer($this->createRedisContextMock()); + new RedisSubscriptionConsumer( + $this->createRedisContextMock(), + $this->createMock(RedisConsumeStrategy::class) + ); } public function testShouldAddConsumerAndCallbackToSubscribersPropertyOnSubscribe() { - $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + $subscriptionConsumer = new RedisSubscriptionConsumer( + $this->createRedisContextMock(), + $this->createMock(RedisConsumeStrategy::class) + ); $fooCallback = function () {}; $fooConsumer = $this->createConsumerStub('foo_queue'); @@ -45,7 +52,10 @@ public function testShouldAddConsumerAndCallbackToSubscribersPropertyOnSubscribe public function testThrowsIfTrySubscribeAnotherConsumerToAlreadySubscribedQueue() { - $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + $subscriptionConsumer = new RedisSubscriptionConsumer( + $this->createRedisContextMock(), + $this->createMock(RedisConsumeStrategy::class) + ); $fooCallback = function () {}; $fooConsumer = $this->createConsumerStub('foo_queue'); @@ -62,7 +72,10 @@ public function testThrowsIfTrySubscribeAnotherConsumerToAlreadySubscribedQueue( public function testShouldAllowSubscribeSameConsumerAndCallbackSecondTime() { - $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + $subscriptionConsumer = new RedisSubscriptionConsumer( + $this->createRedisContextMock(), + $this->createMock(RedisConsumeStrategy::class) + ); $fooCallback = function () {}; $fooConsumer = $this->createConsumerStub('foo_queue'); @@ -73,7 +86,10 @@ public function testShouldAllowSubscribeSameConsumerAndCallbackSecondTime() public function testShouldRemoveSubscribedConsumerOnUnsubscribeCall() { - $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + $subscriptionConsumer = new RedisSubscriptionConsumer( + $this->createRedisContextMock(), + $this->createMock(RedisConsumeStrategy::class) + ); $fooConsumer = $this->createConsumerStub('foo_queue'); $barConsumer = $this->createConsumerStub('bar_queue'); @@ -91,7 +107,10 @@ public function testShouldRemoveSubscribedConsumerOnUnsubscribeCall() public function testShouldDoNothingIfTryUnsubscribeNotSubscribedQueueName() { - $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + $subscriptionConsumer = new RedisSubscriptionConsumer( + $this->createRedisContextMock(), + $this->createMock(RedisConsumeStrategy::class) + ); $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); @@ -105,7 +124,10 @@ public function testShouldDoNothingIfTryUnsubscribeNotSubscribedQueueName() public function testShouldDoNothingIfTryUnsubscribeNotSubscribedConsumer() { - $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + $subscriptionConsumer = new RedisSubscriptionConsumer( + $this->createRedisContextMock(), + $this->createMock(RedisConsumeStrategy::class) + ); $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); @@ -119,7 +141,10 @@ public function testShouldDoNothingIfTryUnsubscribeNotSubscribedConsumer() public function testShouldRemoveAllSubscriberOnUnsubscribeAllCall() { - $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + $subscriptionConsumer = new RedisSubscriptionConsumer( + $this->createRedisContextMock(), + $this->createMock(RedisConsumeStrategy::class) + ); $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); $subscriptionConsumer->subscribe($this->createConsumerStub('bar_queue'), function () {}); @@ -134,7 +159,10 @@ public function testShouldRemoveAllSubscriberOnUnsubscribeAllCall() public function testThrowsIfTryConsumeWithoutSubscribers() { - $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + $subscriptionConsumer = new RedisSubscriptionConsumer( + $this->createRedisContextMock(), + $this->createMock(RedisConsumeStrategy::class) + ); $this->expectException(\LogicException::class); $this->expectExceptionMessage('No subscribers'); From 95ecda0eac0ae06dca2341915dade0a426cf8677 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 21 Feb 2019 14:29:37 +0200 Subject: [PATCH 4/7] redid --- pkg/redis/RedisConnectionFactory.php | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/redis/RedisConnectionFactory.php b/pkg/redis/RedisConnectionFactory.php index 0d5c1c62e..ab3866303 100644 --- a/pkg/redis/RedisConnectionFactory.php +++ b/pkg/redis/RedisConnectionFactory.php @@ -43,6 +43,7 @@ class RedisConnectionFactory implements ConnectionFactory * 'ssl' => could be any of http://fi2.php.net/manual/en/context.ssl.php#refsect1-context.ssl-options * 'redelivery_delay' => Default 300 sec. Returns back message into the queue if message was not acknowledged or rejected after this delay. * It could happen if consumer has failed with fatal error or even if message processing is slow and takes more than this time. + * 'consume_strategy' => [blocking|non_blocking] * ]. * * or From 467048c6d6a2e21cb7b614f7a4f3c3d2cd3ef0e3 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 21 Feb 2019 14:39:07 +0200 Subject: [PATCH 5/7] redid --- pkg/redis/LuaScripts.php | 2 ++ pkg/redis/RedisBlockingConsumeStrategy.php | 2 +- pkg/redis/RedisConnectionFactory.php | 2 +- pkg/redis/RedisNonBlockingConsumeStrategy.php | 2 +- pkg/redis/Tests/RedisConsumerTest.php | 4 ++-- pkg/redis/Tests/RedisSubscriptionConsumerTest.php | 2 +- 6 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/redis/LuaScripts.php b/pkg/redis/LuaScripts.php index 76280bbe3..de53051ee 100644 --- a/pkg/redis/LuaScripts.php +++ b/pkg/redis/LuaScripts.php @@ -7,6 +7,8 @@ class LuaScripts { /** + * Lua script to receive message. + * * KEYS[1] - The queue we are reading message * KEYS[2] - The reserved queue we are moving message to * ARGV[1] - Now timestamp diff --git a/pkg/redis/RedisBlockingConsumeStrategy.php b/pkg/redis/RedisBlockingConsumeStrategy.php index 4fe0e3253..dcf67f02f 100644 --- a/pkg/redis/RedisBlockingConsumeStrategy.php +++ b/pkg/redis/RedisBlockingConsumeStrategy.php @@ -63,7 +63,7 @@ public function receiveMessage(array $queues, int $timeout, int $redeliveryDelay public function receiveMessageNoWait(RedisDestination $queue, int $redeliveryDelay): ?RedisMessage { - $this->migrateExpiredMessages($this->context->getRedis(), [$queue]); + $this->migrateExpiredMessages($this->context->getRedis(), [$queue->getName()]); if ($result = $this->context->getRedis()->rpop($queue->getName())) { return $this->processResult($result, $redeliveryDelay); diff --git a/pkg/redis/RedisConnectionFactory.php b/pkg/redis/RedisConnectionFactory.php index ab3866303..9dfdff6e1 100644 --- a/pkg/redis/RedisConnectionFactory.php +++ b/pkg/redis/RedisConnectionFactory.php @@ -94,7 +94,7 @@ public function createContext(): Context }, $this->config); } - return new RedisContext($this->createRedis(), $this->config['redelivery_delay']); + return new RedisContext($this->createRedis(), $this->config); } private function createRedis(): Redis diff --git a/pkg/redis/RedisNonBlockingConsumeStrategy.php b/pkg/redis/RedisNonBlockingConsumeStrategy.php index 7374231c4..8bfd59d0e 100644 --- a/pkg/redis/RedisNonBlockingConsumeStrategy.php +++ b/pkg/redis/RedisNonBlockingConsumeStrategy.php @@ -65,7 +65,7 @@ public function receiveMessage(array $queues, int $timeout, int $redeliveryDelay public function receiveMessageNoWait(RedisDestination $queue, int $redeliveryDelay): ?RedisMessage { - $this->migrateExpiredMessages($this->context->getRedis(), [$queue]); + $this->migrateExpiredMessages($this->context->getRedis(), [$queue->getName()]); if ($result = $this->context->getRedis()->rpop($queue->getName())) { return $this->processResult($result->getMessage(), $queue->getName()); diff --git a/pkg/redis/Tests/RedisConsumerTest.php b/pkg/redis/Tests/RedisConsumerTest.php index 1e657801b..89e72b8a1 100644 --- a/pkg/redis/Tests/RedisConsumerTest.php +++ b/pkg/redis/Tests/RedisConsumerTest.php @@ -136,7 +136,7 @@ public function testShouldReceiveMessage() $result = $consumer->receive(2000); $this->assertNotNull($result); - $this->assertSame($message, $result);; + $this->assertSame($message, $result); } public function testShouldReceiveNoWaitMessage() @@ -157,7 +157,7 @@ public function testShouldReceiveNoWaitMessage() $result = $consumer->receiveNoWait(); $this->assertNotNull($result); - $this->assertSame($message, $result);; + $this->assertSame($message, $result); } /** diff --git a/pkg/redis/Tests/RedisSubscriptionConsumerTest.php b/pkg/redis/Tests/RedisSubscriptionConsumerTest.php index 5faac7561..428abb7c2 100644 --- a/pkg/redis/Tests/RedisSubscriptionConsumerTest.php +++ b/pkg/redis/Tests/RedisSubscriptionConsumerTest.php @@ -178,7 +178,7 @@ private function createRedisContextMock() } /** - * @param null|mixed $queueName + * @param mixed|null $queueName * * @return Consumer|\PHPUnit_Framework_MockObject_MockObject */ From 2851ebd5f21f8260fb4da26cfacd0f80f13fc432 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 22 Feb 2019 11:56:07 +0200 Subject: [PATCH 6/7] redid --- pkg/redis/RedisConnectionFactory.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/redis/RedisConnectionFactory.php b/pkg/redis/RedisConnectionFactory.php index 9dfdff6e1..afb72d3ad 100644 --- a/pkg/redis/RedisConnectionFactory.php +++ b/pkg/redis/RedisConnectionFactory.php @@ -165,7 +165,7 @@ private function defaultConfig(): array 'predis_options' => null, 'ssl' => null, 'redelivery_delay' => 300, - 'consume_strategy' => self::CONSUME_STRATEGY_BLOCKING, + 'consume_strategy' => self::CONSUME_STRATEGY_NON_BLOCKING, ]; } } From 235d3e061a36661c53e7984eabfffaeae046b6a2 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 22 Feb 2019 14:59:21 +0200 Subject: [PATCH 7/7] redid --- .../RedisConnectionFactoryConfigTest.php | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php b/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php index 23eed48e1..828c232d7 100644 --- a/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php +++ b/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php @@ -91,7 +91,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'redelivery_delay' => 300, - 'consume_strategy' => 'blocking', + 'consume_strategy' => 'non_blocking', ], ]; @@ -113,7 +113,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'redelivery_delay' => 300, - 'consume_strategy' => 'blocking', + 'consume_strategy' => 'non_blocking', ], ]; @@ -135,7 +135,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'redelivery_delay' => 300, - 'consume_strategy' => 'blocking', + 'consume_strategy' => 'non_blocking', ], ]; @@ -158,7 +158,7 @@ public static function provideConfigs() 'ssl' => null, 'foo' => 'bar', 'redelivery_delay' => 300, - 'consume_strategy' => 'blocking', + 'consume_strategy' => 'non_blocking', ], ]; @@ -181,7 +181,7 @@ public static function provideConfigs() 'ssl' => null, 'foo' => 'bar', 'redelivery_delay' => 300, - 'consume_strategy' => 'blocking', + 'consume_strategy' => 'non_blocking', ], ]; @@ -204,7 +204,7 @@ public static function provideConfigs() 'ssl' => null, 'foo' => 'bar', 'redelivery_delay' => 300, - 'consume_strategy' => 'blocking', + 'consume_strategy' => 'non_blocking', ], ]; @@ -228,7 +228,7 @@ public static function provideConfigs() 'ssl' => null, 'foo' => 'bar', 'redelivery_delay' => 300, - 'consume_strategy' => 'blocking', + 'consume_strategy' => 'non_blocking', ], ]; @@ -252,7 +252,7 @@ public static function provideConfigs() 'ssl' => null, 'foo' => 'bar', 'redelivery_delay' => 300, - 'consume_strategy' => 'blocking', + 'consume_strategy' => 'non_blocking', ], ]; @@ -276,7 +276,7 @@ public static function provideConfigs() 'ssl' => null, 'foo' => 'bar', 'redelivery_delay' => 300, - 'consume_strategy' => 'blocking', + 'consume_strategy' => 'non_blocking', ], ]; @@ -299,7 +299,7 @@ public static function provideConfigs() 'ssl' => null, 'foo' => 'bar', 'redelivery_delay' => 300, - 'consume_strategy' => 'blocking', + 'consume_strategy' => 'non_blocking', ], ]; @@ -322,7 +322,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'redelivery_delay' => 300, - 'consume_strategy' => 'blocking', + 'consume_strategy' => 'non_blocking', ], ]; @@ -345,7 +345,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'redelivery_delay' => 300, - 'consume_strategy' => 'blocking', + 'consume_strategy' => 'non_blocking', ], ]; @@ -368,7 +368,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'redelivery_delay' => 300, - 'consume_strategy' => 'blocking', + 'consume_strategy' => 'non_blocking', ], ]; @@ -395,7 +395,7 @@ public static function provideConfigs() 'verify_peer' => '1', ], 'redelivery_delay' => 300, - 'consume_strategy' => 'blocking', + 'consume_strategy' => 'non_blocking', ], ]; }