diff --git a/.gitignore b/.gitignore index ec60c84f8..b9809c577 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,6 @@ bin/doctrine* bin/php-cs-fixer bin/phpunit bin/sql-formatter -vendor \ No newline at end of file +vendor +.php_cs +.php_cs.cache \ No newline at end of file diff --git a/.php_cs b/.php_cs deleted file mode 100644 index d3414a1f1..000000000 --- a/.php_cs +++ /dev/null @@ -1,21 +0,0 @@ -in(__DIR__) - ->notPath('vendor') -; - -return Symfony\CS\Config::create() - ->level(Symfony\CS\FixerInterface::SYMFONY_LEVEL) - ->fixers([ - 'ordered_use', - 'no_blank_lines_before_namespace', - 'short_array_syntax', - 'unused_use', - 'phpdoc_order', - - - ]) - ->finder($finder) -; - diff --git a/.php_cs.dist b/.php_cs.dist new file mode 100644 index 000000000..99a418499 --- /dev/null +++ b/.php_cs.dist @@ -0,0 +1,28 @@ +setRiskyAllowed(true) + ->setRules(array( + '@Symfony' => true, + '@Symfony:risky' => true, + 'array_syntax' => array('syntax' => 'short'), + 'combine_consecutive_unsets' => true, + // one should use PHPUnit methods to set up expected exception instead of annotations + 'general_phpdoc_annotation_remove' => array('expectedException', 'expectedExceptionMessage', 'expectedExceptionMessageRegExp'), + 'heredoc_to_nowdoc' => true, + 'no_extra_consecutive_blank_lines' => array('break', 'continue', 'extra', 'return', 'throw', 'use', 'parenthesis_brace_block', 'square_brace_block', 'curly_brace_block'), + 'no_unreachable_default_argument_value' => true, + 'no_useless_else' => true, + 'no_useless_return' => true, + 'ordered_class_elements' => true, + 'ordered_imports' => true, + 'phpdoc_add_missing_param_annotation' => true, + 'phpdoc_order' => true, + 'psr4' => true, + 'strict_param' => true, + )) + ->setFinder( + PhpCsFixer\Finder::create() + ->in(__DIR__) + ) +; \ No newline at end of file diff --git a/README.md b/README.md index 9d78ed845..95ff0d539 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ This is where all development happens. The repository provides a friendly enviro ## Resources * [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) -* [Questions](https://gitter.im/php-enqueue/enqueue-dev) +* [Questions](https://gitter.im/php-enqueue/Lobby) * [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues) ## License diff --git a/bin/dev b/bin/dev index dcfa42f1e..11f87e39d 100755 --- a/bin/dev +++ b/bin/dev @@ -3,7 +3,7 @@ set -x set -e -while getopts "buste" OPTION; do +while getopts "bustef" OPTION; do case $OPTION in b) COMPOSE_PROJECT_NAME=mqdev docker-compose build @@ -17,6 +17,9 @@ while getopts "buste" OPTION; do e) docker exec -it mqdev_dev_1 /bin/bash ;; + f) + ./bin/php-cs-fixer fix + ;; t) COMPOSE_PROJECT_NAME=mqdev docker-compose run --workdir="/mqdev" --rm dev ./bin/test ;; diff --git a/bin/pre-commit b/bin/pre-commit index 9ab08b441..4537cf91a 100755 --- a/bin/pre-commit +++ b/bin/pre-commit @@ -105,11 +105,10 @@ function runPhpCsFixer() $output = ''; $returnCode = null; exec(sprintf( - '%s %s fix %s --config-file=%s', + '%s %s fix %s', $phpBin, $phpCsFixerBin, - $projectRootDir.'/'.$file, - $projectRootDir.'/.php_cs' + $projectRootDir.'/'.$file ), $output, $returnCode); if ($returnCode) { diff --git a/bin/release b/bin/release new file mode 100755 index 000000000..9890f5927 --- /dev/null +++ b/bin/release @@ -0,0 +1,31 @@ +#!/usr/bin/env bash + +set -e +set -x + +if (( "$#" != 1 )) +then + echo "Tag has to be provided" + exit 1 +fi + +./bin/subtree-split + +CURRENT_BRANCH=`git rev-parse --abbrev-ref HEAD` + +for REMOTE in amqp-ext enqueue enqueue-bundle job-queue psr-queue stomp test +do + TMP_DIR="/tmp/enqueue-repo" + REMOTE_URL=`git remote get-url $REMOTE` + + rm -rf $TMP_DIR; + mkdir $TMP_DIR; + + ( + cd $TMP_DIR; + git clone $REMOTE_URL . --depth=10 + git checkout $CURRENT_BRANCH; + git tag $1 -m "Release $1" + git push origin --tags + ) +done diff --git a/bin/subtree-split b/bin/subtree-split index 021177f8c..0d030e9e3 100755 --- a/bin/subtree-split +++ b/bin/subtree-split @@ -3,13 +3,12 @@ set -e set -x -CURRENT_BRANCH=`git name-rev --name-only HEAD` +CURRENT_BRANCH=`git rev-parse --abbrev-ref HEAD` function split() { # split_new_repo $1 $2 - CURRENT_BRANCH=`git name-rev --name-only HEAD` SHA1=`./bin/splitsh-lite --prefix=$1` git push $2 "$SHA1:$CURRENT_BRANCH" @@ -33,8 +32,6 @@ function split_new_repo() git push origin master; ); - CURRENT_BRANCH=`git name-rev --name-only HEAD` - SHA1=`./bin/splitsh-lite --prefix=$1` git fetch $2 git push $2 "$SHA1:$CURRENT_BRANCH" -f diff --git a/composer.json b/composer.json index e0fe925ad..cce8a9151 100644 --- a/composer.json +++ b/composer.json @@ -18,7 +18,7 @@ "symfony/monolog-bundle": "^2.8|^3", "symfony/browser-kit": "^2.8|^3", "symfony/expression-language": "^2.8|^3", - "friendsofphp/php-cs-fixer": "^1" + "friendsofphp/php-cs-fixer": "^2" }, "config": { "bin-dir": "bin" diff --git a/docs/amqp_transport.md b/docs/amqp_transport.md index a673f285f..d777edd4e 100644 --- a/docs/amqp_transport.md +++ b/docs/amqp_transport.md @@ -1,5 +1,8 @@ # AMQP transport +Implements [AMQP specifications](https://www.rabbitmq.com/specification.html). +Build on top of [php amqp extension](https://github.com/pdezwart/php-amqp). + * [Create context](#create-context) * [Declare topic](#declare-topic) * [Declare queue](#decalre-queue) diff --git a/docs/bundle/cli_commands.md b/docs/bundle/cli_commands.md new file mode 100644 index 000000000..5f636128a --- /dev/null +++ b/docs/bundle/cli_commands.md @@ -0,0 +1,167 @@ +# Cli commands + +* [enqueue:consume](#enqueueconsume) +* [enqueue:produce](#enqueueproduce) +* [enqueue:setup-broker](#enqueuesetup-broker) +* [enqueue:queues](#enqueuequeues) +* [enqueue:topics](#enqueuetopics) +* [enqueue:transport:consume](#enqueuetransportconsume) + +## enqueue:consume + +``` +./bin/console enqueue:consume --help +Usage: + enqueue:consume [options] [--] []... + enq:c + +Arguments: + client-queue-names Queues to consume messages from + +Options: + --message-limit=MESSAGE-LIMIT Consume n messages and exit + --time-limit=TIME-LIMIT Consume messages during this time + --memory-limit=MEMORY-LIMIT Consume messages until process reaches this memory limit in MB + --setup-broker Creates queues, topics, exchanges, binding etc on broker side. + -h, --help Display this help message + -q, --quiet Do not output any message + -V, --version Display this application version + --ansi Force ANSI output + --no-ansi Disable ANSI output + -n, --no-interaction Do not ask any interactive question + -e, --env=ENV The environment name [default: "dev"] + --no-debug Switches off debug mode + -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug + +Help: + A client's worker that processes messages. By default it connects to default queue. It select an appropriate message processor based on a message headers +``` + +## enqueue:produce + +``` +./bin/console enqueue:produce --help +Usage: + enqueue:produce + enq:p + +Arguments: + topic A topic to send message to + message A message to send + +Options: + -h, --help Display this help message + -q, --quiet Do not output any message + -V, --version Display this application version + --ansi Force ANSI output + --no-ansi Disable ANSI output + -n, --no-interaction Do not ask any interactive question + -e, --env=ENV The environment name [default: "dev"] + --no-debug Switches off debug mode + -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug + +Help: + A command to send a message to topic +``` + +## enqueue:setup-broker + +``` +./bin/console enqueue:setup-broker --help +Usage: + enqueue:setup-broker + enq:sb + +Options: + -h, --help Display this help message + -q, --quiet Do not output any message + -V, --version Display this application version + --ansi Force ANSI output + --no-ansi Disable ANSI output + -n, --no-interaction Do not ask any interactive question + -e, --env=ENV The environment name [default: "dev"] + --no-debug Switches off debug mode + -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug + +Help: + Creates all required queues +``` + +## enqueue:queues + +``` +/bin/console enqueue:queues --help +Usage: + enqueue:queues + enq:m:q + debug:enqueue:queues + +Options: + -h, --help Display this help message + -q, --quiet Do not output any message + -V, --version Display this application version + --ansi Force ANSI output + --no-ansi Disable ANSI output + -n, --no-interaction Do not ask any interactive question + -e, --env=ENV The environment name [default: "dev"] + --no-debug Switches off debug mode + -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug + +Help: + A command shows all available queues and some information about them. +``` + +## enqueue:topics + +``` +./bin/console enqueue:topics --help +Usage: + enqueue:topics + enq:m:t + debug:enqueue:topics + +Options: + -h, --help Display this help message + -q, --quiet Do not output any message + -V, --version Display this application version + --ansi Force ANSI output + --no-ansi Disable ANSI output + -n, --no-interaction Do not ask any interactive question + -e, --env=ENV The environment name [default: "dev"] + --no-debug Switches off debug mode + -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug + +Help: + A command shows all available topics and some information about them. +``` + +## enqueue:transport:consume + +``` +./bin/console enqueue:transport:consume --help +Usage: + enqueue:transport:consume [options] [--] + +Arguments: + queue Queues to consume from + processor-service A message processor service + +Options: + --message-limit=MESSAGE-LIMIT Consume n messages and exit + --time-limit=TIME-LIMIT Consume messages during this time + --memory-limit=MEMORY-LIMIT Consume messages until process reaches this memory limit in MB + -h, --help Display this help message + -q, --quiet Do not output any message + -V, --version Display this application version + --ansi Force ANSI output + --no-ansi Disable ANSI output + -n, --no-interaction Do not ask any interactive question + -e, --env=ENV The environment name [default: "dev"] + --no-debug Switches off debug mode + -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug + +Help: + A worker that consumes message from a broker. To use this broker you have to explicitly set a queue to consume from and a message processor service +``` + +[back to index](../index.md) \ No newline at end of file diff --git a/docs/bundle/config_reference.md b/docs/bundle/config_reference.md new file mode 100644 index 000000000..e98af73ed --- /dev/null +++ b/docs/bundle/config_reference.md @@ -0,0 +1,105 @@ +# Config reference + +```yaml +# Default configuration for extension with alias: "enqueue" +enqueue: + transport: # Required + default: + alias: ~ # Required + null: [] + stomp: + host: localhost + port: 61613 + login: guest + password: guest + vhost: / + sync: true + connection_timeout: 1 + buffer_size: 1000 + rabbitmq_stomp: + host: localhost + port: 61613 + login: guest + password: guest + vhost: / + sync: true + connection_timeout: 1 + buffer_size: 1000 + + # The option tells whether RabbitMQ broker has management plugin installed or not + management_plugin_installed: false + management_plugin_port: 15672 + + # The option tells whether RabbitMQ broker has delay plugin installed or not + delay_plugin_installed: false + amqp: + + # The host to connect too. Note: Max 1024 characters + host: localhost + + # Port on the host. + port: 5672 + + # The login name to use. Note: Max 128 characters. + login: guest + + # Password. Note: Max 128 characters. + password: guest + + # The virtual host on the host. Note: Max 128 characters. + vhost: / + + # Connection timeout. Note: 0 or greater seconds. May be fractional. + connect_timeout: ~ + + # Timeout in for income activity. Note: 0 or greater seconds. May be fractional. + read_timeout: ~ + + # Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional. + write_timeout: ~ + persisted: false + rabbitmq: + + # The host to connect too. Note: Max 1024 characters + host: localhost + + # Port on the host. + port: 5672 + + # The login name to use. Note: Max 128 characters. + login: guest + + # Password. Note: Max 128 characters. + password: guest + + # The virtual host on the host. Note: Max 128 characters. + vhost: / + + # Connection timeout. Note: 0 or greater seconds. May be fractional. + connect_timeout: ~ + + # Timeout in for income activity. Note: 0 or greater seconds. May be fractional. + read_timeout: ~ + + # Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional. + write_timeout: ~ + persisted: false + + # The option tells whether RabbitMQ broker has delay plugin installed or not + delay_plugin_installed: false + client: + traceable_producer: false + prefix: enqueue + app_name: app + router_topic: router + router_queue: default + router_processor: enqueue.client.router_processor + default_processor_queue: default + redelivered_delay_time: 0 + job: false + extensions: + doctrine_ping_connection_extension: false + doctrine_clear_identity_map_extension: false +``` + +[back to index](../index.md) \ No newline at end of file diff --git a/docs/bundle/consumption_extension.md b/docs/bundle/consumption_extension.md new file mode 100644 index 000000000..38657c58f --- /dev/null +++ b/docs/bundle/consumption_extension.md @@ -0,0 +1,41 @@ +# Consumption extension + +Here, I show how you can crate a custom extension and register it. +Let's first create an extension itself: + +```php +processedMessages += 1; + } +} +``` + +Now we have to register as a Symfony service with special tag: + +```yaml +services: + app.enqueue.count_processed_messages_extension: + class: 'AppBundle\Enqueue\CountProcessedMessagesExtension' + tags: + - { name: 'enqueue.consumption.extension', priority: 10 } +``` + +[back to index](../index.md) \ No newline at end of file diff --git a/docs/bundle/debuging.md b/docs/bundle/debuging.md index 52c951ae2..c853bb3fc 100644 --- a/docs/bundle/debuging.md +++ b/docs/bundle/debuging.md @@ -1,3 +1,71 @@ # Debugging +## Profiler + +It may be useful to see what messages were sent during a http request. +The bundle provides a collector for Symfony [profiler](http://symfony.com/doc/current/profiler.html). +The extension collects all sent messages + +To enable profiler + +```yaml +# app/config/config_dev.yml + +enqueue: + client: + traceable_producer: true +``` + +Now suppose you have this code in an action: + +```php +get('enqueue.message_producer'); + + $producer->send('foo_topic', 'Hello world'); + + $producer->send('bar_topic', ['bar' => 'val']); + + $message = new Message(); + $message->setBody('baz'); + $producer->send('baz_topic', $message); + + // ... + } + +``` + +For this action you may see something like this in the profiler: + + ![Symfony profiler](../images/symfony_profiler.png) + +## Queues and topics available + +There are two console commands `./bin/console enqueue:queues` and `./bin/console enqueue:topics`. +They are here to help you to learn more about existing topics and queues. + +Here's the result: + +![Cli debug commands](../images/cli_debug_commands.png) + +## Consume command verbosity + +By default the commands `enqueu:conume` or `enqueue:transport:consume` does not output anything. +You can add `-vvv` to see more information. + +![Consume command verbosity](../images/consume_command_verbosity.png) + [back to index](../index.md) \ No newline at end of file diff --git a/docs/bundle/job_queue.md b/docs/bundle/job_queue.md index 7dc92e4aa..aded0212c 100644 --- a/docs/bundle/job_queue.md +++ b/docs/bundle/job_queue.md @@ -14,15 +14,14 @@ Guaranty that there is only single job running with such name. ```php getBody(); - return Result::ACK; - // return Result::REJECT; // when the message is broken - // return Result::REQUEUE; // the message is fine but you want to postpone processing + return self::ACK; + // return self::REJECT; // when the message is broken + // return self::REQUEUE; // the message is fine but you want to postpone processing } public static function getSubscribedTopics() @@ -72,15 +71,15 @@ Register it as a container service and subscribe to the topic: ```yaml foo_message_processor: - class: 'FooMessageProcessor' + class: 'FooProcessor' tags: - - { name: 'enqueue.client.message_processor' } + - { name: 'enqueue.client.processor' } ``` Now you can start consuming messages: ```bash -$ ./app/console enqueue:consume +$ ./app/console enqueue:consume --setup-broker ``` _**Note**: Add -vvv to find out what is going while you are consuming messages. There is a lot of valuable debug info there._ diff --git a/docs/client/message_bus.md b/docs/client/message_bus.md new file mode 100644 index 000000000..24d29ccd0 --- /dev/null +++ b/docs/client/message_bus.md @@ -0,0 +1,16 @@ +# Client. Message bus + +Here's a description of message bus from [Enterprise Integration Patterns](http://www.enterpriseintegrationpatterns.com/patterns/messaging/MessageBus.html) + +> A Message Bus is a combination of a common data model, a common command set, and a messaging infrastructure to allow different systems to communicate through a shared set of interfaces. + +If all your applications built on top of Enqueue Client you have to only make sure they send message to a shared topic. +The rest is done under the hood. + +If you'd like to connect another application (written on Python for example ) you have to follow these rules: + +* An application defines its own queue that is connected to the topic as fanout. +* A message sent to message bus topic must have a header `enqueue.topic_name`. +* Once a message is received it could be routed internally. `enqueue.topic_name` header could be used for that. + +[back to index](../index.md) diff --git a/docs/client/message_examples.md b/docs/client/message_examples.md new file mode 100644 index 000000000..f39c1c027 --- /dev/null +++ b/docs/client/message_examples.md @@ -0,0 +1,78 @@ +# Client. Message examples + +## Delay + +Message sent with a delay set is processed after the delay time exceed. +Some brokers may not support it from scratch. +In order to use delay feature with [RabbitMQ](https://www.rabbitmq.com/) you have to install a [delay plugin](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange). + +```php +setDelay(60); // seconds + +/** @var \Enqueue\Client\MessageProducerInterface $producer */ +$producer->send('aTopic', $message); +``` + +## Expiration (TTL) + +The message may have an expiration or TTL (time to live). +The message is removed from the queue if the expiration exceeded but the message has not been consumed. +For example it make sense to send a forgot password email within first few minutes, nobody needs it in an hour. + +```php +setExpire(60); // seconds + +/** @var \Enqueue\Client\MessageProducerInterface $producer */ +$producer->send('aTopic', $message); +``` + +## Priority + +You can set a priority If you want a message to be processed quicker than other messages in the queue. +Client defines five priority constants: `MessagePriority::VERY_LOW`, `MessagePriority::LOW`, `MessagePriority::NORMAL`, `MessagePriority::HIGH`, `MessagePriority::VERY_HIGH`. +The `MessagePriority::NORMAL` is default priority. + +```php +setPriority(MessagePriority::HIGH); + +/** @var \Enqueue\Client\MessageProducerInterface $producer */ +$producer->send('aTopic', $message); +``` + +## Timestamp, Content type, Message id + +Those are self describing things. +Usually they are set by Client so you dont have to worry about them. +If you do not like what Client set you can always set custom values: + +```php +setMessageId('aCustomMessageId'); +$message->setTimestamp(time()); +$message->setContentType('text/plain'); + +/** @var \Enqueue\Client\MessageProducerInterface $producer */ +$producer->send('aTopic', $message); +``` + +[back to index](../index.md) diff --git a/docs/client/supported_brokers.md b/docs/client/supported_brokers.md new file mode 100644 index 000000000..39c92c215 --- /dev/null +++ b/docs/client/supported_brokers.md @@ -0,0 +1,17 @@ +# Client. Supported brokers + +Here's the list of protocols and Client features supported by them + +| Protocol | Priority | Delay | Expiration | Setup broker | Message bus | +|:--------------:|:--------:|:--------:|:----------:|:------------:|:-----------:| +| AMQP | No | No | Yes | Yes | Yes | +| RabbitMQ AMQP | Yes | Yes* | Yes | Yes | Yes | +| STOMP | No | No | Yes | No | Yes** | +| RabbitMQ STOMP | Yes | Yes* | Yes | Yes*** | Yes** | + + +* \* Possible if a RabbitMQ delay plugin is installed. +* \*\* Possible if topics (exchanges) are configured on broker side manually. +* \*\*\* Possible if RabbitMQ Managment Plugin is installed. + +[back to index](../index.md) \ No newline at end of file diff --git a/docs/consumption/extensions.md b/docs/consumption/extensions.md index 99d7f5423..31a47e29e 100644 --- a/docs/consumption/extensions.md +++ b/docs/consumption/extensions.md @@ -1,3 +1,53 @@ # Consumption extensions. -[back to index](../index.md) \ No newline at end of file +You can learn how to register extensions in [quick tour](../quick_tour.md#consumption). +There's dedicated [chapter](../bundle/consumption_extension.md) for how to add extension in Symfony app. + +## [LoggerExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue/Consumption/Extension/LoggerExtension.php) + +It sets logger to queue consumer context. All log messages will go to it. + +## [DoctrineClearIdentityMapExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue-bundle/Consumption/Extension/DoctrineClearIdentityMapExtension.php) + +It clears Doctrine's identity map after a message is processed. It reduce memory usage. + +## [DoctrinePingConnectionExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue-bundle/Consumption/Extension/DoctrinePingConnectionExtension.php) + +It test a database connection and if it is lost it does reconnect. Fixes "MySQL has gone away" errors. + +## [ReplyExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue/Consumption/Extension/ReplyExtension.php) + +It comes with RPC code and simplifies reply logic. +It takes care of sending a reply message to reply queue. + +## [SetupBrokerExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php) + +It responsible for configuring everything at a broker side. queues, topics, bindings and so on. +The extension is added at runtime when `--setup-broker` option is used. + +## [LimitConsumedMessagesExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue/Consumption/Extension/LimitConsumedMessagesExtension.php) + +The extension counts processed message and once a limit is reached it interrupts consumption. +The extension is added at runtime when `--message-limit=10` option is used. + +## [LimitConsumerMemoryExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue/Consumption/Extension/LimitConsumerMemoryExtension.php) + +The extension interrupts consumption once a memory limit is reached. +The extension is added at runtime when `--memory-limit=512` option is used. +The value is Mb. + +## [LimitConsumptionTimeExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue/Consumption/Extension/LimitConsumptionTimeExtension.php) + +The extension interrupts consumption once time limit is reached. +The extension is added at runtime when `--time-limit="now + 2 minutes"` option is used. + +## [SignalExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue/Consumption/Extension/SignalExtension.php) + +The extension catch process signals and gracefully stops consumption. Works only on NIX platforms. + +## [DelayRedeliveredMessageExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue/Client/ConsumptionExtension/DelayRedeliveredMessageExtension.php) + +The extension checks whether the received message is redelivered (There was attempt to process message but it failed). +If so the extension reject the origin message and creates a copy message with a delay. + +[back to index](../index.md) diff --git a/docs/contribution.md b/docs/contribution.md new file mode 100644 index 000000000..dce0a0f07 --- /dev/null +++ b/docs/contribution.md @@ -0,0 +1,29 @@ +# Contribution + +To contribute you have to fork a [enqueue-dev](https://github.com/php-enqueue/enqueue-dev) repository. +Clone it locally. + +## Setup environment + +``` +composer install +./bin/pre-commit -i +./bin/dev -b +``` + +Once you did it you can work on a feature or bug fix. + +## Testing + +To run tests simply run + +``` +./bin/dev -t +``` + +## Commit + +When you try to commit changes `php-cs-fixer` is run. It fixes all coding style issues. Dont forget to stage them and commit everything. +Once everything is done open a pull request on official repository. + +[back to index](index.md) \ No newline at end of file diff --git a/docs/images/cli_debug_commands.png b/docs/images/cli_debug_commands.png new file mode 100644 index 000000000..d366f7c76 Binary files /dev/null and b/docs/images/cli_debug_commands.png differ diff --git a/docs/images/consume_command_verbosity.png b/docs/images/consume_command_verbosity.png new file mode 100644 index 000000000..9895e1fee Binary files /dev/null and b/docs/images/consume_command_verbosity.png differ diff --git a/docs/images/symfony_profiler.png b/docs/images/symfony_profiler.png new file mode 100644 index 000000000..29f10b648 Binary files /dev/null and b/docs/images/symfony_profiler.png differ diff --git a/docs/index.md b/docs/index.md index 2ad4fc6ba..6dfcaebc9 100644 --- a/docs/index.md +++ b/docs/index.md @@ -7,9 +7,22 @@ - [NULL](null_transport.md) * Consumption - [Extensions](consumption/extensions.md) -* Symfony bundle +* Client + - [Message examples](client/message_examples.md) + - [Supported brokers](client/supported_brokers.md) + - [Message bus](client/message_bus.md) +* Job queue + - [Run unique job](job_queue/run_unique_job.md) + - [Run sub job(s)](job_queue/run_sub_job.md) +* EnqueueBundle (Symfony). - [Quick tour](bundle/quick_tour.md) + - [Config reference](bundle/config_reference.md) + - [Cli commands](bundle/cli_commands.md) - [Job queue](bundle/job_queue.md) + - [Consumption extension](bundle/consumption_extension.md) - [Production settings](bundle/production_settings.md) - [Debuging](bundle/debuging.md) - - [Functional testing](bundle/functional_testing.md) \ No newline at end of file + - [Functional testing](bundle/functional_testing.md) +* Development + - [Contribution](contribution.md) + \ No newline at end of file diff --git a/docs/job_queue/run_sub_job.md b/docs/job_queue/run_sub_job.md new file mode 100644 index 000000000..e0322922e --- /dev/null +++ b/docs/job_queue/run_sub_job.md @@ -0,0 +1,62 @@ +## Job queue. Run sub job + +It shows how you can create and run a sub job, which it is executed separately. +You can create as many sub jobs as you like. +They will be executed in parallel. + +```php +jobRunner->runUnique($message->getMessageId(), 'aJobName', function (JobRunner $runner) { + $runner->createDelayed('aSubJobName1', function (JobRunner $runner, Job $childJob) { + $this->producer->send('aJobTopic', [ + 'jobId' => $childJob->getId(), + // other data required by sub job + ]); + }); + + return true; + }); + + return $result ? self::ACK : self::REJECT; + } +} + +class SubJobProcessor implements Processor +{ + /** @var JobRunner */ + private $jobRunner; + + public function process(Message $message, Context $context) + { + $data = JSON::decode($message->getBody()); + + $result = $this->jobRunner->runDelayed($data['jobId'], function () use ($data) { + // do your job + + return true; + }); + + return $result ? Result::ACK : Result::REJECT; + } +} +``` + +[back to index](../index.md) \ No newline at end of file diff --git a/docs/job_queue/run_unique_job.md b/docs/job_queue/run_unique_job.md new file mode 100644 index 000000000..cfb145d2e --- /dev/null +++ b/docs/job_queue/run_unique_job.md @@ -0,0 +1,38 @@ +## Job queue. Run unique job + +There is job queue component build on top of a transport. It provides some additional features: + +* Stores jobs to a database. So you can query that information and build a UI for it. +* Run unique job feature. If used guarantee that there is not any job with the same name running same time. +* Sub jobs. If used allow split a big job into smaller pieces and process them asynchronously and in parallel. +* Depended job. If used allow send a message when the whole job is finished (including sub jobs). + +Here's some examples. +It shows how you can run unique job using job queue (The configuration is described in a dedicated chapter). + +```php +jobRunner->runUnique($message->getMessageId(), 'aJobName', function () { + // do your job, there is no any other processes executing same job, + + return true; // if you want to ACK message or false to REJECT + }); + + return $result ? self::ACK : self::REJECT; + } +} +``` + +[back to index](../index.md) \ No newline at end of file diff --git a/docs/quick_tour.md b/docs/quick_tour.md index 26cd1aeed..b51c63883 100644 --- a/docs/quick_tour.md +++ b/docs/quick_tour.md @@ -4,7 +4,6 @@ * [Consumption](#consumption) * [Remote Procedure Call (RPC)](#remote-procedure-call-rpc) * [Client](#client) -* [Job queue](#job-queue) * [Cli commands](#cli-commands) ## Transport @@ -66,8 +65,8 @@ The `consume` method starts the consumption process which last as long as it is ```php bind('foo_queue', function(Message $message) { // process messsage - return Result::ACK; + return Processor::ACK; }); $queueConsumer->bind('bar_queue', function(Message $message) { // process messsage - return Result::ACK; + return Processor::ACK; }); $queueConsumer->consume(); ``` -There are bunch of [extensions](consumption_extensions.md) available. +There are bunch of [extensions](consumption/extensions.md) available. This is an example of how you can add them. The `SignalExtension` provides support of process signals, whenever you send SIGTERM for example it will correctly managed. The `LimitConsumptionTimeExtension` interrupts the consumption after given time. @@ -156,27 +155,28 @@ $queueConsumer->consume(); ## Client -It provides a high level abstraction. -The goal of the component is hide as much as possible details from you so you can concentrate on things that really matters. -For example, It configure a broker for you, if needed. -It provides easy to use services for producing and processing messages. -It supports message bus so different applications can send message to each other. +It provides an easy to use high level abstraction. +The goal of the component is hide as much as possible low level details so you can concentrate on things that really matters. +For example, It configure a broker for you by creating queuest, exchanges and bind them. +It provides easy to use services for producing and processing messages. +It supports unified format for setting message expiration, delay, timestamp, correlation id. +It supports message bus so different applications can talk to each other. Here's an example of how you can send and consume messages. ```php bind('foo_topic', function (Message $message) { // process message - return Result::ACK; + return Processor::ACK; }); $client->send('foo_topic', 'Hello there!'); @@ -185,104 +185,6 @@ $client->send('foo_topic', 'Hello there!'); $client->consume(); ``` -## Job queue - -There is job queue component build on top of a transport. It provides some additional features: - -* Stores jobs to a database. So you can query that information and build a UI for it. -* Run unique job feature. If used guarantee that there is not any job with the same name running same time. -* Sub jobs. If used allow split a big job into smaller pieces and process them asynchronously and in parallel. -* Depended job. If used allow send a message when the whole job is finished (including sub jobs). - -Here's some examples. -First shows how you can run unique job using job queue (The configuration is described in a dedicated chapter). - -```php -jobRunner->runUnique($message->getMessageId(), 'aJobName', function () { - // do your job, there is no any other processes executing same job, - - return true; // if you want to ACK message or false to REJECT - }); - - return $result ? Result::ACK : Result::REJECT; - } -} -``` - -Second shows how you can create and run a sub job, which it is executed separately. -You can create as many sub jobs as you like. -They will be executed in parallel. - -```php -jobRunner->runUnique($message->getMessageId(), 'aJobName', function (JobRunner $runner) { - $runner->createDelayed('aSubJobName1', function (JobRunner $runner, Job $childJob) { - $this->producer->send('aJobTopic', [ - 'jobId' => $childJob->getId(), - // other data required by sub job - ]); - }); - - return true; - }); - - return $result ? Result::ACK : Result::REJECT; - } -} - -class SubJobMessageProcessor implements MessageProcessorInterface -{ - /** @var JobRunner */ - private $jobRunner; - - public function process(Message $message, Context $context) - { - $data = JSON::decode($message->getBody()); - - $result = $this->jobRunner->runDelayed($data['jobId'], function () use ($data) { - // do your job - - return true; - }); - - return $result ? Result::ACK : Result::REJECT; - } -} -``` - ## Cli commands The library provides handy commands out of the box. @@ -318,7 +220,7 @@ $app->run(); and starts the consumption from the console: ```bash -$ app.php consume --time-limit="now + 60 sec" --message-limit=10 --memory-limit=256 +$ app.php consume ``` [back to index](index.md) diff --git a/pkg/amqp-ext/AmqpConnectionFactory.php b/pkg/amqp-ext/AmqpConnectionFactory.php index dfe36a35f..ab46fe0f0 100644 --- a/pkg/amqp-ext/AmqpConnectionFactory.php +++ b/pkg/amqp-ext/AmqpConnectionFactory.php @@ -1,4 +1,5 @@ bind($destination, $queue); return new AmqpConsumer($this, $queue); - } else { - return new AmqpConsumer($this, $destination); } + + return new AmqpConsumer($this, $destination); } public function close() diff --git a/pkg/amqp-ext/AmqpMessage.php b/pkg/amqp-ext/AmqpMessage.php index 044c4f19e..d5992237b 100644 --- a/pkg/amqp-ext/AmqpMessage.php +++ b/pkg/amqp-ext/AmqpMessage.php @@ -1,4 +1,5 @@ assertSame(null, $message->getBody()); + $this->assertNull($message->getBody()); $this->assertSame([], $message->getProperties()); $this->assertSame([], $message->getHeaders()); } diff --git a/pkg/amqp-ext/Tests/AmqpQueueTest.php b/pkg/amqp-ext/Tests/AmqpQueueTest.php index 9381c1def..56e234764 100644 --- a/pkg/amqp-ext/Tests/AmqpQueueTest.php +++ b/pkg/amqp-ext/Tests/AmqpQueueTest.php @@ -1,4 +1,5 @@ true]); - $meta = new QueueMetaRegistry($config, ['default' => []]); $driver = new RabbitMqDriver($context, $config, $meta); diff --git a/pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php b/pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php index 45a884231..be0ef4fef 100644 --- a/pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php +++ b/pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php @@ -1,4 +1,5 @@ amqpContext->createConsumer($topic); //guard - $this->assertSame(null, $consumer->receive(1)); + $this->assertNull($consumer->receive(1)); $message = $this->amqpContext->createMessage(__METHOD__); diff --git a/pkg/amqp-ext/Tests/Functional/AmqpConsumptionUseCasesTest.php b/pkg/amqp-ext/Tests/Functional/AmqpConsumptionUseCasesTest.php index 15ffb2a2f..76abbfce9 100644 --- a/pkg/amqp-ext/Tests/Functional/AmqpConsumptionUseCasesTest.php +++ b/pkg/amqp-ext/Tests/Functional/AmqpConsumptionUseCasesTest.php @@ -1,16 +1,17 @@ bind($queue, $processor); $queueConsumer->consume(); @@ -81,10 +82,10 @@ public function testConsumeOneMessageAndSendReplyExit() $replyMessage = $this->amqpContext->createMessage(__METHOD__.'.reply'); - $processor = new StubMessageProcessor(); + $processor = new StubProcessor(); $processor->result = Result::reply($replyMessage); - $replyProcessor = new StubMessageProcessor(); + $replyProcessor = new StubProcessor(); $queueConsumer->bind($queue, $processor); $queueConsumer->bind($replyQueue, $replyProcessor); @@ -98,7 +99,7 @@ public function testConsumeOneMessageAndSendReplyExit() } } -class StubMessageProcessor implements MessageProcessorInterface +class StubProcessor implements Processor { public $result = Result::ACK; diff --git a/pkg/amqp-ext/Tests/Functional/AmqpRpcUseCasesTest.php b/pkg/amqp-ext/Tests/Functional/AmqpRpcUseCasesTest.php index bd6d58496..3085e858f 100644 --- a/pkg/amqp-ext/Tests/Functional/AmqpRpcUseCasesTest.php +++ b/pkg/amqp-ext/Tests/Functional/AmqpRpcUseCasesTest.php @@ -1,4 +1,5 @@ =5.6", - "ext-amqp": "*", - "enqueue/psr-queue": "dev-master", + "ext-amqp": "^1.6", + "enqueue/psr-queue": "^0.1", "psr/log": "^1" }, "require-dev": { "phpunit/phpunit": "~5.4.0", - "enqueue/test": "dev-master", - "enqueue/enqueue": "dev-master", + "enqueue/test": "^0.1", + "enqueue/enqueue": "^0.1", "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3" }, diff --git a/pkg/enqueue-bundle/Consumption/Extension/DoctrineClearIdentityMapExtension.php b/pkg/enqueue-bundle/Consumption/Extension/DoctrineClearIdentityMapExtension.php index 4f2a1c0ae..9750fb399 100644 --- a/pkg/enqueue-bundle/Consumption/Extension/DoctrineClearIdentityMapExtension.php +++ b/pkg/enqueue-bundle/Consumption/Extension/DoctrineClearIdentityMapExtension.php @@ -1,4 +1,5 @@ hasDefinition($routerId)) { diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php index ecb0deb33..c86b5a35d 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php @@ -1,4 +1,5 @@ hasDefinition($processorRegistryId)) { return; diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildQueueMetaRegistryPass.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildQueueMetaRegistryPass.php index 96d99d22d..af9ffea2f 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildQueueMetaRegistryPass.php +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildQueueMetaRegistryPass.php @@ -1,4 +1,5 @@ hasDefinition($queueMetaRegistryId)) { return; diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildTopicMetaSubscribersPass.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildTopicMetaSubscribersPass.php index cf53cb388..b2a339eaa 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildTopicMetaSubscribersPass.php +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildTopicMetaSubscribersPass.php @@ -1,4 +1,5 @@ findTaggedServiceIds($processorTagName) as $serviceId => $tagAttributes) { diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractMessageProcessorTagSubscriptionsTrait.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php similarity index 98% rename from pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractMessageProcessorTagSubscriptionsTrait.php rename to pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php index d0f9e2ea0..cd15177d9 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractMessageProcessorTagSubscriptionsTrait.php +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php @@ -1,11 +1,12 @@ arrayNode('extensions')->addDefaultsIfNotSet()->children() ->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end() ->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end() + ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() ->end()->end() ; diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index dec578ad8..86610e44d 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -1,8 +1,9 @@ load('extensions/doctrine_clear_identity_map_extension.yml'); } + + if ($config['extensions']['signal_extension']) { + $loader->load('extensions/signal_extension.yml'); + } } /** diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index d230d3a71..e07191358 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -1,20 +1,21 @@ addCompilerPass(new BuildExtensionsPass()); $container->addCompilerPass(new BuildClientRoutingPass()); - $container->addCompilerPass(new BuildMessageProcessorRegistryPass()); + $container->addCompilerPass(new BuildProcessorRegistryPass()); $container->addCompilerPass(new BuildTopicMetaSubscribersPass()); $container->addCompilerPass(new BuildQueueMetaRegistryPass()); diff --git a/pkg/enqueue-bundle/Entity/Job.php b/pkg/enqueue-bundle/Entity/Job.php index 1d2dc03e2..38abf755e 100644 --- a/pkg/enqueue-bundle/Entity/Job.php +++ b/pkg/enqueue-bundle/Entity/Job.php @@ -1,4 +1,5 @@ container->get('enqueue.client.consume_messages_command'); - $messageProcessor = $this->container->get('test.message.processor'); + $processor = $this->container->get('test.message.processor'); - $this->getMessageProducer()->send(TestMessageProcessor::TOPIC, 'test message body'); + $this->getMessageProducer()->send(TestProcessor::TOPIC, 'test message body'); $tester = new CommandTester($command); $tester->execute([ @@ -45,16 +46,16 @@ public function testClientConsumeMessagesCommandShouldConsumeMessage() '--time-limit' => 'now +10 seconds', ]); - $this->assertInstanceOf(AmqpMessage::class, $messageProcessor->message); - $this->assertEquals('test message body', $messageProcessor->message->getBody()); + $this->assertInstanceOf(AmqpMessage::class, $processor->message); + $this->assertEquals('test message body', $processor->message->getBody()); } public function testClientConsumeMessagesFromExplicitlySetQueue() { $command = $this->container->get('enqueue.client.consume_messages_command'); - $messageProcessor = $this->container->get('test.message.processor'); + $processor = $this->container->get('test.message.processor'); - $this->getMessageProducer()->send(TestMessageProcessor::TOPIC, 'test message body'); + $this->getMessageProducer()->send(TestProcessor::TOPIC, 'test message body'); $tester = new CommandTester($command); $tester->execute([ @@ -63,17 +64,17 @@ public function testClientConsumeMessagesFromExplicitlySetQueue() 'client-queue-names' => ['test'], ]); - $this->assertInstanceOf(AmqpMessage::class, $messageProcessor->message); - $this->assertEquals('test message body', $messageProcessor->message->getBody()); + $this->assertInstanceOf(AmqpMessage::class, $processor->message); + $this->assertEquals('test message body', $processor->message->getBody()); } public function testTransportConsumeMessagesCommandShouldConsumeMessage() { $command = $this->container->get('enqueue.command.consume_messages'); $command->setContainer($this->container); - $messageProcessor = $this->container->get('test.message.processor'); + $processor = $this->container->get('test.message.processor'); - $this->getMessageProducer()->send(TestMessageProcessor::TOPIC, 'test message body'); + $this->getMessageProducer()->send(TestProcessor::TOPIC, 'test message body'); $tester = new CommandTester($command); $tester->execute([ @@ -83,13 +84,8 @@ public function testTransportConsumeMessagesCommandShouldConsumeMessage() 'processor-service' => 'test.message.processor', ]); - $this->assertInstanceOf(AmqpMessage::class, $messageProcessor->message); - $this->assertEquals('test message body', $messageProcessor->message->getBody()); - } - - private function getMessageProducer() - { - return $this->container->get('enqueue.client.message_producer'); + $this->assertInstanceOf(AmqpMessage::class, $processor->message); + $this->assertEquals('test message body', $processor->message->getBody()); } /** @@ -101,4 +97,9 @@ public static function getKernelClass() return AmqpAppKernel::class; } + + private function getMessageProducer() + { + return $this->container->get('enqueue.client.message_producer'); + } } diff --git a/pkg/enqueue-bundle/Tests/Functional/ContextTest.php b/pkg/enqueue-bundle/Tests/Functional/ContextTest.php index 557005797..f11a9b17a 100644 --- a/pkg/enqueue-bundle/Tests/Functional/ContextTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/ContextTest.php @@ -1,4 +1,5 @@ message = $message; - return Result::ACK; + return self::ACK; } public static function getSubscribedTopics() diff --git a/pkg/enqueue-bundle/Tests/Functional/TopicRegistryTest.php b/pkg/enqueue-bundle/Tests/Functional/TopicRegistryTest.php index 38f2fa99e..0db559da0 100644 --- a/pkg/enqueue-bundle/Tests/Functional/TopicRegistryTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/TopicRegistryTest.php @@ -1,4 +1,5 @@ load(__DIR__.'/config/amqp-config.yml'); } + + protected function getContainerClass() + { + return parent::getContainerClass().'BundleAmqp'; + } } diff --git a/pkg/enqueue-bundle/Tests/Functional/app/AppKernel.php b/pkg/enqueue-bundle/Tests/Functional/app/AppKernel.php index 9c84fd9ee..e2ed065a3 100644 --- a/pkg/enqueue-bundle/Tests/Functional/app/AppKernel.php +++ b/pkg/enqueue-bundle/Tests/Functional/app/AppKernel.php @@ -1,4 +1,5 @@ load(__DIR__.'/config/config.yml'); } + + protected function getContainerClass() + { + return parent::getContainerClass().'BundleDefault'; + } } diff --git a/pkg/enqueue-bundle/Tests/Functional/app/config/amqp-config.yml b/pkg/enqueue-bundle/Tests/Functional/app/config/amqp-config.yml index ad130b295..312f81f52 100644 --- a/pkg/enqueue-bundle/Tests/Functional/app/config/amqp-config.yml +++ b/pkg/enqueue-bundle/Tests/Functional/app/config/amqp-config.yml @@ -38,6 +38,6 @@ enqueue: services: test.message.processor: - class: 'Enqueue\Bundle\Tests\Functional\TestMessageProcessor' + class: 'Enqueue\Bundle\Tests\Functional\TestProcessor' tags: - - { name: 'enqueue.client.message_processor' } + - { name: 'enqueue.client.processor' } diff --git a/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrineClearIdentityMapExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrineClearIdentityMapExtensionTest.php index 93ac073ab..7bf9eaa36 100644 --- a/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrineClearIdentityMapExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrineClearIdentityMapExtensionTest.php @@ -1,12 +1,13 @@ createMock(PsrContext::class)); $context->setLogger($this->createMock(LoggerInterface::class)); $context->setPsrConsumer($this->createMock(Consumer::class)); - $context->setMessageProcessor($this->createMock(MessageProcessorInterface::class)); + $context->setPsrProcessor($this->createMock(Processor::class)); return $context; } diff --git a/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrinePingConnectionExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrinePingConnectionExtensionTest.php index 046442add..8e4f40bdf 100644 --- a/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrinePingConnectionExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrinePingConnectionExtensionTest.php @@ -1,12 +1,13 @@ createMock(PsrContext::class)); $context->setLogger($this->createMock(LoggerInterface::class)); $context->setPsrConsumer($this->createMock(Consumer::class)); - $context->setMessageProcessor($this->createMock(MessageProcessorInterface::class)); + $context->setPsrProcessor($this->createMock(Processor::class)); return $context; } diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/AddTopicMetaPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/AddTopicMetaPassTest.php index 6bbb330b5..b7efc8551 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/AddTopicMetaPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/AddTopicMetaPassTest.php @@ -1,8 +1,9 @@ createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', 'processorName' => 'processor', 'queueName' => 'queue', @@ -49,7 +50,7 @@ public function testThrowIfProcessorClassNameCouldNotBeFound() $container = $this->createContainerBuilder(); $processor = new Definition('notExistingClass'); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'processorName' => 'processor', ]); $container->setDefinition('processor', $processor); @@ -70,7 +71,7 @@ public function testShouldThrowExceptionIfTopicNameIsNotSet() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor', $processor); $router = new Definition(); @@ -89,7 +90,7 @@ public function testShouldSetServiceIdAdProcessorIdIfIsNotSetInTag() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', 'queueName' => 'queue', ]); @@ -116,7 +117,7 @@ public function testShouldSetDefaultQueueIfNotSetInTag() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', ]); $container->setDefinition('processor-service-id', $processor); @@ -142,7 +143,7 @@ public function testShouldBuildRouteFromSubscriberIfOnlyTopicNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(OnlyTopicNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-service-id', $processor); $router = new Definition(); @@ -166,7 +167,7 @@ public function testShouldBuildRouteFromSubscriberIfProcessorNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(ProcessorNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-service-id', $processor); $router = new Definition(); @@ -190,7 +191,7 @@ public function testShouldBuildRouteFromSubscriberIfQueueNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(QueueNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-service-id', $processor); $router = new Definition(); @@ -216,7 +217,7 @@ public function testShouldThrowExceptionWhenTopicSubscriberConfigurationIsInvali $container = $this->createContainerBuilder(); $processor = new Definition(InvalidTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-service-id', $processor); $router = new Definition(); diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php index d45f403a9..8e4aa54bc 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php @@ -1,8 +1,9 @@ createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', 'processorName' => 'processor-name', ]); @@ -28,9 +29,9 @@ public function testShouldBuildRouteRegistry() $processorRegistry = new Definition(); $processorRegistry->setArguments([]); - $container->setDefinition('enqueue.client.message_processor_registry', $processorRegistry); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); - $pass = new BuildMessageProcessorRegistryPass(); + $pass = new BuildProcessorRegistryPass(); $pass->process($container); $expectedValue = [ @@ -45,16 +46,16 @@ public function testThrowIfProcessorClassNameCouldNotBeFound() $container = $this->createContainerBuilder(); $processor = new Definition('notExistingClass'); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'processorName' => 'processor', ]); $container->setDefinition('processor', $processor); $processorRegistry = new Definition(); $processorRegistry->setArguments([]); - $container->setDefinition('enqueue.client.message_processor_registry', $processorRegistry); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); - $pass = new BuildMessageProcessorRegistryPass(); + $pass = new BuildProcessorRegistryPass(); $this->expectException(\LogicException::class); $this->expectExceptionMessage('The class "notExistingClass" could not be found.'); @@ -66,14 +67,14 @@ public function testShouldThrowExceptionIfTopicNameIsNotSet() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor', $processor); $processorRegistry = new Definition(); $processorRegistry->setArguments([]); - $container->setDefinition('enqueue.client.message_processor_registry', $processorRegistry); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); - $pass = new BuildMessageProcessorRegistryPass(); + $pass = new BuildProcessorRegistryPass(); $this->expectException(\LogicException::class); $this->expectExceptionMessage('Topic name is not set on message processor tag but it is required.'); @@ -85,16 +86,16 @@ public function testShouldSetServiceIdAdProcessorIdIfIsNotSetInTag() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', ]); $container->setDefinition('processor-id', $processor); $processorRegistry = new Definition(); $processorRegistry->setArguments([]); - $container->setDefinition('enqueue.client.message_processor_registry', $processorRegistry); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); - $pass = new BuildMessageProcessorRegistryPass(); + $pass = new BuildProcessorRegistryPass(); $pass->process($container); $expectedValue = [ @@ -109,14 +110,14 @@ public function testShouldBuildRouteFromSubscriberIfOnlyTopicNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(OnlyTopicNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-id', $processor); $processorRegistry = new Definition(); $processorRegistry->setArguments([]); - $container->setDefinition('enqueue.client.message_processor_registry', $processorRegistry); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); - $pass = new BuildMessageProcessorRegistryPass(); + $pass = new BuildProcessorRegistryPass(); $pass->process($container); $expectedValue = [ @@ -131,14 +132,14 @@ public function testShouldBuildRouteFromSubscriberIfProcessorNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(ProcessorNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-id', $processor); $processorRegistry = new Definition(); $processorRegistry->setArguments([]); - $container->setDefinition('enqueue.client.message_processor_registry', $processorRegistry); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); - $pass = new BuildMessageProcessorRegistryPass(); + $pass = new BuildProcessorRegistryPass(); $pass->process($container); $expectedValue = [ @@ -155,14 +156,14 @@ public function testShouldThrowExceptionWhenTopicSubscriberConfigurationIsInvali $container = $this->createContainerBuilder(); $processor = new Definition(InvalidTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-id', $processor); $processorRegistry = new Definition(); $processorRegistry->setArguments([]); - $container->setDefinition('enqueue.client.message_processor_registry', $processorRegistry); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); - $pass = new BuildMessageProcessorRegistryPass(); + $pass = new BuildProcessorRegistryPass(); $pass->process($container); } diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildQueueMetaRegistryPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildQueueMetaRegistryPassTest.php index ad1a85bf7..79665080e 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildQueueMetaRegistryPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildQueueMetaRegistryPassTest.php @@ -1,4 +1,5 @@ createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'processorName' => 'processor', ]); $container->setDefinition('processor', $processor); @@ -34,7 +35,7 @@ public function testThrowIfProcessorClassNameCouldNotBeFound() $container = $this->createContainerBuilder(); $processor = new Definition('notExistingClass'); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'processorName' => 'processor', ]); $container->setDefinition('processor', $processor); @@ -55,7 +56,7 @@ public function testShouldBuildQueueMetaRegistry() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'processorName' => 'theProcessorName', 'topicName' => 'aTopicName', ]); @@ -80,7 +81,7 @@ public function testShouldSetServiceIdAdProcessorIdIfIsNotSetInTag() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'aTopicName', ]); $container->setDefinition('processor-service-id', $processor); @@ -104,7 +105,7 @@ public function testShouldSetQueueIfSetInTag() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'queueName' => 'theClientQueueName', 'topicName' => 'aTopicName', ]); @@ -129,7 +130,7 @@ public function testShouldBuildQueueFromSubscriberIfOnlyTopicNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(OnlyTopicNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-service-id', $processor); $registry = new Definition(); @@ -151,7 +152,7 @@ public function testShouldBuildQueueFromSubscriberIfProcessorNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(ProcessorNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-service-id', $processor); $registry = new Definition(); @@ -173,7 +174,7 @@ public function testShouldBuildQueueFromSubscriberIfQueueNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(QueueNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-service-id', $processor); $registry = new Definition(); diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildTopicMetaSubscribersPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildTopicMetaSubscribersPassTest.php index 929f1fba4..affac5854 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildTopicMetaSubscribersPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildTopicMetaSubscribersPassTest.php @@ -1,4 +1,5 @@ createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', 'processorName' => 'processor-name', ]); @@ -45,7 +46,7 @@ public function testThrowIfProcessorClassNameCouldNotBeFound() $container = $this->createContainerBuilder(); $processor = new Definition('notExistingClass'); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'processorName' => 'processor', ]); $container->setDefinition('processor', $processor); @@ -66,7 +67,7 @@ public function testShouldBuildTopicMetaSubscribersForOneTagAndSameMetaInRegistr $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', 'processorName' => 'barProcessorName', ]); @@ -96,7 +97,7 @@ public function testShouldBuildTopicMetaSubscribersForOneTagAndSameMetaInPlusAno $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'fooTopic', 'processorName' => 'barProcessorName', ]); @@ -128,14 +129,14 @@ public function testShouldBuildTopicMetaSubscribersForTwoTagAndEmptyRegistry() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'fooTopic', 'processorName' => 'fooProcessorName', ]); $container->setDefinition('processor-id', $processor); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'fooTopic', 'processorName' => 'barProcessorName', ]); @@ -162,14 +163,14 @@ public function testShouldBuildTopicMetaSubscribersForTwoTagSameMetaRegistry() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'fooTopic', 'processorName' => 'fooProcessorName', ]); $container->setDefinition('processor-id', $processor); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'fooTopic', 'processorName' => 'barProcessorName', ]); @@ -199,7 +200,7 @@ public function testThrowIfTopicNameNotSetOnTagAsAttribute() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', []); + $processor->addTag('enqueue.client.processor', []); $container->setDefinition('processor', $processor); $topicMetaRegistry = new Definition(); @@ -218,7 +219,7 @@ public function testShouldSetServiceIdAdProcessorIdIfIsNotSetInTag() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', ]); $container->setDefinition('processor-id', $processor); @@ -242,7 +243,7 @@ public function testShouldBuildMetaFromSubscriberIfOnlyTopicNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(OnlyTopicNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-id', $processor); $topicMetaRegistry = new Definition(); @@ -264,7 +265,7 @@ public function testShouldBuildMetaFromSubscriberIfProcessorNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(ProcessorNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-id', $processor); $topicMetaRegistry = new Definition(); @@ -288,7 +289,7 @@ public function testShouldThrowExceptionWhenTopicSubscriberConfigurationIsInvali $container = $this->createContainerBuilder(); $processor = new Definition(InvalidTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-id', $processor); $topicMetaRegistry = new Definition(); diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/Mock/InvalidTopicSubscriber.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/Mock/InvalidTopicSubscriber.php index fd6e90438..05ab5361d 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/Mock/InvalidTopicSubscriber.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/Mock/InvalidTopicSubscriber.php @@ -1,4 +1,5 @@ processConfiguration($configuration, [[ + 'transport' => [], + ]]); + + $this->assertArraySubset([ + 'extensions' => [ + 'signal_extension' => true, + ], + ], $config); + } + + public function testSignalExtensionCouldBeDisabled() + { + $configuration = new Configuration([]); + + $processor = new Processor(); + $config = $processor->processConfiguration($configuration, [[ + 'transport' => [], + 'extensions' => [ + 'signal_extension' => false, + ], + ]]); + + $this->assertArraySubset([ + 'extensions' => [ + 'signal_extension' => false, + ], + ], $config); + } } diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/FormaproMessageQueueExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php similarity index 93% rename from pkg/enqueue-bundle/Tests/Unit/DependencyInjection/FormaproMessageQueueExtensionTest.php rename to pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php index 9ea21b829..525ed914e 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/FormaproMessageQueueExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php @@ -1,15 +1,16 @@ hasDefinition('enqueue.consumption.doctrine_clear_identity_map_extension')); } + + public function testShouldLoadSignalExtensionServiceIfEnabled() + { + $container = new ContainerBuilder(); + $container->setParameter('kernel.debug', true); + + $extension = new EnqueueExtension(); + + $extension->load([[ + 'transport' => [], + 'extensions' => [ + 'signal_extension' => true, + ], + ]], $container); + + self::assertTrue($container->hasDefinition('enqueue.consumption.signal_extension')); + } + + public function testShouldNotLoadSignalExtensionServiceIfDisabled() + { + $container = new ContainerBuilder(); + $container->setParameter('kernel.debug', true); + + $extension = new EnqueueExtension(); + + $extension->load([[ + 'transport' => [], + 'extensions' => [ + 'signal_extension' => false, + ], + ]], $container); + + self::assertFalse($container->hasDefinition('enqueue.consumption.signal_extension')); + } } diff --git a/pkg/enqueue-bundle/Tests/Unit/FormaproMessageQueueBundleTest.php b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php similarity index 96% rename from pkg/enqueue-bundle/Tests/Unit/FormaproMessageQueueBundleTest.php rename to pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php index 04a72bea4..72d931360 100644 --- a/pkg/enqueue-bundle/Tests/Unit/FormaproMessageQueueBundleTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php @@ -1,20 +1,21 @@ expects($this->at(2)) ->method('addCompilerPass') - ->with($this->isInstanceOf(BuildMessageProcessorRegistryPass::class)) + ->with($this->isInstanceOf(BuildProcessorRegistryPass::class)) ; $container ->expects($this->at(3)) diff --git a/pkg/enqueue-bundle/Tests/Unit/Mocks/FooTransportFactory.php b/pkg/enqueue-bundle/Tests/Unit/Mocks/FooTransportFactory.php index 7e95d6201..89ee65995 100644 --- a/pkg/enqueue-bundle/Tests/Unit/Mocks/FooTransportFactory.php +++ b/pkg/enqueue-bundle/Tests/Unit/Mocks/FooTransportFactory.php @@ -1,4 +1,5 @@ =5.6", "symfony/framework-bundle": "^2.8|^3", - "enqueue/enqueue": "dev-master" + "enqueue/enqueue": "^0.1" }, "require-dev": { "phpunit/phpunit": "~5.5", - "enqueue/stomp": "dev-master", - "enqueue/amqp-ext": "dev-master", - "enqueue/job-queue": "dev-master", - "enqueue/test": "dev-master", + "enqueue/stomp": "^0.1", + "enqueue/amqp-ext": "^0.1", + "enqueue/job-queue": "^0.1", + "enqueue/test": "^0.1", "doctrine/doctrine-bundle": "~1.2", "symfony/monolog-bundle": "^2.8|^3", "symfony/browser-kit": "^2.8|^3", diff --git a/pkg/enqueue/Client/ArrayMessageProcessorRegistry.php b/pkg/enqueue/Client/ArrayMessageProcessorRegistry.php deleted file mode 100644 index e48693c55..000000000 --- a/pkg/enqueue/Client/ArrayMessageProcessorRegistry.php +++ /dev/null @@ -1,41 +0,0 @@ -processors = $processors; - } - - /** - * @param string $name - * @param MessageProcessorInterface $processor - */ - public function add($name, MessageProcessorInterface $processor) - { - $this->processors[$name] = $processor; - } - - /** - * {@inheritdoc} - */ - public function get($processorName) - { - if (false == isset($this->processors[$processorName])) { - throw new \LogicException(sprintf('MessageProcessor was not found. processorName: "%s"', $processorName)); - } - - return $this->processors[$processorName]; - } -} diff --git a/pkg/enqueue/Client/ArrayProcessorRegistry.php b/pkg/enqueue/Client/ArrayProcessorRegistry.php new file mode 100644 index 000000000..ab8278893 --- /dev/null +++ b/pkg/enqueue/Client/ArrayProcessorRegistry.php @@ -0,0 +1,42 @@ +processors = $processors; + } + + /** + * @param string $name + * @param Processor $processor + */ + public function add($name, Processor $processor) + { + $this->processors[$name] = $processor; + } + + /** + * {@inheritdoc} + */ + public function get($processorName) + { + if (false == isset($this->processors[$processorName])) { + throw new \LogicException(sprintf('Processor was not found. processorName: "%s"', $processorName)); + } + + return $this->processors[$processorName]; + } +} diff --git a/pkg/enqueue/Client/Config.php b/pkg/enqueue/Client/Config.php index fe2a3fab7..66fc07596 100644 --- a/pkg/enqueue/Client/Config.php +++ b/pkg/enqueue/Client/Config.php @@ -1,4 +1,5 @@ registry = $registry; } diff --git a/pkg/enqueue/Client/DriverInterface.php b/pkg/enqueue/Client/DriverInterface.php index 9055a4e7d..9e0c1ede2 100644 --- a/pkg/enqueue/Client/DriverInterface.php +++ b/pkg/enqueue/Client/DriverInterface.php @@ -1,4 +1,5 @@ queueMetaRegistry->add($this->config->getRouterQueueName()); $this->topicsMetaRegistry = new TopicMetaRegistry([]); - $this->processorsRegistry = new ArrayMessageProcessorRegistry(); + $this->processorsRegistry = new ArrayProcessorRegistry(); $this->driver = new AmqpDriver($context, $this->config, $this->queueMetaRegistry); $this->routerProcessor = new RouterProcessor($this->driver, []); @@ -80,7 +81,7 @@ public function bind($topic, callable $processor) $this->topicsMetaRegistry->addProcessor($topic, $processorName); $this->queueMetaRegistry->addProcessor($queueName, $processorName); - $this->processorsRegistry->add($processorName, new CallbackMessageProcessor($processor)); + $this->processorsRegistry->add($processorName, new CallbackProcessor($processor)); $this->routerProcessor->add($topic, $queueName, $processorName); } @@ -124,10 +125,10 @@ private function getProducer() } /** - * @return DelegateMessageProcessor + * @return DelegateProcessor */ private function getProcessor() { - return new DelegateMessageProcessor($this->processorsRegistry); + return new DelegateProcessor($this->processorsRegistry); } } diff --git a/pkg/enqueue/Client/TopicSubscriberInterface.php b/pkg/enqueue/Client/TopicSubscriberInterface.php index ccf8b6617..9d66d2cca 100644 --- a/pkg/enqueue/Client/TopicSubscriberInterface.php +++ b/pkg/enqueue/Client/TopicSubscriberInterface.php @@ -1,4 +1,5 @@ messageProcessor; + return $this->psrProcessor; } /** - * @param MessageProcessorInterface $messageProcessor + * @param Processor $psrProcessor */ - public function setMessageProcessor(MessageProcessorInterface $messageProcessor) + public function setPsrProcessor(Processor $psrProcessor) { - if ($this->messageProcessor) { + if ($this->psrProcessor) { throw new IllegalContextModificationException('The message processor could be set once'); } - $this->messageProcessor = $messageProcessor; + $this->psrProcessor = $psrProcessor; } /** diff --git a/pkg/enqueue/Consumption/EmptyExtensionTrait.php b/pkg/enqueue/Consumption/EmptyExtensionTrait.php index fb10b248a..c98ed8da3 100644 --- a/pkg/enqueue/Consumption/EmptyExtensionTrait.php +++ b/pkg/enqueue/Consumption/EmptyExtensionTrait.php @@ -1,4 +1,5 @@ getResult(); if (false == $result instanceof Result) { - throw new \LogicException('To send a reply an instance of Result class has to returned from a MessageProcessor.'); + throw new \LogicException('To send a reply an instance of Result class has to returned from a Processor.'); } if (false == $result->getReply()) { diff --git a/pkg/enqueue/Consumption/Extension/SignalExtension.php b/pkg/enqueue/Consumption/Extension/SignalExtension.php index add9e3aff..a3bab8500 100644 --- a/pkg/enqueue/Consumption/Extension/SignalExtension.php +++ b/pkg/enqueue/Consumption/Extension/SignalExtension.php @@ -1,4 +1,5 @@ extension = $extension; $this->idleMicroseconds = $idleMicroseconds; - $this->boundMessageProcessors = []; + $this->boundProcessors = []; } /** @@ -62,31 +64,31 @@ public function getPsrContext() } /** - * @param Queue|string $queue - * @param MessageProcessorInterface|callable $messageProcessor + * @param Queue|string $queue + * @param Processor|callable $processor * * @return QueueConsumer */ - public function bind($queue, $messageProcessor) + public function bind($queue, $processor) { if (is_string($queue)) { $queue = $this->psrContext->createQueue($queue); } - if (is_callable($messageProcessor)) { - $messageProcessor = new CallbackMessageProcessor($messageProcessor); + if (is_callable($processor)) { + $processor = new CallbackProcessor($processor); } InvalidArgumentException::assertInstanceOf($queue, Queue::class); - InvalidArgumentException::assertInstanceOf($messageProcessor, MessageProcessorInterface::class); + InvalidArgumentException::assertInstanceOf($processor, Processor::class); if (empty($queue->getQueueName())) { throw new LogicException('The queue name must be not empty.'); } - if (array_key_exists($queue->getQueueName(), $this->boundMessageProcessors)) { + if (array_key_exists($queue->getQueueName(), $this->boundProcessors)) { throw new LogicException(sprintf('The queue was already bound. Queue: %s', $queue->getQueueName())); } - $this->boundMessageProcessors[$queue->getQueueName()] = [$queue, $messageProcessor]; + $this->boundProcessors[$queue->getQueueName()] = [$queue, $processor]; return $this; } @@ -104,7 +106,7 @@ public function consume(ExtensionInterface $runtimeExtension = null) /** @var Consumer[] $messageConsumers */ $messageConsumers = []; /** @var \Enqueue\Psr\Queue $queue */ - foreach ($this->boundMessageProcessors as list($queue, $messageProcessor)) { + foreach ($this->boundProcessors as list($queue, $processor)) { $messageConsumers[$queue->getQueueName()] = $this->psrContext->createConsumer($queue); } @@ -122,7 +124,7 @@ public function consume(ExtensionInterface $runtimeExtension = null) while (true) { try { /** @var Queue $queue */ - foreach ($this->boundMessageProcessors as list($queue, $messageProcessor)) { + foreach ($this->boundProcessors as list($queue, $processor)) { $logger->debug(sprintf('Switch to a queue %s', $queue->getQueueName())); $messageConsumer = $messageConsumers[$queue->getQueueName()]; @@ -131,7 +133,7 @@ public function consume(ExtensionInterface $runtimeExtension = null) $context->setLogger($logger); $context->setPsrQueue($queue); $context->setPsrConsumer($messageConsumer); - $context->setMessageProcessor($messageProcessor); + $context->setPsrProcessor($processor); $this->doConsume($extension, $context); } @@ -171,8 +173,8 @@ public function consume(ExtensionInterface $runtimeExtension = null) */ protected function doConsume(ExtensionInterface $extension, Context $context) { - $messageProcessor = $context->getMessageProcessor(); - $messageConsumer = $context->getPsrConsumer(); + $processor = $context->getPsrProcessor(); + $consumer = $context->getPsrConsumer(); $logger = $context->getLogger(); $extension->onBeforeReceive($context); @@ -181,7 +183,7 @@ protected function doConsume(ExtensionInterface $extension, Context $context) throw new ConsumptionInterruptedException(); } - if ($message = $messageConsumer->receive($timeout = 1)) { + if ($message = $consumer->receive($timeout = 1)) { $logger->info('Message received'); $logger->debug('Headers: {headers}', ['headers' => new VarExport($message->getHeaders())]); $logger->debug('Properties: {properties}', ['properties' => new VarExport($message->getProperties())]); @@ -191,19 +193,19 @@ protected function doConsume(ExtensionInterface $extension, Context $context) $extension->onPreReceived($context); if (!$context->getResult()) { - $result = $messageProcessor->process($message, $this->psrContext); + $result = $processor->process($message, $this->psrContext); $context->setResult($result); } switch ($context->getResult()) { case Result::ACK: - $messageConsumer->acknowledge($message); + $consumer->acknowledge($message); break; case Result::REJECT: - $messageConsumer->reject($message, false); + $consumer->reject($message, false); break; case Result::REQUEUE: - $messageConsumer->reject($message, true); + $consumer->reject($message, true); break; default: throw new \LogicException(sprintf('Status is not supported: %s', $context->getResult())); diff --git a/pkg/enqueue/Consumption/Result.php b/pkg/enqueue/Consumption/Result.php index 3d248afce..74ab6e1d2 100644 --- a/pkg/enqueue/Consumption/Result.php +++ b/pkg/enqueue/Consumption/Result.php @@ -1,26 +1,26 @@ reply; - } - - /** - * @param Message|null $reply - */ - public function setReply(Message $reply = null) - { - $this->reply = $reply; - } - /** * @param string $status * @param string $reason @@ -63,6 +47,14 @@ public function __construct($status, $reason = '') $this->reason = (string) $reason; } + /** + * @return string + */ + public function __toString() + { + return $this->status; + } + /** * @return string */ @@ -79,6 +71,22 @@ public function getReason() return $this->reason; } + /** + * @return PsrMessage|null + */ + public function getReply() + { + return $this->reply; + } + + /** + * @param PsrMessage|null $reply + */ + public function setReply(PsrMessage $reply = null) + { + $this->reply = $reply; + } + /** * @param string $reason * @@ -110,24 +118,16 @@ public static function requeue($reason = '') } /** - * @param Message $replyMessage + * @param PsrMessage $replyMessage * @param string|null $reason * * @return Result */ - public static function reply(Message $replyMessage, $reason = '') + public static function reply(PsrMessage $replyMessage, $reason = '') { $result = static::ack($reason); $result->setReply($replyMessage); return $result; } - - /** - * @return string - */ - public function __toString() - { - return $this->status; - } } diff --git a/pkg/enqueue/README.md b/pkg/enqueue/README.md index 0dfa75c77..32c78d685 100644 --- a/pkg/enqueue/README.md +++ b/pkg/enqueue/README.md @@ -12,7 +12,7 @@ Read more about it in documentation. ## Resources * [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) -* [Questions](https://gitter.im/php-enqueue/enqueue-dev) +* [Questions](https://gitter.im/php-enqueue/Lobby) * [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues) ## License diff --git a/pkg/enqueue/Router/Recipient.php b/pkg/enqueue/Router/Recipient.php index 94083a2b0..892c0e511 100644 --- a/pkg/enqueue/Router/Recipient.php +++ b/pkg/enqueue/Router/Recipient.php @@ -1,4 +1,5 @@ send($recipient->getDestination(), $recipient->getMessage()); } - return Result::ACK; + return self::ACK; } } diff --git a/pkg/enqueue/Rpc/Promise.php b/pkg/enqueue/Rpc/Promise.php index ee6188f09..b0265f115 100644 --- a/pkg/enqueue/Rpc/Promise.php +++ b/pkg/enqueue/Rpc/Promise.php @@ -1,4 +1,5 @@ consumer->acknowledge($message); return $message; - } else { - $this->consumer->reject($message, true); } + $this->consumer->reject($message, true); } } diff --git a/pkg/enqueue/Rpc/RpcClient.php b/pkg/enqueue/Rpc/RpcClient.php index e8b4b6268..fad0bcd69 100644 --- a/pkg/enqueue/Rpc/RpcClient.php +++ b/pkg/enqueue/Rpc/RpcClient.php @@ -1,4 +1,5 @@ processors[$processorName])) { - throw new \LogicException(sprintf('MessageProcessor was not found. processorName: "%s"', $processorName)); + throw new \LogicException(sprintf('Processor was not found. processorName: "%s"', $processorName)); } if (null === $this->container) { @@ -47,10 +48,10 @@ public function get($processorName) $processor = $this->container->get($this->processors[$processorName]); - if (false == $processor instanceof MessageProcessorInterface) { + if (false == $processor instanceof Processor) { throw new \LogicException(sprintf( 'Invalid instance of message processor. expected: "%s", got: "%s"', - MessageProcessorInterface::class, + Processor::class, is_object($processor) ? get_class($processor) : gettype($processor) )); } diff --git a/pkg/enqueue/Symfony/Client/Meta/QueuesCommand.php b/pkg/enqueue/Symfony/Client/Meta/QueuesCommand.php index 7bbfe2f40..6f37679dc 100644 --- a/pkg/enqueue/Symfony/Client/Meta/QueuesCommand.php +++ b/pkg/enqueue/Symfony/Client/Meta/QueuesCommand.php @@ -1,4 +1,5 @@ getArgument('queue'); - /** @var MessageProcessorInterface $messageProcessor */ - $messageProcessor = $this->container->get($input->getArgument('processor-service')); - if (!$messageProcessor instanceof MessageProcessorInterface) { + /** @var Processor $processor */ + $processor = $this->container->get($input->getArgument('processor-service')); + if (!$processor instanceof Processor) { throw new \LogicException(sprintf( 'Invalid message processor service given. It must be an instance of %s but %s', - MessageProcessorInterface::class, - get_class($messageProcessor) + Processor::class, + get_class($processor) )); } @@ -78,7 +79,7 @@ protected function execute(InputInterface $input, OutputInterface $output) $queue = $this->consumer->getPsrContext()->createQueue($queueName); // @todo set additional queue options - $this->consumer->bind($queue, $messageProcessor); + $this->consumer->bind($queue, $processor); $this->consumer->consume($runtimeExtensions); } finally { $this->consumer->getPsrContext()->close(); diff --git a/pkg/enqueue/Symfony/Consumption/LimitsExtensionsCommandTrait.php b/pkg/enqueue/Symfony/Consumption/LimitsExtensionsCommandTrait.php index 1f12a957d..34bc76d37 100644 --- a/pkg/enqueue/Symfony/Consumption/LimitsExtensionsCommandTrait.php +++ b/pkg/enqueue/Symfony/Consumption/LimitsExtensionsCommandTrait.php @@ -1,4 +1,5 @@ assertClassImplements(MessageProcessorRegistryInterface::class, ArrayMessageProcessorRegistry::class); - } - - public function testCouldBeConstructedWithoutAnyArgument() - { - new ArrayMessageProcessorRegistry(); - } - - public function testShouldThrowExceptionIfProcessorIsNotSet() - { - $registry = new ArrayMessageProcessorRegistry(); - - $this->expectException(\LogicException::class); - $this->expectExceptionMessage('MessageProcessor was not found. processorName: "processor-name"'); - $registry->get('processor-name'); - } - - public function testShouldAllowGetProcessorAddedViaConstructor() - { - $processor = $this->createMessageProcessorMock(); - - $registry = new ArrayMessageProcessorRegistry(['aFooName' => $processor]); - - $this->assertSame($processor, $registry->get('aFooName')); - } - - public function testShouldAllowGetProcessorAddedViaAddMethod() - { - $processor = $this->createMessageProcessorMock(); - - $registry = new ArrayMessageProcessorRegistry(); - $registry->add('aFooName', $processor); - - $this->assertSame($processor, $registry->get('aFooName')); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorInterface - */ - protected function createMessageProcessorMock() - { - return $this->createMock(MessageProcessorInterface::class); - } -} diff --git a/pkg/enqueue/Tests/Client/ArrayProcessorRegistryTest.php b/pkg/enqueue/Tests/Client/ArrayProcessorRegistryTest.php new file mode 100644 index 000000000..dce55f4a5 --- /dev/null +++ b/pkg/enqueue/Tests/Client/ArrayProcessorRegistryTest.php @@ -0,0 +1,59 @@ +assertClassImplements(ProcessorRegistryInterface::class, ArrayProcessorRegistry::class); + } + + public function testCouldBeConstructedWithoutAnyArgument() + { + new ArrayProcessorRegistry(); + } + + public function testShouldThrowExceptionIfProcessorIsNotSet() + { + $registry = new ArrayProcessorRegistry(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Processor was not found. processorName: "processor-name"'); + $registry->get('processor-name'); + } + + public function testShouldAllowGetProcessorAddedViaConstructor() + { + $processor = $this->createProcessorMock(); + + $registry = new ArrayProcessorRegistry(['aFooName' => $processor]); + + $this->assertSame($processor, $registry->get('aFooName')); + } + + public function testShouldAllowGetProcessorAddedViaAddMethod() + { + $processor = $this->createProcessorMock(); + + $registry = new ArrayProcessorRegistry(); + $registry->add('aFooName', $processor); + + $this->assertSame($processor, $registry->get('aFooName')); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|Processor + */ + protected function createProcessorMock() + { + return $this->createMock(Processor::class); + } +} diff --git a/pkg/enqueue/Tests/Client/ConfigTest.php b/pkg/enqueue/Tests/Client/ConfigTest.php index f9957c77d..7d2c0a499 100644 --- a/pkg/enqueue/Tests/Client/ConfigTest.php +++ b/pkg/enqueue/Tests/Client/ConfigTest.php @@ -1,11 +1,12 @@ createMessageProcessorRegistryMock()); + new DelegateProcessor($this->createProcessorRegistryMock()); } public function testShouldThrowExceptionIfProcessorNameIsNotSet() @@ -22,7 +23,7 @@ public function testShouldThrowExceptionIfProcessorNameIsNotSet() 'Got message without required parameter: "enqueue.processor_name"' ); - $processor = new DelegateMessageProcessor($this->createMessageProcessorRegistryMock()); + $processor = new DelegateProcessor($this->createProcessorRegistryMock()); $processor->process(new NullMessage(), $this->createPsrContextMock()); } @@ -34,34 +35,34 @@ public function testShouldProcessMessage() Config::PARAMETER_PROCESSOR_NAME => 'processor-name', ]); - $messageProcessor = $this->createMessageProcessorMock(); - $messageProcessor + $processor = $this->createProcessorMock(); + $processor ->expects($this->once()) ->method('process') ->with($this->identicalTo($message), $this->identicalTo($session)) ->will($this->returnValue('return-value')) ; - $processorRegistry = $this->createMessageProcessorRegistryMock(); + $processorRegistry = $this->createProcessorRegistryMock(); $processorRegistry ->expects($this->once()) ->method('get') ->with('processor-name') - ->will($this->returnValue($messageProcessor)) + ->will($this->returnValue($processor)) ; - $processor = new DelegateMessageProcessor($processorRegistry); + $processor = new DelegateProcessor($processorRegistry); $return = $processor->process($message, $session); $this->assertEquals('return-value', $return); } /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorRegistryInterface + * @return \PHPUnit_Framework_MockObject_MockObject|ProcessorRegistryInterface */ - protected function createMessageProcessorRegistryMock() + protected function createProcessorRegistryMock() { - return $this->createMock(MessageProcessorRegistryInterface::class); + return $this->createMock(ProcessorRegistryInterface::class); } /** @@ -73,10 +74,10 @@ protected function createPsrContextMock() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorInterface + * @return \PHPUnit_Framework_MockObject_MockObject|Processor */ - protected function createMessageProcessorMock() + protected function createProcessorMock() { - return $this->createMock(MessageProcessorInterface::class); + return $this->createMock(Processor::class); } } diff --git a/pkg/enqueue/Tests/Client/MessagePriorityTest.php b/pkg/enqueue/Tests/Client/MessagePriorityTest.php index 9d996a409..5778e0299 100644 --- a/pkg/enqueue/Tests/Client/MessagePriorityTest.php +++ b/pkg/enqueue/Tests/Client/MessagePriorityTest.php @@ -1,4 +1,5 @@ assertClassImplements(MessageProcessorInterface::class, CallbackMessageProcessor::class); + $this->assertClassImplements(Processor::class, CallbackProcessor::class); } public function testCouldBeConstructedWithCallableAsArgument() { - new CallbackMessageProcessor(function () { + new CallbackProcessor(function () { }); } @@ -27,7 +28,7 @@ public function testShouldCallCallbackAndProxyItsReturnedValue() $expectedMessage = new NullMessage(); $expectedContext = new NullContext(); - $processor = new CallbackMessageProcessor(function ($message, $context) use ($expectedMessage, $expectedContext) { + $processor = new CallbackProcessor(function ($message, $context) use ($expectedMessage, $expectedContext) { $this->assertSame($expectedMessage, $message); $this->assertSame($expectedContext, $context); diff --git a/pkg/enqueue/Tests/Consumption/ContextTest.php b/pkg/enqueue/Tests/Consumption/ContextTest.php index 5191e5976..3d891e232 100644 --- a/pkg/enqueue/Tests/Consumption/ContextTest.php +++ b/pkg/enqueue/Tests/Consumption/ContextTest.php @@ -1,12 +1,13 @@ createMessageProcessor(); + $processorMock = $this->createProcessorMock(); $context = new Context($this->createPsrContext()); - $context->setMessageProcessor($messageProcessor); + $context->setPsrProcessor($processorMock); - $this->assertSame($messageProcessor, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); } - public function testThrowOnTryToChangeMessageProcessorIfAlreadySet() + public function testThrowOnTryToChangeProcessorIfAlreadySet() { - $messageProcessor = $this->createMessageProcessor(); - $anotherMessageProcessor = $this->createMessageProcessor(); + $processor = $this->createProcessorMock(); + $anotherProcessor = $this->createProcessorMock(); $context = new Context($this->createPsrContext()); - $context->setMessageProcessor($messageProcessor); + $context->setPsrProcessor($processor); $this->expectException(IllegalContextModificationException::class); - $context->setMessageProcessor($anotherMessageProcessor); + $context->setPsrProcessor($anotherProcessor); } public function testShouldAllowGetLoggerPreviouslySet() @@ -247,10 +248,10 @@ protected function createPsrConsumer() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorInterface + * @return \PHPUnit_Framework_MockObject_MockObject|Processor */ - protected function createMessageProcessor() + protected function createProcessorMock() { - return $this->createMock(MessageProcessorInterface::class); + return $this->createMock(Processor::class); } } diff --git a/pkg/enqueue/Tests/Consumption/Exception/ConsumptionInterruptedExceptionTest.php b/pkg/enqueue/Tests/Consumption/Exception/ConsumptionInterruptedExceptionTest.php index 012192817..f4fa0678d 100644 --- a/pkg/enqueue/Tests/Consumption/Exception/ConsumptionInterruptedExceptionTest.php +++ b/pkg/enqueue/Tests/Consumption/Exception/ConsumptionInterruptedExceptionTest.php @@ -1,4 +1,5 @@ createMock(PsrContext::class)); $context->setLogger($this->createMock(LoggerInterface::class)); $context->setPsrConsumer($this->createMock(Consumer::class)); - $context->setMessageProcessor($this->createMock(MessageProcessorInterface::class)); + $context->setPsrProcessor($this->createMock(Processor::class)); return $context; } diff --git a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumerMemoryExtensionTest.php b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumerMemoryExtensionTest.php index 5a90eeddc..c7f42a207 100644 --- a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumerMemoryExtensionTest.php +++ b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumerMemoryExtensionTest.php @@ -1,11 +1,12 @@ createMock(PsrContext::class)); $context->setLogger($this->createMock(LoggerInterface::class)); $context->setPsrConsumer($this->createMock(Consumer::class)); - $context->setMessageProcessor($this->createMock(MessageProcessorInterface::class)); + $context->setPsrProcessor($this->createMock(Processor::class)); return $context; } diff --git a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumptionTimeExtensionTest.php b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumptionTimeExtensionTest.php index 15c2b8980..2abb75ecd 100644 --- a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumptionTimeExtensionTest.php +++ b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumptionTimeExtensionTest.php @@ -1,11 +1,12 @@ createMock(PsrContext::class)); $context->setLogger($this->createMock(LoggerInterface::class)); $context->setPsrConsumer($this->createMock(Consumer::class)); - $context->setMessageProcessor($this->createMock(MessageProcessorInterface::class)); + $context->setPsrProcessor($this->createMock(Processor::class)); return $context; } diff --git a/pkg/enqueue/Tests/Consumption/Extension/LoggerExtensionTest.php b/pkg/enqueue/Tests/Consumption/Extension/LoggerExtensionTest.php index 4bcdc09b4..62998e82d 100644 --- a/pkg/enqueue/Tests/Consumption/Extension/LoggerExtensionTest.php +++ b/pkg/enqueue/Tests/Consumption/Extension/LoggerExtensionTest.php @@ -1,12 +1,13 @@ setResult('notInstanceOfResult'); $this->expectException(\LogicException::class); - $this->expectExceptionMessage('To send a reply an instance of Result class has to returned from a MessageProcessor.'); + $this->expectExceptionMessage('To send a reply an instance of Result class has to returned from a Processor.'); $extension->onPostReceived($context); } diff --git a/pkg/enqueue/Tests/Consumption/ExtensionsTest.php b/pkg/enqueue/Tests/Consumption/ExtensionsTest.php index aaa7b6502..2f35bc76f 100644 --- a/pkg/enqueue/Tests/Consumption/ExtensionsTest.php +++ b/pkg/enqueue/Tests/Consumption/ExtensionsTest.php @@ -1,4 +1,5 @@ createPsrContextStub(), $this->createExtension()); } - public function testShouldSetEmptyArrayToBoundMessageProcessorsPropertyInConstructor() + public function testShouldSetEmptyArrayToBoundProcessorsPropertyInConstructor() { $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); - $this->assertAttributeSame([], 'boundMessageProcessors', $consumer); + $this->assertAttributeSame([], 'boundProcessors', $consumer); } public function testShouldAllowGetConnectionSetInConstructor() @@ -52,57 +53,57 @@ public function testShouldAllowGetConnectionSetInConstructor() public function testThrowIfQueueNameEmptyOnBind() { - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); $this->expectException(\LogicException::class); $this->expectExceptionMessage('The queue name must be not empty.'); - $consumer->bind(new NullQueue(''), $messageProcessorMock); + $consumer->bind(new NullQueue(''), $processorMock); } - public function testThrowIfQueueAlreadyBoundToMessageProcessorOnBind() + public function testThrowIfQueueAlreadyBoundToProcessorOnBind() { - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); - $consumer->bind(new NullQueue('theQueueName'), $messageProcessorMock); + $consumer->bind(new NullQueue('theQueueName'), $processorMock); $this->expectException(\LogicException::class); $this->expectExceptionMessage('The queue was already bound.'); - $consumer->bind(new NullQueue('theQueueName'), $messageProcessorMock); + $consumer->bind(new NullQueue('theQueueName'), $processorMock); } - public function testShouldAllowBindMessageProcessorToQueue() + public function testShouldAllowBindProcessorToQueue() { $queue = new NullQueue('theQueueName'); - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); - $consumer->bind($queue, $messageProcessorMock); + $consumer->bind($queue, $processorMock); - $this->assertAttributeSame(['theQueueName' => [$queue, $messageProcessorMock]], 'boundMessageProcessors', $consumer); + $this->assertAttributeSame(['theQueueName' => [$queue, $processorMock]], 'boundProcessors', $consumer); } public function testThrowIfQueueNeitherInstanceOfQueueNorString() { - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); $this->expectException(InvalidArgumentException::class); $this->expectExceptionMessage('The argument must be an instance of Enqueue\Psr\Queue but got stdClass.'); - $consumer->bind(new \stdClass(), $messageProcessorMock); + $consumer->bind(new \stdClass(), $processorMock); } - public function testThrowIfMessageProcessorNeitherInstanceOfMessageProcessorNorCallable() + public function testThrowIfProcessorNeitherInstanceOfProcessorNorCallable() { $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); $this->expectException(InvalidArgumentException::class); - $this->expectExceptionMessage('The argument must be an instance of Enqueue\Consumption\MessageProcessorInterface but got stdClass.'); + $this->expectExceptionMessage('The argument must be an instance of Enqueue\Psr\Processor but got stdClass.'); $consumer->bind(new NullQueue(''), new \stdClass()); } @@ -126,7 +127,7 @@ public function testShouldAllowBindCallbackToQueueName() $consumer->bind($queueName, $callback); - $boundProcessors = $this->readAttribute($consumer, 'boundMessageProcessors'); + $boundProcessors = $this->readAttribute($consumer, 'boundProcessors'); $this->assertInternalType('array', $boundProcessors); $this->assertCount(1, $boundProcessors); @@ -135,16 +136,16 @@ public function testShouldAllowBindCallbackToQueueName() $this->assertInternalType('array', $boundProcessors[$queueName]); $this->assertCount(2, $boundProcessors[$queueName]); $this->assertSame($queue, $boundProcessors[$queueName][0]); - $this->assertInstanceOf(CallbackMessageProcessor::class, $boundProcessors[$queueName][1]); + $this->assertInstanceOf(CallbackProcessor::class, $boundProcessors[$queueName][1]); } public function testShouldReturnSelfOnBind() { - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); - $this->assertSame($consumer, $consumer->bind(new NullQueue('aQueueName'), $messageProcessorMock)); + $this->assertSame($consumer, $consumer->bind(new NullQueue('aQueueName'), $processorMock)); } public function testShouldSubscribeToGivenQueueAndQuitAfterFifthIdleCycle() @@ -166,14 +167,14 @@ public function testShouldSubscribeToGivenQueueAndQuitAfterFifthIdleCycle() ->willReturn($messageConsumerMock) ; - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->never()) ->method('process') ; $queueConsumer = new QueueConsumer($contextMock, new BreakCycleExtension(5), 0); - $queueConsumer->bind($expectedQueue, $messageProcessorMock); + $queueConsumer->bind($expectedQueue, $processorMock); $queueConsumer->consume(); } @@ -184,20 +185,20 @@ public function testShouldProcessFiveMessagesAndQuit() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->exactly(5)) ->method('process') ->willReturn(Result::ACK) ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(5), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } - public function testShouldAckMessageIfMessageProcessorReturnSuchStatus() + public function testShouldAckMessageIfProcessorReturnSuchStatus() { $messageMock = $this->createMessageMock(); $messageConsumerStub = $this->createMessageConsumerStub($messageMock); @@ -209,8 +210,8 @@ public function testShouldAckMessageIfMessageProcessorReturnSuchStatus() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->with($this->identicalTo($messageMock)) @@ -218,20 +219,20 @@ public function testShouldAckMessageIfMessageProcessorReturnSuchStatus() ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(1), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } - public function testThrowIfMessageProcessorReturnNull() + public function testThrowIfProcessorReturnNull() { $messageMock = $this->createMessageMock(); $messageConsumerStub = $this->createMessageConsumerStub($messageMock); $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->with($this->identicalTo($messageMock)) @@ -239,14 +240,14 @@ public function testThrowIfMessageProcessorReturnNull() ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(1), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $this->expectException(\LogicException::class); $this->expectExceptionMessage('Status is not supported'); $queueConsumer->consume(); } - public function testShouldRejectMessageIfMessageProcessorReturnSuchStatus() + public function testShouldRejectMessageIfProcessorReturnSuchStatus() { $messageMock = $this->createMessageMock(); $messageConsumerStub = $this->createMessageConsumerStub($messageMock); @@ -258,8 +259,8 @@ public function testShouldRejectMessageIfMessageProcessorReturnSuchStatus() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->with($this->identicalTo($messageMock)) @@ -267,12 +268,12 @@ public function testShouldRejectMessageIfMessageProcessorReturnSuchStatus() ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(1), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } - public function testShouldRequeueMessageIfMessageProcessorReturnSuchStatus() + public function testShouldRequeueMessageIfProcessorReturnSuchStatus() { $messageMock = $this->createMessageMock(); $messageConsumerStub = $this->createMessageConsumerStub($messageMock); @@ -284,8 +285,8 @@ public function testShouldRequeueMessageIfMessageProcessorReturnSuchStatus() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->with($this->identicalTo($messageMock)) @@ -293,20 +294,20 @@ public function testShouldRequeueMessageIfMessageProcessorReturnSuchStatus() ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(1), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } - public function testThrowIfMessageProcessorReturnInvalidStatus() + public function testThrowIfProcessorReturnInvalidStatus() { $messageMock = $this->createMessageMock(); $messageConsumerStub = $this->createMessageConsumerStub($messageMock); $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->with($this->identicalTo($messageMock)) @@ -314,14 +315,14 @@ public function testThrowIfMessageProcessorReturnInvalidStatus() ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(1), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $this->expectException(\LogicException::class); $this->expectExceptionMessage('Status is not supported: invalidStatus'); $queueConsumer->consume(); } - public function testShouldNotPassMessageToMessageProcessorIfItWasProcessedByExtension() + public function testShouldNotPassMessageToProcessorIfItWasProcessedByExtension() { $extension = $this->createExtension(); $extension @@ -338,15 +339,15 @@ public function testShouldNotPassMessageToMessageProcessorIfItWasProcessedByExte $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->never()) ->method('process') ; $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -357,7 +358,7 @@ public function testShouldCallOnStartExtensionMethod() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $extension = $this->createExtension(); $extension @@ -367,11 +368,11 @@ public function testShouldCallOnStartExtensionMethod() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock + $processorMock ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertNull($context->getPsrConsumer()); - $this->assertNull($context->getMessageProcessor()); + $this->assertNull($context->getPsrProcessor()); $this->assertNull($context->getLogger()); $this->assertNull($context->getPsrMessage()); $this->assertNull($context->getException()); @@ -383,7 +384,7 @@ public function testShouldCallOnStartExtensionMethod() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -394,7 +395,7 @@ public function testShouldCallOnIdleExtensionMethod() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $extension = $this->createExtension(); $extension @@ -404,11 +405,11 @@ public function testShouldCallOnIdleExtensionMethod() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock + $processorMock ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); $this->assertNull($context->getPsrMessage()); $this->assertNull($context->getException()); @@ -419,7 +420,7 @@ public function testShouldCallOnIdleExtensionMethod() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -431,7 +432,7 @@ public function testShouldCallOnBeforeReceiveExtensionMethod() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorStub(); + $processorMock = $this->createProcessorStub(); $queue = new NullQueue('aQueueName'); @@ -443,13 +444,13 @@ public function testShouldCallOnBeforeReceiveExtensionMethod() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock, + $processorMock, $expectedMessage, $queue ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); $this->assertNull($context->getPsrMessage()); $this->assertNull($context->getException()); @@ -461,7 +462,7 @@ public function testShouldCallOnBeforeReceiveExtensionMethod() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind($queue, $messageProcessorMock); + $queueConsumer->bind($queue, $processorMock); $queueConsumer->consume(); } @@ -473,7 +474,7 @@ public function testShouldCallOnPreReceivedAndPostReceivedExtensionMethods() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorStub(); + $processorMock = $this->createProcessorStub(); $extension = $this->createExtension(); $extension @@ -483,12 +484,12 @@ public function testShouldCallOnPreReceivedAndPostReceivedExtensionMethods() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock, + $processorMock, $expectedMessage ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertSame($expectedMessage, $context->getPsrMessage()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); $this->assertNull($context->getException()); @@ -503,12 +504,12 @@ public function testShouldCallOnPreReceivedAndPostReceivedExtensionMethods() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock, + $processorMock, $expectedMessage ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertSame($expectedMessage, $context->getPsrMessage()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); $this->assertNull($context->getException()); @@ -519,7 +520,7 @@ public function testShouldCallOnPreReceivedAndPostReceivedExtensionMethods() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -530,7 +531,7 @@ public function testShouldAllowInterruptConsumingOnIdle() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $extension = $this->createExtension(); $extension @@ -548,11 +549,11 @@ public function testShouldAllowInterruptConsumingOnIdle() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock + $processorMock ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); $this->assertNull($context->getPsrMessage()); $this->assertNull($context->getException()); @@ -563,7 +564,7 @@ public function testShouldAllowInterruptConsumingOnIdle() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -578,7 +579,7 @@ public function testShouldCloseSessionWhenConsumptionInterrupted() ->method('close') ; - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $extension = $this->createExtension(); $extension @@ -592,7 +593,7 @@ public function testShouldCloseSessionWhenConsumptionInterrupted() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -609,15 +610,15 @@ public function testShouldCloseSessionWhenConsumptionInterruptedByException() ->method('close') ; - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->willThrowException($expectedException) ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(1), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); try { $queueConsumer->consume(); @@ -640,8 +641,8 @@ public function testShouldSetMainExceptionAsPreviousToExceptionThrownOnInterrupt $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->willThrowException($mainException) @@ -656,7 +657,7 @@ public function testShouldSetMainExceptionAsPreviousToExceptionThrownOnInterrupt $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); try { $queueConsumer->consume(); @@ -677,8 +678,8 @@ public function testShouldAllowInterruptConsumingOnPreReceiveButProcessCurrentMe $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->willReturn(Result::ACK) @@ -700,12 +701,12 @@ public function testShouldAllowInterruptConsumingOnPreReceiveButProcessCurrentMe ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock, + $processorMock, $expectedMessage ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertSame($expectedMessage, $context->getPsrMessage()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); $this->assertNull($context->getException()); @@ -716,7 +717,7 @@ public function testShouldAllowInterruptConsumingOnPreReceiveButProcessCurrentMe $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -728,8 +729,8 @@ public function testShouldAllowInterruptConsumingOnPostReceive() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->willReturn(Result::ACK) @@ -751,12 +752,12 @@ public function testShouldAllowInterruptConsumingOnPostReceive() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock, + $processorMock, $expectedMessage ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertSame($expectedMessage, $context->getPsrMessage()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); $this->assertNull($context->getException()); @@ -767,7 +768,7 @@ public function testShouldAllowInterruptConsumingOnPostReceive() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -780,8 +781,8 @@ public function testShouldCallOnInterruptedIfExceptionThrow() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->willThrowException($expectedException) @@ -795,13 +796,13 @@ public function testShouldCallOnInterruptedIfExceptionThrow() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock, + $processorMock, $expectedMessage, $expectedException ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertSame($expectedMessage, $context->getPsrMessage()); $this->assertSame($expectedException, $context->getException()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); @@ -812,7 +813,7 @@ public function testShouldCallOnInterruptedIfExceptionThrow() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $this->expectException(\Exception::class); $this->expectExceptionMessage('Process failed'); @@ -826,8 +827,8 @@ public function testShouldCallExtensionPassedOnRuntime() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->willReturn(Result::ACK) @@ -856,7 +857,7 @@ public function testShouldCallExtensionPassedOnRuntime() ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(1), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(new ChainExtension([$runtimeExtension])); } @@ -868,8 +869,8 @@ public function testShouldChangeLoggerOnStart() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->willReturn(Result::ACK) @@ -906,7 +907,7 @@ public function testShouldChangeLoggerOnStart() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -918,8 +919,8 @@ public function testShouldCallEachQueueOneByOne() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorStub(); - $anotherMessageProcessorMock = $this->createMessageProcessorStub(); + $processorMock = $this->createProcessorStub(); + $anotherProcessorMock = $this->createProcessorStub(); $queue1 = new NullQueue('aQueueName'); $queue2 = new NullQueue('aAnotherQueueName'); @@ -929,8 +930,8 @@ public function testShouldCallEachQueueOneByOne() ->expects($this->at(1)) ->method('onBeforeReceive') ->with($this->isInstanceOf(Context::class)) - ->willReturnCallback(function (Context $context) use ($messageProcessorMock, $queue1) { - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + ->willReturnCallback(function (Context $context) use ($processorMock, $queue1) { + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertSame($queue1, $context->getPsrQueue()); }) ; @@ -938,16 +939,16 @@ public function testShouldCallEachQueueOneByOne() ->expects($this->at(4)) ->method('onBeforeReceive') ->with($this->isInstanceOf(Context::class)) - ->willReturnCallback(function (Context $context) use ($anotherMessageProcessorMock, $queue2) { - $this->assertSame($anotherMessageProcessorMock, $context->getMessageProcessor()); + ->willReturnCallback(function (Context $context) use ($anotherProcessorMock, $queue2) { + $this->assertSame($anotherProcessorMock, $context->getPsrProcessor()); $this->assertSame($queue2, $context->getPsrQueue()); }) ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(2), 0); $queueConsumer - ->bind($queue1, $messageProcessorMock) - ->bind($queue2, $anotherMessageProcessorMock) + ->bind($queue1, $processorMock) + ->bind($queue2, $anotherProcessorMock) ; $queueConsumer->consume(new ChainExtension([$extension])); @@ -955,6 +956,7 @@ public function testShouldCallEachQueueOneByOne() /** * @return \PHPUnit_Framework_MockObject_MockObject|Consumer + * @param null|mixed $message */ protected function createMessageConsumerStub($message = null) { @@ -970,6 +972,7 @@ protected function createMessageConsumerStub($message = null) /** * @return \PHPUnit_Framework_MockObject_MockObject|PsrContext + * @param null|mixed $messageConsumer */ protected function createPsrContextStub($messageConsumer = null) { @@ -993,26 +996,26 @@ protected function createPsrContextStub($messageConsumer = null) } /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorInterface + * @return \PHPUnit_Framework_MockObject_MockObject|Processor */ - protected function createMessageProcessorMock() + protected function createProcessorMock() { - return $this->createMock(MessageProcessorInterface::class); + return $this->createMock(Processor::class); } /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorInterface + * @return \PHPUnit_Framework_MockObject_MockObject|Processor */ - protected function createMessageProcessorStub() + protected function createProcessorStub() { - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->any()) ->method('process') ->willReturn(Result::ACK) ; - return $messageProcessorMock; + return $processorMock; } /** diff --git a/pkg/enqueue/Tests/Consumption/ResultTest.php b/pkg/enqueue/Tests/Consumption/ResultTest.php index 3dce4be1b..20b675593 100644 --- a/pkg/enqueue/Tests/Consumption/ResultTest.php +++ b/pkg/enqueue/Tests/Consumption/ResultTest.php @@ -1,4 +1,5 @@ assertClassImplements(MessageProcessorInterface::class, RouteRecipientListProcessor::class); + $this->assertClassImplements(Processor::class, RouteRecipientListProcessor::class); } public function testCouldBeConstructedWithRouterAsFirstArgument() diff --git a/pkg/enqueue/Tests/Rpc/PromiseTest.php b/pkg/enqueue/Tests/Rpc/PromiseTest.php index e8267e401..1e7057b1a 100644 --- a/pkg/enqueue/Tests/Rpc/PromiseTest.php +++ b/pkg/enqueue/Tests/Rpc/PromiseTest.php @@ -1,4 +1,5 @@ createQueueConsumerMock(), - $this->createDelegateMessageProcessorMock(), + $this->createDelegateProcessorMock(), $this->createQueueMetaRegistry([]), $this->createDriverMock() ); @@ -28,7 +29,7 @@ public function testShouldHaveCommandName() { $command = new ConsumeMessagesCommand( $this->createQueueConsumerMock(), - $this->createDelegateMessageProcessorMock(), + $this->createDelegateProcessorMock(), $this->createQueueMetaRegistry([]), $this->createDriverMock() ); @@ -40,7 +41,7 @@ public function testShouldHaveCommandAliases() { $command = new ConsumeMessagesCommand( $this->createQueueConsumerMock(), - $this->createDelegateMessageProcessorMock(), + $this->createDelegateProcessorMock(), $this->createQueueMetaRegistry([]), $this->createDriverMock() ); @@ -52,7 +53,7 @@ public function testShouldHaveExpectedOptions() { $command = new ConsumeMessagesCommand( $this->createQueueConsumerMock(), - $this->createDelegateMessageProcessorMock(), + $this->createDelegateProcessorMock(), $this->createQueueMetaRegistry([]), $this->createDriverMock() ); @@ -70,7 +71,7 @@ public function testShouldHaveExpectedAttributes() { $command = new ConsumeMessagesCommand( $this->createQueueConsumerMock(), - $this->createDelegateMessageProcessorMock(), + $this->createDelegateProcessorMock(), $this->createQueueMetaRegistry([]), $this->createDriverMock() ); @@ -85,7 +86,7 @@ public function testShouldExecuteConsumptionAndUseDefaultQueueName() { $queue = new NullQueue(''); - $processor = $this->createDelegateMessageProcessorMock(); + $processor = $this->createDelegateProcessorMock(); $context = $this->createPsrContextMock(); $context @@ -132,7 +133,7 @@ public function testShouldExecuteConsumptionAndUseCustomClientDestinationName() { $queue = new NullQueue(''); - $processor = $this->createDelegateMessageProcessorMock(); + $processor = $this->createDelegateProcessorMock(); $context = $this->createPsrContextMock(); $context @@ -205,11 +206,11 @@ private function createPsrContextMock() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|DelegateMessageProcessor + * @return \PHPUnit_Framework_MockObject_MockObject|DelegateProcessor */ - private function createDelegateMessageProcessorMock() + private function createDelegateProcessorMock() { - return $this->createMock(DelegateMessageProcessor::class); + return $this->createMock(DelegateProcessor::class); } /** diff --git a/pkg/enqueue/Tests/Symfony/Client/ContainerAwareMessageProcessorRegistryTest.php b/pkg/enqueue/Tests/Symfony/Client/ContainerAwareProcessorRegistryTest.php similarity index 52% rename from pkg/enqueue/Tests/Symfony/Client/ContainerAwareMessageProcessorRegistryTest.php rename to pkg/enqueue/Tests/Symfony/Client/ContainerAwareProcessorRegistryTest.php index 80a3c503d..aa2f744b4 100644 --- a/pkg/enqueue/Tests/Symfony/Client/ContainerAwareMessageProcessorRegistryTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/ContainerAwareProcessorRegistryTest.php @@ -1,34 +1,35 @@ assertClassImplements(MessageProcessorRegistryInterface::class, ContainerAwareMessageProcessorRegistry::class); + $this->assertClassImplements(ProcessorRegistryInterface::class, ContainerAwareProcessorRegistry::class); } public function testCouldBeConstructedWithoutAnyArgument() { - new ContainerAwareMessageProcessorRegistry(); + new ContainerAwareProcessorRegistry(); } public function testShouldThrowExceptionIfProcessorIsNotSet() { $this->setExpectedException( \LogicException::class, - 'MessageProcessor was not found. processorName: "processor-name"' + 'Processor was not found. processorName: "processor-name"' ); - $registry = new ContainerAwareMessageProcessorRegistry(); + $registry = new ContainerAwareProcessorRegistry(); $registry->get('processor-name'); } @@ -36,13 +37,13 @@ public function testShouldThrowExceptionIfContainerIsNotSet() { $this->setExpectedException(\LogicException::class, 'Container was not set'); - $registry = new ContainerAwareMessageProcessorRegistry(); + $registry = new ContainerAwareProcessorRegistry(); $registry->set('processor-name', 'processor-id'); $registry->get('processor-name'); } - public function testShouldThrowExceptionIfInstanceOfMessageProcessorIsInvalid() + public function testShouldThrowExceptionIfInstanceOfProcessorIsInvalid() { $this->setExpectedException(\LogicException::class, 'Container was not set'); @@ -51,32 +52,32 @@ public function testShouldThrowExceptionIfInstanceOfMessageProcessorIsInvalid() $container = new Container(); $container->set('processor-id', $processor); - $registry = new ContainerAwareMessageProcessorRegistry(); + $registry = new ContainerAwareProcessorRegistry(); $registry->set('processor-name', 'processor-id'); $registry->get('processor-name'); } - public function testShouldReturnInstanceOfMessageProcessor() + public function testShouldReturnInstanceOfProcessor() { $this->setExpectedException(\LogicException::class, 'Container was not set'); - $processor = $this->createMessageProcessorMock(); + $processor = $this->createProcessorMock(); $container = new Container(); $container->set('processor-id', $processor); - $registry = new ContainerAwareMessageProcessorRegistry(); + $registry = new ContainerAwareProcessorRegistry(); $registry->set('processor-name', 'processor-id'); $this->assertSame($processor, $registry->get('processor-name')); } /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorInterface + * @return \PHPUnit_Framework_MockObject_MockObject|Processor */ - protected function createMessageProcessorMock() + protected function createProcessorMock() { - return $this->createMock(MessageProcessorInterface::class); + return $this->createMock(Processor::class); } } diff --git a/pkg/enqueue/Tests/Symfony/Client/Meta/QueuesCommandTest.php b/pkg/enqueue/Tests/Symfony/Client/Meta/QueuesCommandTest.php index ff50642e5..048e7d3fe 100644 --- a/pkg/enqueue/Tests/Symfony/Client/Meta/QueuesCommandTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/Meta/QueuesCommandTest.php @@ -1,4 +1,5 @@ extension; + } + protected function configure() { parent::configure(); @@ -26,9 +32,4 @@ protected function execute(InputInterface $input, OutputInterface $output) { $this->extension = $this->getSetupBrokerExtension($input, new NullDriver(new NullContext(), Config::create())); } - - public function getExtension() - { - return $this->extension; - } } diff --git a/pkg/enqueue/Tests/Symfony/Client/ProduceMessageCommandTest.php b/pkg/enqueue/Tests/Symfony/Client/ProduceMessageCommandTest.php index cd3bed816..6cbfbfc1d 100644 --- a/pkg/enqueue/Tests/Symfony/Client/ProduceMessageCommandTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/ProduceMessageCommandTest.php @@ -1,4 +1,5 @@ setExpectedException(\LogicException::class, 'Invalid message processor service given.'. - ' It must be an instance of Enqueue\Consumption\MessageProcessorInterface but stdClass'); + ' It must be an instance of Enqueue\Psr\Processor but stdClass'); $container = new Container(); $container->set('processor-service', new \stdClass()); @@ -67,7 +68,7 @@ public function testShouldThrowExceptionIfProcessorInstanceHasWrongClass() public function testShouldExecuteConsumption() { - $processor = $this->createMessageProcessor(); + $processor = $this->createProcessor(); $queue = $this->createQueueMock(); @@ -129,11 +130,11 @@ protected function createQueueMock() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorInterface + * @return \PHPUnit_Framework_MockObject_MockObject|Processor */ - protected function createMessageProcessor() + protected function createProcessor() { - return $this->createMock(MessageProcessorInterface::class); + return $this->createMock(Processor::class); } /** diff --git a/pkg/enqueue/Tests/Symfony/Consumption/LimitsExtensionsCommandTraitTest.php b/pkg/enqueue/Tests/Symfony/Consumption/LimitsExtensionsCommandTraitTest.php index 742027211..90293d094 100644 --- a/pkg/enqueue/Tests/Symfony/Consumption/LimitsExtensionsCommandTraitTest.php +++ b/pkg/enqueue/Tests/Symfony/Consumption/LimitsExtensionsCommandTraitTest.php @@ -1,4 +1,5 @@ extensions; + } + protected function configure() { parent::configure(); @@ -23,9 +29,4 @@ protected function execute(InputInterface $input, OutputInterface $output) { $this->extensions = $this->getLimitsExtensions($input, $output); } - - public function getExtensions() - { - return $this->extensions; - } } diff --git a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php index 3fbc5c59a..917ad5d2b 100644 --- a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php @@ -1,4 +1,5 @@ assertInstanceOf(NullMessage::class, $message); - $this->assertSame(null, $message->getBody()); + $this->assertNull($message->getBody()); $this->assertSame([], $message->getHeaders()); $this->assertSame([], $message->getProperties()); } diff --git a/pkg/enqueue/Tests/Transport/Null/NullMessageTest.php b/pkg/enqueue/Tests/Transport/Null/NullMessageTest.php index d8d2569bc..ddf9cd2d5 100644 --- a/pkg/enqueue/Tests/Transport/Null/NullMessageTest.php +++ b/pkg/enqueue/Tests/Transport/Null/NullMessageTest.php @@ -1,4 +1,5 @@ assertSame(null, $message->getBody()); + $this->assertNull($message->getBody()); } public function testShouldNewMessageReturnEmptyProperties() diff --git a/pkg/enqueue/Tests/Transport/Null/NullProducerTest.php b/pkg/enqueue/Tests/Transport/Null/NullProducerTest.php index 39c05898f..1e90bf349 100644 --- a/pkg/enqueue/Tests/Transport/Null/NullProducerTest.php +++ b/pkg/enqueue/Tests/Transport/Null/NullProducerTest.php @@ -1,4 +1,5 @@ toString(); - } else { - return RhumsaaUuid::uuid4()->toString(); } + + return RhumsaaUuid::uuid4()->toString(); } } diff --git a/pkg/enqueue/Util/VarExport.php b/pkg/enqueue/Util/VarExport.php index 897c664f3..4a48afadd 100644 --- a/pkg/enqueue/Util/VarExport.php +++ b/pkg/enqueue/Util/VarExport.php @@ -1,4 +1,5 @@ =5.6", - "enqueue/psr-queue": "dev-master", + "enqueue/psr-queue": "^0.1", "ramsey/uuid": "^2|^3.5" }, "require-dev": { @@ -20,7 +20,7 @@ "symfony/console": "^2.8|^3", "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3", - "enqueue/test": "dev-master" + "enqueue/test": "^0.1" }, "suggest": { "symfony/console": "^2.8|^3 If you want to use li commands", diff --git a/pkg/job-queue/CalculateRootJobStatusProcessor.php b/pkg/job-queue/CalculateRootJobStatusProcessor.php index a970a4399..82cb8e897 100644 --- a/pkg/job-queue/CalculateRootJobStatusProcessor.php +++ b/pkg/job-queue/CalculateRootJobStatusProcessor.php @@ -1,16 +1,17 @@ isRoot() ? $job : $job->getRootJob(); $stopStatuses = [Job::STATUS_SUCCESS, Job::STATUS_FAILED, Job::STATUS_CANCELLED]; - if (in_array($rootJob->getStatus(), $stopStatuses)) { + if (in_array($rootJob->getStatus(), $stopStatuses, true)) { return; } $rootStopped = false; $this->jobStorage->saveJob($rootJob, function (Job $rootJob) use ($stopStatuses, &$rootStopped) { - if (in_array($rootJob->getStatus(), $stopStatuses)) { + if (in_array($rootJob->getStatus(), $stopStatuses, true)) { return; } @@ -47,7 +48,7 @@ public function calculate(Job $job) $rootJob->setStatus($status); - if (in_array($status, $stopStatuses)) { + if (in_array($status, $stopStatuses, true)) { $rootStopped = true; if (!$rootJob->getStoppedAt()) { $rootJob->setStoppedAt(new \DateTime()); diff --git a/pkg/job-queue/DependentJobContext.php b/pkg/job-queue/DependentJobContext.php index 6393eed5f..34deba656 100644 --- a/pkg/job-queue/DependentJobContext.php +++ b/pkg/job-queue/DependentJobContext.php @@ -1,4 +1,5 @@ logger->critical(sprintf( - '[DependentJobMessageProcessor] Got invalid message. body: "%s"', + '[DependentJobProcessor] Got invalid message. body: "%s"', $message->getBody() )); @@ -59,7 +60,7 @@ public function process(PsrMessage $message, Context $context) $job = $this->jobStorage->findJobById($data['jobId']); if (!$job) { $this->logger->critical(sprintf( - '[DependentJobMessageProcessor] Job was not found. id: "%s"', + '[DependentJobProcessor] Job was not found. id: "%s"', $data['jobId'] )); @@ -68,7 +69,7 @@ public function process(PsrMessage $message, Context $context) if (!$job->isRoot()) { $this->logger->critical(sprintf( - '[DependentJobMessageProcessor] Expected root job but got child. id: "%s"', + '[DependentJobProcessor] Expected root job but got child. id: "%s"', $data['jobId'] )); @@ -86,7 +87,7 @@ public function process(PsrMessage $message, Context $context) foreach ($dependentJobs as $dependentJob) { if (!isset($dependentJob['topic']) || !isset($dependentJob['message'])) { $this->logger->critical(sprintf( - '[DependentJobMessageProcessor] Got invalid dependent job data. job: "%s", dependentJob: "%s"', + '[DependentJobProcessor] Got invalid dependent job data. job: "%s", dependentJob: "%s"', $job->getId(), JSON::encode($dependentJob) )); diff --git a/pkg/job-queue/DependentJobService.php b/pkg/job-queue/DependentJobService.php index e61bc44ba..e0d42b961 100644 --- a/pkg/job-queue/DependentJobService.php +++ b/pkg/job-queue/DependentJobService.php @@ -1,4 +1,5 @@ jobStorage->findJobById($job->getId()); - if (!in_array($job->getStatus(), [Job::STATUS_NEW, Job::STATUS_RUNNING])) { + if (!in_array($job->getStatus(), [Job::STATUS_NEW, Job::STATUS_RUNNING], true)) { throw new \LogicException(sprintf( 'Can cancel only new or running jobs. id: "%s", status: "%s"', $job->getId(), diff --git a/pkg/job-queue/JobRunner.php b/pkg/job-queue/JobRunner.php index c0adb97b0..bfca429a6 100644 --- a/pkg/job-queue/JobRunner.php +++ b/pkg/job-queue/JobRunner.php @@ -1,4 +1,5 @@ wrapTransactionNestingLevel('rollBack'); } - /** - * @param int $level - */ - private function setTransactionNestingLevel($level) - { - $prop = new \ReflectionProperty('Doctrine\DBAL\Connection', '_transactionNestingLevel'); - $prop->setAccessible(true); - - return $prop->setValue($this, $level); - } - - /** - * @param string $method - * - * @throws \Exception - */ - private function wrapTransactionNestingLevel($method) - { - $e = null; - - $this->setTransactionNestingLevel($this->getPersistedTransactionNestingLevel()); - - try { - call_user_func(['parent', $method]); - } catch (\Exception $e) { - } - - $this->persistTransactionNestingLevel($this->getTransactionNestingLevel()); - - if ($e) { - throw $e; - } - } - /** * @param bool $connected */ @@ -161,4 +128,38 @@ protected function getConnectionId() { return md5(serialize($this->getParams())); } + + /** + * @param int $level + */ + private function setTransactionNestingLevel($level) + { + $prop = new \ReflectionProperty('Doctrine\DBAL\Connection', '_transactionNestingLevel'); + $prop->setAccessible(true); + + return $prop->setValue($this, $level); + } + + /** + * @param string $method + * + * @throws \Exception + */ + private function wrapTransactionNestingLevel($method) + { + $e = null; + + $this->setTransactionNestingLevel($this->getPersistedTransactionNestingLevel()); + + try { + call_user_func(['parent', $method]); + } catch (\Exception $e) { + } + + $this->persistTransactionNestingLevel($this->getTransactionNestingLevel()); + + if ($e) { + throw $e; + } + } } diff --git a/pkg/job-queue/Test/JobRunner.php b/pkg/job-queue/Test/JobRunner.php index fe2d3cb77..62d9f9a52 100644 --- a/pkg/job-queue/Test/JobRunner.php +++ b/pkg/job-queue/Test/JobRunner.php @@ -1,4 +1,5 @@ assertEquals( [Topics::ROOT_JOB_STOPPED], - DependentJobMessageProcessor::getSubscribedTopics() + DependentJobProcessor::getSubscribedTopics() ); } @@ -32,13 +33,13 @@ public function testShouldLogCriticalAndRejectMessageIfJobIdIsNotSet() $logger ->expects($this->once()) ->method('critical') - ->with('[DependentJobMessageProcessor] Got invalid message. body: "{"key":"value"}"') + ->with('[DependentJobProcessor] Got invalid message. body: "{"key":"value"}"') ; $message = new NullMessage(); $message->setBody(json_encode(['key' => 'value'])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); @@ -60,13 +61,13 @@ public function testShouldLogCriticalAndRejectMessageIfJobEntityWasNotFound() $logger ->expects($this->once()) ->method('critical') - ->with('[DependentJobMessageProcessor] Job was not found. id: "12345"') + ->with('[DependentJobProcessor] Job was not found. id: "12345"') ; $message = new NullMessage(); $message->setBody(json_encode(['jobId' => 12345])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); @@ -92,13 +93,13 @@ public function testShouldLogCriticalAndRejectMessageIfJobIsNotRoot() $logger ->expects($this->once()) ->method('critical') - ->with('[DependentJobMessageProcessor] Expected root job but got child. id: "12345"') + ->with('[DependentJobProcessor] Expected root job but got child. id: "12345"') ; $message = new NullMessage(); $message->setBody(json_encode(['jobId' => 12345])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); @@ -128,7 +129,7 @@ public function testShouldDoNothingIfDependentJobsAreMissing() $message = new NullMessage(); $message->setBody(json_encode(['jobId' => 12345])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); @@ -163,13 +164,13 @@ public function testShouldLogCriticalAndRejectMessageIfDependentJobTopicIsMissin $logger ->expects($this->once()) ->method('critical') - ->with('[DependentJobMessageProcessor] Got invalid dependent job data. job: "123", dependentJob: "[]"') + ->with('[DependentJobProcessor] Got invalid dependent job data. job: "123", dependentJob: "[]"') ; $message = new NullMessage(); $message->setBody(json_encode(['jobId' => 12345])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); @@ -206,14 +207,14 @@ public function testShouldLogCriticalAndRejectMessageIfDependentJobMessageIsMiss $logger ->expects($this->once()) ->method('critical') - ->with('[DependentJobMessageProcessor] Got invalid dependent job data. '. + ->with('[DependentJobProcessor] Got invalid dependent job data. '. 'job: "123", dependentJob: "{"topic":"topic-name"}"') ; $message = new NullMessage(); $message->setBody(json_encode(['jobId' => 12345])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); @@ -257,7 +258,7 @@ public function testShouldPublishDependentMessage() $message = new NullMessage(); $message->setBody(json_encode(['jobId' => 12345])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); @@ -305,7 +306,7 @@ public function testShouldPublishDependentMessageWithPriority() $message = new NullMessage(); $message->setBody(json_encode(['jobId' => 12345])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); diff --git a/pkg/job-queue/Tests/DependentJobServiceTest.php b/pkg/job-queue/Tests/DependentJobServiceTest.php index 1c91dc78e..581c463b6 100644 --- a/pkg/job-queue/Tests/DependentJobServiceTest.php +++ b/pkg/job-queue/Tests/DependentJobServiceTest.php @@ -1,4 +1,5 @@ load(__DIR__.'/config/config.yml'); } + + protected function getContainerClass() + { + return parent::getContainerClass().'JobQueue'; + } } diff --git a/pkg/job-queue/Tests/JobProcessorTest.php b/pkg/job-queue/Tests/JobProcessorTest.php index 85c7f06f8..31ce0a7d8 100644 --- a/pkg/job-queue/Tests/JobProcessorTest.php +++ b/pkg/job-queue/Tests/JobProcessorTest.php @@ -1,12 +1,13 @@ =5.6", "symfony/framework-bundle": "^2.8|^3", - "enqueue/enqueue": "dev-master", + "enqueue/enqueue": "^0.1", "doctrine/orm": "~2.4" }, "require-dev": { "phpunit/phpunit": "~5.5", - "enqueue/test": "dev-master", + "enqueue/test": "^0.1", "doctrine/doctrine-bundle": "~1.2", "symfony/browser-kit": "^2.8|^3", "symfony/expression-language": "^2.8|^3" diff --git a/pkg/psr-queue/ConnectionFactory.php b/pkg/psr-queue/ConnectionFactory.php index a34f99048..5c7a2b0be 100644 --- a/pkg/psr-queue/ConnectionFactory.php +++ b/pkg/psr-queue/ConnectionFactory.php @@ -1,4 +1,5 @@ expectException(InvalidDestinationException::class); $this->expectExceptionMessage( - 'The destination must be an instance of Enqueue\Psr\Tests\Exception\DestinationBar'. - ' but got Enqueue\Psr\Tests\Exception\DestinationFoo.' + 'The destination must be an instance of Enqueue\Psr\Tests\DestinationBar'. + ' but got Enqueue\Psr\Tests\DestinationFoo.' ); InvalidDestinationException::assertDestinationInstanceOf(new DestinationFoo(), DestinationBar::class); diff --git a/pkg/psr-queue/Tests/Exception/InvalidMessageExceptionTest.php b/pkg/psr-queue/Tests/InvalidMessageExceptionTest.php similarity index 92% rename from pkg/psr-queue/Tests/Exception/InvalidMessageExceptionTest.php rename to pkg/psr-queue/Tests/InvalidMessageExceptionTest.php index 882967ebb..82f44571b 100644 --- a/pkg/psr-queue/Tests/Exception/InvalidMessageExceptionTest.php +++ b/pkg/psr-queue/Tests/InvalidMessageExceptionTest.php @@ -1,5 +1,6 @@ $value) { + if (0 === strpos($key, self::PROPERTY_PREFIX)) { + $encodedProperties[substr($key, $prefixLength)] = $value; + } else { + $encodedHeaders[$key] = $value; + } + } + + $decodedHeaders = self::doDecode($encodedHeaders); + $decodedProperties = self::doDecode($encodedProperties); + + return [$decodedHeaders, $decodedProperties]; + } + /** * @param array $headers * @@ -72,32 +99,6 @@ private static function doEncode($headers = []) return $encoded; } - /** - * @param array $headers - * - * @return array [[headers], [properties]] - */ - public static function decode(array $headers = []) - { - $encodedHeaders = []; - $encodedProperties = []; - $prefixLength = strlen(self::PROPERTY_PREFIX); - - // separate headers/properties - foreach ($headers as $key => $value) { - if (0 === strpos($key, self::PROPERTY_PREFIX)) { - $encodedProperties[substr($key, $prefixLength)] = $value; - } else { - $encodedHeaders[$key] = $value; - } - } - - $decodedHeaders = self::doDecode($encodedHeaders); - $decodedProperties = self::doDecode($encodedProperties); - - return [$decodedHeaders, $decodedProperties]; - } - /** * @param array $headers * diff --git a/pkg/stomp/StompMessage.php b/pkg/stomp/StompMessage.php index 5eb1c0843..ed90f6317 100644 --- a/pkg/stomp/StompMessage.php +++ b/pkg/stomp/StompMessage.php @@ -1,4 +1,5 @@ bind($queue, $processor); $queueConsumer->consume(); @@ -78,10 +79,10 @@ public function testConsumeOneMessageAndSendReplyExit() $replyMessage = $this->stompContext->createMessage(__METHOD__.'.reply'); - $processor = new StubMessageProcessor(); + $processor = new StubProcessor(); $processor->result = Result::reply($replyMessage); - $replyProcessor = new StubMessageProcessor(); + $replyProcessor = new StubProcessor(); $queueConsumer->bind($queue, $processor); $queueConsumer->bind($replyQueue, $replyProcessor); @@ -95,7 +96,7 @@ public function testConsumeOneMessageAndSendReplyExit() } } -class StubMessageProcessor implements MessageProcessorInterface +class StubProcessor implements Processor { public $result = Result::ACK; diff --git a/pkg/stomp/Tests/Functional/StompRpcUseCasesTest.php b/pkg/stomp/Tests/Functional/StompRpcUseCasesTest.php index 6b95d1f47..ec1ad55a5 100644 --- a/pkg/stomp/Tests/Functional/StompRpcUseCasesTest.php +++ b/pkg/stomp/Tests/Functional/StompRpcUseCasesTest.php @@ -1,12 +1,13 @@ =5.6", "stomp-php/stomp-php": "^4", - "enqueue/psr-queue": "dev-master", + "enqueue/psr-queue": "^0.1", "php-http/guzzle6-adapter": "^1.1", "richardfullmer/rabbitmq-management-api": "^2.0", "psr/log": "^1" }, "require-dev": { "phpunit/phpunit": "~5.4.0", - "enqueue/test": "dev-master", - "enqueue/enqueue": "dev-master", + "enqueue/test": "^0.1", + "enqueue/enqueue": "^0.1", "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3" }, diff --git a/pkg/test/ClassExtensionTrait.php b/pkg/test/ClassExtensionTrait.php index f5406ebc4..9bd326a7f 100644 --- a/pkg/test/ClassExtensionTrait.php +++ b/pkg/test/ClassExtensionTrait.php @@ -1,4 +1,5 @@