diff --git a/CHANGELOG.md b/CHANGELOG.md index 69c1a288e..be4feca42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Change Log +## [0.3.8](https://github.com/php-enqueue/enqueue-dev/tree/0.3.8) (2017-05-10) +[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.3.7...0.3.8) + +- Add support for production extensions [\#70](https://github.com/php-enqueue/enqueue-dev/issues/70) + +- Multi Transport Simple Client [\#75](https://github.com/php-enqueue/enqueue-dev/pull/75) ([ASKozienko](https://github.com/ASKozienko)) +- Client Extensions [\#72](https://github.com/php-enqueue/enqueue-dev/pull/72) ([ASKozienko](https://github.com/ASKozienko)) + ## [0.3.7](https://github.com/php-enqueue/enqueue-dev/tree/0.3.7) (2017-05-04) [Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.3.6...0.3.7) diff --git a/bin/release b/bin/release index 511e3053b..87613ea64 100755 --- a/bin/release +++ b/bin/release @@ -13,7 +13,7 @@ fi CURRENT_BRANCH=`git rev-parse --abbrev-ref HEAD` -for REMOTE in origin psr-queue stomp amqp-ext sqs fs redis dbal null enqueue enqueue-bundle job-queue test +for REMOTE in origin psr-queue stomp amqp-ext sqs fs redis dbal null enqueue simple-client enqueue-bundle job-queue test do TMP_DIR="/tmp/enqueue-repo" REMOTE_URL=`git remote get-url $REMOTE` diff --git a/bin/subtree-split b/bin/subtree-split index 53e46cce1..6dbdc0193 100755 --- a/bin/subtree-split +++ b/bin/subtree-split @@ -45,6 +45,7 @@ function remote() remote psr-queue git@github.com:php-enqueue/psr-queue.git remote enqueue git@github.com:php-enqueue/enqueue.git +remote simple-client git@github.com:php-enqueue/simple-client.git remote stomp git@github.com:php-enqueue/stomp.git remote amqp-ext git@github.com:php-enqueue/amqp-ext.git remote fs git@github.com:php-enqueue/fs.git @@ -58,6 +59,7 @@ remote test git@github.com:php-enqueue/test.git split 'pkg/psr-queue' psr-queue split 'pkg/enqueue' enqueue +split 'pkg/simple-client' simple-client split 'pkg/stomp' stomp split 'pkg/amqp-ext' amqp-ext split 'pkg/fs' fs diff --git a/composer.json b/composer.json index 2afbec114..5730bb920 100644 --- a/composer.json +++ b/composer.json @@ -15,6 +15,7 @@ "enqueue/sqs": "*@dev", "enqueue/enqueue-bundle": "*@dev", "enqueue/job-queue": "*@dev", + "enqueue/simple-client": "*@dev", "enqueue/test": "*@dev", "phpunit/phpunit": "^5", @@ -77,6 +78,10 @@ { "type": "path", "url": "pkg/sqs" + }, + { + "type": "path", + "url": "pkg/simple-client" } ] } diff --git a/docs/client/quick_tour.md b/docs/client/quick_tour.md new file mode 100644 index 000000000..107f2bbaa --- /dev/null +++ b/docs/client/quick_tour.md @@ -0,0 +1,123 @@ +# Simple client. Quick tour. + +The simple client library takes Enqueue client classes and Symfony components and makes an easy to use client facade. +It reduces the boiler plate code you have to write to start using the Enqueue client features. + +* [Install](#install) +* [Configure](#configure) +* [Producer message](#produce-message) +* [Consume messages](#consume-messages) + +## Install + +```bash +$ composer require enqueue/simple-client enqueue/amqp-ext +``` + +## Configure + +```php + [ + 'default' => 'amqp', + 'amqp' => [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'login' => 'guest', + 'password' => 'guest', + ], + ], + 'client' => [ + 'app_name' => 'plain_php', + ], +]); +``` + +## Produce message + +```php +send('a_bar_topic', 'aMessageData'); + +// or an array + +$client->send('a_bar_topic', ['foo', 'bar']); + +// or an json serializable object +$client->send('a_bar_topic', new class() implements \JsonSerializable { + public function jsonSerialize() { + return ['foo', 'bar']; + } +}); +``` + +## Consume messages + +```php +bind('a_bar_topic', 'a_processor_name', function(PsrMessage $psrMessage) { + // processing logic here +}); + +$client->consume(); +``` + +## Cli commands + +```php +#!/usr/bin/env php +add(new SetupBrokerCommand($client->getDriver())); +$application->add(new ProduceMessageCommand($client->getProducer())); +$application->add(new QueuesCommand($client->getQueueMetaRegistry())); +$application->add(new TopicsCommand($client->getTopicMetaRegistry())); +$application->add(new ConsumeMessagesCommand( + $client->getQueueConsumer(), + $client->getDelegateProcessor(), + $client->getQueueMetaRegistry(), + $client->getDriver() +)); + +$application->run(); +``` + +and run to see what is there: + +```bash +$ php bin/enqueue.php +``` + +or consume messages + +```bash +$ php bin/enqueue.php enqueue:consume -vvv --setup-broker +``` + +[back to index](../index.md) diff --git a/docs/client/rpc_call.md b/docs/client/rpc_call.md index 460df5943..cccd4f602 100644 --- a/docs/client/rpc_call.md +++ b/docs/client/rpc_call.md @@ -1,5 +1,7 @@ # Client. RPC call +The client's [quick tour](quick_tour.md) describes how to get the client object. +We use you followed instructions there and have instance of `Enqueue\SimpleClient\SimpleClient` in `$client` var. ## The client side @@ -8,13 +10,10 @@ It allows you to easily send a message and wait for a reply. ```php getProducer(), $context); $replyMessage = $rpcClient->call('greeting_topic', 'Hi Thomas!', 5); @@ -24,13 +23,10 @@ You can perform several requests asynchronously with `callAsync` and request rep ```php getProducer(), $context); $promises = []; @@ -53,7 +49,6 @@ Of course it is possible to implement rpc server side based on transport classes ```php context); +/** @var \Enqueue\SimpleClient\SimpleClient $client */ + $client->bind('greeting_topic', 'greeting_processor', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) { echo $message->getBody(); diff --git a/docs/index.md b/docs/index.md index 03495711f..d8c9258cc 100644 --- a/docs/index.md +++ b/docs/index.md @@ -13,6 +13,7 @@ - [Extensions](consumption/extensions.md) - [Message processor](consumption/message_processor.md) * Client + - [Quick tour](client/quick_tour.md) - [Message examples](client/message_examples.md) - [Supported brokers](client/supported_brokers.md) - [Message bus](client/message_bus.md) @@ -38,5 +39,5 @@ * [Getting Started with RabbitMQ in PHP](https://blog.forma-pro.com/getting-started-with-rabbitmq-in-php-84d331e20a66) * [Getting Started with RabbitMQ in Symfony](https://blog.forma-pro.com/getting-started-with-rabbitmq-in-symfony-cb06e0b674f1) * [RabbitMQ redelivery pitfalls](https://blog.forma-pro.com/rabbitmq-redelivery-pitfalls-440e0347f4e0) -* [LiipImagineBundle. Resolve cache and apply filter in backgroung job.](https://github.com/php-enqueue/enqueue-sandbox/pull/3) +* [LiipImagineBundle. Process images in background](https://blog.forma-pro.com/liipimaginebundle-process-images-in-background-3838c0ed5234) * [FOSElasticaBundle. Improve performance of fos:elastica:populate command](https://github.com/php-enqueue/enqueue-elastica-bundle) diff --git a/docs/quick_tour.md b/docs/quick_tour.md index 75f8ba00e..d7749b73a 100644 --- a/docs/quick_tour.md +++ b/docs/quick_tour.md @@ -167,20 +167,30 @@ Here's an example of how you can send and consume messages. ```php bind('foo_topic', 'processor_name', function (PsrMessage $message) { - // process message - - return PsrProcessor::ACK; +$client = new SimpleClient([ + 'transport' => [ + 'default' => 'amqp', + 'amqp' => [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'login' => 'guest', + 'password' => 'guest', + ], + ], + 'client' => true, +]); + +$client->setupBroker(); + +$client->bind('a_foo_topic', 'fooProcessor', function(PsrMessage $message) { + // your processing logic here }); -$client->send('foo_topic', 'Hello there!'); +$client->send('a_bar_topic', 'aMessageData'); // in another process you can consume messages. $client->consume(); diff --git a/phpunit.xml.dist b/phpunit.xml.dist index f2faadb8e..429ead9d2 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -56,6 +56,10 @@ pkg/job-queue/Tests + + + pkg/simple-client/Tests + diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientExtensionsPass.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientExtensionsPass.php new file mode 100644 index 000000000..5f83e83e2 --- /dev/null +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientExtensionsPass.php @@ -0,0 +1,40 @@ +hasDefinition('enqueue.client.extensions')) { + return; + } + + $tags = $container->findTaggedServiceIds('enqueue.client.extension'); + + $groupByPriority = []; + foreach ($tags as $serviceId => $tagAttributes) { + foreach ($tagAttributes as $tagAttribute) { + $priority = isset($tagAttribute['priority']) ? (int) $tagAttribute['priority'] : 0; + + $groupByPriority[$priority][] = new Reference($serviceId); + } + } + + krsort($groupByPriority, SORT_NUMERIC); + + $flatExtensions = []; + foreach ($groupByPriority as $extension) { + $flatExtensions = array_merge($flatExtensions, $extension); + } + + $container->getDefinition('enqueue.client.extensions')->replaceArgument(0, $flatExtensions); + } +} diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildConsumptionExtensionsPass.php similarity index 94% rename from pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php rename to pkg/enqueue-bundle/DependencyInjection/Compiler/BuildConsumptionExtensionsPass.php index 31dcee799..20f2a3817 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildConsumptionExtensionsPass.php @@ -6,7 +6,7 @@ use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Reference; -class BuildExtensionsPass implements CompilerPassInterface +class BuildConsumptionExtensionsPass implements CompilerPassInterface { /** * {@inheritdoc} diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index 4c8e5806d..5b4039104 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -5,8 +5,9 @@ use Enqueue\AmqpExt\AmqpContext; use Enqueue\AmqpExt\Symfony\AmqpTransportFactory; use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass; -use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass; @@ -34,11 +35,12 @@ class EnqueueBundle extends Bundle */ public function build(ContainerBuilder $container) { - $container->addCompilerPass(new BuildExtensionsPass()); + $container->addCompilerPass(new BuildConsumptionExtensionsPass()); $container->addCompilerPass(new BuildClientRoutingPass()); $container->addCompilerPass(new BuildProcessorRegistryPass()); $container->addCompilerPass(new BuildTopicMetaSubscribersPass()); $container->addCompilerPass(new BuildQueueMetaRegistryPass()); + $container->addCompilerPass(new BuildClientExtensionsPass()); /** @var EnqueueExtension $extension */ $extension = $container->getExtension('enqueue'); diff --git a/pkg/enqueue-bundle/Resources/config/client.yml b/pkg/enqueue-bundle/Resources/config/client.yml index 0c1dab387..c83db9055 100644 --- a/pkg/enqueue-bundle/Resources/config/client.yml +++ b/pkg/enqueue-bundle/Resources/config/client.yml @@ -5,7 +5,15 @@ services: enqueue.client.producer: class: 'Enqueue\Client\Producer' - arguments: ['@enqueue.client.driver'] + arguments: + - '@enqueue.client.driver' + - '@enqueue.client.extensions' + + enqueue.client.extensions: + class: 'Enqueue\Client\ChainExtension' + public: false + arguments: + - [] enqueue.producer: alias: 'enqueue.client.producer' diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientExtensionsPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientExtensionsPassTest.php new file mode 100644 index 000000000..5b98ecda6 --- /dev/null +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientExtensionsPassTest.php @@ -0,0 +1,129 @@ +assertClassImplements(CompilerPassInterface::class, BuildClientExtensionsPass::class); + } + + public function testCouldBeConstructedWithoutAnyArguments() + { + new BuildClientExtensionsPass(); + } + + public function testShouldReplaceFirstArgumentOfExtensionsServiceConstructorWithTaggsExtensions() + { + $container = new ContainerBuilder(); + + $extensions = new Definition(); + $extensions->addArgument([]); + $container->setDefinition('enqueue.client.extensions', $extensions); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension'); + $container->setDefinition('foo_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension'); + $container->setDefinition('bar_extension', $extension); + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + + $this->assertEquals( + [new Reference('foo_extension'), new Reference('bar_extension')], + $extensions->getArgument(0) + ); + } + + public function testShouldOrderExtensionsByPriority() + { + $container = new ContainerBuilder(); + + $extensions = new Definition(); + $extensions->addArgument([]); + $container->setDefinition('enqueue.client.extensions', $extensions); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => 6]); + $container->setDefinition('foo_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => -5]); + $container->setDefinition('bar_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => 2]); + $container->setDefinition('baz_extension', $extension); + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + + $orderedExtensions = $extensions->getArgument(0); + + $this->assertEquals(new Reference('foo_extension'), $orderedExtensions[0]); + $this->assertEquals(new Reference('baz_extension'), $orderedExtensions[1]); + $this->assertEquals(new Reference('bar_extension'), $orderedExtensions[2]); + } + + public function testShouldAssumePriorityZeroIfPriorityIsNotSet() + { + $container = new ContainerBuilder(); + + $extensions = new Definition(); + $extensions->addArgument([]); + $container->setDefinition('enqueue.client.extensions', $extensions); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension'); + $container->setDefinition('foo_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => 1]); + $container->setDefinition('bar_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => -1]); + $container->setDefinition('baz_extension', $extension); + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + + $orderedExtensions = $extensions->getArgument(0); + + $this->assertEquals(new Reference('bar_extension'), $orderedExtensions[0]); + $this->assertEquals(new Reference('foo_extension'), $orderedExtensions[1]); + $this->assertEquals(new Reference('baz_extension'), $orderedExtensions[2]); + } + + public function testShouldDoesNothingIfClientExtensionServiceIsNotDefined() + { + $container = $this->createMock(ContainerBuilder::class); + $container + ->expects($this->once()) + ->method('hasDefinition') + ->with('enqueue.client.extensions') + ->willReturn(false) + ; + $container + ->expects($this->never()) + ->method('findTaggedServiceIds') + ; + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + } +} diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildConsumptionExtensionsPassTest.php similarity index 90% rename from pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php rename to pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildConsumptionExtensionsPassTest.php index 8f02a365b..048e0c467 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildConsumptionExtensionsPassTest.php @@ -2,7 +2,7 @@ namespace Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler; -use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass; use Enqueue\Test\ClassExtensionTrait; use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface; use Symfony\Component\DependencyInjection\ContainerBuilder; @@ -10,18 +10,18 @@ use Symfony\Component\DependencyInjection\Reference; use PHPUnit\Framework\TestCase; -class BuildExtensionsPassTest extends TestCase +class BuildConsumptionExtensionsPassTest extends TestCase { use ClassExtensionTrait; public function testShouldImplementCompilerPass() { - $this->assertClassImplements(CompilerPassInterface::class, BuildExtensionsPass::class); + $this->assertClassImplements(CompilerPassInterface::class, BuildConsumptionExtensionsPass::class); } public function testCouldBeConstructedWithoutAnyArguments() { - new BuildExtensionsPass(); + new BuildConsumptionExtensionsPass(); } public function testShouldReplaceFirstArgumentOfExtensionsServiceConstructorWithTaggsExtensions() @@ -40,7 +40,7 @@ public function testShouldReplaceFirstArgumentOfExtensionsServiceConstructorWith $extension->addTag('enqueue.consumption.extension'); $container->setDefinition('bar_extension', $extension); - $pass = new BuildExtensionsPass(); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $this->assertEquals( @@ -69,7 +69,7 @@ public function testShouldOrderExtensionsByPriority() $extension->addTag('enqueue.consumption.extension', ['priority' => 2]); $container->setDefinition('baz_extension', $extension); - $pass = new BuildExtensionsPass(); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $orderedExtensions = $extensions->getArgument(0); @@ -99,7 +99,7 @@ public function testShouldAssumePriorityZeroIfPriorityIsNotSet() $extension->addTag('enqueue.consumption.extension', ['priority' => -1]); $container->setDefinition('baz_extension', $extension); - $pass = new BuildExtensionsPass(); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $orderedExtensions = $extensions->getArgument(0); diff --git a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php index ee10168f6..e5bc2f0a0 100644 --- a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php @@ -4,8 +4,9 @@ use Enqueue\AmqpExt\Symfony\AmqpTransportFactory; use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass; -use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass; @@ -46,7 +47,7 @@ public function testShouldRegisterExpectedCompilerPasses() $container ->expects($this->at(0)) ->method('addCompilerPass') - ->with($this->isInstanceOf(BuildExtensionsPass::class)) + ->with($this->isInstanceOf(BuildConsumptionExtensionsPass::class)) ; $container ->expects($this->at(1)) @@ -70,6 +71,11 @@ public function testShouldRegisterExpectedCompilerPasses() ; $container ->expects($this->at(5)) + ->method('addCompilerPass') + ->with($this->isInstanceOf(BuildClientExtensionsPass::class)) + ; + $container + ->expects($this->at(6)) ->method('getExtension') ->willReturn($extensionMock) ; diff --git a/pkg/enqueue/Client/ChainExtension.php b/pkg/enqueue/Client/ChainExtension.php new file mode 100644 index 000000000..c202e98e0 --- /dev/null +++ b/pkg/enqueue/Client/ChainExtension.php @@ -0,0 +1,39 @@ +extensions = $extensions; + } + + /** + * {@inheritdoc} + */ + public function onPreSend($topic, Message $message) + { + foreach ($this->extensions as $extension) { + $extension->onPreSend($topic, $message); + } + } + + /** + * {@inheritdoc} + */ + public function onPostSend($topic, Message $message) + { + foreach ($this->extensions as $extension) { + $extension->onPostSend($topic, $message); + } + } +} diff --git a/pkg/enqueue/Client/ExtensionInterface.php b/pkg/enqueue/Client/ExtensionInterface.php new file mode 100644 index 000000000..3b0a028e8 --- /dev/null +++ b/pkg/enqueue/Client/ExtensionInterface.php @@ -0,0 +1,19 @@ +driver = $driver; + $this->extension = $extension ?: new ChainExtension([]); } /** @@ -55,7 +61,9 @@ public function send($topic, $message) throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_NAME)); } + $this->extension->onPreSend($topic, $message); $this->driver->sendToRouter($message); + $this->extension->onPostSend($topic, $message); } elseif (Message::SCOPE_APP == $message->getScope()) { if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $this->driver->getConfig()->getRouterProcessorName()); @@ -64,7 +72,9 @@ public function send($topic, $message) $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->driver->getConfig()->getRouterQueueName()); } + $this->extension->onPreSend($topic, $message); $this->driver->sendToProcessor($message); + $this->extension->onPostSend($topic, $message); } else { throw new \LogicException(sprintf('The message scope "%s" is not supported.', $message->getScope())); } diff --git a/pkg/enqueue/Client/SimpleClient.php b/pkg/enqueue/Client/SimpleClient.php deleted file mode 100644 index 27f295cdb..000000000 --- a/pkg/enqueue/Client/SimpleClient.php +++ /dev/null @@ -1,174 +0,0 @@ -context = $context; - $this->config = $config ?: Config::create(); - - $this->queueMetaRegistry = new QueueMetaRegistry($this->config, []); - $this->queueMetaRegistry->add($this->config->getDefaultProcessorQueueName()); - $this->queueMetaRegistry->add($this->config->getRouterQueueName()); - - $this->topicsMetaRegistry = new TopicMetaRegistry([]); - $this->processorsRegistry = new ArrayProcessorRegistry(); - - $this->driver = new AmqpDriver($context, $this->config, $this->queueMetaRegistry); - $this->routerProcessor = new RouterProcessor($this->driver, []); - - $this->processorsRegistry->add($this->config->getRouterProcessorName(), $this->routerProcessor); - $this->queueMetaRegistry->addProcessor($this->config->getRouterQueueName(), $this->routerProcessor); - } - - /** - * @param string $topic - * @param string $processorName - * @param callback $processor - */ - public function bind($topic, $processorName, callable $processor) - { - $queueName = $this->config->getDefaultProcessorQueueName(); - - $this->topicsMetaRegistry->addProcessor($topic, $processorName); - $this->queueMetaRegistry->addProcessor($queueName, $processorName); - $this->processorsRegistry->add($processorName, new CallbackProcessor($processor)); - - $this->routerProcessor->add($topic, $queueName, $processorName); - } - - public function send($topic, $message) - { - $this->getProducer()->send($topic, $message); - } - - public function consume(ExtensionInterface $runtimeExtension = null) - { - $this->driver->setupBroker(); - - $processor = $this->getProcessor(); - - $queueConsumer = $this->getQueueConsumer(); - - $defaultQueueName = $this->config->getDefaultProcessorQueueName(); - $defaultTransportQueueName = $this->config->createTransportQueueName($defaultQueueName); - - $queueConsumer->bind($defaultTransportQueueName, $processor); - if ($this->config->getRouterQueueName() != $defaultQueueName) { - $routerTransportQueueName = $this->config->createTransportQueueName($this->config->getRouterQueueName()); - - $queueConsumer->bind($routerTransportQueueName, $processor); - } - - $queueConsumer->consume($runtimeExtension); - } - - /** - * @return AmqpContext - */ - public function getContext() - { - return $this->context; - } - - /** - * @return QueueConsumer - */ - public function getQueueConsumer() - { - return new QueueConsumer($this->context, new ChainExtension([ - new SetRouterPropertiesExtension($this->driver), - ])); - } - - /** - * @return DriverInterface - */ - public function getDriver() - { - return $this->driver; - } - - /** - * @return TopicMetaRegistry - */ - public function getTopicMetaRegistry() - { - return $this->topicsMetaRegistry; - } - - /** - * @return QueueMetaRegistry - */ - public function getQueueMetaRegistry() - { - return $this->queueMetaRegistry; - } - - /** - * @return ProducerInterface - */ - public function getProducer() - { - $this->driver->setupBroker(); - - return new Producer($this->driver); - } - - /** - * @return DelegateProcessor - */ - public function getProcessor() - { - return new DelegateProcessor($this->processorsRegistry); - } -} diff --git a/pkg/enqueue/Tests/Client/ChainExtensionTest.php b/pkg/enqueue/Tests/Client/ChainExtensionTest.php new file mode 100644 index 000000000..3b1d82f9a --- /dev/null +++ b/pkg/enqueue/Tests/Client/ChainExtensionTest.php @@ -0,0 +1,76 @@ +assertClassImplements(ExtensionInterface::class, ChainExtension::class); + } + + public function testCouldBeConstructedWithExtensionsArray() + { + new ChainExtension([$this->createExtension(), $this->createExtension()]); + } + + public function testShouldProxyOnPreSendToAllInternalExtensions() + { + $message = new Message(); + + $fooExtension = $this->createExtension(); + $fooExtension + ->expects($this->once()) + ->method('onPreSend') + ->with('topic', $this->identicalTo($message)) + ; + $barExtension = $this->createExtension(); + $barExtension + ->expects($this->once()) + ->method('onPreSend') + ->with('topic', $this->identicalTo($message)) + ; + + $extensions = new ChainExtension([$fooExtension, $barExtension]); + + $extensions->onPreSend('topic', $message); + } + + public function testShouldProxyOnPostSendToAllInternalExtensions() + { + $message = new Message(); + + $fooExtension = $this->createExtension(); + $fooExtension + ->expects($this->once()) + ->method('onPostSend') + ->with('topic', $this->identicalTo($message)) + ; + $barExtension = $this->createExtension(); + $barExtension + ->expects($this->once()) + ->method('onPostSend') + ->with('topic', $this->identicalTo($message)) + ; + + $extensions = new ChainExtension([$fooExtension, $barExtension]); + + $extensions->onPostSend('topic', $message); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|ExtensionInterface + */ + protected function createExtension() + { + return $this->createMock(ExtensionInterface::class); + } +} diff --git a/pkg/enqueue/Tests/Client/ProducerTest.php b/pkg/enqueue/Tests/Client/ProducerTest.php index 9741810cc..79c7a0cb2 100644 --- a/pkg/enqueue/Tests/Client/ProducerTest.php +++ b/pkg/enqueue/Tests/Client/ProducerTest.php @@ -4,6 +4,7 @@ use Enqueue\Client\Config; use Enqueue\Client\DriverInterface; +use Enqueue\Client\ExtensionInterface; use Enqueue\Client\Message; use Enqueue\Client\MessagePriority; use Enqueue\Client\Producer; @@ -541,6 +542,62 @@ public function testThrowIfUnSupportedScopeGivenOnSend() $producer->send('topic', $message); } + public function testShouldCallPreSendPostSendExtensionMethodsWhenSendToRouter() + { + $message = new Message(); + $message->setBody('aBody'); + $message->setScope(Message::SCOPE_MESSAGE_BUS); + + $extension = $this->createMock(ExtensionInterface::class); + $extension + ->expects($this->at(0)) + ->method('onPreSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + $extension + ->expects($this->at(1)) + ->method('onPostSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + + $driver = $this->createDriverStub(); + $driver + ->expects($this->once()) + ->method('sendToRouter') + ; + + $producer = new Producer($driver, $extension); + $producer->send('topic', $message); + } + + public function testShouldCallPreSendPostSendExtensionMethodsWhenSendToProcessor() + { + $message = new Message(); + $message->setBody('aBody'); + $message->setScope(Message::SCOPE_APP); + + $extension = $this->createMock(ExtensionInterface::class); + $extension + ->expects($this->at(0)) + ->method('onPreSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + $extension + ->expects($this->at(1)) + ->method('onPostSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + + $driver = $this->createDriverStub(); + $driver + ->expects($this->once()) + ->method('sendToProcessor') + ; + + $producer = new Producer($driver, $extension); + $producer->send('topic', $message); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|DriverInterface */ diff --git a/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php b/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php index bf7d69e13..49f6fa20a 100644 --- a/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php +++ b/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php @@ -4,7 +4,7 @@ use Enqueue\AmqpExt\AmqpContext; use Enqueue\Client\RpcClient; -use Enqueue\Client\SimpleClient; +use Enqueue\SimpleClient\SimpleClient; use Enqueue\Consumption\ChainExtension; use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension; use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension; @@ -39,14 +39,27 @@ public function setUp() $this->context = $this->buildAmqpContext(); $this->replyContext = $this->buildAmqpContext(); - $this->removeQueue('default'); + $this->removeQueue('enqueue.app.default'); } public function testProduceAndConsumeOneMessage() { + $config = [ + 'transport' => [ + 'rabbitmq_amqp' => [ + 'host' => getenv('SYMFONY__RABBITMQ__HOST'), + 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), + 'login' => getenv('SYMFONY__RABBITMQ__USER'), + 'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'), + 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), + ], + ], + ]; + $requestMessage = null; - $client = new SimpleClient($this->context); + $client = new SimpleClient($config); + $client->setupBroker(); $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) { $requestMessage = $message; diff --git a/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php b/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php deleted file mode 100644 index f96ba9571..000000000 --- a/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php +++ /dev/null @@ -1,83 +0,0 @@ -context = $this->buildAmqpContext(); - - $this->removeQueue('default'); - } - - public function testProduceAndConsumeOneMessage() - { - $actualMessage = null; - - $client = new SimpleClient($this->context); - $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message) use (&$actualMessage) { - $actualMessage = $message; - - return Result::ACK; - }); - - $client->send('foo_topic', 'Hello there!'); - - $client->consume(new ChainExtension([ - new LimitConsumptionTimeExtension(new \DateTime('+5sec')), - new LimitConsumedMessagesExtension(2), - ])); - - $this->assertInstanceOf(PsrMessage::class, $actualMessage); - $this->assertSame('Hello there!', $actualMessage->getBody()); - } - - public function testProduceAndRouteToTwoConsumes() - { - $received = 0; - - $client = new SimpleClient($this->context); - $client->bind('foo_topic', 'foo_processor1', function () use (&$received) { - ++$received; - - return Result::ACK; - }); - $client->bind('foo_topic', 'foo_processor2', function () use (&$received) { - ++$received; - - return Result::ACK; - }); - - $client->send('foo_topic', 'Hello there!'); - - $client->consume(new ChainExtension([ - new LimitConsumptionTimeExtension(new \DateTime('+5sec')), - new LimitConsumedMessagesExtension(3), - ])); - - $this->assertSame(2, $received); - } -} diff --git a/pkg/enqueue/composer.json b/pkg/enqueue/composer.json index cfa0e9ca1..0f8e11f75 100644 --- a/pkg/enqueue/composer.json +++ b/pkg/enqueue/composer.json @@ -21,7 +21,8 @@ "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3", "enqueue/null": "^0.3", - "enqueue/test": "^0.3" + "enqueue/test": "^0.3", + "enqueue/simple-client": "^0.3" }, "suggest": { "symfony/console": "^2.8|^3 If you want to use li commands", @@ -31,7 +32,8 @@ "enqueue/stomp": "STOMP transport", "enqueue/fs": "Filesystem transport", "enqueue/redis": "Redis transport", - "enqueue/dbal": "Doctrine DBAL transport" + "enqueue/dbal": "Doctrine DBAL transport", + "enqueue/sqs": "Amazon AWS SQS transport" }, "autoload": { "psr-4": { "Enqueue\\": "" }, diff --git a/pkg/simple-client/.gitignore b/pkg/simple-client/.gitignore new file mode 100644 index 000000000..a770439e5 --- /dev/null +++ b/pkg/simple-client/.gitignore @@ -0,0 +1,6 @@ +*~ +/composer.lock +/composer.phar +/phpunit.xml +/vendor/ +/.idea/ diff --git a/pkg/simple-client/.travis.yml b/pkg/simple-client/.travis.yml new file mode 100644 index 000000000..42374ddc7 --- /dev/null +++ b/pkg/simple-client/.travis.yml @@ -0,0 +1,21 @@ +sudo: false + +git: + depth: 1 + +language: php + +php: + - '5.6' + - '7.0' + +cache: + directories: + - $HOME/.composer/cache + +install: + - composer self-update + - composer install --prefer-source + +script: + - vendor/bin/phpunit --exclude-group=functional diff --git a/pkg/simple-client/LICENSE b/pkg/simple-client/LICENSE new file mode 100644 index 000000000..70fa75252 --- /dev/null +++ b/pkg/simple-client/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2017 Kotliar Maksym + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/pkg/simple-client/README.md b/pkg/simple-client/README.md new file mode 100644 index 000000000..75007c491 --- /dev/null +++ b/pkg/simple-client/README.md @@ -0,0 +1,26 @@ +# Message Queue. Simple client + +[![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby) +[![Build Status](https://travis-ci.org/php-enqueue/simple-client.png?branch=master)](https://travis-ci.org/php-enqueue/simple-client) +[![Total Downloads](https://poser.pugx.org/enqueue/simple-client/d/total.png)](https://packagist.org/packages/enqueue/simple-client) +[![Latest Stable Version](https://poser.pugx.org/enqueue/simple-client/version.png)](https://packagist.org/packages/enqueue/simple-client) + +The simple client takes Enqueue client classes and Symfony components and combines it to easy to use facade called `SimpleCLient`. + +## Resources + +* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) +* [Questions](https://gitter.im/php-enqueue/Lobby) +* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues) + +## Developed by Forma-Pro + +Forma-Pro is a full stack development company which interests also spread to open source development. +Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience. +Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability. + +If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at opensource@forma-pro.com + +## License + +It is released under the [MIT License](LICENSE). diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php new file mode 100644 index 000000000..5879b969d --- /dev/null +++ b/pkg/simple-client/SimpleClient.php @@ -0,0 +1,293 @@ + [ + * 'default' => 'amqp', + * 'amqp' => [], + * .... + * ], + * 'client' => [ + * 'prefix' => 'enqueue', + * 'app_name' => 'app', + * 'router_topic' => 'router', + * 'router_queue' => 'default', + * 'default_processor_queue' => 'default', + * 'redelivered_delay_time' => 0 + * ], + * 'extensions' => [ + * 'signal_extension' => true, + * ] + * ] + * + * + * @param string|array $config + */ + public function __construct($config) + { + $this->container = $this->buildContainer($config); + } + + /** + * @param array|string $config + * + * @return ContainerBuilder + */ + private function buildContainer($config) + { + $config = $this->buildConfig($config); + $extension = $this->buildContainerExtension($config); + + $container = new ContainerBuilder(); + $container->registerExtension($extension); + $container->loadFromExtension($extension->getAlias(), $config); + + $container->compile(); + + return $container; + } + + /** + * @param array $config + * + * @return SimpleClientContainerExtension + */ + private function buildContainerExtension($config) + { + $map = [ + 'default' => DefaultTransportFactory::class, + 'amqp' => AmqpTransportFactory::class, + 'rabbitmq_amqp' => RabbitMqAmqpTransportFactory::class, + 'dbal' => DbalTransportFactory::class, + 'fs' => FsTransportFactory::class, + 'redis' => RedisTransportFactory::class, + 'stomp' => StompTransportFactory::class, + 'rabbitmq_stomp' => RabbitMqStompTransportFactory::class, + 'sqs' => SqsTransportFactory::class, + ]; + + $extension = new SimpleClientContainerExtension(); + + foreach (array_keys($config['transport']) as $transport) { + if (false == isset($map[$transport])) { + throw new \LogicException(sprintf('Transport is not supported: "%s"', $transport)); + } + + $extension->addTransportFactory(new $map[$transport]); + } + + return $extension; + } + + /** + * @param array|string $config + * + * @return array + */ + private function buildConfig($config) + { + if (is_string($config)) { + $extConfig = [ + 'client' => [], + 'transport' => [ + 'default' => $config, + $config => [], + ], + ]; + } elseif (is_array($config)) { + $extConfig = array_merge_recursive([ + 'client' => [], + 'transport' => [], + ], $config); + } else { + throw new \LogicException('Expects config is string or array'); + } + + if (empty($extConfig['transport']['default'])) { + $defaultTransport = null; + foreach ($extConfig['transport'] as $transport => $config) { + if ('default' === $transport) { + continue; + } + + $defaultTransport = $transport; + break; + } + + if (false == $defaultTransport) { + throw new \LogicException('There is no transport configured'); + } + + $extConfig['transport']['default'] = $defaultTransport; + } + + return $extConfig; + } + + /** + * @param string $topic + * @param string $processorName + * @param callback $processor + */ + public function bind($topic, $processorName, callable $processor) + { + $queueName = $this->getConfig()->getDefaultProcessorQueueName(); + + $this->getTopicMetaRegistry()->addProcessor($topic, $processorName); + $this->getQueueMetaRegistry()->addProcessor($queueName, $processorName); + $this->getProcessorRegistry()->add($processorName, new CallbackProcessor($processor)); + $this->getRouterProcessor()->add($topic, $queueName, $processorName); + } + + /** + * @param string $topic + * @param string|array $message + * @param bool $setupBroker + */ + public function send($topic, $message, $setupBroker = false) + { + $this->getProducer($setupBroker)->send($topic, $message); + } + + /** + * @param ExtensionInterface|null $runtimeExtension + */ + public function consume(ExtensionInterface $runtimeExtension = null) + { + $this->setupBroker(); + $processor = $this->getDelegateProcessor(); + $queueConsumer = $this->getQueueConsumer(); + + $defaultQueueName = $this->getConfig()->getDefaultProcessorQueueName(); + $defaultTransportQueueName = $this->getConfig()->createTransportQueueName($defaultQueueName); + + $queueConsumer->bind($defaultTransportQueueName, $processor); + if ($this->getConfig()->getRouterQueueName() != $defaultQueueName) { + $routerTransportQueueName = $this->getConfig()->createTransportQueueName($this->getConfig()->getRouterQueueName()); + + $queueConsumer->bind($routerTransportQueueName, $processor); + } + + $queueConsumer->consume($runtimeExtension); + } + + /** + * @return PsrContext + */ + public function getContext() + { + return $this->container->get('enqueue.transport.context'); + } + + /** + * @return QueueConsumer + */ + public function getQueueConsumer() + { + return $this->container->get('enqueue.client.queue_consumer'); + } + + /** + * @return Config + */ + public function getConfig() + { + return $this->container->get('enqueue.client.config'); + } + + /** + * @return DriverInterface + */ + public function getDriver() + { + return $this->container->get('enqueue.client.driver'); + } + + /** + * @return TopicMetaRegistry + */ + public function getTopicMetaRegistry() + { + return $this->container->get('enqueue.client.meta.topic_meta_registry'); + } + + /** + * @return QueueMetaRegistry + */ + public function getQueueMetaRegistry() + { + return $this->container->get('enqueue.client.meta.queue_meta_registry'); + } + + /** + * @param bool $setupBroker + * + * @return ProducerInterface + */ + public function getProducer($setupBroker = false) + { + $setupBroker && $this->setupBroker(); + + return $this->container->get('enqueue.client.producer'); + } + + public function setupBroker() + { + $this->getDriver()->setupBroker(); + } + + /** + * @return ArrayProcessorRegistry + */ + public function getProcessorRegistry() + { + return $this->container->get('enqueue.client.processor_registry'); + } + + /** + * @return DelegateProcessor + */ + public function getDelegateProcessor() + { + return $this->container->get('enqueue.client.delegate_processor'); + } + + /** + * @return RouterProcessor + */ + public function getRouterProcessor() + { + return $this->container->get('enqueue.client.router_processor'); + } +} diff --git a/pkg/simple-client/SimpleClientContainerExtension.php b/pkg/simple-client/SimpleClientContainerExtension.php new file mode 100644 index 000000000..0784cfb17 --- /dev/null +++ b/pkg/simple-client/SimpleClientContainerExtension.php @@ -0,0 +1,172 @@ +factories = []; + } + + /** + * {@inheritdoc} + */ + public function getAlias() + { + return 'enqueue'; + } + + /** + * @return NodeInterface + */ + private function createConfiguration() + { + $tb = new TreeBuilder(); + $rootNode = $tb->root('enqueue'); + + $transportChildren = $rootNode->children() + ->arrayNode('transport')->isRequired()->children(); + + foreach ($this->factories as $factory) { + $factory->addConfiguration( + $transportChildren->arrayNode($factory->getName()) + ); + } + + $rootNode->children() + ->arrayNode('client')->children() + ->scalarNode('prefix')->defaultValue('enqueue')->end() + ->scalarNode('app_name')->defaultValue('app')->end() + ->scalarNode('router_topic')->defaultValue('router')->cannotBeEmpty()->end() + ->scalarNode('router_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() + ->end()->end() + ->arrayNode('extensions')->addDefaultsIfNotSet()->children() + ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() + ->end()->end() + ; + + return $tb->buildTree(); + } + + /** + * @param TransportFactoryInterface $transportFactory + */ + public function addTransportFactory(TransportFactoryInterface $transportFactory) + { + $name = $transportFactory->getName(); + + if (empty($name)) { + throw new \LogicException('Transport factory name cannot be empty'); + } + if (array_key_exists($name, $this->factories)) { + throw new \LogicException(sprintf('Transport factory with such name already added. Name %s', $name)); + } + + $this->factories[$name] = $transportFactory; + } + + /** + * {@inheritdoc} + */ + public function load(array $configs, ContainerBuilder $container) + { + $configProcessor = new Processor(); + $config = $configProcessor->process($this->createConfiguration(), $configs); + + foreach ($config['transport'] as $name => $transportConfig) { + $this->factories[$name]->createConnectionFactory($container, $transportConfig); + $this->factories[$name]->createContext($container, $transportConfig); + $this->factories[$name]->createDriver($container, $transportConfig); + } + + $container->register('enqueue.client.config', Config::class) + ->setArguments([ + $config['client']['prefix'], + $config['client']['app_name'], + $config['client']['router_topic'], + $config['client']['router_queue'], + $config['client']['default_processor_queue'], + 'enqueue.client.router_processor', + $config['transport'][$config['transport']['default']['alias']], + ]); + + $container->register('enqueue.client.producer', Producer::class) + ->setArguments([ + new Reference('enqueue.client.driver') + ]); + + $container->register('enqueue.client.meta.topic_meta_registry', TopicMetaRegistry::class) + ->setArguments([[]]); + + $container->register('enqueue.client.meta.queue_meta_registry', QueueMetaRegistry::class) + ->setArguments([ + new Reference('enqueue.client.config'), + [], + ]); + + $container->register('enqueue.client.processor_registry', ArrayProcessorRegistry::class); + + $container->register('enqueue.client.delegate_processor', DelegateProcessor::class) + ->setArguments([new Reference('enqueue.client.processor_registry')]); + + $container->register('enqueue.client.queue_consumer', QueueConsumer::class) + ->setArguments([ + new Reference('enqueue.transport.context'), + new Reference('enqueue.consumption.extensions') + ]); + + // router + $container->register('enqueue.client.router_processor', RouterProcessor::class) + ->setArguments([new Reference('enqueue.client.driver'), []]); + $container->getDefinition('enqueue.client.processor_registry') + ->addMethodCall('add', ['enqueue.client.router_processor', new Reference('enqueue.client.router_processor')]); + $container->getDefinition('enqueue.client.meta.queue_meta_registry') + ->addMethodCall('addProcessor', [$config['client']['router_queue'], 'enqueue.client.router_processor']); + + // extensions + $extensions = []; + if ($config['client']['redelivered_delay_time']) { + $container->register('enqueue.client.delay_redelivered_message_extension', DelayRedeliveredMessageExtension::class) + ->setArguments([ + new Reference('enqueue.client.driver'), + $config['client']['redelivered_delay_time'] + ]); + + $extensions[] = new Reference('enqueue.client.delay_redelivered_message_extension'); + } + + $container->register('enqueue.client.extension.set_router_properties', SetRouterPropertiesExtension::class) + ->setArguments([new Reference('enqueue.client.driver')]); + + $extensions[] = new Reference('enqueue.client.extension.set_router_properties'); + + $container->register('enqueue.consumption.extensions', ConsumptionChainExtension::class) + ->setArguments([$extensions]); + } +} diff --git a/pkg/simple-client/Tests/Functional/SimpleClientTest.php b/pkg/simple-client/Tests/Functional/SimpleClientTest.php new file mode 100644 index 000000000..7cef59d54 --- /dev/null +++ b/pkg/simple-client/Tests/Functional/SimpleClientTest.php @@ -0,0 +1,113 @@ +removeQueue('enqueue.app.default'); + } + + public function transportConfigDataProvider() + { + $amqp = [ + 'transport' => [ + 'amqp' => [ + 'host' => getenv('SYMFONY__RABBITMQ__HOST'), + 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), + 'login' => getenv('SYMFONY__RABBITMQ__USER'), + 'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'), + 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), + ], + ], + ]; + + $rabbitmqAmqp = [ + 'transport' => [ + 'rabbitmq_amqp' => [ + 'host' => getenv('SYMFONY__RABBITMQ__HOST'), + 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), + 'login' => getenv('SYMFONY__RABBITMQ__USER'), + 'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'), + 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), + ], + ], + ]; + + return [[$amqp, $rabbitmqAmqp]]; + } + + /** + * @dataProvider transportConfigDataProvider + */ + public function testProduceAndConsumeOneMessage($config) + { + $actualMessage = null; + + $client = new SimpleClient($config); + $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message) use (&$actualMessage) { + $actualMessage = $message; + + return Result::ACK; + }); + + $client->send('foo_topic', 'Hello there!', true); + + $client->consume(new ChainExtension([ + new LimitConsumptionTimeExtension(new \DateTime('+5sec')), + new LimitConsumedMessagesExtension(2), + ])); + + $this->assertInstanceOf(PsrMessage::class, $actualMessage); + $this->assertSame('Hello there!', $actualMessage->getBody()); + } + + /** + * @dataProvider transportConfigDataProvider + */ + public function testProduceAndRouteToTwoConsumes($config) + { + $received = 0; + + $client = new SimpleClient($config); + $client->bind('foo_topic', 'foo_processor1', function () use (&$received) { + ++$received; + + return Result::ACK; + }); + $client->bind('foo_topic', 'foo_processor2', function () use (&$received) { + ++$received; + + return Result::ACK; + }); + + $client->send('foo_topic', 'Hello there!', true); + + $client->consume(new ChainExtension([ + new LimitConsumptionTimeExtension(new \DateTime('+5sec')), + new LimitConsumedMessagesExtension(3), + ])); + + $this->assertSame(2, $received); + } +} diff --git a/pkg/simple-client/composer.json b/pkg/simple-client/composer.json new file mode 100644 index 000000000..7df693ba2 --- /dev/null +++ b/pkg/simple-client/composer.json @@ -0,0 +1,36 @@ +{ + "name": "enqueue/simple-client", + "type": "library", + "description": "Message Queue Simple Client", + "keywords": ["messaging", "queue", "amqp", "rabbitmq"], + "license": "MIT", + "repositories": [ + { + "type": "vcs", + "url": "git@github.com:php-enqueue/test.git" + } + ], + "require": { + "php": ">=5.6", + "enqueue/enqueue": "^0.3", + "symfony/dependency-injection": "^2.8|^3", + "symfony/config": "^2.8|^3", + "symfony/console": "^2.8|^3" + }, + "require-dev": { + "phpunit/phpunit": "~5.5", + "enqueue/test": "^0.3" + }, + "autoload": { + "psr-4": { "Enqueue\\SimpleClient\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "0.3.x-dev" + } + } +} diff --git a/pkg/simple-client/phpunit.xml.dist b/pkg/simple-client/phpunit.xml.dist new file mode 100644 index 000000000..e86476dec --- /dev/null +++ b/pkg/simple-client/phpunit.xml.dist @@ -0,0 +1,31 @@ + + + + + + + ./Tests + + + + + + . + + ./vendor + ./Resources + ./Tests + + + + diff --git a/pkg/stomp/Symfony/StompTransportFactory.php b/pkg/stomp/Symfony/StompTransportFactory.php index d3d30ecf6..c7ed2f4ec 100644 --- a/pkg/stomp/Symfony/StompTransportFactory.php +++ b/pkg/stomp/Symfony/StompTransportFactory.php @@ -84,6 +84,7 @@ public function createDriver(ContainerBuilder $container, array $config) $driver->setArguments([ new Reference(sprintf('enqueue.transport.%s.context', $this->getName())), new Reference('enqueue.client.config'), + new Reference('enqueue.client.meta.queue_meta_registry'), ]); $driverId = sprintf('enqueue.client.%s.driver', $this->getName());