From 0c76d8d7d00c609f376f1dccc0d52b08a8bbc899 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 5 May 2017 09:59:45 +0300 Subject: [PATCH 01/15] [doc] Add link to post about liip imagine bundle --- docs/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/index.md b/docs/index.md index 03495711f..e5c3ee007 100644 --- a/docs/index.md +++ b/docs/index.md @@ -38,5 +38,5 @@ * [Getting Started with RabbitMQ in PHP](https://blog.forma-pro.com/getting-started-with-rabbitmq-in-php-84d331e20a66) * [Getting Started with RabbitMQ in Symfony](https://blog.forma-pro.com/getting-started-with-rabbitmq-in-symfony-cb06e0b674f1) * [RabbitMQ redelivery pitfalls](https://blog.forma-pro.com/rabbitmq-redelivery-pitfalls-440e0347f4e0) -* [LiipImagineBundle. Resolve cache and apply filter in backgroung job.](https://github.com/php-enqueue/enqueue-sandbox/pull/3) +* [LiipImagineBundle. Process images in background](https://blog.forma-pro.com/liipimaginebundle-process-images-in-background-3838c0ed5234) * [FOSElasticaBundle. Improve performance of fos:elastica:populate command](https://github.com/php-enqueue/enqueue-elastica-bundle) From 5d263120307e4cde09105ead3723fe497dc0ad21 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 5 May 2017 10:05:55 +0300 Subject: [PATCH 02/15] client extension --- pkg/enqueue/Client/ChainExtension.php | 39 +++++++++++++++++++++++ pkg/enqueue/Client/ExtensionInterface.php | 21 ++++++++++++ pkg/enqueue/Client/Producer.php | 12 ++++++- 3 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 pkg/enqueue/Client/ChainExtension.php create mode 100644 pkg/enqueue/Client/ExtensionInterface.php diff --git a/pkg/enqueue/Client/ChainExtension.php b/pkg/enqueue/Client/ChainExtension.php new file mode 100644 index 000000000..c202e98e0 --- /dev/null +++ b/pkg/enqueue/Client/ChainExtension.php @@ -0,0 +1,39 @@ +extensions = $extensions; + } + + /** + * {@inheritdoc} + */ + public function onPreSend($topic, Message $message) + { + foreach ($this->extensions as $extension) { + $extension->onPreSend($topic, $message); + } + } + + /** + * {@inheritdoc} + */ + public function onPostSend($topic, Message $message) + { + foreach ($this->extensions as $extension) { + $extension->onPostSend($topic, $message); + } + } +} diff --git a/pkg/enqueue/Client/ExtensionInterface.php b/pkg/enqueue/Client/ExtensionInterface.php new file mode 100644 index 000000000..47cdfd311 --- /dev/null +++ b/pkg/enqueue/Client/ExtensionInterface.php @@ -0,0 +1,21 @@ +driver = $driver; + $this->extension = $extension ?: new ChainExtension([]); } /** @@ -47,6 +53,8 @@ public function send($topic, $message) $message->setPriority(MessagePriority::NORMAL); } + $this->extension->onPreSend($topic, $message); + if (Message::SCOPE_MESSAGE_BUS == $message->getScope()) { if ($message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_QUEUE_NAME)); @@ -68,6 +76,8 @@ public function send($topic, $message) } else { throw new \LogicException(sprintf('The message scope "%s" is not supported.', $message->getScope())); } + + $this->extension->onPostSend($topic, $message); } /** From 2145d1a594b8e07a947cbd8d2e646d00f742f8c3 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 5 May 2017 11:01:18 +0300 Subject: [PATCH 03/15] client extension --- pkg/amqp-ext/Client/AmqpDriver.php | 9 +++++++++ pkg/enqueue/Client/ChainExtension.php | 13 +++++++++++++ .../ConsumptionExtension/SetupBrokerExtension.php | 11 ++++++++++- pkg/enqueue/Client/DriverInterface.php | 6 ++++++ pkg/enqueue/Client/ExtensionInterface.php | 10 +++++++++- pkg/enqueue/Symfony/Client/SetupBrokerCommand.php | 13 +++++++++++-- 6 files changed, 58 insertions(+), 4 deletions(-) diff --git a/pkg/amqp-ext/Client/AmqpDriver.php b/pkg/amqp-ext/Client/AmqpDriver.php index ec11765de..b965b6a5d 100644 --- a/pkg/amqp-ext/Client/AmqpDriver.php +++ b/pkg/amqp-ext/Client/AmqpDriver.php @@ -11,6 +11,7 @@ use Enqueue\Client\Message; use Enqueue\Client\Meta\QueueMetaRegistry; use Enqueue\AmqpExt\DeliveryMode; +use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -192,6 +193,14 @@ public function getConfig() return $this->config; } + /** + * {@inheritdoc} + */ + public function getContext() + { + return $this->context; + } + /** * @return AmqpTopic */ diff --git a/pkg/enqueue/Client/ChainExtension.php b/pkg/enqueue/Client/ChainExtension.php index c202e98e0..2a6d0c3d6 100644 --- a/pkg/enqueue/Client/ChainExtension.php +++ b/pkg/enqueue/Client/ChainExtension.php @@ -2,6 +2,9 @@ namespace Enqueue\Client; +use Enqueue\Psr\PsrContext; +use Psr\Log\LoggerInterface; + class ChainExtension implements ExtensionInterface { /** @@ -36,4 +39,14 @@ public function onPostSend($topic, Message $message) $extension->onPostSend($topic, $message); } } + + /** + * {@inheritdoc} + */ + public function onPostSetupBroker(PsrContext $context, LoggerInterface $logger = null) + { + foreach ($this->extensions as $extension) { + $extension->onPostSetupBroker($context, $logger); + } + } } diff --git a/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php b/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php index 8b6aecbc1..2131a6fdc 100644 --- a/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php +++ b/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php @@ -2,6 +2,8 @@ namespace Enqueue\Client\ConsumptionExtension; +use Enqueue\Client\ChainExtension; +use Enqueue\Client\ExtensionInterface as ClientExtensionInterface; use Enqueue\Client\DriverInterface; use Enqueue\Consumption\Context; use Enqueue\Consumption\EmptyExtensionTrait; @@ -16,6 +18,11 @@ class SetupBrokerExtension implements ExtensionInterface */ private $driver; + /** + * @var ClientExtensionInterface + */ + private $extension; + /** * @var bool */ @@ -24,9 +31,10 @@ class SetupBrokerExtension implements ExtensionInterface /** * @param DriverInterface $driver */ - public function __construct(DriverInterface $driver) + public function __construct(DriverInterface $driver, ClientExtensionInterface $extension = null) { $this->driver = $driver; + $this->extension = $extension ?: new ChainExtension([]); $this->isDone = false; } @@ -38,6 +46,7 @@ public function onStart(Context $context) if (false == $this->isDone) { $this->isDone = true; $this->driver->setupBroker($context->getLogger()); + $this->extension->onPostSetupBroker($this->driver->getContext(), $context->getLogger()); } } } diff --git a/pkg/enqueue/Client/DriverInterface.php b/pkg/enqueue/Client/DriverInterface.php index 3737165e3..f3b85b0dc 100644 --- a/pkg/enqueue/Client/DriverInterface.php +++ b/pkg/enqueue/Client/DriverInterface.php @@ -2,6 +2,7 @@ namespace Enqueue\Client; +use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Enqueue\Psr\PsrQueue; use Psr\Log\LoggerInterface; @@ -50,4 +51,9 @@ public function setupBroker(LoggerInterface $logger = null); * @return Config */ public function getConfig(); + + /** + * @return PsrContext + */ + public function getContext(); } diff --git a/pkg/enqueue/Client/ExtensionInterface.php b/pkg/enqueue/Client/ExtensionInterface.php index 47cdfd311..1d1e73333 100644 --- a/pkg/enqueue/Client/ExtensionInterface.php +++ b/pkg/enqueue/Client/ExtensionInterface.php @@ -1,6 +1,9 @@ driver = $driver; + $this->extension = $extension ?: new ChainExtension([]); } /** @@ -44,6 +52,7 @@ protected function execute(InputInterface $input, OutputInterface $output) { $output->writeln('Setup Broker'); - $this->driver->setupBroker(new ConsoleLogger($output)); + $this->driver->setupBroker($logger = new ConsoleLogger($output)); + $this->extension->onPostSetupBroker($this->driver->getContext(), $logger); } } From d771b3ff1f8778b2225624a82e546cf8f60ffceb Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 5 May 2017 13:32:32 +0300 Subject: [PATCH 04/15] client extension --- pkg/amqp-ext/Client/AmqpDriver.php | 9 -- .../Compiler/BuildClientExtensionsPass.php | 40 ++++++ ...php => BuildConsumptionExtensionsPass.php} | 2 +- pkg/enqueue-bundle/EnqueueBundle.php | 6 +- .../Resources/config/client.yml | 10 +- .../BuildClientExtensionsPassTest.php | 129 ++++++++++++++++++ ...=> BuildConsumptionExtensionsPassTest.php} | 14 +- .../Tests/Unit/EnqueueBundleTest.php | 10 +- pkg/enqueue/Client/ChainExtension.php | 13 -- .../SetupBrokerExtension.php | 11 +- pkg/enqueue/Client/DriverInterface.php | 6 - pkg/enqueue/Client/ExtensionInterface.php | 10 -- pkg/enqueue/Client/Producer.php | 8 +- .../Symfony/Client/SetupBrokerCommand.php | 11 +- pkg/enqueue/Tests/Client/ProducerTest.php | 57 ++++++++ 15 files changed, 261 insertions(+), 75 deletions(-) create mode 100644 pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientExtensionsPass.php rename pkg/enqueue-bundle/DependencyInjection/Compiler/{BuildExtensionsPass.php => BuildConsumptionExtensionsPass.php} (94%) create mode 100644 pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientExtensionsPassTest.php rename pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/{BuildExtensionsPassTest.php => BuildConsumptionExtensionsPassTest.php} (90%) diff --git a/pkg/amqp-ext/Client/AmqpDriver.php b/pkg/amqp-ext/Client/AmqpDriver.php index b965b6a5d..ec11765de 100644 --- a/pkg/amqp-ext/Client/AmqpDriver.php +++ b/pkg/amqp-ext/Client/AmqpDriver.php @@ -11,7 +11,6 @@ use Enqueue\Client\Message; use Enqueue\Client\Meta\QueueMetaRegistry; use Enqueue\AmqpExt\DeliveryMode; -use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -193,14 +192,6 @@ public function getConfig() return $this->config; } - /** - * {@inheritdoc} - */ - public function getContext() - { - return $this->context; - } - /** * @return AmqpTopic */ diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientExtensionsPass.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientExtensionsPass.php new file mode 100644 index 000000000..5f83e83e2 --- /dev/null +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientExtensionsPass.php @@ -0,0 +1,40 @@ +hasDefinition('enqueue.client.extensions')) { + return; + } + + $tags = $container->findTaggedServiceIds('enqueue.client.extension'); + + $groupByPriority = []; + foreach ($tags as $serviceId => $tagAttributes) { + foreach ($tagAttributes as $tagAttribute) { + $priority = isset($tagAttribute['priority']) ? (int) $tagAttribute['priority'] : 0; + + $groupByPriority[$priority][] = new Reference($serviceId); + } + } + + krsort($groupByPriority, SORT_NUMERIC); + + $flatExtensions = []; + foreach ($groupByPriority as $extension) { + $flatExtensions = array_merge($flatExtensions, $extension); + } + + $container->getDefinition('enqueue.client.extensions')->replaceArgument(0, $flatExtensions); + } +} diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildConsumptionExtensionsPass.php similarity index 94% rename from pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php rename to pkg/enqueue-bundle/DependencyInjection/Compiler/BuildConsumptionExtensionsPass.php index 31dcee799..20f2a3817 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildConsumptionExtensionsPass.php @@ -6,7 +6,7 @@ use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Reference; -class BuildExtensionsPass implements CompilerPassInterface +class BuildConsumptionExtensionsPass implements CompilerPassInterface { /** * {@inheritdoc} diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index 4c8e5806d..5b4039104 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -5,8 +5,9 @@ use Enqueue\AmqpExt\AmqpContext; use Enqueue\AmqpExt\Symfony\AmqpTransportFactory; use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass; -use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass; @@ -34,11 +35,12 @@ class EnqueueBundle extends Bundle */ public function build(ContainerBuilder $container) { - $container->addCompilerPass(new BuildExtensionsPass()); + $container->addCompilerPass(new BuildConsumptionExtensionsPass()); $container->addCompilerPass(new BuildClientRoutingPass()); $container->addCompilerPass(new BuildProcessorRegistryPass()); $container->addCompilerPass(new BuildTopicMetaSubscribersPass()); $container->addCompilerPass(new BuildQueueMetaRegistryPass()); + $container->addCompilerPass(new BuildClientExtensionsPass()); /** @var EnqueueExtension $extension */ $extension = $container->getExtension('enqueue'); diff --git a/pkg/enqueue-bundle/Resources/config/client.yml b/pkg/enqueue-bundle/Resources/config/client.yml index 0c1dab387..c83db9055 100644 --- a/pkg/enqueue-bundle/Resources/config/client.yml +++ b/pkg/enqueue-bundle/Resources/config/client.yml @@ -5,7 +5,15 @@ services: enqueue.client.producer: class: 'Enqueue\Client\Producer' - arguments: ['@enqueue.client.driver'] + arguments: + - '@enqueue.client.driver' + - '@enqueue.client.extensions' + + enqueue.client.extensions: + class: 'Enqueue\Client\ChainExtension' + public: false + arguments: + - [] enqueue.producer: alias: 'enqueue.client.producer' diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientExtensionsPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientExtensionsPassTest.php new file mode 100644 index 000000000..5b98ecda6 --- /dev/null +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientExtensionsPassTest.php @@ -0,0 +1,129 @@ +assertClassImplements(CompilerPassInterface::class, BuildClientExtensionsPass::class); + } + + public function testCouldBeConstructedWithoutAnyArguments() + { + new BuildClientExtensionsPass(); + } + + public function testShouldReplaceFirstArgumentOfExtensionsServiceConstructorWithTaggsExtensions() + { + $container = new ContainerBuilder(); + + $extensions = new Definition(); + $extensions->addArgument([]); + $container->setDefinition('enqueue.client.extensions', $extensions); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension'); + $container->setDefinition('foo_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension'); + $container->setDefinition('bar_extension', $extension); + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + + $this->assertEquals( + [new Reference('foo_extension'), new Reference('bar_extension')], + $extensions->getArgument(0) + ); + } + + public function testShouldOrderExtensionsByPriority() + { + $container = new ContainerBuilder(); + + $extensions = new Definition(); + $extensions->addArgument([]); + $container->setDefinition('enqueue.client.extensions', $extensions); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => 6]); + $container->setDefinition('foo_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => -5]); + $container->setDefinition('bar_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => 2]); + $container->setDefinition('baz_extension', $extension); + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + + $orderedExtensions = $extensions->getArgument(0); + + $this->assertEquals(new Reference('foo_extension'), $orderedExtensions[0]); + $this->assertEquals(new Reference('baz_extension'), $orderedExtensions[1]); + $this->assertEquals(new Reference('bar_extension'), $orderedExtensions[2]); + } + + public function testShouldAssumePriorityZeroIfPriorityIsNotSet() + { + $container = new ContainerBuilder(); + + $extensions = new Definition(); + $extensions->addArgument([]); + $container->setDefinition('enqueue.client.extensions', $extensions); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension'); + $container->setDefinition('foo_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => 1]); + $container->setDefinition('bar_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => -1]); + $container->setDefinition('baz_extension', $extension); + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + + $orderedExtensions = $extensions->getArgument(0); + + $this->assertEquals(new Reference('bar_extension'), $orderedExtensions[0]); + $this->assertEquals(new Reference('foo_extension'), $orderedExtensions[1]); + $this->assertEquals(new Reference('baz_extension'), $orderedExtensions[2]); + } + + public function testShouldDoesNothingIfClientExtensionServiceIsNotDefined() + { + $container = $this->createMock(ContainerBuilder::class); + $container + ->expects($this->once()) + ->method('hasDefinition') + ->with('enqueue.client.extensions') + ->willReturn(false) + ; + $container + ->expects($this->never()) + ->method('findTaggedServiceIds') + ; + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + } +} diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildConsumptionExtensionsPassTest.php similarity index 90% rename from pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php rename to pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildConsumptionExtensionsPassTest.php index 8f02a365b..048e0c467 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildConsumptionExtensionsPassTest.php @@ -2,7 +2,7 @@ namespace Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler; -use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass; use Enqueue\Test\ClassExtensionTrait; use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface; use Symfony\Component\DependencyInjection\ContainerBuilder; @@ -10,18 +10,18 @@ use Symfony\Component\DependencyInjection\Reference; use PHPUnit\Framework\TestCase; -class BuildExtensionsPassTest extends TestCase +class BuildConsumptionExtensionsPassTest extends TestCase { use ClassExtensionTrait; public function testShouldImplementCompilerPass() { - $this->assertClassImplements(CompilerPassInterface::class, BuildExtensionsPass::class); + $this->assertClassImplements(CompilerPassInterface::class, BuildConsumptionExtensionsPass::class); } public function testCouldBeConstructedWithoutAnyArguments() { - new BuildExtensionsPass(); + new BuildConsumptionExtensionsPass(); } public function testShouldReplaceFirstArgumentOfExtensionsServiceConstructorWithTaggsExtensions() @@ -40,7 +40,7 @@ public function testShouldReplaceFirstArgumentOfExtensionsServiceConstructorWith $extension->addTag('enqueue.consumption.extension'); $container->setDefinition('bar_extension', $extension); - $pass = new BuildExtensionsPass(); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $this->assertEquals( @@ -69,7 +69,7 @@ public function testShouldOrderExtensionsByPriority() $extension->addTag('enqueue.consumption.extension', ['priority' => 2]); $container->setDefinition('baz_extension', $extension); - $pass = new BuildExtensionsPass(); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $orderedExtensions = $extensions->getArgument(0); @@ -99,7 +99,7 @@ public function testShouldAssumePriorityZeroIfPriorityIsNotSet() $extension->addTag('enqueue.consumption.extension', ['priority' => -1]); $container->setDefinition('baz_extension', $extension); - $pass = new BuildExtensionsPass(); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $orderedExtensions = $extensions->getArgument(0); diff --git a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php index ee10168f6..e5bc2f0a0 100644 --- a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php @@ -4,8 +4,9 @@ use Enqueue\AmqpExt\Symfony\AmqpTransportFactory; use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass; -use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass; @@ -46,7 +47,7 @@ public function testShouldRegisterExpectedCompilerPasses() $container ->expects($this->at(0)) ->method('addCompilerPass') - ->with($this->isInstanceOf(BuildExtensionsPass::class)) + ->with($this->isInstanceOf(BuildConsumptionExtensionsPass::class)) ; $container ->expects($this->at(1)) @@ -70,6 +71,11 @@ public function testShouldRegisterExpectedCompilerPasses() ; $container ->expects($this->at(5)) + ->method('addCompilerPass') + ->with($this->isInstanceOf(BuildClientExtensionsPass::class)) + ; + $container + ->expects($this->at(6)) ->method('getExtension') ->willReturn($extensionMock) ; diff --git a/pkg/enqueue/Client/ChainExtension.php b/pkg/enqueue/Client/ChainExtension.php index 2a6d0c3d6..c202e98e0 100644 --- a/pkg/enqueue/Client/ChainExtension.php +++ b/pkg/enqueue/Client/ChainExtension.php @@ -2,9 +2,6 @@ namespace Enqueue\Client; -use Enqueue\Psr\PsrContext; -use Psr\Log\LoggerInterface; - class ChainExtension implements ExtensionInterface { /** @@ -39,14 +36,4 @@ public function onPostSend($topic, Message $message) $extension->onPostSend($topic, $message); } } - - /** - * {@inheritdoc} - */ - public function onPostSetupBroker(PsrContext $context, LoggerInterface $logger = null) - { - foreach ($this->extensions as $extension) { - $extension->onPostSetupBroker($context, $logger); - } - } } diff --git a/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php b/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php index 2131a6fdc..8b6aecbc1 100644 --- a/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php +++ b/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php @@ -2,8 +2,6 @@ namespace Enqueue\Client\ConsumptionExtension; -use Enqueue\Client\ChainExtension; -use Enqueue\Client\ExtensionInterface as ClientExtensionInterface; use Enqueue\Client\DriverInterface; use Enqueue\Consumption\Context; use Enqueue\Consumption\EmptyExtensionTrait; @@ -18,11 +16,6 @@ class SetupBrokerExtension implements ExtensionInterface */ private $driver; - /** - * @var ClientExtensionInterface - */ - private $extension; - /** * @var bool */ @@ -31,10 +24,9 @@ class SetupBrokerExtension implements ExtensionInterface /** * @param DriverInterface $driver */ - public function __construct(DriverInterface $driver, ClientExtensionInterface $extension = null) + public function __construct(DriverInterface $driver) { $this->driver = $driver; - $this->extension = $extension ?: new ChainExtension([]); $this->isDone = false; } @@ -46,7 +38,6 @@ public function onStart(Context $context) if (false == $this->isDone) { $this->isDone = true; $this->driver->setupBroker($context->getLogger()); - $this->extension->onPostSetupBroker($this->driver->getContext(), $context->getLogger()); } } } diff --git a/pkg/enqueue/Client/DriverInterface.php b/pkg/enqueue/Client/DriverInterface.php index f3b85b0dc..3737165e3 100644 --- a/pkg/enqueue/Client/DriverInterface.php +++ b/pkg/enqueue/Client/DriverInterface.php @@ -2,7 +2,6 @@ namespace Enqueue\Client; -use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Enqueue\Psr\PsrQueue; use Psr\Log\LoggerInterface; @@ -51,9 +50,4 @@ public function setupBroker(LoggerInterface $logger = null); * @return Config */ public function getConfig(); - - /** - * @return PsrContext - */ - public function getContext(); } diff --git a/pkg/enqueue/Client/ExtensionInterface.php b/pkg/enqueue/Client/ExtensionInterface.php index 1d1e73333..3b0a028e8 100644 --- a/pkg/enqueue/Client/ExtensionInterface.php +++ b/pkg/enqueue/Client/ExtensionInterface.php @@ -1,9 +1,6 @@ setPriority(MessagePriority::NORMAL); } - $this->extension->onPreSend($topic, $message); - if (Message::SCOPE_MESSAGE_BUS == $message->getScope()) { if ($message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_QUEUE_NAME)); @@ -63,7 +61,9 @@ public function send($topic, $message) throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_NAME)); } + $this->extension->onPreSend($topic, $message); $this->driver->sendToRouter($message); + $this->extension->onPostSend($topic, $message); } elseif (Message::SCOPE_APP == $message->getScope()) { if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $this->driver->getConfig()->getRouterProcessorName()); @@ -72,12 +72,12 @@ public function send($topic, $message) $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->driver->getConfig()->getRouterQueueName()); } + $this->extension->onPreSend($topic, $message); $this->driver->sendToProcessor($message); + $this->extension->onPostSend($topic, $message); } else { throw new \LogicException(sprintf('The message scope "%s" is not supported.', $message->getScope())); } - - $this->extension->onPostSend($topic, $message); } /** diff --git a/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php b/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php index 3ceb33196..ee7f7cf53 100644 --- a/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php +++ b/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php @@ -2,9 +2,7 @@ namespace Enqueue\Symfony\Client; -use Enqueue\Client\ChainExtension; use Enqueue\Client\DriverInterface; -use Enqueue\Client\ExtensionInterface; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Logger\ConsoleLogger; @@ -17,20 +15,14 @@ class SetupBrokerCommand extends Command */ private $driver; - /** - * @var ExtensionInterface - */ - private $extension; - /** * @param DriverInterface $driver */ - public function __construct(DriverInterface $driver, ExtensionInterface $extension = null) + public function __construct(DriverInterface $driver) { parent::__construct(null); $this->driver = $driver; - $this->extension = $extension ?: new ChainExtension([]); } /** @@ -53,6 +45,5 @@ protected function execute(InputInterface $input, OutputInterface $output) $output->writeln('Setup Broker'); $this->driver->setupBroker($logger = new ConsoleLogger($output)); - $this->extension->onPostSetupBroker($this->driver->getContext(), $logger); } } diff --git a/pkg/enqueue/Tests/Client/ProducerTest.php b/pkg/enqueue/Tests/Client/ProducerTest.php index 9741810cc..79c7a0cb2 100644 --- a/pkg/enqueue/Tests/Client/ProducerTest.php +++ b/pkg/enqueue/Tests/Client/ProducerTest.php @@ -4,6 +4,7 @@ use Enqueue\Client\Config; use Enqueue\Client\DriverInterface; +use Enqueue\Client\ExtensionInterface; use Enqueue\Client\Message; use Enqueue\Client\MessagePriority; use Enqueue\Client\Producer; @@ -541,6 +542,62 @@ public function testThrowIfUnSupportedScopeGivenOnSend() $producer->send('topic', $message); } + public function testShouldCallPreSendPostSendExtensionMethodsWhenSendToRouter() + { + $message = new Message(); + $message->setBody('aBody'); + $message->setScope(Message::SCOPE_MESSAGE_BUS); + + $extension = $this->createMock(ExtensionInterface::class); + $extension + ->expects($this->at(0)) + ->method('onPreSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + $extension + ->expects($this->at(1)) + ->method('onPostSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + + $driver = $this->createDriverStub(); + $driver + ->expects($this->once()) + ->method('sendToRouter') + ; + + $producer = new Producer($driver, $extension); + $producer->send('topic', $message); + } + + public function testShouldCallPreSendPostSendExtensionMethodsWhenSendToProcessor() + { + $message = new Message(); + $message->setBody('aBody'); + $message->setScope(Message::SCOPE_APP); + + $extension = $this->createMock(ExtensionInterface::class); + $extension + ->expects($this->at(0)) + ->method('onPreSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + $extension + ->expects($this->at(1)) + ->method('onPostSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + + $driver = $this->createDriverStub(); + $driver + ->expects($this->once()) + ->method('sendToProcessor') + ; + + $producer = new Producer($driver, $extension); + $producer->send('topic', $message); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|DriverInterface */ From 1ac0989242a3e139a9a0e87eaabedbf737374a43 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 5 May 2017 13:42:05 +0300 Subject: [PATCH 05/15] client extension --- .../Symfony/Client/SetupBrokerCommand.php | 2 +- .../Tests/Client/ChainExtensionTest.php | 76 +++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 pkg/enqueue/Tests/Client/ChainExtensionTest.php diff --git a/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php b/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php index ee7f7cf53..c825bd2c4 100644 --- a/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php +++ b/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php @@ -44,6 +44,6 @@ protected function execute(InputInterface $input, OutputInterface $output) { $output->writeln('Setup Broker'); - $this->driver->setupBroker($logger = new ConsoleLogger($output)); + $this->driver->setupBroker(new ConsoleLogger($output)); } } diff --git a/pkg/enqueue/Tests/Client/ChainExtensionTest.php b/pkg/enqueue/Tests/Client/ChainExtensionTest.php new file mode 100644 index 000000000..3b1d82f9a --- /dev/null +++ b/pkg/enqueue/Tests/Client/ChainExtensionTest.php @@ -0,0 +1,76 @@ +assertClassImplements(ExtensionInterface::class, ChainExtension::class); + } + + public function testCouldBeConstructedWithExtensionsArray() + { + new ChainExtension([$this->createExtension(), $this->createExtension()]); + } + + public function testShouldProxyOnPreSendToAllInternalExtensions() + { + $message = new Message(); + + $fooExtension = $this->createExtension(); + $fooExtension + ->expects($this->once()) + ->method('onPreSend') + ->with('topic', $this->identicalTo($message)) + ; + $barExtension = $this->createExtension(); + $barExtension + ->expects($this->once()) + ->method('onPreSend') + ->with('topic', $this->identicalTo($message)) + ; + + $extensions = new ChainExtension([$fooExtension, $barExtension]); + + $extensions->onPreSend('topic', $message); + } + + public function testShouldProxyOnPostSendToAllInternalExtensions() + { + $message = new Message(); + + $fooExtension = $this->createExtension(); + $fooExtension + ->expects($this->once()) + ->method('onPostSend') + ->with('topic', $this->identicalTo($message)) + ; + $barExtension = $this->createExtension(); + $barExtension + ->expects($this->once()) + ->method('onPostSend') + ->with('topic', $this->identicalTo($message)) + ; + + $extensions = new ChainExtension([$fooExtension, $barExtension]); + + $extensions->onPostSend('topic', $message); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|ExtensionInterface + */ + protected function createExtension() + { + return $this->createMock(ExtensionInterface::class); + } +} From b3bc044c49247da420363fd058dce280042dad90 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 9 May 2017 14:33:12 +0300 Subject: [PATCH 06/15] multi transport simple client --- pkg/enqueue/Client/SimpleClient.php | 228 +++++++++++++----- .../Client/SimpleClientContainerExtension.php | 166 +++++++++++++ 2 files changed, 327 insertions(+), 67 deletions(-) create mode 100644 pkg/enqueue/Client/SimpleClientContainerExtension.php diff --git a/pkg/enqueue/Client/SimpleClient.php b/pkg/enqueue/Client/SimpleClient.php index 27f295cdb..36bd37495 100644 --- a/pkg/enqueue/Client/SimpleClient.php +++ b/pkg/enqueue/Client/SimpleClient.php @@ -1,73 +1,133 @@ [ + * 'rabbitmq_amqp' => [], + * 'amqp' => [], + * .... + * ], + * 'client' => [ + * 'prefix' => 'enqueue', + * 'app_name' => 'app', + * 'router_topic' => 'router', + * 'router_queue' => 'default', + * 'default_processor_queue' => 'default', + * 'redelivered_delay_time' => 0 + * ], + * 'extensions' => [ + * 'signal_extension' => true, + * ] + * ] + * + * + * @param string|array $config */ - private $driver; + public function __construct($config) + { + $this->container = $this->buildContainer($config); + } /** - * @var Config + * @param array|string $config + * + * @return ContainerBuilder */ - private $config; + private function buildContainer($config) + { + $config = $this->buildConfig($config); + $extension = $this->buildContainerExtension($config); - /** - * @var ArrayProcessorRegistry - */ - private $processorsRegistry; + $container = new ContainerBuilder(); + $container->registerExtension($extension); + $container->loadFromExtension($extension->getAlias(), $config); - /** - * @var TopicMetaRegistry - */ - private $topicsMetaRegistry; + $container->compile(); - /** - * @var RouterProcessor - */ - private $routerProcessor; + return $container; + } /** - * @param AmqpContext $context - * @param Config|null $config + * @param array $config + * + * @return SimpleClientContainerExtension */ - public function __construct(AmqpContext $context, Config $config = null) + private function buildContainerExtension($config) { - $this->context = $context; - $this->config = $config ?: Config::create(); + $map = [ + 'default' => DefaultTransportFactory::class, + 'amqp' => AmqpTransportFactory::class, + 'rabbitmq_amqp' => RabbitMqAmqpTransportFactory::class, + ]; - $this->queueMetaRegistry = new QueueMetaRegistry($this->config, []); - $this->queueMetaRegistry->add($this->config->getDefaultProcessorQueueName()); - $this->queueMetaRegistry->add($this->config->getRouterQueueName()); + $extension = new SimpleClientContainerExtension(); - $this->topicsMetaRegistry = new TopicMetaRegistry([]); - $this->processorsRegistry = new ArrayProcessorRegistry(); + foreach (array_keys($config['transport']) as $transport) { + if (false == isset($map[$transport])) { + throw new \LogicException(sprintf('Transport is not supported: "%s"', $transport)); + } - $this->driver = new AmqpDriver($context, $this->config, $this->queueMetaRegistry); - $this->routerProcessor = new RouterProcessor($this->driver, []); + $extension->addTransportFactory(new $map[$transport]); + } - $this->processorsRegistry->add($this->config->getRouterProcessorName(), $this->routerProcessor); - $this->queueMetaRegistry->addProcessor($this->config->getRouterQueueName(), $this->routerProcessor); + return $extension; + } + + /** + * @param array|string $config + * + * @return array + */ + private function buildConfig($config) + { + if (is_string($config)) { + $extConfig = [ + 'client' => [], + 'transport' => [ + 'default' => $config, + $config => [], + ], + ]; + } elseif (is_array($config)) { + $extConfig = array_merge_recursive([ + 'client' => [], + 'transport' => [], + ], $config); + + $transport = current(array_keys($extConfig['transport'])); + + if (false == $transport) { + throw new \LogicException('There is no transport configured'); + } + + $extConfig['transport']['default'] = $transport; + } else { + throw new \LogicException('Expects config is string or array'); + } + + return $extConfig; } /** @@ -77,34 +137,39 @@ public function __construct(AmqpContext $context, Config $config = null) */ public function bind($topic, $processorName, callable $processor) { - $queueName = $this->config->getDefaultProcessorQueueName(); - - $this->topicsMetaRegistry->addProcessor($topic, $processorName); - $this->queueMetaRegistry->addProcessor($queueName, $processorName); - $this->processorsRegistry->add($processorName, new CallbackProcessor($processor)); + $queueName = $this->getConfig()->getDefaultProcessorQueueName(); - $this->routerProcessor->add($topic, $queueName, $processorName); + $this->getTopicMetaRegistry()->addProcessor($topic, $processorName); + $this->getQueueMetaRegistry()->addProcessor($queueName, $processorName); + $this->getProcessorRegistry()->add($processorName, new CallbackProcessor($processor)); + $this->getRouterProcessor()->add($topic, $queueName, $processorName); } - public function send($topic, $message) + /** + * @param string $topic + * @param string|array $message + * @param bool $setupBroker + */ + public function send($topic, $message, $setupBroker = false) { - $this->getProducer()->send($topic, $message); + $this->getProducer($setupBroker)->send($topic, $message); } + /** + * @param ExtensionInterface|null $runtimeExtension + */ public function consume(ExtensionInterface $runtimeExtension = null) { - $this->driver->setupBroker(); - - $processor = $this->getProcessor(); - + $this->setupBroker(); + $processor = $this->getDelegateProcessor(); $queueConsumer = $this->getQueueConsumer(); - $defaultQueueName = $this->config->getDefaultProcessorQueueName(); - $defaultTransportQueueName = $this->config->createTransportQueueName($defaultQueueName); + $defaultQueueName = $this->getConfig()->getDefaultProcessorQueueName(); + $defaultTransportQueueName = $this->getConfig()->createTransportQueueName($defaultQueueName); $queueConsumer->bind($defaultTransportQueueName, $processor); - if ($this->config->getRouterQueueName() != $defaultQueueName) { - $routerTransportQueueName = $this->config->createTransportQueueName($this->config->getRouterQueueName()); + if ($this->getConfig()->getRouterQueueName() != $defaultQueueName) { + $routerTransportQueueName = $this->getConfig()->createTransportQueueName($this->getConfig()->getRouterQueueName()); $queueConsumer->bind($routerTransportQueueName, $processor); } @@ -113,11 +178,11 @@ public function consume(ExtensionInterface $runtimeExtension = null) } /** - * @return AmqpContext + * @return PsrContext */ public function getContext() { - return $this->context; + return $this->container->get('enqueue.transport.context'); } /** @@ -125,9 +190,15 @@ public function getContext() */ public function getQueueConsumer() { - return new QueueConsumer($this->context, new ChainExtension([ - new SetRouterPropertiesExtension($this->driver), - ])); + return $this->container->get('enqueue.client.queue_consumer'); + } + + /** + * @return Config + */ + public function getConfig() + { + return $this->container->get('enqueue.client.config'); } /** @@ -135,7 +206,7 @@ public function getQueueConsumer() */ public function getDriver() { - return $this->driver; + return $this->container->get('enqueue.client.driver'); } /** @@ -143,7 +214,7 @@ public function getDriver() */ public function getTopicMetaRegistry() { - return $this->topicsMetaRegistry; + return $this->container->get('enqueue.client.meta.topic_meta_registry'); } /** @@ -151,24 +222,47 @@ public function getTopicMetaRegistry() */ public function getQueueMetaRegistry() { - return $this->queueMetaRegistry; + return $this->container->get('enqueue.client.meta.queue_meta_registry'); } /** + * @param bool $setupBroker + * * @return ProducerInterface */ - public function getProducer() + public function getProducer($setupBroker = false) + { + $setupBroker && $this->setupBroker(); + + return $this->container->get('enqueue.client.producer'); + } + + public function setupBroker() { - $this->driver->setupBroker(); + $this->getDriver()->setupBroker(); + } - return new Producer($this->driver); + /** + * @return ArrayProcessorRegistry + */ + public function getProcessorRegistry() + { + return $this->container->get('enqueue.client.processor_registry'); } /** * @return DelegateProcessor */ - public function getProcessor() + public function getDelegateProcessor() + { + return $this->container->get('enqueue.client.delegate_processor'); + } + + /** + * @return RouterProcessor + */ + public function getRouterProcessor() { - return new DelegateProcessor($this->processorsRegistry); + return $this->container->get('enqueue.client.router_processor'); } } diff --git a/pkg/enqueue/Client/SimpleClientContainerExtension.php b/pkg/enqueue/Client/SimpleClientContainerExtension.php new file mode 100644 index 000000000..87f1b3f44 --- /dev/null +++ b/pkg/enqueue/Client/SimpleClientContainerExtension.php @@ -0,0 +1,166 @@ +factories = []; + } + + /** + * {@inheritdoc} + */ + public function getAlias() + { + return 'enqueue'; + } + + /** + * {@inheritdoc} + */ + private function getConfigTreeBuilder() + { + $tb = new TreeBuilder(); + $rootNode = $tb->root('enqueue'); + + $transportChildren = $rootNode->children() + ->arrayNode('transport')->isRequired()->children(); + + foreach ($this->factories as $factory) { + $factory->addConfiguration( + $transportChildren->arrayNode($factory->getName()) + ); + } + + $rootNode->children() + ->arrayNode('client')->children() + ->scalarNode('prefix')->defaultValue('enqueue')->end() + ->scalarNode('app_name')->defaultValue('app')->end() + ->scalarNode('router_topic')->defaultValue('router')->cannotBeEmpty()->end() + ->scalarNode('router_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() + ->end()->end() + ->arrayNode('extensions')->addDefaultsIfNotSet()->children() + ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() + ->end()->end() + ; + + return $tb; + } + + /** + * @param TransportFactoryInterface $transportFactory + */ + public function addTransportFactory(TransportFactoryInterface $transportFactory) + { + $name = $transportFactory->getName(); + + if (empty($name)) { + throw new \LogicException('Transport factory name cannot be empty'); + } + if (array_key_exists($name, $this->factories)) { + throw new \LogicException(sprintf('Transport factory with such name already added. Name %s', $name)); + } + + $this->factories[$name] = $transportFactory; + } + + /** + * {@inheritdoc} + */ + public function load(array $configs, ContainerBuilder $container) + { + $configProcessor = new Processor(); + $config = $configProcessor->process($this->getConfigTreeBuilder()->buildTree(), $configs); + + foreach ($config['transport'] as $name => $transportConfig) { + $this->factories[$name]->createConnectionFactory($container, $transportConfig); + $this->factories[$name]->createContext($container, $transportConfig); + $this->factories[$name]->createDriver($container, $transportConfig); + } + + $container->register('enqueue.client.config', Config::class) + ->setArguments([ + $config['client']['prefix'], + $config['client']['app_name'], + $config['client']['router_topic'], + $config['client']['router_queue'], + $config['client']['default_processor_queue'], + 'enqueue.client.router_processor', + $config['transport'][$config['transport']['default']['alias']], + ]); + + $container->register('enqueue.client.producer', Producer::class) + ->setArguments([ + new Reference('enqueue.client.driver') + ]); + + $container->register('enqueue.client.meta.topic_meta_registry', TopicMetaRegistry::class) + ->setArguments([[]]); + + $container->register('enqueue.client.meta.queue_meta_registry', QueueMetaRegistry::class) + ->setArguments([ + new Reference('enqueue.client.config'), + [], + ]); + + $container->register('enqueue.client.processor_registry', ArrayProcessorRegistry::class); + + $container->register('enqueue.client.delegate_processor', DelegateProcessor::class) + ->setArguments([new Reference('enqueue.client.processor_registry')]); + + $container->register('enqueue.client.queue_consumer', QueueConsumer::class) + ->setArguments([ + new Reference('enqueue.transport.context'), + new Reference('enqueue.consumption.extensions') + ]); + + // router + $container->register('enqueue.client.router_processor', RouterProcessor::class) + ->setArguments([new Reference('enqueue.client.driver'), []]); + $container->getDefinition('enqueue.client.processor_registry') + ->addMethodCall('add', ['enqueue.client.router_processor', new Reference('enqueue.client.router_processor')]); + $container->getDefinition('enqueue.client.meta.queue_meta_registry') + ->addMethodCall('addProcessor', [$config['client']['router_queue'], 'enqueue.client.router_processor']); + + // extensions + $extensions = []; + if ($config['client']['redelivered_delay_time']) { + $container->register('enqueue.client.delay_redelivered_message_extension', DelayRedeliveredMessageExtension::class) + ->setArguments([ + new Reference('enqueue.client.driver'), + $config['client']['redelivered_delay_time'] + ]); + + $extensions[] = new Reference('enqueue.client.delay_redelivered_message_extension'); + } + + $container->register('enqueue.client.extension.set_router_properties', SetRouterPropertiesExtension::class) + ->setArguments([new Reference('enqueue.client.driver')]); + + $extensions[] = new Reference('enqueue.client.extension.set_router_properties'); + + $container->register('enqueue.consumption.extensions', ConsumptionChainExtension::class) + ->setArguments([$extensions]); + } +} From f6c5c85660e784b287fcbe247c4fe720b427cf7e Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 10 May 2017 12:02:43 +0300 Subject: [PATCH 07/15] multi transport simple client --- composer.json | 5 + phpunit.xml.dist | 4 + .../Tests/Functional/Client/RpcClientTest.php | 19 ++- .../Functional/Client/SimpleClientTest.php | 83 ------------- pkg/enqueue/composer.json | 6 +- pkg/simple-client/.gitignore | 6 + pkg/simple-client/.travis.yml | 21 ++++ .../Functional/SimpleClientTest.php | 113 ++++++++++++++++++ pkg/simple-client/LICENSE | 19 +++ pkg/simple-client/README.md | 1 + .../Client => simple-client}/SimpleClient.php | 22 +++- .../SimpleClientContainerExtension.php | 16 ++- pkg/simple-client/composer.json | 43 +++++++ pkg/simple-client/phpunit.xml.dist | 31 +++++ pkg/stomp/Symfony/StompTransportFactory.php | 1 + 15 files changed, 295 insertions(+), 95 deletions(-) delete mode 100644 pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php create mode 100644 pkg/simple-client/.gitignore create mode 100644 pkg/simple-client/.travis.yml create mode 100644 pkg/simple-client/Functional/SimpleClientTest.php create mode 100644 pkg/simple-client/LICENSE create mode 100644 pkg/simple-client/README.md rename pkg/{enqueue/Client => simple-client}/SimpleClient.php (88%) rename pkg/{enqueue/Client => simple-client}/SimpleClientContainerExtension.php (93%) create mode 100644 pkg/simple-client/composer.json create mode 100644 pkg/simple-client/phpunit.xml.dist diff --git a/composer.json b/composer.json index 2afbec114..5730bb920 100644 --- a/composer.json +++ b/composer.json @@ -15,6 +15,7 @@ "enqueue/sqs": "*@dev", "enqueue/enqueue-bundle": "*@dev", "enqueue/job-queue": "*@dev", + "enqueue/simple-client": "*@dev", "enqueue/test": "*@dev", "phpunit/phpunit": "^5", @@ -77,6 +78,10 @@ { "type": "path", "url": "pkg/sqs" + }, + { + "type": "path", + "url": "pkg/simple-client" } ] } diff --git a/phpunit.xml.dist b/phpunit.xml.dist index f2faadb8e..429ead9d2 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -56,6 +56,10 @@ pkg/job-queue/Tests + + + pkg/simple-client/Tests + diff --git a/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php b/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php index bf7d69e13..49f6fa20a 100644 --- a/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php +++ b/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php @@ -4,7 +4,7 @@ use Enqueue\AmqpExt\AmqpContext; use Enqueue\Client\RpcClient; -use Enqueue\Client\SimpleClient; +use Enqueue\SimpleClient\SimpleClient; use Enqueue\Consumption\ChainExtension; use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension; use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension; @@ -39,14 +39,27 @@ public function setUp() $this->context = $this->buildAmqpContext(); $this->replyContext = $this->buildAmqpContext(); - $this->removeQueue('default'); + $this->removeQueue('enqueue.app.default'); } public function testProduceAndConsumeOneMessage() { + $config = [ + 'transport' => [ + 'rabbitmq_amqp' => [ + 'host' => getenv('SYMFONY__RABBITMQ__HOST'), + 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), + 'login' => getenv('SYMFONY__RABBITMQ__USER'), + 'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'), + 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), + ], + ], + ]; + $requestMessage = null; - $client = new SimpleClient($this->context); + $client = new SimpleClient($config); + $client->setupBroker(); $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) { $requestMessage = $message; diff --git a/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php b/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php deleted file mode 100644 index f96ba9571..000000000 --- a/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php +++ /dev/null @@ -1,83 +0,0 @@ -context = $this->buildAmqpContext(); - - $this->removeQueue('default'); - } - - public function testProduceAndConsumeOneMessage() - { - $actualMessage = null; - - $client = new SimpleClient($this->context); - $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message) use (&$actualMessage) { - $actualMessage = $message; - - return Result::ACK; - }); - - $client->send('foo_topic', 'Hello there!'); - - $client->consume(new ChainExtension([ - new LimitConsumptionTimeExtension(new \DateTime('+5sec')), - new LimitConsumedMessagesExtension(2), - ])); - - $this->assertInstanceOf(PsrMessage::class, $actualMessage); - $this->assertSame('Hello there!', $actualMessage->getBody()); - } - - public function testProduceAndRouteToTwoConsumes() - { - $received = 0; - - $client = new SimpleClient($this->context); - $client->bind('foo_topic', 'foo_processor1', function () use (&$received) { - ++$received; - - return Result::ACK; - }); - $client->bind('foo_topic', 'foo_processor2', function () use (&$received) { - ++$received; - - return Result::ACK; - }); - - $client->send('foo_topic', 'Hello there!'); - - $client->consume(new ChainExtension([ - new LimitConsumptionTimeExtension(new \DateTime('+5sec')), - new LimitConsumedMessagesExtension(3), - ])); - - $this->assertSame(2, $received); - } -} diff --git a/pkg/enqueue/composer.json b/pkg/enqueue/composer.json index cfa0e9ca1..0f8e11f75 100644 --- a/pkg/enqueue/composer.json +++ b/pkg/enqueue/composer.json @@ -21,7 +21,8 @@ "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3", "enqueue/null": "^0.3", - "enqueue/test": "^0.3" + "enqueue/test": "^0.3", + "enqueue/simple-client": "^0.3" }, "suggest": { "symfony/console": "^2.8|^3 If you want to use li commands", @@ -31,7 +32,8 @@ "enqueue/stomp": "STOMP transport", "enqueue/fs": "Filesystem transport", "enqueue/redis": "Redis transport", - "enqueue/dbal": "Doctrine DBAL transport" + "enqueue/dbal": "Doctrine DBAL transport", + "enqueue/sqs": "Amazon AWS SQS transport" }, "autoload": { "psr-4": { "Enqueue\\": "" }, diff --git a/pkg/simple-client/.gitignore b/pkg/simple-client/.gitignore new file mode 100644 index 000000000..a770439e5 --- /dev/null +++ b/pkg/simple-client/.gitignore @@ -0,0 +1,6 @@ +*~ +/composer.lock +/composer.phar +/phpunit.xml +/vendor/ +/.idea/ diff --git a/pkg/simple-client/.travis.yml b/pkg/simple-client/.travis.yml new file mode 100644 index 000000000..42374ddc7 --- /dev/null +++ b/pkg/simple-client/.travis.yml @@ -0,0 +1,21 @@ +sudo: false + +git: + depth: 1 + +language: php + +php: + - '5.6' + - '7.0' + +cache: + directories: + - $HOME/.composer/cache + +install: + - composer self-update + - composer install --prefer-source + +script: + - vendor/bin/phpunit --exclude-group=functional diff --git a/pkg/simple-client/Functional/SimpleClientTest.php b/pkg/simple-client/Functional/SimpleClientTest.php new file mode 100644 index 000000000..7cef59d54 --- /dev/null +++ b/pkg/simple-client/Functional/SimpleClientTest.php @@ -0,0 +1,113 @@ +removeQueue('enqueue.app.default'); + } + + public function transportConfigDataProvider() + { + $amqp = [ + 'transport' => [ + 'amqp' => [ + 'host' => getenv('SYMFONY__RABBITMQ__HOST'), + 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), + 'login' => getenv('SYMFONY__RABBITMQ__USER'), + 'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'), + 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), + ], + ], + ]; + + $rabbitmqAmqp = [ + 'transport' => [ + 'rabbitmq_amqp' => [ + 'host' => getenv('SYMFONY__RABBITMQ__HOST'), + 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), + 'login' => getenv('SYMFONY__RABBITMQ__USER'), + 'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'), + 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), + ], + ], + ]; + + return [[$amqp, $rabbitmqAmqp]]; + } + + /** + * @dataProvider transportConfigDataProvider + */ + public function testProduceAndConsumeOneMessage($config) + { + $actualMessage = null; + + $client = new SimpleClient($config); + $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message) use (&$actualMessage) { + $actualMessage = $message; + + return Result::ACK; + }); + + $client->send('foo_topic', 'Hello there!', true); + + $client->consume(new ChainExtension([ + new LimitConsumptionTimeExtension(new \DateTime('+5sec')), + new LimitConsumedMessagesExtension(2), + ])); + + $this->assertInstanceOf(PsrMessage::class, $actualMessage); + $this->assertSame('Hello there!', $actualMessage->getBody()); + } + + /** + * @dataProvider transportConfigDataProvider + */ + public function testProduceAndRouteToTwoConsumes($config) + { + $received = 0; + + $client = new SimpleClient($config); + $client->bind('foo_topic', 'foo_processor1', function () use (&$received) { + ++$received; + + return Result::ACK; + }); + $client->bind('foo_topic', 'foo_processor2', function () use (&$received) { + ++$received; + + return Result::ACK; + }); + + $client->send('foo_topic', 'Hello there!', true); + + $client->consume(new ChainExtension([ + new LimitConsumptionTimeExtension(new \DateTime('+5sec')), + new LimitConsumedMessagesExtension(3), + ])); + + $this->assertSame(2, $received); + } +} diff --git a/pkg/simple-client/LICENSE b/pkg/simple-client/LICENSE new file mode 100644 index 000000000..396bf1908 --- /dev/null +++ b/pkg/simple-client/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2016 Kotliar Maksym + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/pkg/simple-client/README.md b/pkg/simple-client/README.md new file mode 100644 index 000000000..d842aa27d --- /dev/null +++ b/pkg/simple-client/README.md @@ -0,0 +1 @@ +# Message Queue Simple Client. diff --git a/pkg/enqueue/Client/SimpleClient.php b/pkg/simple-client/SimpleClient.php similarity index 88% rename from pkg/enqueue/Client/SimpleClient.php rename to pkg/simple-client/SimpleClient.php index 36bd37495..072ef8c95 100644 --- a/pkg/enqueue/Client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -1,19 +1,31 @@ DefaultTransportFactory::class, 'amqp' => AmqpTransportFactory::class, 'rabbitmq_amqp' => RabbitMqAmqpTransportFactory::class, + 'dbal' => DbalTransportFactory::class, + 'fs' => FsTransportFactory::class, + 'redis' => RedisTransportFactory::class, + 'stomp' => StompTransportFactory::class, + 'rabbitmq_stomp' => RabbitMqStompTransportFactory::class, + 'sqs' => SqsTransportFactory::class, ]; $extension = new SimpleClientContainerExtension(); diff --git a/pkg/enqueue/Client/SimpleClientContainerExtension.php b/pkg/simple-client/SimpleClientContainerExtension.php similarity index 93% rename from pkg/enqueue/Client/SimpleClientContainerExtension.php rename to pkg/simple-client/SimpleClientContainerExtension.php index 87f1b3f44..0784cfb17 100644 --- a/pkg/enqueue/Client/SimpleClientContainerExtension.php +++ b/pkg/simple-client/SimpleClientContainerExtension.php @@ -1,14 +1,20 @@ root('enqueue'); @@ -65,7 +71,7 @@ private function getConfigTreeBuilder() ->end()->end() ; - return $tb; + return $tb->buildTree(); } /** @@ -91,7 +97,7 @@ public function addTransportFactory(TransportFactoryInterface $transportFactory) public function load(array $configs, ContainerBuilder $container) { $configProcessor = new Processor(); - $config = $configProcessor->process($this->getConfigTreeBuilder()->buildTree(), $configs); + $config = $configProcessor->process($this->createConfiguration(), $configs); foreach ($config['transport'] as $name => $transportConfig) { $this->factories[$name]->createConnectionFactory($container, $transportConfig); diff --git a/pkg/simple-client/composer.json b/pkg/simple-client/composer.json new file mode 100644 index 000000000..3e98ca93a --- /dev/null +++ b/pkg/simple-client/composer.json @@ -0,0 +1,43 @@ +{ + "name": "enqueue/simple-client", + "type": "library", + "description": "Message Queue Simple Client", + "keywords": ["messaging", "queue", "amqp", "rabbitmq"], + "license": "MIT", + "repositories": [ + { + "type": "vcs", + "url": "git@github.com:php-enqueue/test.git" + } + ], + "require": { + "php": ">=5.6", + "enqueue/enqueue": "^0.3", + "symfony/dependency-injection": "^2.8|^3", + "symfony/config": "^2.8|^3" + }, + "require-dev": { + "phpunit/phpunit": "~5.5", + "enqueue/test": "^0.3" + }, + "suggest": { + "enqueue/amqp-ext": "AMQP transport (based on php extension)", + "enqueue/stomp": "STOMP transport", + "enqueue/fs": "Filesystem transport", + "enqueue/redis": "Redis transport", + "enqueue/dbal": "Doctrine DBAL transport", + "enqueue/sqs": "Amazon AWS SQS transport" + }, + "autoload": { + "psr-4": { "Enqueue\\SimpleClient\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "0.3.x-dev" + } + } +} diff --git a/pkg/simple-client/phpunit.xml.dist b/pkg/simple-client/phpunit.xml.dist new file mode 100644 index 000000000..e86476dec --- /dev/null +++ b/pkg/simple-client/phpunit.xml.dist @@ -0,0 +1,31 @@ + + + + + + + ./Tests + + + + + + . + + ./vendor + ./Resources + ./Tests + + + + diff --git a/pkg/stomp/Symfony/StompTransportFactory.php b/pkg/stomp/Symfony/StompTransportFactory.php index d3d30ecf6..c7ed2f4ec 100644 --- a/pkg/stomp/Symfony/StompTransportFactory.php +++ b/pkg/stomp/Symfony/StompTransportFactory.php @@ -84,6 +84,7 @@ public function createDriver(ContainerBuilder $container, array $config) $driver->setArguments([ new Reference(sprintf('enqueue.transport.%s.context', $this->getName())), new Reference('enqueue.client.config'), + new Reference('enqueue.client.meta.queue_meta_registry'), ]); $driverId = sprintf('enqueue.client.%s.driver', $this->getName()); From c8e111f80d247bd174060ddfc37fed7f580e1a93 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 10 May 2017 12:49:25 +0300 Subject: [PATCH 08/15] multi transport simple client --- pkg/simple-client/SimpleClient.php | 20 +++++++++++++++----- pkg/simple-client/composer.json | 3 ++- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php index 072ef8c95..af7e3a8ee 100644 --- a/pkg/simple-client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -133,16 +133,26 @@ private function buildConfig($config) 'client' => [], 'transport' => [], ], $config); + } else { + throw new \LogicException('Expects config is string or array'); + } - $transport = current(array_keys($extConfig['transport'])); + if (empty($extConfig['transport']['default'])) { + $defaultTransport = null; + foreach ($extConfig['transport'] as $transport => $config) { + if ('default' === $transport) { + continue; + } - if (false == $transport) { + $defaultTransport = $transport; + break; + } + + if (false == $defaultTransport) { throw new \LogicException('There is no transport configured'); } - $extConfig['transport']['default'] = $transport; - } else { - throw new \LogicException('Expects config is string or array'); + $extConfig['transport']['default'] = $defaultTransport; } return $extConfig; diff --git a/pkg/simple-client/composer.json b/pkg/simple-client/composer.json index 3e98ca93a..9222aa053 100644 --- a/pkg/simple-client/composer.json +++ b/pkg/simple-client/composer.json @@ -14,7 +14,8 @@ "php": ">=5.6", "enqueue/enqueue": "^0.3", "symfony/dependency-injection": "^2.8|^3", - "symfony/config": "^2.8|^3" + "symfony/config": "^2.8|^3", + "symfony/console": "^2.8|^3" }, "require-dev": { "phpunit/phpunit": "~5.5", From 0ed8ffc462b92114883881ce79c40405af0e54ef Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 10 May 2017 12:55:51 +0300 Subject: [PATCH 09/15] multi transport simple client --- pkg/simple-client/LICENSE | 2 +- pkg/simple-client/SimpleClient.php | 3 --- pkg/simple-client/composer.json | 8 -------- 3 files changed, 1 insertion(+), 12 deletions(-) diff --git a/pkg/simple-client/LICENSE b/pkg/simple-client/LICENSE index 396bf1908..70fa75252 100644 --- a/pkg/simple-client/LICENSE +++ b/pkg/simple-client/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2016 Kotliar Maksym +Copyright (c) 2017 Kotliar Maksym Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php index af7e3a8ee..a963cc9ca 100644 --- a/pkg/simple-client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -24,9 +24,6 @@ use Enqueue\Symfony\DefaultTransportFactory; use Symfony\Component\DependencyInjection\ContainerBuilder; -/** - * Use it speedup setup process and learning but consider to switch to custom solution (build your own client). - */ final class SimpleClient { /** diff --git a/pkg/simple-client/composer.json b/pkg/simple-client/composer.json index 9222aa053..7df693ba2 100644 --- a/pkg/simple-client/composer.json +++ b/pkg/simple-client/composer.json @@ -21,14 +21,6 @@ "phpunit/phpunit": "~5.5", "enqueue/test": "^0.3" }, - "suggest": { - "enqueue/amqp-ext": "AMQP transport (based on php extension)", - "enqueue/stomp": "STOMP transport", - "enqueue/fs": "Filesystem transport", - "enqueue/redis": "Redis transport", - "enqueue/dbal": "Doctrine DBAL transport", - "enqueue/sqs": "Amazon AWS SQS transport" - }, "autoload": { "psr-4": { "Enqueue\\SimpleClient\\": "" }, "exclude-from-classmap": [ From fac809632b633278a252d7cb631c02e0b94d8b3b Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 10 May 2017 15:54:25 +0300 Subject: [PATCH 10/15] [doc] add docs for simple client. --- docs/client/quick_tour.md | 123 ++++++++++++++++++++++++++++++++++++++ docs/client/rpc_call.md | 16 ++--- docs/quick_tour.md | 30 ++++++---- 3 files changed, 149 insertions(+), 20 deletions(-) create mode 100644 docs/client/quick_tour.md diff --git a/docs/client/quick_tour.md b/docs/client/quick_tour.md new file mode 100644 index 000000000..107f2bbaa --- /dev/null +++ b/docs/client/quick_tour.md @@ -0,0 +1,123 @@ +# Simple client. Quick tour. + +The simple client library takes Enqueue client classes and Symfony components and makes an easy to use client facade. +It reduces the boiler plate code you have to write to start using the Enqueue client features. + +* [Install](#install) +* [Configure](#configure) +* [Producer message](#produce-message) +* [Consume messages](#consume-messages) + +## Install + +```bash +$ composer require enqueue/simple-client enqueue/amqp-ext +``` + +## Configure + +```php + [ + 'default' => 'amqp', + 'amqp' => [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'login' => 'guest', + 'password' => 'guest', + ], + ], + 'client' => [ + 'app_name' => 'plain_php', + ], +]); +``` + +## Produce message + +```php +send('a_bar_topic', 'aMessageData'); + +// or an array + +$client->send('a_bar_topic', ['foo', 'bar']); + +// or an json serializable object +$client->send('a_bar_topic', new class() implements \JsonSerializable { + public function jsonSerialize() { + return ['foo', 'bar']; + } +}); +``` + +## Consume messages + +```php +bind('a_bar_topic', 'a_processor_name', function(PsrMessage $psrMessage) { + // processing logic here +}); + +$client->consume(); +``` + +## Cli commands + +```php +#!/usr/bin/env php +add(new SetupBrokerCommand($client->getDriver())); +$application->add(new ProduceMessageCommand($client->getProducer())); +$application->add(new QueuesCommand($client->getQueueMetaRegistry())); +$application->add(new TopicsCommand($client->getTopicMetaRegistry())); +$application->add(new ConsumeMessagesCommand( + $client->getQueueConsumer(), + $client->getDelegateProcessor(), + $client->getQueueMetaRegistry(), + $client->getDriver() +)); + +$application->run(); +``` + +and run to see what is there: + +```bash +$ php bin/enqueue.php +``` + +or consume messages + +```bash +$ php bin/enqueue.php enqueue:consume -vvv --setup-broker +``` + +[back to index](../index.md) diff --git a/docs/client/rpc_call.md b/docs/client/rpc_call.md index 460df5943..cccd4f602 100644 --- a/docs/client/rpc_call.md +++ b/docs/client/rpc_call.md @@ -1,5 +1,7 @@ # Client. RPC call +The client's [quick tour](quick_tour.md) describes how to get the client object. +We use you followed instructions there and have instance of `Enqueue\SimpleClient\SimpleClient` in `$client` var. ## The client side @@ -8,13 +10,10 @@ It allows you to easily send a message and wait for a reply. ```php getProducer(), $context); $replyMessage = $rpcClient->call('greeting_topic', 'Hi Thomas!', 5); @@ -24,13 +23,10 @@ You can perform several requests asynchronously with `callAsync` and request rep ```php getProducer(), $context); $promises = []; @@ -53,7 +49,6 @@ Of course it is possible to implement rpc server side based on transport classes ```php context); +/** @var \Enqueue\SimpleClient\SimpleClient $client */ + $client->bind('greeting_topic', 'greeting_processor', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) { echo $message->getBody(); diff --git a/docs/quick_tour.md b/docs/quick_tour.md index 75f8ba00e..d7749b73a 100644 --- a/docs/quick_tour.md +++ b/docs/quick_tour.md @@ -167,20 +167,30 @@ Here's an example of how you can send and consume messages. ```php bind('foo_topic', 'processor_name', function (PsrMessage $message) { - // process message - - return PsrProcessor::ACK; +$client = new SimpleClient([ + 'transport' => [ + 'default' => 'amqp', + 'amqp' => [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'login' => 'guest', + 'password' => 'guest', + ], + ], + 'client' => true, +]); + +$client->setupBroker(); + +$client->bind('a_foo_topic', 'fooProcessor', function(PsrMessage $message) { + // your processing logic here }); -$client->send('foo_topic', 'Hello there!'); +$client->send('a_bar_topic', 'aMessageData'); // in another process you can consume messages. $client->consume(); From c5b04261e387e7d2d010e546d84fd8448ff78028 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 10 May 2017 15:54:42 +0300 Subject: [PATCH 11/15] add simple client to split scrit and release script. --- bin/release | 2 +- bin/subtree-split | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/bin/release b/bin/release index 511e3053b..87613ea64 100755 --- a/bin/release +++ b/bin/release @@ -13,7 +13,7 @@ fi CURRENT_BRANCH=`git rev-parse --abbrev-ref HEAD` -for REMOTE in origin psr-queue stomp amqp-ext sqs fs redis dbal null enqueue enqueue-bundle job-queue test +for REMOTE in origin psr-queue stomp amqp-ext sqs fs redis dbal null enqueue simple-client enqueue-bundle job-queue test do TMP_DIR="/tmp/enqueue-repo" REMOTE_URL=`git remote get-url $REMOTE` diff --git a/bin/subtree-split b/bin/subtree-split index 53e46cce1..6dbdc0193 100755 --- a/bin/subtree-split +++ b/bin/subtree-split @@ -45,6 +45,7 @@ function remote() remote psr-queue git@github.com:php-enqueue/psr-queue.git remote enqueue git@github.com:php-enqueue/enqueue.git +remote simple-client git@github.com:php-enqueue/simple-client.git remote stomp git@github.com:php-enqueue/stomp.git remote amqp-ext git@github.com:php-enqueue/amqp-ext.git remote fs git@github.com:php-enqueue/fs.git @@ -58,6 +59,7 @@ remote test git@github.com:php-enqueue/test.git split 'pkg/psr-queue' psr-queue split 'pkg/enqueue' enqueue +split 'pkg/simple-client' simple-client split 'pkg/stomp' stomp split 'pkg/amqp-ext' amqp-ext split 'pkg/fs' fs From d77fad601eebba627e4d5293af4d600f9c3ab79d Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 10 May 2017 15:54:58 +0300 Subject: [PATCH 12/15] simple client. small fixes. --- pkg/simple-client/SimpleClient.php | 2 +- pkg/simple-client/{ => Tests}/Functional/SimpleClientTest.php | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename pkg/simple-client/{ => Tests}/Functional/SimpleClientTest.php (100%) diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php index a963cc9ca..5879b969d 100644 --- a/pkg/simple-client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -34,7 +34,7 @@ final class SimpleClient /** * $config = [ * 'transport' => [ - * 'rabbitmq_amqp' => [], + * 'default' => 'amqp', * 'amqp' => [], * .... * ], diff --git a/pkg/simple-client/Functional/SimpleClientTest.php b/pkg/simple-client/Tests/Functional/SimpleClientTest.php similarity index 100% rename from pkg/simple-client/Functional/SimpleClientTest.php rename to pkg/simple-client/Tests/Functional/SimpleClientTest.php From 99cefcd724e6a9747db2d5108a7503864e29725e Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 10 May 2017 15:58:30 +0300 Subject: [PATCH 13/15] [doc] add simple client quick tour --- docs/index.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/index.md b/docs/index.md index e5c3ee007..d8c9258cc 100644 --- a/docs/index.md +++ b/docs/index.md @@ -13,6 +13,7 @@ - [Extensions](consumption/extensions.md) - [Message processor](consumption/message_processor.md) * Client + - [Quick tour](client/quick_tour.md) - [Message examples](client/message_examples.md) - [Supported brokers](client/supported_brokers.md) - [Message bus](client/message_bus.md) From 0460c04aa06785a68001b6ecc5c8b32ed2da1b0d Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 10 May 2017 16:16:02 +0300 Subject: [PATCH 14/15] simple client upd readme. --- pkg/simple-client/README.md | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/pkg/simple-client/README.md b/pkg/simple-client/README.md index d842aa27d..75007c491 100644 --- a/pkg/simple-client/README.md +++ b/pkg/simple-client/README.md @@ -1 +1,26 @@ -# Message Queue Simple Client. +# Message Queue. Simple client + +[![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby) +[![Build Status](https://travis-ci.org/php-enqueue/simple-client.png?branch=master)](https://travis-ci.org/php-enqueue/simple-client) +[![Total Downloads](https://poser.pugx.org/enqueue/simple-client/d/total.png)](https://packagist.org/packages/enqueue/simple-client) +[![Latest Stable Version](https://poser.pugx.org/enqueue/simple-client/version.png)](https://packagist.org/packages/enqueue/simple-client) + +The simple client takes Enqueue client classes and Symfony components and combines it to easy to use facade called `SimpleCLient`. + +## Resources + +* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) +* [Questions](https://gitter.im/php-enqueue/Lobby) +* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues) + +## Developed by Forma-Pro + +Forma-Pro is a full stack development company which interests also spread to open source development. +Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience. +Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability. + +If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at opensource@forma-pro.com + +## License + +It is released under the [MIT License](LICENSE). From 58aec46703941c810e756aa0b67b1391299a1a33 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 10 May 2017 16:20:34 +0300 Subject: [PATCH 15/15] Release 0.3.8 --- CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69c1a288e..be4feca42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Change Log +## [0.3.8](https://github.com/php-enqueue/enqueue-dev/tree/0.3.8) (2017-05-10) +[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.3.7...0.3.8) + +- Add support for production extensions [\#70](https://github.com/php-enqueue/enqueue-dev/issues/70) + +- Multi Transport Simple Client [\#75](https://github.com/php-enqueue/enqueue-dev/pull/75) ([ASKozienko](https://github.com/ASKozienko)) +- Client Extensions [\#72](https://github.com/php-enqueue/enqueue-dev/pull/72) ([ASKozienko](https://github.com/ASKozienko)) + ## [0.3.7](https://github.com/php-enqueue/enqueue-dev/tree/0.3.7) (2017-05-04) [Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.3.6...0.3.7)