From 0e9dd8c71b4032d519a1319648e228d047cfb27c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20C=C3=AErna=C8=9B?= Date: Thu, 19 Jan 2017 15:16:58 +0100 Subject: [PATCH 1/8] Fix unclear sentences --- docs/quick_tour.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/quick_tour.md b/docs/quick_tour.md index b51c63883..b4cfb8e0e 100644 --- a/docs/quick_tour.md +++ b/docs/quick_tour.md @@ -58,9 +58,9 @@ $consumer->acknowledge($message); ## Consumption Consumption is a layer build on top of a transport functionality. -The goal of the component is to simply message consumption. -The `QueueConsumer` is main piece of the component it allows bind message processors (or callbacks) to queues. -The `consume` method starts the consumption process which last as long as it is interrupted. +The goal of the component is to simply consume messages. +The `QueueConsumer` is main piece of the component it allows binding of message processors (or callbacks) to queues. +The `consume` method starts the consumption process which last as long as it is not interrupted. ```php Date: Fri, 27 Jan 2017 10:58:41 +0200 Subject: [PATCH 2/8] Release 0.2.5 --- CHANGELOG.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6029d9be8..7af299ba0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ # Change Log +## [0.2.5](https://github.com/php-enqueue/enqueue-dev/tree/0.2.5) (2017-01-27) +[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.2.4...0.2.5) + +- \[amqp\]\[bug\] Consumer received message targeted for another consumer of this same channel [\#13](https://github.com/php-enqueue/enqueue-dev/issues/13) +- \[amqp\] Put in buffer not our message. Continue consumption. [\#22](https://github.com/php-enqueue/enqueue-dev/pull/22) ([makasim](https://github.com/makasim)) + +- \[travis\] Test against different Symfony versions, at least 2.8, 3.0, 3.1 [\#17](https://github.com/php-enqueue/enqueue-dev/issues/17) +- \[docker\] Build images for all containers that built from Dockerfiles. [\#16](https://github.com/php-enqueue/enqueue-dev/issues/16) + +- \[travis\] Run test with different Symfony versions. 2.8, 3.0 [\#19](https://github.com/php-enqueue/enqueue-dev/pull/19) ([makasim](https://github.com/makasim)) +- \[fs\] Add missing enqueue/psr-queue package to composer.json. [\#18](https://github.com/php-enqueue/enqueue-dev/pull/18) ([makasim](https://github.com/makasim)) + ## [0.2.4](https://github.com/php-enqueue/enqueue-dev/tree/0.2.4) (2017-01-18) [Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.2.3...0.2.4) From 4fdd4d1975881357119d9f963b54435e2639a1d0 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Sun, 29 Jan 2017 11:41:00 +0200 Subject: [PATCH 3/8] [doc] add installation instruction. --- docs/amqp_transport.md | 7 +++++++ docs/filesystem_transport.md | 7 +++++++ docs/null_transport.md | 8 ++++++++ docs/stomp_transport.md | 8 ++++++++ 4 files changed, 30 insertions(+) diff --git a/docs/amqp_transport.md b/docs/amqp_transport.md index 420b22e7e..659bdca52 100644 --- a/docs/amqp_transport.md +++ b/docs/amqp_transport.md @@ -3,6 +3,7 @@ Implements [AMQP specifications](https://www.rabbitmq.com/specification.html). Build on top of [php amqp extension](https://github.com/pdezwart/php-amqp). +* [Installation](#installation) * [Create context](#create-context) * [Declare topic](#declare-topic) * [Declare queue](#decalre-queue) @@ -12,6 +13,12 @@ Build on top of [php amqp extension](https://github.com/pdezwart/php-amqp). * [Consume message](#consume-message) * [Purge queue messages](#purge-queue-messages) +## Installation + +```bash +$ composer require enqueue/amqp-ext +``` + ## Create context ```php diff --git a/docs/filesystem_transport.md b/docs/filesystem_transport.md index db3ebb450..d49886078 100644 --- a/docs/filesystem_transport.md +++ b/docs/filesystem_transport.md @@ -5,6 +5,7 @@ It creates a file per queue\topic. A message is a line inside the file. **Limitations** It works only in auto ack mode. Local by nature therefor messages are not visible on other servers. +* [Installation](#installation) * [Create context](#create-context) * [Declare topic](#declare-topic) * [Declare queue](#decalre-queue) @@ -14,6 +15,12 @@ A message is a line inside the file. * [Consume message](#consume-message) * [Purge queue messages](#purge-queue-messages) +## Installation + +```bash +$ composer require enqueue/fs +``` + ## Create context ```php diff --git a/docs/null_transport.md b/docs/null_transport.md index 622bdea8b..ff50b626e 100644 --- a/docs/null_transport.md +++ b/docs/null_transport.md @@ -4,6 +4,14 @@ This a special transport implementation, kind of stub. It does not send nor receive anything. Useful in tests for example. +* [Installation](#installation) +* [Create context](#create-context) + +## Installation + +```bash +$ composer require enqueue/enqueue +``` ## Create context diff --git a/docs/stomp_transport.md b/docs/stomp_transport.md index 03a03ceb3..cb62191fd 100644 --- a/docs/stomp_transport.md +++ b/docs/stomp_transport.md @@ -1,9 +1,17 @@ # STOMP transport +* [Installation](#installation) +* [Create context](#create-context) * [Send message to topic](#send-message-to-topic) * [Send message to queue](#send-message-to-queue) * [Consume message](#consume-message) +## Installation + +```bash +$ composer require enqueue/stomp +``` + ## Create context ```php From 44dd9a8932e9db24bd535894c2e80406c35b6a2a Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Sun, 29 Jan 2017 12:54:53 +0200 Subject: [PATCH 4/8] [doc] Add docs about message processors. --- docs/bundle/message_processor.md | 81 ++++++++++++++++ docs/consumption/message_processor.md | 133 ++++++++++++++++++++++++++ docs/index.md | 2 + 3 files changed, 216 insertions(+) create mode 100644 docs/bundle/message_processor.md create mode 100644 docs/consumption/message_processor.md diff --git a/docs/bundle/message_processor.md b/docs/bundle/message_processor.md new file mode 100644 index 000000000..1cc8b1d0f --- /dev/null +++ b/docs/bundle/message_processor.md @@ -0,0 +1,81 @@ +# Message processor + +Message processors and usage examples described in [consumption/message_processor](../consumption/message_processor.md) +Here we just show how to register a message processor service to enqueue. Let's say we have app bundle and a message processor there + +* [Container tag](#container-tag) +* [Topic subscriber](#topic-subscriber) + +# Container tag + +```yaml +# src/AppBundle/Resources/services.yml + +services: + app.async.say_hello_processor: + class: 'AppBundle\Async\SayHelloProcessor' + tags: + - { name: 'enqueue.client.message_processor', topicName: 'aTopic' } + +``` + +The tag has some additional options: + +* topicName [Req]: Tells what topic to consume messages from. +* queueName: By default message processor does not require an extra queue on broker side. It reuse a default one. Setting the option you can define a custom queue to be used. +* processorName: By default the service id is used as message processor name. Using the option you can define a custom name. + +# Topic subscriber + +There is a `TopicSubscriber` interface (like [EventSubscriberInterface](https://github.com/symfony/symfony/blob/master/src/Symfony/Component/EventDispatcher/EventSubscriberInterface.php)). +It allows to keep subscription login and process logic closer to each other. + +```php + ['queueName' => 'fooQueue', 'processorName' => 'foo'], + 'anotherTopic' => ['queueName' => 'barQueue', 'processorName' => 'bar'], + ]; + } +} +``` + +In the container you can just add the tag `enqueue.client.message_processor` and omit any other options: + +```yaml +# src/AppBundle/Resources/services.yml + +services: + app.async.say_hello_processor: + class: 'AppBundle\Async\SayHelloProcessor' + tags: + - { name: 'enqueue.client.message_processor'} + +``` + +[back to index](../index.md) \ No newline at end of file diff --git a/docs/consumption/message_processor.md b/docs/consumption/message_processor.md new file mode 100644 index 000000000..69434f467 --- /dev/null +++ b/docs/consumption/message_processor.md @@ -0,0 +1,133 @@ +# Message processor + +The message processor is an object that actually process the message and must return a result status. +Here's example: + +```php +mailer->send('foo@example.com', $message->getBody()); + + return self::ACK; + } +} +``` + +Usually there is no need to catch exceptions. +The message broker can detect consumer has failed and redeliver the message. +Sometimes you have to reject messages explicitly. + +```php +getBody()); + if ($user = $this->userRepository->find($data['userId'])) { + return self::REJECT; + } + + $this->mailer->send($user->getEmail(), $data['text']); + + return self::ACK; + } +} +``` + +It is possible to find out whether the message failed previously or not. +There is `isRedelivered` method for that. +If it returns true than there was attempt to process message. + +```php +isRedelivered()) { + return self::REQUEUE; + } + + $this->mailer->send('foo@example.com', $message->getBody()); + + return self::ACK; + } +} +``` + +The second argument is your context. You can use it to send messages to other queues\topics. + +```php +mailer->send('foo@example.com', $message->getBody()); + + $queue = $context->createQueue('anotherQueue'); + $message = $context->createMessage('Message has been sent'); + $context->createProducer()->send($queue, $message); + + return self::ACK; + } +} +``` + +The consumption component provide some useful extensions, for example there is an extension that makes RPC processing simplier. + +```php +mailer->send('foo@example.com', $message->getBody()); + + $replyMessage = $context->createMessage('Message has been sent'); + + return Result::reply($replyMessage); + } +} + +/** @var \Enqueue\Psr\Context $psrContext */ + +$queueConsumer = new QueueConsumer($psrContext, new ChainExtension([ + new ReplyExtension() +])); + +$queueConsumer->bind('foo', new SendMailProcessor()); + +$queueConsumer->consume(); +``` + +[back to index](../index.md) \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index ada57c0ab..867ce2120 100644 --- a/docs/index.md +++ b/docs/index.md @@ -8,6 +8,7 @@ - [Null](null_transport.md) * Consumption - [Extensions](consumption/extensions.md) + - [Message processor](consumption/message_processor.md) * Client - [Message examples](client/message_examples.md) - [Supported brokers](client/supported_brokers.md) @@ -19,6 +20,7 @@ - [Quick tour](bundle/quick_tour.md) - [Config reference](bundle/config_reference.md) - [Cli commands](bundle/cli_commands.md) + - [Message processor](bundle/message_processor.md) - [Job queue](bundle/job_queue.md) - [Consumption extension](bundle/consumption_extension.md) - [Production settings](bundle/production_settings.md) From f9eb690a05d94f011f3ca5c9d5303e21bb709337 Mon Sep 17 00:00:00 2001 From: Joeri Verdeyen Date: Fri, 17 Feb 2017 10:13:39 +0100 Subject: [PATCH 5/8] Update quick_tour.md add Bundle to AppKernel --- docs/bundle/quick_tour.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/docs/bundle/quick_tour.md b/docs/bundle/quick_tour.md index 661cf5fa0..42dae48d1 100644 --- a/docs/bundle/quick_tour.md +++ b/docs/bundle/quick_tour.md @@ -9,6 +9,33 @@ It adds easy to use [configuration layer](config_reference.md), register service $ composer require enqueue/enqueue-bundle enqueue/amqp-ext ``` +## Enable the Bundle + +Then, enable the bundle by adding `new Enqueue\Bundle\EnqueueBundle()` to the bundles array of the registerBundles method in your project's `app/AppKernel.php` file: + +```php + Date: Fri, 10 Mar 2017 15:29:55 +0200 Subject: [PATCH 6/8] correct tag name. --- docs/bundle/message_processor.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/bundle/message_processor.md b/docs/bundle/message_processor.md index 1cc8b1d0f..12504c16c 100644 --- a/docs/bundle/message_processor.md +++ b/docs/bundle/message_processor.md @@ -15,7 +15,7 @@ services: app.async.say_hello_processor: class: 'AppBundle\Async\SayHelloProcessor' tags: - - { name: 'enqueue.client.message_processor', topicName: 'aTopic' } + - { name: 'enqueue.client.processor', topicName: 'aTopic' } ``` @@ -74,8 +74,8 @@ services: app.async.say_hello_processor: class: 'AppBundle\Async\SayHelloProcessor' tags: - - { name: 'enqueue.client.message_processor'} + - { name: 'enqueue.client.processor'} ``` -[back to index](../index.md) \ No newline at end of file +[back to index](../index.md) From 96cc12c7f356a4ccaed2714c9d487e998ea4b4c8 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 14 Mar 2017 16:36:27 +0200 Subject: [PATCH 7/8] fix simple client --- docs/quick_tour.md | 2 +- pkg/enqueue/Client/SimpleClient.php | 48 ++++++++++++++++++++++++----- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/docs/quick_tour.md b/docs/quick_tour.md index b4cfb8e0e..bb0a87684 100644 --- a/docs/quick_tour.md +++ b/docs/quick_tour.md @@ -173,7 +173,7 @@ use Enqueue\Psr\Processor; /** @var \Enqueue\Psr\Context $psrContext */ $client = new SimpleClient($psrContext); -$client->bind('foo_topic', function (Message $message) { +$client->bind('foo_topic', 'processor_name', function (Message $message) { // process message return Processor::ACK; diff --git a/pkg/enqueue/Client/SimpleClient.php b/pkg/enqueue/Client/SimpleClient.php index 12ceac7f0..ad60d2d27 100644 --- a/pkg/enqueue/Client/SimpleClient.php +++ b/pkg/enqueue/Client/SimpleClient.php @@ -72,11 +72,11 @@ public function __construct(AmqpContext $context, Config $config = null) /** * @param string $topic - * @param callback + * @param string $processorName + * @param callback $processor */ - public function bind($topic, callable $processor) + public function bind($topic, $processorName, callable $processor) { - $processorName = uniqid('', true); $queueName = $this->config->getDefaultProcessorQueueName(); $this->topicsMetaRegistry->addProcessor($topic, $processorName); @@ -97,9 +97,7 @@ public function consume(ExtensionInterface $runtimeExtension = null) $processor = $this->getProcessor(); - $queueConsumer = new QueueConsumer($this->context, new ChainExtension([ - new SetRouterPropertiesExtension($this->driver), - ])); + $queueConsumer = $this->getQueueConsumer(); $defaultQueueName = $this->config->getDefaultProcessorQueueName(); $defaultTransportQueueName = $this->config->createTransportQueueName($defaultQueueName); @@ -114,10 +112,44 @@ public function consume(ExtensionInterface $runtimeExtension = null) $queueConsumer->consume($runtimeExtension); } + /** + * @return QueueConsumer + */ + public function getQueueConsumer() + { + return new QueueConsumer($this->context, new ChainExtension([ + new SetRouterPropertiesExtension($this->driver), + ])); + } + + /** + * @return DriverInterface + */ + public function getDriver() + { + return $this->driver; + } + + /** + * @return TopicMetaRegistry + */ + public function getTopicMetaRegistry() + { + return $this->topicsMetaRegistry; + } + + /** + * @return QueueMetaRegistry + */ + public function getQueueMetaRegistry() + { + return $this->queueMetaRegistry; + } + /** * @return MessageProducerInterface */ - private function getProducer() + public function getProducer() { $this->driver->setupBroker(); @@ -127,7 +159,7 @@ private function getProducer() /** * @return DelegateProcessor */ - private function getProcessor() + public function getProcessor() { return new DelegateProcessor($this->processorsRegistry); } From 9cfb22667fe1176d3275349c48aeb21dbfb869c4 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 14 Mar 2017 17:03:16 +0200 Subject: [PATCH 8/8] fix tests --- pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php b/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php index eb84cf2bc..346fb7b57 100644 --- a/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php +++ b/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php @@ -34,7 +34,7 @@ public function testProduceAndConsumeOneMessage() $actualMessage = null; $client = new SimpleClient($this->context); - $client->bind('foo_topic', function (Message $message) use (&$actualMessage) { + $client->bind('foo_topic', 'foo_processor', function (Message $message) use (&$actualMessage) { $actualMessage = $message; return Result::ACK; @@ -56,12 +56,12 @@ public function testProduceAndRouteToTwoConsumes() $received = 0; $client = new SimpleClient($this->context); - $client->bind('foo_topic', function () use (&$received) { + $client->bind('foo_topic', 'foo_processor1', function () use (&$received) { ++$received; return Result::ACK; }); - $client->bind('foo_topic', function () use (&$received) { + $client->bind('foo_topic', 'foo_processor2', function () use (&$received) { ++$received; return Result::ACK;