From 0a3b34c36b024797299a65a4d4c670abd1b3d525 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 9 Jun 2017 14:13:59 +0300 Subject: [PATCH 01/10] fix release script. The last released tag logic was not accurate --- bin/release | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bin/release b/bin/release index 888c6d795..995494da6 100755 --- a/bin/release +++ b/bin/release @@ -30,7 +30,8 @@ do cd $TMP_DIR; git clone $REMOTE_URL . --depth=200 git checkout $CURRENT_BRANCH; - LAST_RELEASE=$(git tag -l [0-9].* | tail -n1) + # gsort comes with coreutils packages. brew install coreutils + LAST_RELEASE=$(git tag -l [0-9].* | gsort -V | tail -n1 ) echo "Last release $LAST_RELEASE"; From bb98a46684a59419a53beca972d404211d3fba87 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 9 Jun 2017 15:16:06 +0300 Subject: [PATCH 02/10] [client] Add ability to send events or commands. * Events 1 to many * Commands 1 to 1 with possibility to get result (optionally) --- .../ExtractProcessorTagSubscriptionsTrait.php | 45 +++++++++++++++- .../Client/CommandSubscriberInterface.php | 27 ++++++++++ pkg/enqueue/Client/Config.php | 1 + pkg/enqueue/Client/Producer.php | 31 ++++++++++- pkg/enqueue/Client/ProducerV2Interface.php | 22 ++++++++ pkg/enqueue/Client/RouterProcessor.php | 52 ++++++++++++++++++- 6 files changed, 174 insertions(+), 4 deletions(-) create mode 100644 pkg/enqueue/Client/CommandSubscriberInterface.php create mode 100644 pkg/enqueue/Client/ProducerV2Interface.php diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php index 825c142a5..9fc35e1f1 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php @@ -2,6 +2,8 @@ namespace Enqueue\Bundle\DependencyInjection\Compiler; +use Enqueue\Client\CommandSubscriberInterface; +use Enqueue\Client\Config; use Enqueue\Client\TopicSubscriberInterface; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Exception\ParameterNotFoundException; @@ -43,7 +45,43 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS ]; $data = []; + if (is_subclass_of($processorClass, CommandSubscriberInterface::class)) { + /** @var CommandSubscriberInterface $processorClass */ + $params = $processorClass::getSubscribedCommand(); + if (is_string($params)) { + if (empty($params)) { + throw new \LogicException('The command name must not be empty'); + } + + $data[] = [ + 'topicName' => Config::COMMAND_TOPIC, + 'queueName' => $defaultQueueName, + 'queueNameHardcoded' => false, + 'processorName' => $params, + ]; + } elseif (is_array($params)) { + $params = array_replace($subscriptionPrototype, $params); + + if ($processorName = $resolve($params['processorName'])) { + throw new \LogicException('The processor name (it is also the command name) must not be empty.'); + } + + $data[] = [ + 'topicName' => Config::COMMAND_TOPIC, + 'queueName' => $resolve($params['queueName']) ?: $defaultQueueName, + 'queueNameHardcoded' => $resolve($params['queueNameHardcoded']), + 'processorName' => $processorName, + ]; + } else { + throw new \LogicException(sprintf( + 'Command subscriber configuration is invalid. "%s"', + json_encode($processorClass::getSubscribedCommand()) + )); + } + } + if (is_subclass_of($processorClass, TopicSubscriberInterface::class)) { + /** @var TopicSubscriberInterface $processorClass */ foreach ($processorClass::getSubscribedTopics() as $topicName => $params) { if (is_string($params)) { $data[] = [ @@ -68,7 +106,12 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS )); } } - } else { + } + + if (false == ( + is_subclass_of($processorClass, CommandSubscriberInterface::class) || + is_subclass_of($processorClass, TopicSubscriberInterface::class) + )) { foreach ($tagAttributes as $tagAttribute) { $tagAttribute = array_replace($subscriptionPrototype, $tagAttribute); diff --git a/pkg/enqueue/Client/CommandSubscriberInterface.php b/pkg/enqueue/Client/CommandSubscriberInterface.php new file mode 100644 index 000000000..ee67965d8 --- /dev/null +++ b/pkg/enqueue/Client/CommandSubscriberInterface.php @@ -0,0 +1,27 @@ + 'aCommandName', + * 'queueName' => 'a_client_queue_name', + * 'queueNameHardcoded' => true, + * ] + * + * queueName and queueNameHardcoded are optional. + * + * Note: If you set queueNameHardcoded to true then the queueName is used as is and therefor the driver is not used to create a transport queue name. + * + * @return string|array + */ + public static function getSubscribedCommand(); +} diff --git a/pkg/enqueue/Client/Config.php b/pkg/enqueue/Client/Config.php index 2dd1a0e00..e555b6d0c 100644 --- a/pkg/enqueue/Client/Config.php +++ b/pkg/enqueue/Client/Config.php @@ -8,6 +8,7 @@ class Config const PARAMETER_PROCESSOR_NAME = 'enqueue.processor_name'; const PARAMETER_PROCESSOR_QUEUE_NAME = 'enqueue.processor_queue_name'; const DEFAULT_PROCESSOR_QUEUE_NAME = 'default'; + const COMMAND_TOPIC = '__command__'; /** * @var string diff --git a/pkg/enqueue/Client/Producer.php b/pkg/enqueue/Client/Producer.php index ade90049f..ddc3b1196 100644 --- a/pkg/enqueue/Client/Producer.php +++ b/pkg/enqueue/Client/Producer.php @@ -5,7 +5,7 @@ use Enqueue\Util\JSON; use Enqueue\Util\UUID; -class Producer implements ProducerInterface +class Producer implements ProducerInterface, ProducerV2Interface { /** * @var DriverInterface @@ -18,7 +18,8 @@ class Producer implements ProducerInterface private $extension; /** - * @param DriverInterface $driver + * @param DriverInterface $driver + * @param ExtensionInterface|null $extension */ public function __construct(DriverInterface $driver, ExtensionInterface $extension = null) { @@ -80,6 +81,32 @@ public function send($topic, $message) } } + /** + * {@inheritdoc} + */ + public function sendEvent($topic, $message) + { + $this->send($topic, $message); + } + + /** + * {@inheritdoc} + */ + public function sendCommand($command, $message) + { + if (false == $message instanceof Message) { + $message = new Message($message); + } + + $message->setProperty(Config::PARAMETER_TOPIC_NAME, Config::COMMAND_TOPIC); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $command); + $message->setScope(Message::SCOPE_APP); + + $this->send('__command__', $message); + + // TODO add support of replies + } + /** * @param Message $message */ diff --git a/pkg/enqueue/Client/ProducerV2Interface.php b/pkg/enqueue/Client/ProducerV2Interface.php new file mode 100644 index 000000000..69e0dd1fc --- /dev/null +++ b/pkg/enqueue/Client/ProducerV2Interface.php @@ -0,0 +1,22 @@ +routes[$topicName][] = [$processorName, $queueName]; + if (Config::COMMAND_TOPIC === $topicName) { + $this->commands[$processorName] = $queueName; + } else { + $this->routes[$topicName][] = [$processorName, $queueName]; + } } /** @@ -52,6 +61,22 @@ public function process(PsrMessage $message, PsrContext $context) )); } + if (Config::COMMAND_TOPIC === $topicName) { + return $this->routeCommand($message); + } + + return $this->routeEvent($message); + } + + /** + * @param PsrMessage $message + * + * @return string|Result + */ + private function routeEvent(PsrMessage $message) + { + $topicName = $message->getProperty(Config::PARAMETER_TOPIC_NAME); + if (array_key_exists($topicName, $this->routes)) { foreach ($this->routes[$topicName] as $route) { $processorMessage = clone $message; @@ -64,4 +89,29 @@ public function process(PsrMessage $message, PsrContext $context) return self::ACK; } + + /** + * @param PsrMessage $message + * + * @return string|Result + */ + private function routeCommand(PsrMessage $message) + { + $processorName = $message->getProperty(Config::PARAMETER_PROCESSOR_NAME); + if (false == $processorName) { + return Result::reject(sprintf( + 'Got message without required parameter: "%s"', + Config::PARAMETER_PROCESSOR_NAME + )); + } + + if (isset($this->commands[$processorName])) { + $processorMessage = clone $message; + $processorMessage->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->commands[$processorName]); + + $this->driver->sendToProcessor($this->driver->createClientMessage($processorMessage)); + } + + return self::ACK; + } } From e84770f15ed562eecf16d5ac758b87bb628466fc Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 9 Jun 2017 15:27:03 +0300 Subject: [PATCH 03/10] [client] move stuff to separate class. --- pkg/enqueue/Client/Producer.php | 31 +------------ pkg/enqueue/Client/ProducerV2.php | 53 ++++++++++++++++++++++ pkg/enqueue/Client/ProducerV2Interface.php | 4 ++ 3 files changed, 59 insertions(+), 29 deletions(-) create mode 100644 pkg/enqueue/Client/ProducerV2.php diff --git a/pkg/enqueue/Client/Producer.php b/pkg/enqueue/Client/Producer.php index ddc3b1196..ade90049f 100644 --- a/pkg/enqueue/Client/Producer.php +++ b/pkg/enqueue/Client/Producer.php @@ -5,7 +5,7 @@ use Enqueue\Util\JSON; use Enqueue\Util\UUID; -class Producer implements ProducerInterface, ProducerV2Interface +class Producer implements ProducerInterface { /** * @var DriverInterface @@ -18,8 +18,7 @@ class Producer implements ProducerInterface, ProducerV2Interface private $extension; /** - * @param DriverInterface $driver - * @param ExtensionInterface|null $extension + * @param DriverInterface $driver */ public function __construct(DriverInterface $driver, ExtensionInterface $extension = null) { @@ -81,32 +80,6 @@ public function send($topic, $message) } } - /** - * {@inheritdoc} - */ - public function sendEvent($topic, $message) - { - $this->send($topic, $message); - } - - /** - * {@inheritdoc} - */ - public function sendCommand($command, $message) - { - if (false == $message instanceof Message) { - $message = new Message($message); - } - - $message->setProperty(Config::PARAMETER_TOPIC_NAME, Config::COMMAND_TOPIC); - $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $command); - $message->setScope(Message::SCOPE_APP); - - $this->send('__command__', $message); - - // TODO add support of replies - } - /** * @param Message $message */ diff --git a/pkg/enqueue/Client/ProducerV2.php b/pkg/enqueue/Client/ProducerV2.php new file mode 100644 index 000000000..f4c1ef9ff --- /dev/null +++ b/pkg/enqueue/Client/ProducerV2.php @@ -0,0 +1,53 @@ +realProducer = $realProducer; + $this->rpcClient = $rpcClient; + } + + /** + * {@inheritdoc} + */ + public function sendEvent($topic, $message) + { + $this->realProducer->send($topic, $message); + } + + /** + * {@inheritdoc} + */ + public function sendCommand($command, $message) + { + if (false == $message instanceof Message) { + $message = new Message($message); + } + + $message->setProperty(Config::PARAMETER_TOPIC_NAME, Config::COMMAND_TOPIC); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $command); + $message->setScope(Message::SCOPE_APP); + + if ($message->getReplyTo()) { + return $this->rpcClient->callAsync(Config::COMMAND_TOPIC, $message, 60); + } + $this->realProducer->send(Config::COMMAND_TOPIC, $message); + } +} diff --git a/pkg/enqueue/Client/ProducerV2Interface.php b/pkg/enqueue/Client/ProducerV2Interface.php index 69e0dd1fc..6c46bf557 100644 --- a/pkg/enqueue/Client/ProducerV2Interface.php +++ b/pkg/enqueue/Client/ProducerV2Interface.php @@ -2,6 +2,8 @@ namespace Enqueue\Client; +use Enqueue\Rpc\Promise; + interface ProducerV2Interface { /** @@ -17,6 +19,8 @@ public function sendEvent($topic, $message); * * @param string $command * @param string|array|Message $message + * + * @return Promise|null the promise is returned if message has reply to set */ public function sendCommand($command, $message); } From db17a6d5f5dabe71e1e1103a031eff632cf04611 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 9 Jun 2017 15:43:36 +0300 Subject: [PATCH 04/10] [client] add `@experimental` to class docblock --- pkg/enqueue/Client/ProducerV2.php | 3 +++ pkg/enqueue/Client/ProducerV2Interface.php | 7 +++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/enqueue/Client/ProducerV2.php b/pkg/enqueue/Client/ProducerV2.php index f4c1ef9ff..fd645503c 100644 --- a/pkg/enqueue/Client/ProducerV2.php +++ b/pkg/enqueue/Client/ProducerV2.php @@ -2,6 +2,9 @@ namespace Enqueue\Client; +/** + * @experimental + */ class ProducerV2 implements ProducerV2Interface { /** diff --git a/pkg/enqueue/Client/ProducerV2Interface.php b/pkg/enqueue/Client/ProducerV2Interface.php index 6c46bf557..9024b24f2 100644 --- a/pkg/enqueue/Client/ProducerV2Interface.php +++ b/pkg/enqueue/Client/ProducerV2Interface.php @@ -4,19 +4,18 @@ use Enqueue\Rpc\Promise; +/** + * @experimental + */ interface ProducerV2Interface { /** - * @experimental - * * @param string $topic * @param string|array|Message $message */ public function sendEvent($topic, $message); /** - * @experimental - * * @param string $command * @param string|array|Message $message * From 95cd03b2ed1200b113269f3315e9465d57689a1e Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 9 Jun 2017 15:43:46 +0300 Subject: [PATCH 05/10] [bundle] register producer v2 service. --- pkg/enqueue-bundle/Resources/config/client.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/enqueue-bundle/Resources/config/client.yml b/pkg/enqueue-bundle/Resources/config/client.yml index 83694931e..637b9d98a 100644 --- a/pkg/enqueue-bundle/Resources/config/client.yml +++ b/pkg/enqueue-bundle/Resources/config/client.yml @@ -23,6 +23,12 @@ services: enqueue.producer: alias: 'enqueue.client.producer' + enqueue.client.producer_v2: + class: 'Enqueue\Client\ProducerV2' + arguments: + - '@enqueue.client.producer' + - '@enqueue.client.rpc_client' + enqueue.spool_producer: alias: 'enqueue.client.spool_producer' From 4c960f3ca3ffaa7fa567c09466e81d0f59c75cca Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 9 Jun 2017 15:55:17 +0300 Subject: [PATCH 06/10] [client] add needReply argument --- pkg/enqueue/Client/ProducerV2.php | 5 +++-- pkg/enqueue/Client/ProducerV2Interface.php | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/enqueue/Client/ProducerV2.php b/pkg/enqueue/Client/ProducerV2.php index fd645503c..9c997b77f 100644 --- a/pkg/enqueue/Client/ProducerV2.php +++ b/pkg/enqueue/Client/ProducerV2.php @@ -38,7 +38,7 @@ public function sendEvent($topic, $message) /** * {@inheritdoc} */ - public function sendCommand($command, $message) + public function sendCommand($command, $message, $needReply = false) { if (false == $message instanceof Message) { $message = new Message($message); @@ -48,9 +48,10 @@ public function sendCommand($command, $message) $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $command); $message->setScope(Message::SCOPE_APP); - if ($message->getReplyTo()) { + if ($needReply) { return $this->rpcClient->callAsync(Config::COMMAND_TOPIC, $message, 60); } + $this->realProducer->send(Config::COMMAND_TOPIC, $message); } } diff --git a/pkg/enqueue/Client/ProducerV2Interface.php b/pkg/enqueue/Client/ProducerV2Interface.php index 9024b24f2..5dfc63640 100644 --- a/pkg/enqueue/Client/ProducerV2Interface.php +++ b/pkg/enqueue/Client/ProducerV2Interface.php @@ -19,7 +19,7 @@ public function sendEvent($topic, $message); * @param string $command * @param string|array|Message $message * - * @return Promise|null the promise is returned if message has reply to set + * @return Promise|null the promise is returned if needReply argument is true */ public function sendCommand($command, $message); } From 3e76da02d8a19f403d6f9cc1549a24da0e61e9b7 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 9 Jun 2017 16:35:27 +0300 Subject: [PATCH 07/10] [client] add tests. --- .../App/TestCommandSubscriberProcessor.php | 28 ++++++ .../Tests/Functional/App/config/config.yml | 5 + .../Functional/Client/ProducerV2Test.php | 97 +++++++++++++++++++ .../Tests/Functional/QueuesCommandTest.php | 12 +++ .../Tests/Functional/TopicsCommandTest.php | 14 +++ pkg/enqueue/Client/ProducerV2Interface.php | 3 +- 6 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 pkg/enqueue-bundle/Tests/Functional/App/TestCommandSubscriberProcessor.php create mode 100644 pkg/enqueue-bundle/Tests/Functional/Client/ProducerV2Test.php diff --git a/pkg/enqueue-bundle/Tests/Functional/App/TestCommandSubscriberProcessor.php b/pkg/enqueue-bundle/Tests/Functional/App/TestCommandSubscriberProcessor.php new file mode 100644 index 000000000..5f95b6be4 --- /dev/null +++ b/pkg/enqueue-bundle/Tests/Functional/App/TestCommandSubscriberProcessor.php @@ -0,0 +1,28 @@ +calls[] = $message; + + return Result::reply( + $context->createMessage($message->getBody().'Reply') + ); + } + + public static function getSubscribedCommand() + { + return 'test_command_subscriber'; + } +} diff --git a/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml b/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml index ff11521f1..cd4dfaab5 100644 --- a/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml +++ b/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml @@ -47,6 +47,11 @@ services: tags: - { name: 'kernel.event_listener', async: true, event: 'test_async', method: 'onEvent' } + test_command_subscriber_processor: + class: 'Enqueue\Bundle\Tests\Functional\App\TestCommandSubscriberProcessor' + tags: + - { name: 'enqueue.client.processor' } + test_async_subscriber: class: 'Enqueue\Bundle\Tests\Functional\App\TestAsyncSubscriber' tags: diff --git a/pkg/enqueue-bundle/Tests/Functional/Client/ProducerV2Test.php b/pkg/enqueue-bundle/Tests/Functional/Client/ProducerV2Test.php new file mode 100644 index 000000000..58bf1aeeb --- /dev/null +++ b/pkg/enqueue-bundle/Tests/Functional/Client/ProducerV2Test.php @@ -0,0 +1,97 @@ +container->get('enqueue.client.producer')->clearTraces(); + } + + public function tearDown() + { + parent::tearDown(); + + $this->container->get('enqueue.client.producer')->clearTraces(); + } + + public function testCouldBeGetFromContainerAsService() + { + $producer = $this->container->get('enqueue.client.producer_v2'); + + $this->assertInstanceOf(ProducerV2Interface::class, $producer); + } + + public function testShouldSendEvent() + { + /** @var ProducerV2Interface $producer */ + $producer = $this->container->get('enqueue.client.producer_v2'); + + $producer->sendEvent('theTopic', 'theMessage'); + + $traces = $this->getTraceableProducer()->getTopicTraces('theTopic'); + + $this->assertCount(1, $traces); + $this->assertEquals('theMessage', $traces[0]['body']); + } + + public function testShouldSendCommandWithoutNeedForReply() + { + /** @var ProducerV2Interface $producer */ + $producer = $this->container->get('enqueue.client.producer_v2'); + + $result = $producer->sendCommand('theCommand', 'theMessage', false); + + $this->assertNull($result); + + $traces = $this->getTraceableProducer()->getTopicTraces(Config::COMMAND_TOPIC); + + $this->assertCount(1, $traces); + $this->assertEquals('theMessage', $traces[0]['body']); + $this->assertEquals([ + 'enqueue.topic_name' => Config::COMMAND_TOPIC, + 'enqueue.processor_name' => 'theCommand', + 'enqueue.processor_queue_name' => 'default', + ], $traces[0]['properties']); + } + + public function testShouldSendCommandWithNeedForReply() + { + /** @var ProducerV2Interface $producer */ + $producer = $this->container->get('enqueue.client.producer_v2'); + + $result = $producer->sendCommand('theCommand', 'theMessage', true); + + $this->assertInstanceOf(Promise::class, $result); + + $traces = $this->getTraceableProducer()->getTopicTraces(Config::COMMAND_TOPIC); + + $this->assertCount(1, $traces); + $this->assertEquals('theMessage', $traces[0]['body']); + $this->assertEquals([ + 'enqueue.topic_name' => Config::COMMAND_TOPIC, + 'enqueue.processor_name' => 'theCommand', + 'enqueue.processor_queue_name' => 'default', + ], $traces[0]['properties']); + } + + /** + * @return TraceableProducer|object + */ + private function getTraceableProducer() + { + return $this->container->get('enqueue.client.producer'); + } +} diff --git a/pkg/enqueue-bundle/Tests/Functional/QueuesCommandTest.php b/pkg/enqueue-bundle/Tests/Functional/QueuesCommandTest.php index 642d7058e..cff1db1aa 100644 --- a/pkg/enqueue-bundle/Tests/Functional/QueuesCommandTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/QueuesCommandTest.php @@ -30,4 +30,16 @@ public function testShouldDisplayRegisteredQueues() $this->assertContains('enqueue.app.default', $display); $this->assertContains('enqueue.client.router_processor', $display); } + + public function testShouldDisplayRegisteredCommand() + { + $command = $this->container->get('enqueue.client.meta.queues_command'); + + $tester = new CommandTester($command); + $tester->execute([]); + + $display = $tester->getDisplay(); + + $this->assertContains('test_command_subscriber', $display); + } } diff --git a/pkg/enqueue-bundle/Tests/Functional/TopicsCommandTest.php b/pkg/enqueue-bundle/Tests/Functional/TopicsCommandTest.php index bb47f7119..84526dbe3 100644 --- a/pkg/enqueue-bundle/Tests/Functional/TopicsCommandTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/TopicsCommandTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Bundle\Tests\Functional; +use Enqueue\Client\Config; use Enqueue\Symfony\Client\Meta\TopicsCommand; use Symfony\Component\Console\Tester\CommandTester; @@ -29,4 +30,17 @@ public function testShouldDisplayRegisteredTopics() $this->assertContains('__router__', $display); $this->assertContains('enqueue.client.router_processor', $display); } + + public function testShouldDisplayCommands() + { + $command = $this->container->get('enqueue.client.meta.topics_command'); + + $tester = new CommandTester($command); + $tester->execute([]); + + $display = $tester->getDisplay(); + + $this->assertContains(Config::COMMAND_TOPIC, $display); + $this->assertContains('test_command_subscriber', $display); + } } diff --git a/pkg/enqueue/Client/ProducerV2Interface.php b/pkg/enqueue/Client/ProducerV2Interface.php index 5dfc63640..4013bdbe3 100644 --- a/pkg/enqueue/Client/ProducerV2Interface.php +++ b/pkg/enqueue/Client/ProducerV2Interface.php @@ -18,8 +18,9 @@ public function sendEvent($topic, $message); /** * @param string $command * @param string|array|Message $message + * @param bool $needReply * * @return Promise|null the promise is returned if needReply argument is true */ - public function sendCommand($command, $message); + public function sendCommand($command, $message, $needReply = false); } From 776a5dcf56220f4fc0f438f6b37cb2875cf7fef9 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 9 Jun 2017 17:13:37 +0300 Subject: [PATCH 08/10] add tests --- .../ExtractProcessorTagSubscriptionsTrait.php | 5 +- .../Compiler/BuildClientRoutingPassTest.php | 57 +++++++++- .../BuildProcessorRegistryPassTest.php | 91 ++++++++++++++- .../BuildQueueMetaRegistryPassTest.php | 46 ++++++++ .../BuildTopicMetaSubscribersPassTest.php | 53 ++++++++- .../Compiler/Mock/EmptyCommandSubscriber.php | 13 +++ .../Mock/InvalidCommandSubscriber.php | 13 +++ .../Mock/OnlyCommandNameSubscriber.php | 13 +++ .../Mock/ProcessorNameCommandSubscriber.php | 16 +++ pkg/enqueue/Client/RouterProcessor.php | 25 +++-- .../Tests/Client/RouterProcessorTest.php | 105 +++++++++++++++++- 11 files changed, 413 insertions(+), 24 deletions(-) create mode 100644 pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/Mock/EmptyCommandSubscriber.php create mode 100644 pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/Mock/InvalidCommandSubscriber.php create mode 100644 pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/Mock/OnlyCommandNameSubscriber.php create mode 100644 pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/Mock/ProcessorNameCommandSubscriber.php diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php index 9fc35e1f1..fbb17e86d 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php @@ -50,7 +50,7 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS $params = $processorClass::getSubscribedCommand(); if (is_string($params)) { if (empty($params)) { - throw new \LogicException('The command name must not be empty'); + throw new \LogicException('The processor name (it is also the command name) must not be empty.'); } $data[] = [ @@ -61,8 +61,7 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS ]; } elseif (is_array($params)) { $params = array_replace($subscriptionPrototype, $params); - - if ($processorName = $resolve($params['processorName'])) { + if (false == $processorName = $resolve($params['processorName'])) { throw new \LogicException('The processor name (it is also the command name) must not be empty.'); } diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientRoutingPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientRoutingPassTest.php index 1766460c6..4ed56d36d 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientRoutingPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientRoutingPassTest.php @@ -4,9 +4,12 @@ use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass; use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\InvalidTopicSubscriber; +use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\OnlyCommandNameSubscriber; use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\OnlyTopicNameTopicSubscriber; +use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ProcessorNameCommandSubscriber; use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ProcessorNameTopicSubscriber; use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\QueueNameTopicSubscriber; +use Enqueue\Client\Config; use PHPUnit\Framework\TestCase; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Definition; @@ -213,8 +216,6 @@ public function testShouldBuildRouteFromSubscriberIfQueueNameSpecified() public function testShouldThrowExceptionWhenTopicSubscriberConfigurationIsInvalid() { - $this->setExpectedException(\LogicException::class, 'Topic subscriber configuration is invalid. "[12345]"'); - $container = $this->createContainerBuilder(); $processor = new Definition(InvalidTopicSubscriber::class); @@ -225,8 +226,60 @@ public function testShouldThrowExceptionWhenTopicSubscriberConfigurationIsInvali $router->setArguments(['', '']); $container->setDefinition('enqueue.client.router_processor', $router); + $pass = new BuildClientRoutingPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Topic subscriber configuration is invalid. "[12345]"'); + + $pass->process($container); + } + + public function testShouldBuildRouteFromCommandSubscriberIfOnlyCommandNameSpecified() + { + $container = $this->createContainerBuilder(); + + $processor = new Definition(OnlyCommandNameSubscriber::class); + $processor->addTag('enqueue.client.processor'); + $container->setDefinition('processor-service-id', $processor); + + $router = new Definition(); + $router->setArguments([null, null, null]); + $container->setDefinition('enqueue.client.router_processor', $router); + + $pass = new BuildClientRoutingPass(); + $pass->process($container); + + $expectedRoutes = [ + Config::COMMAND_TOPIC => [ + ['the-command-name', 'aDefaultQueueName'], + ], + ]; + + $this->assertEquals($expectedRoutes, $router->getArgument(1)); + } + + public function testShouldBuildRouteFromCommandSubscriberIfProcessorNameSpecified() + { + $container = $this->createContainerBuilder(); + + $processor = new Definition(ProcessorNameCommandSubscriber::class); + $processor->addTag('enqueue.client.processor'); + $container->setDefinition('processor-service-id', $processor); + + $router = new Definition(); + $router->setArguments([null, null, null]); + $container->setDefinition('enqueue.client.router_processor', $router); + $pass = new BuildClientRoutingPass(); $pass->process($container); + + $expectedRoutes = [ + Config::COMMAND_TOPIC => [ + ['the-command-name', 'the-command-queue-name'], + ], + ]; + + $this->assertEquals($expectedRoutes, $router->getArgument(1)); } /** diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildProcessorRegistryPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildProcessorRegistryPassTest.php index d2927ffcd..0ad3d3483 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildProcessorRegistryPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildProcessorRegistryPassTest.php @@ -3,8 +3,12 @@ namespace Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler; use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass; +use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\EmptyCommandSubscriber; +use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\InvalidCommandSubscriber; use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\InvalidTopicSubscriber; +use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\OnlyCommandNameSubscriber; use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\OnlyTopicNameTopicSubscriber; +use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ProcessorNameCommandSubscriber; use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ProcessorNameTopicSubscriber; use PHPUnit\Framework\TestCase; use Symfony\Component\DependencyInjection\ContainerBuilder; @@ -152,7 +156,8 @@ public function testShouldBuildRouteFromSubscriberIfProcessorNameSpecified() public function testShouldThrowExceptionWhenTopicSubscriberConfigurationIsInvalid() { - $this->setExpectedException(\LogicException::class, 'Topic subscriber configuration is invalid. "[12345]"'); + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Topic subscriber configuration is invalid. "[12345]"'); $container = $this->createContainerBuilder(); @@ -168,6 +173,90 @@ public function testShouldThrowExceptionWhenTopicSubscriberConfigurationIsInvali $pass->process($container); } + public function testShouldBuildRouteFromOnlyNameCommandSubscriber() + { + $container = $this->createContainerBuilder(); + + $processor = new Definition(OnlyCommandNameSubscriber::class); + $processor->addTag('enqueue.client.processor'); + $container->setDefinition('processor-id', $processor); + + $processorRegistry = new Definition(); + $processorRegistry->setArguments([]); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); + + $pass = new BuildProcessorRegistryPass(); + $pass->process($container); + + $expectedValue = [ + 'the-command-name' => 'processor-id', + ]; + + $this->assertEquals($expectedValue, $processorRegistry->getArgument(0)); + } + + public function testShouldBuildRouteFromProcessorNameCommandSubscriber() + { + $container = $this->createContainerBuilder(); + + $processor = new Definition(ProcessorNameCommandSubscriber::class); + $processor->addTag('enqueue.client.processor'); + $container->setDefinition('processor-id', $processor); + + $processorRegistry = new Definition(); + $processorRegistry->setArguments([]); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); + + $pass = new BuildProcessorRegistryPass(); + $pass->process($container); + + $expectedValue = [ + 'the-command-name' => 'processor-id', + ]; + + $this->assertEquals($expectedValue, $processorRegistry->getArgument(0)); + } + + public function testShouldThrowExceptionWhenProcessorNameEmpty() + { + $container = $this->createContainerBuilder(); + + $processor = new Definition(EmptyCommandSubscriber::class); + $processor->addTag('enqueue.client.processor'); + $container->setDefinition('processor-id', $processor); + + $processorRegistry = new Definition(); + $processorRegistry->setArguments([]); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); + + $pass = new BuildProcessorRegistryPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The processor name (it is also the command name) must not be empty.'); + + $pass->process($container); + } + + public function testShouldThrowExceptionWhenCommandSubscriberConfigurationIsInvalid() + { + $container = $this->createContainerBuilder(); + + $processor = new Definition(InvalidCommandSubscriber::class); + $processor->addTag('enqueue.client.processor'); + $container->setDefinition('processor-id', $processor); + + $processorRegistry = new Definition(); + $processorRegistry->setArguments([]); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); + + $pass = new BuildProcessorRegistryPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Command subscriber configuration is invalid. "12345"'); + + $pass->process($container); + } + /** * @return ContainerBuilder */ diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildQueueMetaRegistryPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildQueueMetaRegistryPassTest.php index 00bc4cb02..cd98f8b18 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildQueueMetaRegistryPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildQueueMetaRegistryPassTest.php @@ -3,7 +3,9 @@ namespace Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler; use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass; +use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\OnlyCommandNameSubscriber; use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\OnlyTopicNameTopicSubscriber; +use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ProcessorNameCommandSubscriber; use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ProcessorNameTopicSubscriber; use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\QueueNameTopicSubscriber; use PHPUnit\Framework\TestCase; @@ -192,6 +194,50 @@ public function testShouldBuildQueueFromSubscriberIfQueueNameSpecified() $this->assertEquals($expectedQueues, $registry->getArgument(1)); } + public function testShouldBuildQueueFromCommandSubscriber() + { + $container = $this->createContainerBuilder(); + + $processor = new Definition(ProcessorNameCommandSubscriber::class); + $processor->addTag('enqueue.client.processor'); + $container->setDefinition('processor-service-id', $processor); + + $registry = new Definition(); + $registry->setArguments([null, []]); + $container->setDefinition('enqueue.client.meta.queue_meta_registry', $registry); + + $pass = new BuildQueueMetaRegistryPass(); + $pass->process($container); + + $expectedQueues = [ + 'the-command-queue-name' => ['processors' => ['the-command-name']], + ]; + + $this->assertEquals($expectedQueues, $registry->getArgument(1)); + } + + public function testShouldBuildQueueFromOnlyCommandNameSubscriber() + { + $container = $this->createContainerBuilder(); + + $processor = new Definition(OnlyCommandNameSubscriber::class); + $processor->addTag('enqueue.client.processor'); + $container->setDefinition('processor-service-id', $processor); + + $registry = new Definition(); + $registry->setArguments([null, []]); + $container->setDefinition('enqueue.client.meta.queue_meta_registry', $registry); + + $pass = new BuildQueueMetaRegistryPass(); + $pass->process($container); + + $expectedQueues = [ + 'aDefaultQueueName' => ['processors' => ['the-command-name']], + ]; + + $this->assertEquals($expectedQueues, $registry->getArgument(1)); + } + /** * @return ContainerBuilder */ diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildTopicMetaSubscribersPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildTopicMetaSubscribersPassTest.php index 5236890ec..cb8425abf 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildTopicMetaSubscribersPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildTopicMetaSubscribersPassTest.php @@ -4,8 +4,11 @@ use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass; use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\InvalidTopicSubscriber; +use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\OnlyCommandNameSubscriber; use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\OnlyTopicNameTopicSubscriber; +use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ProcessorNameCommandSubscriber; use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ProcessorNameTopicSubscriber; +use Enqueue\Client\Config; use PHPUnit\Framework\TestCase; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Definition; @@ -285,8 +288,6 @@ public function testShouldBuildMetaFromSubscriberIfProcessorNameSpecified() public function testShouldThrowExceptionWhenTopicSubscriberConfigurationIsInvalid() { - $this->setExpectedException(\LogicException::class, 'Topic subscriber configuration is invalid. "[12345]"'); - $container = $this->createContainerBuilder(); $processor = new Definition(InvalidTopicSubscriber::class); @@ -297,8 +298,56 @@ public function testShouldThrowExceptionWhenTopicSubscriberConfigurationIsInvali $topicMetaRegistry->setArguments([[]]); $container->setDefinition('enqueue.client.meta.topic_meta_registry', $topicMetaRegistry); + $pass = new BuildTopicMetaSubscribersPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Topic subscriber configuration is invalid. "[12345]"'); + + $pass->process($container); + } + + public function testShouldBuildMetaFromCommandSubscriberIfOnlyCommandNameSpecified() + { + $container = $this->createContainerBuilder(); + + $processor = new Definition(OnlyCommandNameSubscriber::class); + $processor->addTag('enqueue.client.processor'); + $container->setDefinition('processor-id', $processor); + + $topicMetaRegistry = new Definition(); + $topicMetaRegistry->setArguments([[]]); + $container->setDefinition('enqueue.client.meta.topic_meta_registry', $topicMetaRegistry); + + $pass = new BuildTopicMetaSubscribersPass(); + $pass->process($container); + + $expectedValue = [ + Config::COMMAND_TOPIC => ['processors' => ['the-command-name']], + ]; + + $this->assertEquals($expectedValue, $topicMetaRegistry->getArgument(0)); + } + + public function testShouldBuildMetaFromCommandSubscriberIfProcessorNameSpecified() + { + $container = $this->createContainerBuilder(); + + $processor = new Definition(ProcessorNameCommandSubscriber::class); + $processor->addTag('enqueue.client.processor'); + $container->setDefinition('processor-id', $processor); + + $topicMetaRegistry = new Definition(); + $topicMetaRegistry->setArguments([[]]); + $container->setDefinition('enqueue.client.meta.topic_meta_registry', $topicMetaRegistry); + $pass = new BuildTopicMetaSubscribersPass(); $pass->process($container); + + $expectedValue = [ + Config::COMMAND_TOPIC => ['processors' => ['the-command-name']], + ]; + + $this->assertEquals($expectedValue, $topicMetaRegistry->getArgument(0)); } /** diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/Mock/EmptyCommandSubscriber.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/Mock/EmptyCommandSubscriber.php new file mode 100644 index 000000000..27fb16138 --- /dev/null +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/Mock/EmptyCommandSubscriber.php @@ -0,0 +1,13 @@ + 'the-command-name', + 'queueName' => 'the-command-queue-name', + ]; + } +} diff --git a/pkg/enqueue/Client/RouterProcessor.php b/pkg/enqueue/Client/RouterProcessor.php index 55ab8b7e2..a08789966 100644 --- a/pkg/enqueue/Client/RouterProcessor.php +++ b/pkg/enqueue/Client/RouterProcessor.php @@ -17,21 +17,24 @@ class RouterProcessor implements PsrProcessor /** * @var array */ - private $routes; + private $eventRoutes; /** * @var array */ - private $commands; + private $commandRoutes; /** * @param DriverInterface $driver - * @param array $routes + * @param array $eventRoutes + * @param array $commandRoutes */ - public function __construct(DriverInterface $driver, array $routes = []) + public function __construct(DriverInterface $driver, array $eventRoutes = [], array $commandRoutes = []) { $this->driver = $driver; - $this->routes = $routes; + + $this->eventRoutes = $eventRoutes; + $this->commandRoutes = $commandRoutes; } /** @@ -42,9 +45,9 @@ public function __construct(DriverInterface $driver, array $routes = []) public function add($topicName, $queueName, $processorName) { if (Config::COMMAND_TOPIC === $topicName) { - $this->commands[$processorName] = $queueName; + $this->commandRoutes[$processorName] = $queueName; } else { - $this->routes[$topicName][] = [$processorName, $queueName]; + $this->eventRoutes[$topicName][] = [$processorName, $queueName]; } } @@ -77,8 +80,8 @@ private function routeEvent(PsrMessage $message) { $topicName = $message->getProperty(Config::PARAMETER_TOPIC_NAME); - if (array_key_exists($topicName, $this->routes)) { - foreach ($this->routes[$topicName] as $route) { + if (array_key_exists($topicName, $this->eventRoutes)) { + foreach ($this->eventRoutes[$topicName] as $route) { $processorMessage = clone $message; $processorMessage->setProperty(Config::PARAMETER_PROCESSOR_NAME, $route[0]); $processorMessage->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $route[1]); @@ -105,9 +108,9 @@ private function routeCommand(PsrMessage $message) )); } - if (isset($this->commands[$processorName])) { + if (isset($this->commandRoutes[$processorName])) { $processorMessage = clone $message; - $processorMessage->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->commands[$processorName]); + $processorMessage->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->commandRoutes[$processorName]); $this->driver->sendToProcessor($this->driver->createClientMessage($processorMessage)); } diff --git a/pkg/enqueue/Tests/Client/RouterProcessorTest.php b/pkg/enqueue/Tests/Client/RouterProcessorTest.php index 18efa075e..b4ac16613 100644 --- a/pkg/enqueue/Tests/Client/RouterProcessorTest.php +++ b/pkg/enqueue/Tests/Client/RouterProcessorTest.php @@ -27,7 +27,7 @@ public function testCouldBeConstructedWithSessionAndRoutes() $router = new RouterProcessor($this->createDriverMock(), $routes); - $this->assertAttributeEquals($routes, 'routes', $router); + $this->assertAttributeEquals($routes, 'eventRoutes', $router); } public function testShouldRejectIfTopicNameParameterIsNotSet() @@ -41,7 +41,7 @@ public function testShouldRejectIfTopicNameParameterIsNotSet() $this->assertEquals('Got message without required parameter: "enqueue.topic_name"', $result->getReason()); } - public function testShouldRouteOriginalMessageToRecipient() + public function testShouldRouteOriginalMessageToEventRecipient() { $message = new NullMessage(); $message->setBody('theBody'); @@ -85,11 +85,92 @@ public function testShouldRouteOriginalMessageToRecipient() ], $routedMessage->getProperties()); } - public function testShouldAddRoute() + public function testShouldRouteOriginalMessageToCommandRecipient() + { + $message = new NullMessage(); + $message->setBody('theBody'); + $message->setHeaders(['aHeader' => 'aHeaderVal']); + $message->setProperties([ + 'aProp' => 'aPropVal', + Config::PARAMETER_TOPIC_NAME => Config::COMMAND_TOPIC, + Config::PARAMETER_PROCESSOR_NAME => 'theCommandName', + ]); + + $clientMessage = new Message(); + + $routedMessage = null; + + $driver = $this->createDriverMock(); + $driver + ->expects($this->once()) + ->method('sendToProcessor') + ->with($this->identicalTo($clientMessage)) + ; + $driver + ->expects($this->once()) + ->method('createClientMessage') + ->willReturnCallback(function (NullMessage $message) use (&$routedMessage, $clientMessage) { + $routedMessage = $message; + + return $clientMessage; + }) + ; + + $routes = [ + 'theCommandName' => 'aQueueName', + ]; + + $router = new RouterProcessor($driver, [], $routes); + + $result = $router->process($message, new NullContext()); + + $this->assertEquals(Result::ACK, $result); + $this->assertEquals([ + 'aProp' => 'aPropVal', + 'enqueue.topic_name' => Config::COMMAND_TOPIC, + 'enqueue.processor_name' => 'theCommandName', + 'enqueue.processor_queue_name' => 'aQueueName', + ], $routedMessage->getProperties()); + } + + public function testShouldRejectCommandMessageIfProcessorNamePropertyMissing() + { + $message = new NullMessage(); + $message->setBody('theBody'); + $message->setHeaders(['aHeader' => 'aHeaderVal']); + $message->setProperties([ + 'aProp' => 'aPropVal', + Config::PARAMETER_TOPIC_NAME => Config::COMMAND_TOPIC, + ]); + + $driver = $this->createDriverMock(); + $driver + ->expects($this->never()) + ->method('sendToProcessor') + ; + $driver + ->expects($this->never()) + ->method('createClientMessage') + ; + + $routes = [ + 'theCommandName' => 'aQueueName', + ]; + + $router = new RouterProcessor($driver, [], $routes); + + $result = $router->process($message, new NullContext()); + + $this->assertInstanceOf(Result::class, $result); + $this->assertEquals(Result::REJECT, $result->getStatus()); + $this->assertEquals('Got message without required parameter: "enqueue.processor_name"', $result->getReason()); + } + + public function testShouldAddEventRoute() { $router = new RouterProcessor($this->createDriverMock(), []); - $this->assertAttributeSame([], 'routes', $router); + $this->assertAttributeSame([], 'eventRoutes', $router); $router->add('theTopicName', 'theQueueName', 'aProcessorName'); @@ -97,7 +178,21 @@ public function testShouldAddRoute() 'theTopicName' => [ ['aProcessorName', 'theQueueName'], ], - ], 'routes', $router); + ], 'eventRoutes', $router); + + $this->assertAttributeSame([], 'commandRoutes', $router); + } + + public function testShouldAddCommandRoute() + { + $router = new RouterProcessor($this->createDriverMock(), []); + + $this->assertAttributeSame([], 'eventRoutes', $router); + + $router->add(Config::COMMAND_TOPIC, 'theQueueName', 'aProcessorName'); + + $this->assertAttributeSame(['aProcessorName' => 'theQueueName'], 'commandRoutes', $router); + $this->assertAttributeSame([], 'eventRoutes', $router); } /** From 779121158e38e64696f515ebf4b955d3d67f2b45 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 9 Jun 2017 20:59:56 +0300 Subject: [PATCH 09/10] [stomp] fix fragile test --- pkg/stomp/Tests/Functional/StompCommonUseCasesTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stomp/Tests/Functional/StompCommonUseCasesTest.php b/pkg/stomp/Tests/Functional/StompCommonUseCasesTest.php index eae85f4a8..caeaae542 100644 --- a/pkg/stomp/Tests/Functional/StompCommonUseCasesTest.php +++ b/pkg/stomp/Tests/Functional/StompCommonUseCasesTest.php @@ -48,7 +48,7 @@ public function testWaitsForTwoSecondsAndReturnNullOnReceive() $this->assertNull($message); $this->assertGreaterThan(1.5, $endAt - $startAt); - $this->assertLessThan(2.5, $endAt - $startAt); + $this->assertLessThan(3, $endAt - $startAt); } public function testReturnNullImmediatelyOnReceiveNoWait() From 8f2ceaa87dbbb60c72d30ec84c447f13916c63d1 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 9 Jun 2017 21:19:26 +0300 Subject: [PATCH 10/10] Release 0.4.14 --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4380684f5..4c111418c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Change Log +## [0.4.14](https://github.com/php-enqueue/enqueue-dev/tree/0.4.14) (2017-06-09) +[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.4.13...0.4.14) + +- \[RFC\]\[client\] Add ability to send events or commands. [\#113](https://github.com/php-enqueue/enqueue-dev/pull/113) ([makasim](https://github.com/makasim)) + ## [0.4.13](https://github.com/php-enqueue/enqueue-dev/tree/0.4.13) (2017-06-09) [Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.4.12...0.4.13)