diff --git a/CHANGELOG.md b/CHANGELOG.md index 6eeafc0a8..f9370be3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,25 @@ # Change Log +## [0.9.2](https://github.com/php-enqueue/enqueue-dev/tree/0.9.2) (2018-12-13) +[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.9.1...0.9.2) + +**Merged pull requests:** + +- Allow 0.8.x Queue Interop \(without deprecated Psr prefixed interfaces\) [\#688](https://github.com/php-enqueue/enqueue-dev/pull/688) ([makasim](https://github.com/makasim)) +- \[dsn\] remove commented out code [\#661](https://github.com/php-enqueue/enqueue-dev/pull/661) ([kunicmarko20](https://github.com/kunicmarko20)) +- \[fs\]: fix: Wrong parameters for Exception [\#678](https://github.com/php-enqueue/enqueue-dev/pull/678) ([ssiergl](https://github.com/ssiergl)) +- \[fs\] Do not throw error in jsonUnserialize on deprecation notice [\#671](https://github.com/php-enqueue/enqueue-dev/pull/671) ([ssiergl](https://github.com/ssiergl)) +- \[mongodb\] polling\_integer type not correctly handled when using DSN [\#673](https://github.com/php-enqueue/enqueue-dev/pull/673) ([jak](https://github.com/jak)) +- \[dbal\] Use ordered bytes time uuid codec on message id decode. [\#665](https://github.com/php-enqueue/enqueue-dev/pull/665) ([makasim](https://github.com/makasim)) +- \[dbal\] fix: Wrong parameters for Exception [\#676](https://github.com/php-enqueue/enqueue-dev/pull/676) ([Nommyde](https://github.com/Nommyde)) +- \[sqs\] Add ability to use another aws account per queue. [\#666](https://github.com/php-enqueue/enqueue-dev/pull/666) ([makasim](https://github.com/makasim)) +- \[sqs\] Multi region support [\#664](https://github.com/php-enqueue/enqueue-dev/pull/664) ([makasim](https://github.com/makasim)) +- \[sqs\] Use a queue created in another AWS account. [\#662](https://github.com/php-enqueue/enqueue-dev/pull/662) ([makasim](https://github.com/makasim)) +- \[job-queue\] Fix tests on newer dbal versions. [\#687](https://github.com/php-enqueue/enqueue-dev/pull/687) ([makasim](https://github.com/makasim)) +- [doc] typo [\#686](https://github.com/php-enqueue/enqueue-dev/pull/686) ([OskarStark](https://github.com/OskarStark)) +- [doc] typo [\#683](https://github.com/php-enqueue/enqueue-dev/pull/683) ([OskarStark](https://github.com/OskarStark)) +- [doc] Fix package name for redis [\#680](https://github.com/php-enqueue/enqueue-dev/pull/680) ([gnumoksha](https://github.com/gnumoksha)) + ## [0.9.1](https://github.com/php-enqueue/enqueue-dev/tree/0.9.1) (2018-11-27) [Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.9.0...0.9.1) diff --git a/README.md b/README.md index 0fb847f2c..e08bdfcf9 100644 --- a/README.md +++ b/README.md @@ -90,7 +90,7 @@ Features: [![Build Status](https://travis-ci.org/php-enqueue/null.png?branch=master)](https://travis-ci.org/php-enqueue/null) [![Total Downloads](https://poser.pugx.org/enqueue/null/d/total.png)](https://packagist.org/packages/enqueue/null/stats) [![Latest Stable Version](https://poser.pugx.org/enqueue/null/version.png)](https://packagist.org/packages/enqueue/null) - * [the others are comming](https://github.com/php-enqueue/enqueue-dev/issues/284) + * [the others are coming](https://github.com/php-enqueue/enqueue-dev/issues/284) * [Symfony bundle](docs/bundle/quick_tour.md) * [Magento1 extension](docs/magento/quick_tour.md) * [Magento2 module](docs/magento2/quick_tour.md) diff --git a/bin/release b/bin/release index 4ef1a760a..41735ff64 100755 --- a/bin/release +++ b/bin/release @@ -22,7 +22,7 @@ git add CHANGELOG.md && git commit -m "Release $1" -S && git push origin "$CURRE ./bin/subtree-split -for REMOTE in origin stomp amqp-ext amqp-lib amqp-bunny amqp-tools pheanstalk gearman sqs gps fs redis dbal null rdkafka enqueue simple-client enqueue-bundle job-queue test async-event-dispatcher mongodb wamp monitoring +for REMOTE in origin stomp amqp-ext amqp-lib amqp-bunny amqp-tools pheanstalk gearman sqs gps fs redis dbal null rdkafka enqueue simple-client enqueue-bundle job-queue test async-event-dispatcher async-command mongodb wamp monitoring dsn do echo "" echo "" diff --git a/composer.json b/composer.json index ff9db427d..ed76384f5 100644 --- a/composer.json +++ b/composer.json @@ -12,7 +12,7 @@ "ext-rdkafka": "^3.0.3", "queue-interop/amqp-interop": "^0.8", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "bunny/bunny": "^0.2.4|^0.3|^0.4", "php-amqplib/php-amqplib": "^2.7", "doctrine/dbal": "^2.6", diff --git a/docs/bundle/cli_commands.md b/docs/bundle/cli_commands.md index 88cf5e5b7..c61d4b7c9 100644 --- a/docs/bundle/cli_commands.md +++ b/docs/bundle/cli_commands.md @@ -155,7 +155,7 @@ Help: ``` ./bin/console enqueue:transport:consume --help -Usage:ng mqdev_gearmand_1 ... done +Usage: enqueue:transport:consume [options] [--] Arguments: diff --git a/docs/client/supported_brokers.md b/docs/client/supported_brokers.md index a0d7b08b3..ddc8a3968 100644 --- a/docs/client/supported_brokers.md +++ b/docs/client/supported_brokers.md @@ -19,7 +19,7 @@ Here's the list of transports supported by Enqueue Client: | Doctrine DBAL | [enqueue/dbal](../transport/dbal.md) | mysql: pgsql: pdo_pgsql etc | | Filesystem | [enqueue/fs](../transport/fs.md) | file:///foo/bar | | Google PubSub | [enqueue/gps](../transport/gps.md) | gps: | -| Redis | [enqueue/gps](../transport/redis.md) | redis: | +| Redis | [enqueue/redis](../transport/redis.md) | redis: | | Amazon SQS | [enqueue/sqs](../transport/sqs.md) | sqs: | | STOMP, RabbitMQ | [enqueue/stomp](../transport/stomp.md) | stomp: | | Kafka | [enqueue/rdkafka](../transport/kafka.md) | kafka: | diff --git a/docs/transport/sqs.md b/docs/transport/sqs.md index 8bb06aa13..235df6ccd 100644 --- a/docs/transport/sqs.md +++ b/docs/transport/sqs.md @@ -19,6 +19,7 @@ It uses internally official [aws sdk library](https://packagist.org/packages/aws * [Send delay message](#send-delay-message) * [Consume message](#consume-message) * [Purge queue messages](#purge-queue-messages) +* [Queue from another AWS account](#queue-from-another-aws-account) ## Installation @@ -122,4 +123,42 @@ $fooQueue = $context->createQueue('foo'); $context->purgeQueue($fooQueue); ``` +## Queue from another AWS account + +SQS allows to use queues from another account. You could set it globally for all queues via option `queue_owner_aws_account_id` or +per queue using `SqsDestination::setQueueOwnerAWSAccountId` method. + +```php +createContext(); + +// per queue. +$queue = $context->createQueue('foo'); +$queue->setQueueOwnerAWSAccountId('awsAccountId'); +``` + +## Multi region examples + +Enqueue SQS provides a generic multi-region support. This enables users to specify which AWS Region to send a command to by setting region on SqsDestination. +You might need it to access SQS FIFO queue because they are not available for all regions. +If not specified the default region is used. + +```php +createContext(); + +$queue = $context->createQueue('foo'); +$queue->setRegion('us-west-2'); + +// the request goes to US West (Oregon) Region +$context->declareQueue($queue); +``` + [back to index](../index.md) diff --git a/pkg/amqp-bunny/composer.json b/pkg/amqp-bunny/composer.json index 086db9534..63b6addb0 100644 --- a/pkg/amqp-bunny/composer.json +++ b/pkg/amqp-bunny/composer.json @@ -8,7 +8,7 @@ "require": { "php": "^7.1.3", "queue-interop/amqp-interop": "^0.8", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "bunny/bunny": "^0.2.4|^0.3|^0.4", "enqueue/amqp-tools": "^0.9" }, diff --git a/pkg/amqp-ext/composer.json b/pkg/amqp-ext/composer.json index 6dcdd96e2..1c681f17e 100644 --- a/pkg/amqp-ext/composer.json +++ b/pkg/amqp-ext/composer.json @@ -9,7 +9,7 @@ "php": "^7.1.3", "ext-amqp": "^1.9.3", "queue-interop/amqp-interop": "^0.8", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "enqueue/amqp-tools": "^0.9" }, "require-dev": { diff --git a/pkg/amqp-lib/composer.json b/pkg/amqp-lib/composer.json index 415ca9f23..100dc583b 100644 --- a/pkg/amqp-lib/composer.json +++ b/pkg/amqp-lib/composer.json @@ -9,6 +9,7 @@ "php": "^7.1.3", "php-amqplib/php-amqplib": "^2.8", "queue-interop/amqp-interop": "^0.8", + "queue-interop/queue-interop": "^0.7|^0.8", "enqueue/amqp-tools": "^0.9" }, "require-dev": { diff --git a/pkg/amqp-tools/composer.json b/pkg/amqp-tools/composer.json index f0c70aa7f..aba5b5af3 100644 --- a/pkg/amqp-tools/composer.json +++ b/pkg/amqp-tools/composer.json @@ -8,7 +8,7 @@ "require": { "php": "^7.1.3", "queue-interop/amqp-interop": "^0.8", - "queue-interop/queue-interop": "^0.7" + "queue-interop/queue-interop": "^0.7|^0.8" }, "require-dev": { "phpunit/phpunit": "~5.4.0", diff --git a/pkg/async-command/composer.json b/pkg/async-command/composer.json index 5d93f47b1..7474c8834 100644 --- a/pkg/async-command/composer.json +++ b/pkg/async-command/composer.json @@ -8,7 +8,7 @@ "require": { "php": ">=7.1", "enqueue/enqueue": "^0.9", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "symfony/console": "^3.4|^4", "symfony/process": "^3.4|^4" }, diff --git a/pkg/async-event-dispatcher/composer.json b/pkg/async-event-dispatcher/composer.json index ca63adfb0..d5996f4aa 100644 --- a/pkg/async-event-dispatcher/composer.json +++ b/pkg/async-event-dispatcher/composer.json @@ -8,7 +8,7 @@ "require": { "php": "^7.1.3", "enqueue/enqueue": "^0.9", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "symfony/event-dispatcher": "^3.4|^4" }, "require-dev": { diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index 63ce0b1da..431b115d5 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -17,7 +17,9 @@ use Interop\Queue\Queue; use Interop\Queue\SubscriptionConsumer; use Interop\Queue\Topic; +use Ramsey\Uuid\Codec\OrderedTimeCodec; use Ramsey\Uuid\Uuid; +use Ramsey\Uuid\UuidFactory; class DbalContext implements Context { @@ -152,7 +154,9 @@ public function convertMessage(array $arrayMessage): DbalMessage ); if (isset($arrayMessage['id'])) { - $message->setMessageId(Uuid::fromBytes($arrayMessage['id'])->toString()); + $uuidCodec = new OrderedTimeCodec((new UuidFactory())->getUuidBuilder()); + + $message->setMessageId($uuidCodec->decodeBytes($arrayMessage['id'])->toString()); } if (isset($arrayMessage['queue'])) { $message->setQueue($arrayMessage['queue']); diff --git a/pkg/dbal/DbalProducer.php b/pkg/dbal/DbalProducer.php index 014c7775c..e18753729 100644 --- a/pkg/dbal/DbalProducer.php +++ b/pkg/dbal/DbalProducer.php @@ -139,7 +139,7 @@ public function send(Destination $destination, Message $message): void 'redeliver_after' => Type::BIGINT, ]); } catch (\Exception $e) { - throw new Exception('The transport fails to send the message due to some internal error.', null, $e); + throw new Exception('The transport fails to send the message due to some internal error.', 0, $e); } } diff --git a/pkg/dbal/composer.json b/pkg/dbal/composer.json index b48985714..253d63d31 100644 --- a/pkg/dbal/composer.json +++ b/pkg/dbal/composer.json @@ -7,7 +7,7 @@ "license": "MIT", "require": { "php": "^7.1.3", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "doctrine/dbal": "^2.6", "ramsey/uuid": "^3" }, diff --git a/pkg/dsn/Dsn.php b/pkg/dsn/Dsn.php index 338ec19af..a5cc139b2 100644 --- a/pkg/dsn/Dsn.php +++ b/pkg/dsn/Dsn.php @@ -80,16 +80,6 @@ public function __construct( $this->queryBag = new QueryBag($query); } -// public function __toString(): string -// { -// return $this->dsn; -// } -// -// public function getDsn(): string -// { -// return $this->dsn; -// } - public function getScheme(): string { return $this->scheme; diff --git a/pkg/enqueue-bundle/composer.json b/pkg/enqueue-bundle/composer.json index c74197602..63620a3b5 100644 --- a/pkg/enqueue-bundle/composer.json +++ b/pkg/enqueue-bundle/composer.json @@ -9,7 +9,7 @@ "php": "^7.1.3", "symfony/framework-bundle": "^3.4|^4", "queue-interop/amqp-interop": "^0.8", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "enqueue/enqueue": "^0.9", "enqueue/null": "^0.9" }, diff --git a/pkg/enqueue/Tests/Symfony/ContainerProcessorRegistryTest.php b/pkg/enqueue/Tests/Symfony/ContainerProcessorRegistryTest.php index fe84c2e20..bb3cacb53 100644 --- a/pkg/enqueue/Tests/Symfony/ContainerProcessorRegistryTest.php +++ b/pkg/enqueue/Tests/Symfony/ContainerProcessorRegistryTest.php @@ -69,7 +69,7 @@ public function testThrowErrorIfServiceDoesNotImplementProcessorReturnType() $registry = new ContainerProcessorRegistry($containerMock); $this->expectException(\TypeError::class); - $this->expectExceptionMessage('Return value of Enqueue\Symfony\ContainerProcessorRegistry::get() must implement interface Interop\Queue\PsrProcessor, instance of stdClass returned'); + $this->expectExceptionMessage('Return value of Enqueue\Symfony\ContainerProcessorRegistry::get() must implement interface Interop\Queue\Processor, instance of stdClass returned'); $registry->get('processor-name'); } diff --git a/pkg/enqueue/composer.json b/pkg/enqueue/composer.json index 26d147705..05dd1df2c 100644 --- a/pkg/enqueue/composer.json +++ b/pkg/enqueue/composer.json @@ -8,7 +8,7 @@ "require": { "php": "^7.1.3", "queue-interop/amqp-interop": "^0.8", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "enqueue/null": "^0.9", "enqueue/dsn": "^0.9", "ramsey/uuid": "^2|^3.5", diff --git a/pkg/fs/FsConsumer.php b/pkg/fs/FsConsumer.php index d9092aaf4..65ff12e61 100644 --- a/pkg/fs/FsConsumer.php +++ b/pkg/fs/FsConsumer.php @@ -137,7 +137,7 @@ public function receiveNoWait(): ?Message $this->preFetchedMessages[] = $fetchedMessage; } catch (\Exception $e) { - throw new \LogicException(sprintf("Cannot decode json message '%s'", $rawMessage), null, $e); + throw new \LogicException(sprintf("Cannot decode json message '%s'", $rawMessage), 0, $e); } } else { return null; diff --git a/pkg/fs/FsContext.php b/pkg/fs/FsContext.php index 0d499526c..480074e03 100644 --- a/pkg/fs/FsContext.php +++ b/pkg/fs/FsContext.php @@ -105,7 +105,7 @@ public function workWithFile(FsDestination $destination, string $mode, callable set_error_handler(function ($severity, $message, $file, $line) { throw new \ErrorException($message, 0, $severity, $file, $line); - }); + }, E_ALL & ~E_USER_DEPRECATED); try { $file = fopen((string) $destination->getFileInfo(), $mode); diff --git a/pkg/fs/composer.json b/pkg/fs/composer.json index 0be58949c..fb95a36f9 100644 --- a/pkg/fs/composer.json +++ b/pkg/fs/composer.json @@ -7,7 +7,7 @@ "license": "MIT", "require": { "php": "^7.1.3", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "enqueue/dsn": "^0.9", "symfony/filesystem": "^3.4|^4", "makasim/temp-file": "^0.2@stable" diff --git a/pkg/gearman/composer.json b/pkg/gearman/composer.json index e23162405..102703aed 100644 --- a/pkg/gearman/composer.json +++ b/pkg/gearman/composer.json @@ -8,7 +8,7 @@ "require": { "php": "^7.1.3", "ext-gearman": "^2.0", - "queue-interop/queue-interop": "^0.7" + "queue-interop/queue-interop": "^0.7|^0.8" }, "require-dev": { "phpunit/phpunit": "~5.4.0", diff --git a/pkg/gps/composer.json b/pkg/gps/composer.json index 295ae105f..46f804ebb 100644 --- a/pkg/gps/composer.json +++ b/pkg/gps/composer.json @@ -7,7 +7,7 @@ "license": "MIT", "require": { "php": "^7.1.3", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "google/cloud-pubsub": "^1.0", "enqueue/dsn": "^0.9" }, diff --git a/pkg/job-queue/Test/DbalPersistedConnection.php b/pkg/job-queue/Test/DbalPersistedConnection.php index 7c06f5ab0..f1ec03035 100644 --- a/pkg/job-queue/Test/DbalPersistedConnection.php +++ b/pkg/job-queue/Test/DbalPersistedConnection.php @@ -71,10 +71,15 @@ public function rollBack() */ protected function setConnected($connected) { - $isConnected = new \ReflectionProperty('Doctrine\DBAL\Connection', '_isConnected'); - $isConnected->setAccessible(true); - $isConnected->setValue($this, $connected); - $isConnected->setAccessible(false); + $rc = new \ReflectionClass(Connection::class); + $rp = $rc->hasProperty('isConnected') ? + $rc->getProperty('isConnected') : + $rc->getProperty('_isConnected') + ; + + $rp->setAccessible(true); + $rp->setValue($this, $connected); + $rp->setAccessible(false); } /** @@ -134,10 +139,15 @@ protected function getConnectionId() */ private function setTransactionNestingLevel($level) { - $prop = new \ReflectionProperty('Doctrine\DBAL\Connection', '_transactionNestingLevel'); - $prop->setAccessible(true); - - return $prop->setValue($this, $level); + $rc = new \ReflectionClass(Connection::class); + $rp = $rc->hasProperty('transactionNestingLevel') ? + $rc->getProperty('transactionNestingLevel') : + $rc->getProperty('_transactionNestingLevel') + ; + + $rp->setAccessible(true); + $rp->setValue($this, $level); + $rp->setAccessible(false); } /** diff --git a/pkg/job-queue/composer.json b/pkg/job-queue/composer.json index 3ade4f79a..0dab189f1 100644 --- a/pkg/job-queue/composer.json +++ b/pkg/job-queue/composer.json @@ -9,7 +9,7 @@ "php": "^7.1.3", "enqueue/enqueue": "^0.9", "enqueue/null": "0.9.x-dev", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "doctrine/orm": "~2.4" }, "require-dev": { diff --git a/pkg/mongodb/MongodbConnectionFactory.php b/pkg/mongodb/MongodbConnectionFactory.php index 28a5fc068..a5148f1c0 100644 --- a/pkg/mongodb/MongodbConnectionFactory.php +++ b/pkg/mongodb/MongodbConnectionFactory.php @@ -98,7 +98,7 @@ public static function parseDsn(string $dsn): array parse_str($parsedUrl['query'], $queryParts); //get enqueue attributes values if (!empty($queryParts['polling_interval'])) { - $config['polling_interval'] = $queryParts['polling_interval']; + $config['polling_interval'] = (int) $queryParts['polling_interval']; } if (!empty($queryParts['enqueue_collection'])) { $config['collection_name'] = $queryParts['enqueue_collection']; diff --git a/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php b/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php index 63ec00cea..c15b0fa43 100644 --- a/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php +++ b/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php @@ -45,6 +45,20 @@ public function testCouldBeConstructedWithCustomConfiguration() $this->assertAttributeEquals($params, 'config', $factory); } + public function testCouldBeConstructedWithCustomConfigurationFromDsn() + { + $params = [ + 'dsn' => 'mongodb://127.0.0.3/test-db-name?enqueue_collection=collection-name&polling_interval=3000', + 'dbname' => 'test-db-name', + 'collection_name' => 'collection-name', + 'polling_interval' => 3000, + ]; + + $factory = new MongodbConnectionFactory($params['dsn']); + + $this->assertAttributeEquals($params, 'config', $factory); + } + public function testShouldCreateContext() { $factory = new MongodbConnectionFactory(); diff --git a/pkg/mongodb/composer.json b/pkg/mongodb/composer.json index b3afedb03..ec5bfe34d 100644 --- a/pkg/mongodb/composer.json +++ b/pkg/mongodb/composer.json @@ -11,7 +11,7 @@ "license": "MIT", "require": { "php": "^7.1.3", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "mongodb/mongodb": "^1.2", "ext-mongodb": "^1.3" }, diff --git a/pkg/null/composer.json b/pkg/null/composer.json index cb900e1cd..1fa1a2c22 100644 --- a/pkg/null/composer.json +++ b/pkg/null/composer.json @@ -7,7 +7,7 @@ "license": "MIT", "require": { "php": "^7.1.3", - "queue-interop/queue-interop": "^0.7" + "queue-interop/queue-interop": "^0.7|^0.8" }, "require-dev": { "phpunit/phpunit": "~5.5", diff --git a/pkg/pheanstalk/composer.json b/pkg/pheanstalk/composer.json index 38ea2ee80..a3e34ee10 100644 --- a/pkg/pheanstalk/composer.json +++ b/pkg/pheanstalk/composer.json @@ -8,7 +8,7 @@ "require": { "php": "^7.1.3", "pda/pheanstalk": "^3", - "queue-interop/queue-interop": "^0.7" + "queue-interop/queue-interop": "^0.7|^0.8" }, "require-dev": { "phpunit/phpunit": "~5.4.0", diff --git a/pkg/rdkafka/composer.json b/pkg/rdkafka/composer.json index 9ec0d2f20..e0d55b396 100644 --- a/pkg/rdkafka/composer.json +++ b/pkg/rdkafka/composer.json @@ -8,7 +8,7 @@ "require": { "php": "^7.1.3", "ext-rdkafka": "^3.0.3", - "queue-interop/queue-interop": "^0.7" + "queue-interop/queue-interop": "^0.7|^0.8" }, "require-dev": { "phpunit/phpunit": "~5.4.0", diff --git a/pkg/redis/composer.json b/pkg/redis/composer.json index 0b0a3a37b..7e9294548 100644 --- a/pkg/redis/composer.json +++ b/pkg/redis/composer.json @@ -7,7 +7,7 @@ "license": "MIT", "require": { "php": "^7.1.3", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "enqueue/dsn": "^0.9", "ramsey/uuid": "^3" }, diff --git a/pkg/simple-client/composer.json b/pkg/simple-client/composer.json index a4bbc478b..571cca783 100644 --- a/pkg/simple-client/composer.json +++ b/pkg/simple-client/composer.json @@ -9,7 +9,7 @@ "php": "^7.1.3", "enqueue/enqueue": "^0.9", "queue-interop/amqp-interop": "^0.8", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "symfony/config": "^3.4|^4" }, "require-dev": { diff --git a/pkg/sqs/SqsClient.php b/pkg/sqs/SqsClient.php new file mode 100644 index 000000000..65cf2fb29 --- /dev/null +++ b/pkg/sqs/SqsClient.php @@ -0,0 +1,148 @@ +inputClient = $inputClient; + } + + public function deleteMessage(array $args): Result + { + return $this->callApi('deleteMessage', $args); + } + + public function receiveMessage(array $args): Result + { + return $this->callApi('receiveMessage', $args); + } + + public function purgeQueue(array $args): Result + { + return $this->callApi('purgeQueue', $args); + } + + public function getQueueUrl(array $args): Result + { + return $this->callApi('getQueueUrl', $args); + } + + public function createQueue(array $args): Result + { + return $this->callApi('createQueue', $args); + } + + public function deleteQueue(array $args): Result + { + return $this->callApi('deleteQueue', $args); + } + + public function sendMessage(array $args): Result + { + return $this->callApi('sendMessage', $args); + } + + public function getAWSClient(): AwsSqsClient + { + $this->resolveClient(); + + if ($this->singleClient) { + return $this->singleClient; + } + + if ($this->multiClient) { + $mr = new \ReflectionMethod($this->multiClient, 'getClientFromPool'); + $mr->setAccessible(true); + $singleClient = $mr->invoke($this->multiClient, $this->multiClient->getRegion()); + $mr->setAccessible(false); + + return $singleClient; + } + + throw new \LogicException('The multi or single client must be set'); + } + + private function callApi(string $name, array $args): Result + { + $this->resolveClient(); + + if ($this->singleClient) { + if (false == empty($args['@region'])) { + throw new \LogicException('Cannot send message to another region because transport is configured with single aws client'); + } + + unset($args['@region']); + + return call_user_func([$this->singleClient, $name], $args); + } + + if ($this->multiClient) { + return call_user_func([$this->multiClient, $name], $args); + } + + throw new \LogicException('The multi or single client must be set'); + } + + private function resolveClient(): void + { + if ($this->singleClient || $this->multiClient) { + return; + } + + $client = $this->inputClient; + if ($client instanceof MultiRegionClient) { + $this->multiClient = $client; + + return; + } elseif ($client instanceof AwsSqsClient) { + $this->singleClient = $client; + + return; + } elseif (is_callable($client)) { + $client = call_user_func($client); + if ($client instanceof MultiRegionClient) { + $this->multiClient = $client; + + return; + } + if ($client instanceof AwsSqsClient) { + $this->singleClient = $client; + + return; + } + } + + throw new \LogicException(sprintf( + 'The input client must be an instance of "%s" or "%s" or a callable that returns one of those. Got "%s"', + AwsSqsClient::class, + MultiRegionClient::class, + is_object($client) ? get_class($client) : gettype($client) + )); + } +} diff --git a/pkg/sqs/SqsConnectionFactory.php b/pkg/sqs/SqsConnectionFactory.php index 5f626c368..4a99c752e 100644 --- a/pkg/sqs/SqsConnectionFactory.php +++ b/pkg/sqs/SqsConnectionFactory.php @@ -4,7 +4,8 @@ namespace Enqueue\Sqs; -use Aws\Sqs\SqsClient; +use Aws\Sdk; +use Aws\Sqs\SqsClient as AwsSqsClient; use Enqueue\Dsn\Dsn; use Interop\Queue\ConnectionFactory; use Interop\Queue\Context; @@ -23,14 +24,15 @@ class SqsConnectionFactory implements ConnectionFactory /** * $config = [ - * 'key' => null - AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment. - * 'secret' => null, - AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment. - * 'token' => null, - AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment. - * 'region' => null, - (string, required) Region to connect to. See http://docs.aws.amazon.com/general/latest/gr/rande.html for a list of available regions. - * 'retries' => 3, - (int, default=int(3)) Configures the maximum number of allowed retries for a client (pass 0 to disable retries). - * 'version' => '2012-11-05', - (string, required) The version of the webservice to utilize - * 'lazy' => true, - Enable lazy connection (boolean) - * 'endpoint' => null - (string, default=null) The full URI of the webservice. This is only required when connecting to a custom endpoint e.g. localstack + * 'key' => null AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment. + * 'secret' => null, AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment. + * 'token' => null, AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment. + * 'region' => null, (string, required) Region to connect to. See http://docs.aws.amazon.com/general/latest/gr/rande.html for a list of available regions. + * 'retries' => 3, (int, default=int(3)) Configures the maximum number of allowed retries for a client (pass 0 to disable retries). + * 'version' => '2012-11-05', (string, required) The version of the webservice to utilize + * 'lazy' => true, Enable lazy connection (boolean) + * 'endpoint' => null (string, default=null) The full URI of the webservice. This is only required when connecting to a custom endpoint e.g. localstack + * 'queue_owner_aws_account_id' The AWS account ID of the account that created the queue. * ]. * * or @@ -42,8 +44,8 @@ class SqsConnectionFactory implements ConnectionFactory */ public function __construct($config = 'sqs:') { - if ($config instanceof SqsClient) { - $this->client = $config; + if ($config instanceof AwsSqsClient) { + $this->client = new SqsClient($config); $this->config = ['lazy' => false] + $this->defaultConfig(); return; @@ -60,7 +62,7 @@ public function __construct($config = 'sqs:') unset($config['dsn']); } } else { - throw new \LogicException(sprintf('The config must be either an array of options, a DSN string, null or instance of %s', SqsClient::class)); + throw new \LogicException(sprintf('The config must be either an array of options, a DSN string, null or instance of %s', AwsSqsClient::class)); } $this->config = array_replace($this->defaultConfig(), $config); @@ -71,13 +73,7 @@ public function __construct($config = 'sqs:') */ public function createContext(): Context { - if ($this->config['lazy']) { - return new SqsContext(function () { - return $this->establishConnection(); - }); - } - - return new SqsContext($this->establishConnection()); + return new SqsContext($this->establishConnection(), $this->config); } private function establishConnection(): SqsClient @@ -107,7 +103,14 @@ private function establishConnection(): SqsClient } } - $this->client = new SqsClient($config); + $establishConnection = function () use ($config) { + return (new Sdk(['Sqs' => $config]))->createMultiRegionSqs(); + }; + + $this->client = $this->config['lazy'] ? + new SqsClient($establishConnection) : + new SqsClient($establishConnection()) + ; return $this->client; } @@ -132,6 +135,7 @@ private function parseDsn(string $dsn): array 'version' => $dsn->getString('version'), 'lazy' => $dsn->getBool('lazy'), 'endpoint' => $dsn->getString('endpoint'), + 'queue_owner_aws_account_id' => $dsn->getString('queue_owner_aws_account_id'), ]), function ($value) { return null !== $value; }); } @@ -146,6 +150,7 @@ private function defaultConfig(): array 'version' => '2012-11-05', 'lazy' => true, 'endpoint' => null, + 'queue_owner_aws_account_id' => null, ]; } } diff --git a/pkg/sqs/SqsConsumer.php b/pkg/sqs/SqsConsumer.php index 1ee37a99d..b6ded7e20 100644 --- a/pkg/sqs/SqsConsumer.php +++ b/pkg/sqs/SqsConsumer.php @@ -119,7 +119,8 @@ public function acknowledge(Message $message): void { InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class); - $this->context->getClient()->deleteMessage([ + $this->context->getSqsClient()->deleteMessage([ + '@region' => $this->queue->getRegion(), 'QueueUrl' => $this->context->getQueueUrl($this->queue), 'ReceiptHandle' => $message->getReceiptHandle(), ]); @@ -132,7 +133,8 @@ public function reject(Message $message, bool $requeue = false): void { InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class); - $this->context->getClient()->deleteMessage([ + $this->context->getSqsClient()->deleteMessage([ + '@region' => $this->queue->getRegion(), 'QueueUrl' => $this->context->getQueueUrl($this->queue), 'ReceiptHandle' => $message->getReceiptHandle(), ]); @@ -149,6 +151,7 @@ protected function receiveMessage(int $timeoutSeconds): ?SqsMessage } $arguments = [ + '@region' => $this->queue->getRegion(), 'AttributeNames' => ['All'], 'MessageAttributeNames' => ['All'], 'MaxNumberOfMessages' => $this->maxNumberOfMessages, @@ -160,7 +163,7 @@ protected function receiveMessage(int $timeoutSeconds): ?SqsMessage $arguments['VisibilityTimeout'] = $this->visibilityTimeout; } - $result = $this->context->getClient()->receiveMessage($arguments); + $result = $this->context->getSqsClient()->receiveMessage($arguments); if ($result->hasKey('Messages')) { $this->messages = $result->get('Messages'); diff --git a/pkg/sqs/SqsContext.php b/pkg/sqs/SqsContext.php index cb3a9edfa..8be6fcddb 100644 --- a/pkg/sqs/SqsContext.php +++ b/pkg/sqs/SqsContext.php @@ -4,7 +4,7 @@ namespace Enqueue\Sqs; -use Aws\Sqs\SqsClient; +use Aws\Sqs\SqsClient as AwsSqsClient; use Interop\Queue\Consumer; use Interop\Queue\Context; use Interop\Queue\Destination; @@ -24,34 +24,20 @@ class SqsContext implements Context */ private $client; - /** - * @var callable - */ - private $clientFactory; - /** * @var array */ private $queueUrls; /** - * Callable must return instance of SqsClient once called. - * - * @param SqsClient|callable $client + * @var array */ - public function __construct($client) - { - if ($client instanceof SqsClient) { - $this->client = $client; - } elseif (is_callable($client)) { - $this->clientFactory = $client; - } else { - throw new \InvalidArgumentException(sprintf( - 'The $client argument must be either %s or callable that returns %s once called.', - SqsClient::class, - SqsClient::class - )); - } + private $config; + + public function __construct(SqsClient $client, array $config) + { + $this->client = $client; + $this->config = $config; } /** @@ -114,7 +100,8 @@ public function purgeQueue(Queue $queue): void { InvalidDestinationException::assertDestinationInstanceOf($queue, SqsDestination::class); - $this->getClient()->purgeQueue([ + $this->client->purgeQueue([ + '@region' => $queue->getRegion(), 'QueueUrl' => $this->getQueueUrl($queue), ]); } @@ -124,44 +111,56 @@ public function createSubscriptionConsumer(): SubscriptionConsumer throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt(); } - public function getClient(): SqsClient + public function getAwsSqsClient(): AwsSqsClient { - if (false == $this->client) { - $client = call_user_func($this->clientFactory); - if (false == $client instanceof SqsClient) { - throw new \LogicException(sprintf( - 'The factory must return instance of "%s". But it returns %s', - SqsClient::class, - is_object($client) ? get_class($client) : gettype($client) - )); - } - - $this->client = $client; - } + return $this->client->getAWSClient(); + } + public function getSqsClient(): SqsClient + { return $this->client; } + /** + * @deprecated use getAwsSqsClient method + */ + public function getClient(): AwsSqsClient + { + @trigger_error('The method is deprecated since 0.9.2. SqsContext::getAwsSqsClient() method should be used.', E_USER_DEPRECATED); + + return $this->getAwsSqsClient(); + } + public function getQueueUrl(SqsDestination $destination): string { if (isset($this->queueUrls[$destination->getQueueName()])) { return $this->queueUrls[$destination->getQueueName()]; } - $result = $this->getClient()->getQueueUrl([ + $arguments = [ + '@region' => $destination->getRegion(), 'QueueName' => $destination->getQueueName(), - ]); + ]; + + if ($destination->getQueueOwnerAWSAccountId()) { + $arguments['QueueOwnerAWSAccountId'] = $destination->getQueueOwnerAWSAccountId(); + } elseif (false == empty($this->config['queue_owner_aws_account_id'])) { + $arguments['QueueOwnerAWSAccountId'] = $this->config['queue_owner_aws_account_id']; + } + + $result = $this->client->getQueueUrl($arguments); if (false == $result->hasKey('QueueUrl')) { throw new \RuntimeException(sprintf('QueueUrl cannot be resolved. queueName: "%s"', $destination->getQueueName())); } - return $this->queueUrls[$destination->getQueueName()] = $result->get('QueueUrl'); + return $this->queueUrls[$destination->getQueueName()] = (string) $result->get('QueueUrl'); } public function declareQueue(SqsDestination $dest): void { - $result = $this->getClient()->createQueue([ + $result = $this->client->createQueue([ + '@region' => $dest->getRegion(), 'Attributes' => $dest->getAttributes(), 'QueueName' => $dest->getQueueName(), ]); @@ -175,7 +174,7 @@ public function declareQueue(SqsDestination $dest): void public function deleteQueue(SqsDestination $dest): void { - $this->getClient()->deleteQueue([ + $this->client->deleteQueue([ 'QueueUrl' => $this->getQueueUrl($dest), ]); diff --git a/pkg/sqs/SqsDestination.php b/pkg/sqs/SqsDestination.php index 21fc17828..5649d30a3 100644 --- a/pkg/sqs/SqsDestination.php +++ b/pkg/sqs/SqsDestination.php @@ -14,11 +14,21 @@ class SqsDestination implements Topic, Queue */ private $name; + /** + * @var string|null + */ + private $region; + /** * @var array */ private $attributes; + /** + * @var string|null + */ + private $queueOwnerAWSAccountId; + /** * The name of the new queue. * The following limits apply to this name: @@ -187,4 +197,24 @@ public function setContentBasedDeduplication(bool $enable): void unset($this->attributes['ContentBasedDeduplication']); } } + + public function getQueueOwnerAWSAccountId(): ?string + { + return $this->queueOwnerAWSAccountId; + } + + public function setQueueOwnerAWSAccountId(?string $queueOwnerAWSAccountId): void + { + $this->queueOwnerAWSAccountId = $queueOwnerAWSAccountId; + } + + public function setRegion(string $region = null): void + { + $this->region = $region; + } + + public function getRegion(): ?string + { + return $this->region; + } } diff --git a/pkg/sqs/SqsProducer.php b/pkg/sqs/SqsProducer.php index 332bee505..b33d57fb8 100644 --- a/pkg/sqs/SqsProducer.php +++ b/pkg/sqs/SqsProducer.php @@ -44,6 +44,7 @@ public function send(Destination $destination, Message $message): void } $arguments = [ + '@region' => $destination->getRegion(), 'MessageAttributes' => [ 'Headers' => [ 'DataType' => 'String', @@ -70,7 +71,7 @@ public function send(Destination $destination, Message $message): void $arguments['MessageGroupId'] = $message->getMessageGroupId(); } - $result = $this->context->getClient()->sendMessage($arguments); + $result = $this->context->getSqsClient()->sendMessage($arguments); if (false == $result->hasKey('MessageId')) { throw new \RuntimeException('Message was not sent'); diff --git a/pkg/sqs/Tests/Spec/CreateSqsQueueTrait.php b/pkg/sqs/Tests/Spec/CreateSqsQueueTrait.php new file mode 100644 index 000000000..79af8208e --- /dev/null +++ b/pkg/sqs/Tests/Spec/CreateSqsQueueTrait.php @@ -0,0 +1,21 @@ +queue = $context->createQueue($queueName); + $context->declareQueue($this->queue); + + return $this->queue; + } +} diff --git a/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php index 2c8f2a21a..710e918f3 100644 --- a/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php @@ -17,17 +17,13 @@ class SqsSendAndReceiveDelayedMessageFromQueueTest extends SendAndReceiveDelayed { use RetryTrait; use SqsExtension; + use CreateSqsQueueTrait; /** * @var SqsContext */ private $context; - /** - * @var SqsDestination - */ - private $queue; - protected function tearDown() { parent::tearDown(); @@ -37,26 +33,13 @@ protected function tearDown() } } - /** - * {@inheritdoc} - */ - protected function createContext() + protected function createContext(): SqsContext { return $this->context = $this->buildSqsContext(); } - /** - * {@inheritdoc} - * - * @param SqsContext $context - */ - protected function createQueue(Context $context, $queueName) + protected function createQueue(Context $context, $queueName): SqsDestination { - $queueName = $queueName.time(); - - $this->queue = $context->createQueue($queueName); - $context->declareQueue($this->queue); - - return $this->queue; + return $this->createSqsQueue($context, $queueName); } } diff --git a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php index 934dda60e..1a3cdb54e 100644 --- a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php @@ -4,27 +4,26 @@ use Enqueue\Sqs\SqsContext; use Enqueue\Sqs\SqsDestination; +use Enqueue\Test\RetryTrait; use Enqueue\Test\SqsExtension; use Interop\Queue\Context; use Interop\Queue\Spec\SendToAndReceiveFromQueueSpec; /** * @group functional + * @retry 5 */ class SqsSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec { + use RetryTrait; use SqsExtension; + use CreateSqsQueueTrait; /** * @var SqsContext */ private $context; - /** - * @var SqsDestination - */ - private $queue; - protected function tearDown() { parent::tearDown(); @@ -34,26 +33,13 @@ protected function tearDown() } } - /** - * {@inheritdoc} - */ - protected function createContext() + protected function createContext(): SqsContext { return $this->context = $this->buildSqsContext(); } - /** - * {@inheritdoc} - * - * @param SqsContext $context - */ - protected function createQueue(Context $context, $queueName) + protected function createQueue(Context $context, $queueName): SqsDestination { - $queueName = $queueName.time(); - - $this->queue = $context->createQueue($queueName); - $context->declareQueue($this->queue); - - return $this->queue; + return $this->createSqsQueue($context, $queueName); } } diff --git a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php index f21e33903..b66631233 100644 --- a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php @@ -17,17 +17,13 @@ class SqsSendToAndReceiveFromTopicTest extends SendToAndReceiveFromTopicSpec { use SqsExtension; use RetryTrait; + use CreateSqsQueueTrait; /** * @var SqsContext */ private $context; - /** - * @var SqsDestination - */ - private $queue; - protected function tearDown() { parent::tearDown(); @@ -37,26 +33,13 @@ protected function tearDown() } } - /** - * {@inheritdoc} - */ - protected function createContext() + protected function createContext(): SqsContext { return $this->context = $this->buildSqsContext(); } - /** - * {@inheritdoc} - * - * @param SqsContext $context - */ - protected function createTopic(Context $context, $topicName) + protected function createTopic(Context $context, $queueName): SqsDestination { - $topicName = $topicName.time(); - - $this->queue = $context->createTopic($topicName); - $context->declareQueue($this->queue); - - return $this->queue; + return $this->createSqsQueue($context, $queueName); } } diff --git a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromQueueTest.php index 9301c647b..378e65c13 100644 --- a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromQueueTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromQueueTest.php @@ -2,23 +2,44 @@ namespace Enqueue\Sqs\Tests\Spec; +use Enqueue\Sqs\SqsContext; +use Enqueue\Sqs\SqsDestination; +use Enqueue\Test\RetryTrait; +use Enqueue\Test\SqsExtension; +use Interop\Queue\Context; use Interop\Queue\Spec\SendToAndReceiveNoWaitFromQueueSpec; /** * @group functional + * @retry 5 */ class SqsSendToAndReceiveNoWaitFromQueueTest extends SendToAndReceiveNoWaitFromQueueSpec { - public function test() - { - $this->markTestSkipped('The test is fragile. This is how SQS.'); - } + use RetryTrait; + use SqsExtension; + use CreateSqsQueueTrait; /** - * {@inheritdoc} + * @var SqsContext */ - protected function createContext() + private $context; + + protected function tearDown() + { + parent::tearDown(); + + if ($this->context && $this->queue) { + $this->context->deleteQueue($this->queue); + } + } + + protected function createContext(): SqsContext + { + return $this->context = $this->buildSqsContext(); + } + + protected function createQueue(Context $context, $queueName): SqsDestination { - throw new \LogicException('Should not be ever called'); + return $this->createSqsQueue($context, $queueName); } } diff --git a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromTopicTest.php b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromTopicTest.php index 3d9149b05..175af9568 100644 --- a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromTopicTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromTopicTest.php @@ -2,23 +2,44 @@ namespace Enqueue\Sqs\Tests\Spec; +use Enqueue\Sqs\SqsContext; +use Enqueue\Sqs\SqsDestination; +use Enqueue\Test\RetryTrait; +use Enqueue\Test\SqsExtension; +use Interop\Queue\Context; use Interop\Queue\Spec\SendToAndReceiveNoWaitFromTopicSpec; /** * @group functional + * @retry 5 */ class SqsSendToAndReceiveNoWaitFromTopicTest extends SendToAndReceiveNoWaitFromTopicSpec { - public function test() - { - $this->markTestSkipped('The test is fragile. This is how SQS.'); - } + use RetryTrait; + use SqsExtension; + use CreateSqsQueueTrait; /** - * {@inheritdoc} + * @var SqsContext */ - protected function createContext() + private $context; + + protected function tearDown() + { + parent::tearDown(); + + if ($this->context && $this->queue) { + $this->context->deleteQueue($this->queue); + } + } + + protected function createContext(): SqsContext + { + return $this->context = $this->buildSqsContext(); + } + + protected function createTopic(Context $context, $queueName): SqsDestination { - throw new \LogicException('Should not be ever called'); + return $this->createSqsQueue($context, $queueName); } } diff --git a/pkg/sqs/Tests/Spec/SqsSendToTopicAndReceiveFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendToTopicAndReceiveFromQueueTest.php index a9db45362..2f9c8b638 100644 --- a/pkg/sqs/Tests/Spec/SqsSendToTopicAndReceiveFromQueueTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendToTopicAndReceiveFromQueueTest.php @@ -2,23 +2,49 @@ namespace Enqueue\Sqs\Tests\Spec; +use Enqueue\Sqs\SqsContext; +use Enqueue\Sqs\SqsDestination; +use Enqueue\Test\RetryTrait; +use Enqueue\Test\SqsExtension; +use Interop\Queue\Context; use Interop\Queue\Spec\SendToTopicAndReceiveFromQueueSpec; /** * @group functional + * @retry 5 */ class SqsSendToTopicAndReceiveFromQueueTest extends SendToTopicAndReceiveFromQueueSpec { - public function test() - { - $this->markTestSkipped('The SQS does not support it'); - } + use RetryTrait; + use SqsExtension; + use CreateSqsQueueTrait; /** - * {@inheritdoc} + * @var SqsContext */ - protected function createContext() + private $context; + + protected function tearDown() + { + parent::tearDown(); + + if ($this->context && $this->queue) { + $this->context->deleteQueue($this->queue); + } + } + + protected function createContext(): SqsContext + { + return $this->context = $this->buildSqsContext(); + } + + protected function createTopic(Context $context, $queueName): SqsDestination + { + return $this->createSqsQueue($context, $queueName); + } + + protected function createQueue(Context $context, $queueName): SqsDestination { - throw new \LogicException('Should not be ever called'); + return $this->createSqsQueue($context, $queueName); } } diff --git a/pkg/sqs/Tests/Spec/SqsSendToTopicAndReceiveNoWaitFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendToTopicAndReceiveNoWaitFromQueueTest.php index bbb9be63a..b4cc5669f 100644 --- a/pkg/sqs/Tests/Spec/SqsSendToTopicAndReceiveNoWaitFromQueueTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendToTopicAndReceiveNoWaitFromQueueTest.php @@ -2,23 +2,49 @@ namespace Enqueue\Sqs\Tests\Spec; +use Enqueue\Sqs\SqsContext; +use Enqueue\Sqs\SqsDestination; +use Enqueue\Test\RetryTrait; +use Enqueue\Test\SqsExtension; +use Interop\Queue\Context; use Interop\Queue\Spec\SendToTopicAndReceiveNoWaitFromQueueSpec; /** * @group functional + * @retry 5 */ class SqsSendToTopicAndReceiveNoWaitFromQueueTest extends SendToTopicAndReceiveNoWaitFromQueueSpec { - public function test() - { - $this->markTestSkipped('The SQS does not support it'); - } + use RetryTrait; + use SqsExtension; + use CreateSqsQueueTrait; /** - * {@inheritdoc} + * @var SqsContext */ - protected function createContext() + private $context; + + protected function tearDown() + { + parent::tearDown(); + + if ($this->context && $this->queue) { + $this->context->deleteQueue($this->queue); + } + } + + protected function createContext(): SqsContext + { + return $this->context = $this->buildSqsContext(); + } + + protected function createTopic(Context $context, $queueName): SqsDestination + { + return $this->createSqsQueue($context, $queueName); + } + + protected function createQueue(Context $context, $queueName): SqsDestination { - throw new \LogicException('Should not be ever called'); + return $this->createSqsQueue($context, $queueName); } } diff --git a/pkg/sqs/Tests/SqsClientTest.php b/pkg/sqs/Tests/SqsClientTest.php new file mode 100644 index 000000000..2a45edc72 --- /dev/null +++ b/pkg/sqs/Tests/SqsClientTest.php @@ -0,0 +1,299 @@ + [ + 'key' => '', + 'secret' => '', + 'token' => '', + 'region' => '', + 'version' => '2012-11-05', + 'endpoint' => 'http://localhost/', + ]]))->createSqs(); + + $client = new SqsClient($awsClient); + + $this->assertSame($awsClient, $client->getAWSClient()); + } + + public function testShouldAllowGetAwsClientIfMultipleClientProvided() + { + $awsClient = (new Sdk(['Sqs' => [ + 'key' => '', + 'secret' => '', + 'token' => '', + 'region' => '', + 'version' => '2012-11-05', + 'endpoint' => 'http://localhost/', + ]]))->createMultiRegionSqs(); + + $client = new SqsClient($awsClient); + + $this->assertInstanceOf(AwsSqsClient::class, $client->getAWSClient()); + } + + /** + * @dataProvider provideApiCallsSingleClient + * @dataProvider provideApiCallsMultipleClient + */ + public function testApiCall(string $method, array $args, array $result, string $awsClientClass) + { + $awsClient = $this->getMockBuilder($awsClientClass) + ->disableOriginalConstructor() + ->setMethods([$method]) + ->getMock(); + $awsClient + ->expects($this->once()) + ->method($method) + ->with($this->identicalTo($args)) + ->willReturn(new Result($result)); + + $client = new SqsClient($awsClient); + + $actualResult = $client->{$method}($args); + + $this->assertInstanceOf(Result::class, $actualResult); + $this->assertSame($result, $actualResult->toArray()); + } + + /** + * @dataProvider provideApiCallsSingleClient + * @dataProvider provideApiCallsMultipleClient + */ + public function testLazyApiCall(string $method, array $args, array $result, string $awsClientClass) + { + $awsClient = $this->getMockBuilder($awsClientClass) + ->disableOriginalConstructor() + ->setMethods([$method]) + ->getMock(); + $awsClient + ->expects($this->once()) + ->method($method) + ->with($this->identicalTo($args)) + ->willReturn(new Result($result)); + + $client = new SqsClient(function () use ($awsClient) { + return $awsClient; + }); + + $actualResult = $client->{$method}($args); + + $this->assertInstanceOf(Result::class, $actualResult); + $this->assertSame($result, $actualResult->toArray()); + } + + /** + * @dataProvider provideApiCallsSingleClient + * @dataProvider provideApiCallsMultipleClient + */ + public function testThrowIfInvalidInputClientApiCall(string $method, array $args, array $result, string $awsClientClass) + { + $client = new SqsClient(new \stdClass()); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The input client must be an instance of "Aws\Sqs\SqsClient" or "Aws\MultiRegionClient" or a callable that returns one of those. Got "stdClass"'); + $client->{$method}($args); + } + + /** + * @dataProvider provideApiCallsSingleClient + * @dataProvider provideApiCallsMultipleClient + */ + public function testThrowIfInvalidLazyInputClientApiCall(string $method, array $args, array $result, string $awsClientClass) + { + $client = new SqsClient(function () { return new \stdClass(); }); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The input client must be an instance of "Aws\Sqs\SqsClient" or "Aws\MultiRegionClient" or a callable that returns one of those. Got "stdClass"'); + $client->{$method}($args); + } + + /** + * @dataProvider provideApiCallsMultipleClient + */ + public function testApiCallWithMultiClientAndCustomRegion(string $method, array $args, array $result, string $awsClientClass) + { + $args['@region'] = 'theRegion'; + + $awsClient = $this->getMockBuilder($awsClientClass) + ->disableOriginalConstructor() + ->setMethods([$method]) + ->getMock(); + $awsClient + ->expects($this->once()) + ->method($method) + ->with($this->identicalTo($args)) + ->willReturn(new Result($result)); + + $client = new SqsClient($awsClient); + + $actualResult = $client->{$method}($args); + + $this->assertInstanceOf(Result::class, $actualResult); + $this->assertSame($result, $actualResult->toArray()); + } + + /** + * @dataProvider provideApiCallsSingleClient + */ + public function testApiCallWithSingleClientAndCustomRegion(string $method, array $args, array $result, string $awsClientClass) + { + $args['@region'] = 'theRegion'; + + $awsClient = $this->getMockBuilder($awsClientClass) + ->disableOriginalConstructor() + ->setMethods([$method]) + ->getMock(); + $awsClient + ->expects($this->never()) + ->method($method) + ; + + $client = new SqsClient($awsClient); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Cannot send message to another region because transport is configured with single aws client'); + $client->{$method}($args); + } + + /** + * @dataProvider provideApiCallsSingleClient + */ + public function testApiCallWithMultiClientAndEmptyCustomRegion(string $method, array $args, array $result, string $awsClientClass) + { + $expectedArgs = $args; + $args['@region'] = ''; + + $awsClient = $this->getMockBuilder($awsClientClass) + ->disableOriginalConstructor() + ->setMethods([$method]) + ->getMock(); + $awsClient + ->expects($this->once()) + ->method($method) + ->with($this->identicalTo($expectedArgs)) + ->willReturn(new Result($result)); + + $client = new SqsClient($awsClient); + + $actualResult = $client->{$method}($args); + + $this->assertInstanceOf(Result::class, $actualResult); + $this->assertSame($result, $actualResult->toArray()); + } + + public function provideApiCallsSingleClient() + { + yield [ + 'deleteMessage', + ['fooArg' => 'fooArgVal'], + ['bar' => 'barVal'], + AwsSqsClient::class, + ]; + + yield [ + 'receiveMessage', + ['fooArg' => 'fooArgVal'], + ['bar' => 'barVal'], + AwsSqsClient::class, + ]; + + yield [ + 'purgeQueue', + ['fooArg' => 'fooArgVal'], + ['bar' => 'barVal'], + AwsSqsClient::class, + ]; + + yield [ + 'getQueueUrl', + ['fooArg' => 'fooArgVal'], + ['bar' => 'barVal'], + AwsSqsClient::class, + ]; + + yield [ + 'createQueue', + ['fooArg' => 'fooArgVal'], + ['bar' => 'barVal'], + AwsSqsClient::class, + ]; + + yield [ + 'deleteQueue', + ['fooArg' => 'fooArgVal'], + ['bar' => 'barVal'], + AwsSqsClient::class, + ]; + + yield [ + 'sendMessage', + ['fooArg' => 'fooArgVal'], + ['bar' => 'barVal'], + AwsSqsClient::class, + ]; + } + + public function provideApiCallsMultipleClient() + { + yield [ + 'deleteMessage', + ['fooArg' => 'fooArgVal'], + ['bar' => 'barVal'], + MultiRegionClient::class, + ]; + + yield [ + 'receiveMessage', + ['fooArg' => 'fooArgVal'], + ['bar' => 'barVal'], + MultiRegionClient::class, + ]; + + yield [ + 'purgeQueue', + ['fooArg' => 'fooArgVal'], + ['bar' => 'barVal'], + MultiRegionClient::class, + ]; + + yield [ + 'getQueueUrl', + ['fooArg' => 'fooArgVal'], + ['bar' => 'barVal'], + MultiRegionClient::class, + ]; + + yield [ + 'createQueue', + ['fooArg' => 'fooArgVal'], + ['bar' => 'barVal'], + MultiRegionClient::class, + ]; + + yield [ + 'deleteQueue', + ['fooArg' => 'fooArgVal'], + ['bar' => 'barVal'], + MultiRegionClient::class, + ]; + + yield [ + 'sendMessage', + ['fooArg' => 'fooArgVal'], + ['bar' => 'barVal'], + MultiRegionClient::class, + ]; + } +} diff --git a/pkg/sqs/Tests/SqsConnectionFactoryConfigTest.php b/pkg/sqs/Tests/SqsConnectionFactoryConfigTest.php index f2ad15948..375b0d03a 100644 --- a/pkg/sqs/Tests/SqsConnectionFactoryConfigTest.php +++ b/pkg/sqs/Tests/SqsConnectionFactoryConfigTest.php @@ -63,6 +63,7 @@ public static function provideConfigs() 'version' => '2012-11-05', 'lazy' => true, 'endpoint' => null, + 'queue_owner_aws_account_id' => null, ], ]; @@ -77,6 +78,7 @@ public static function provideConfigs() 'version' => '2012-11-05', 'lazy' => true, 'endpoint' => null, + 'queue_owner_aws_account_id' => null, ], ]; @@ -91,6 +93,7 @@ public static function provideConfigs() 'version' => '2012-11-05', 'lazy' => true, 'endpoint' => null, + 'queue_owner_aws_account_id' => null, ], ]; @@ -105,6 +108,7 @@ public static function provideConfigs() 'version' => '2012-11-05', 'lazy' => false, 'endpoint' => null, + 'queue_owner_aws_account_id' => null, ], ]; @@ -119,6 +123,7 @@ public static function provideConfigs() 'version' => '2012-11-05', 'lazy' => false, 'endpoint' => null, + 'queue_owner_aws_account_id' => null, ], ]; @@ -133,6 +138,7 @@ public static function provideConfigs() 'version' => '2012-11-05', 'lazy' => false, 'endpoint' => null, + 'queue_owner_aws_account_id' => null, ], ]; @@ -153,6 +159,7 @@ public static function provideConfigs() 'version' => '2012-11-05', 'lazy' => false, 'endpoint' => 'http://localstack:1111/', + 'queue_owner_aws_account_id' => null, ], ]; } diff --git a/pkg/sqs/Tests/SqsConnectionFactoryTest.php b/pkg/sqs/Tests/SqsConnectionFactoryTest.php index c4544c9e0..056ecc41b 100644 --- a/pkg/sqs/Tests/SqsConnectionFactoryTest.php +++ b/pkg/sqs/Tests/SqsConnectionFactoryTest.php @@ -2,13 +2,15 @@ namespace Enqueue\Sqs\Tests; -use Aws\Sqs\SqsClient; +use Aws\Sqs\SqsClient as AwsSqsClient; +use Enqueue\Sqs\SqsClient; use Enqueue\Sqs\SqsConnectionFactory; use Enqueue\Sqs\SqsContext; use Enqueue\Test\ClassExtensionTrait; use Interop\Queue\ConnectionFactory; +use PHPUnit\Framework\TestCase; -class SqsConnectionFactoryTest extends \PHPUnit\Framework\TestCase +class SqsConnectionFactoryTest extends TestCase { use ClassExtensionTrait; @@ -30,6 +32,7 @@ public function testCouldBeConstructedWithEmptyConfiguration() 'retries' => 3, 'version' => '2012-11-05', 'endpoint' => null, + 'queue_owner_aws_account_id' => null, ], 'config', $factory); } @@ -46,19 +49,23 @@ public function testCouldBeConstructedWithCustomConfiguration() 'retries' => 3, 'version' => '2012-11-05', 'endpoint' => null, + 'queue_owner_aws_account_id' => null, ], 'config', $factory); } public function testCouldBeConstructedWithClient() { - $client = $this->createMock(SqsClient::class); + $awsClient = $this->createMock(AwsSqsClient::class); - $factory = new SqsConnectionFactory($client); + $factory = new SqsConnectionFactory($awsClient); $context = $factory->createContext(); $this->assertInstanceOf(SqsContext::class, $context); - $this->assertAttributeSame($client, 'client', $context); + + $client = $this->readAttribute($context, 'client'); + $this->assertInstanceOf(SqsClient::class, $client); + $this->assertAttributeSame($awsClient, 'inputClient', $client); } public function testShouldCreateLazyContext() @@ -69,7 +76,8 @@ public function testShouldCreateLazyContext() $this->assertInstanceOf(SqsContext::class, $context); - $this->assertAttributeEquals(null, 'client', $context); - $this->assertInternalType('callable', $this->readAttribute($context, 'clientFactory')); + $client = $this->readAttribute($context, 'client'); + $this->assertInstanceOf(SqsClient::class, $client); + $this->assertAttributeInstanceOf(\Closure::class, 'inputClient', $client); } } diff --git a/pkg/sqs/Tests/SqsConsumerTest.php b/pkg/sqs/Tests/SqsConsumerTest.php index c25782682..5d1f1cdeb 100644 --- a/pkg/sqs/Tests/SqsConsumerTest.php +++ b/pkg/sqs/Tests/SqsConsumerTest.php @@ -3,7 +3,7 @@ namespace Enqueue\Sqs\Tests; use Aws\Result; -use Aws\Sqs\SqsClient; +use Enqueue\Sqs\SqsClient; use Enqueue\Sqs\SqsConsumer; use Enqueue\Sqs\SqsContext; use Enqueue\Sqs\SqsDestination; @@ -53,13 +53,17 @@ public function testCouldAcknowledgeMessage() $client ->expects($this->once()) ->method('deleteMessage') - ->with($this->identicalTo(['QueueUrl' => 'theQueueUrl', 'ReceiptHandle' => 'theReceipt'])) + ->with($this->identicalTo([ + '@region' => null, + 'QueueUrl' => 'theQueueUrl', + 'ReceiptHandle' => 'theReceipt', + ])) ; $context = $this->createContextMock(); $context ->expects($this->once()) - ->method('getClient') + ->method('getSqsClient') ->willReturn($client) ; $context @@ -75,6 +79,41 @@ public function testCouldAcknowledgeMessage() $consumer->acknowledge($message); } + public function testCouldAcknowledgeMessageWithCustomRegion() + { + $client = $this->createSqsClientMock(); + $client + ->expects($this->once()) + ->method('deleteMessage') + ->with($this->identicalTo([ + '@region' => 'theRegion', + 'QueueUrl' => 'theQueueUrl', + 'ReceiptHandle' => 'theReceipt', + ])) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getSqsClient') + ->willReturn($client) + ; + $context + ->expects($this->once()) + ->method('getQueueUrl') + ->willReturn('theQueueUrl') + ; + + $message = new SqsMessage(); + $message->setReceiptHandle('theReceipt'); + + $destination = new SqsDestination('queue'); + $destination->setRegion('theRegion'); + + $consumer = new SqsConsumer($context, $destination); + $consumer->acknowledge($message); + } + public function testRejectShouldThrowIfInstanceOfMessageIsInvalid() { $this->expectException(InvalidMessageException::class); @@ -90,13 +129,17 @@ public function testShouldRejectMessage() $client ->expects($this->once()) ->method('deleteMessage') - ->with($this->identicalTo(['QueueUrl' => 'theQueueUrl', 'ReceiptHandle' => 'theReceipt'])) + ->with($this->identicalTo([ + '@region' => null, + 'QueueUrl' => 'theQueueUrl', + 'ReceiptHandle' => 'theReceipt', + ])) ; $context = $this->createContextMock(); $context ->expects($this->once()) - ->method('getClient') + ->method('getSqsClient') ->willReturn($client) ; $context @@ -116,13 +159,56 @@ public function testShouldRejectMessage() $consumer->reject($message); } + public function testShouldRejectMessageWithCustomRegion() + { + $client = $this->createSqsClientMock(); + $client + ->expects($this->once()) + ->method('deleteMessage') + ->with($this->identicalTo([ + '@region' => 'theRegion', + 'QueueUrl' => 'theQueueUrl', + 'ReceiptHandle' => 'theReceipt', + ])) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getSqsClient') + ->willReturn($client) + ; + $context + ->expects($this->once()) + ->method('getQueueUrl') + ->willReturn('theQueueUrl') + ; + $context + ->expects($this->never()) + ->method('createProducer') + ; + + $message = new SqsMessage(); + $message->setReceiptHandle('theReceipt'); + + $destination = new SqsDestination('queue'); + $destination->setRegion('theRegion'); + + $consumer = new SqsConsumer($context, $destination); + $consumer->reject($message); + } + public function testShouldRejectMessageAndRequeue() { $client = $this->createSqsClientMock(); $client ->expects($this->once()) ->method('deleteMessage') - ->with($this->identicalTo(['QueueUrl' => 'theQueueUrl', 'ReceiptHandle' => 'theReceipt'])) + ->with($this->identicalTo([ + '@region' => null, + 'QueueUrl' => 'theQueueUrl', + 'ReceiptHandle' => 'theReceipt', + ])) ; $message = new SqsMessage(); @@ -140,7 +226,7 @@ public function testShouldRejectMessageAndRequeue() $context = $this->createContextMock(); $context ->expects($this->once()) - ->method('getClient') + ->method('getSqsClient') ->willReturn($client) ; $context @@ -161,6 +247,7 @@ public function testShouldRejectMessageAndRequeue() public function testShouldReceiveMessage() { $expectedAttributes = [ + '@region' => null, 'AttributeNames' => ['All'], 'MessageAttributeNames' => ['All'], 'MaxNumberOfMessages' => 1, @@ -193,7 +280,7 @@ public function testShouldReceiveMessage() $context = $this->createContextMock(); $context ->expects($this->once()) - ->method('getClient') + ->method('getSqsClient') ->willReturn($client) ; $context @@ -218,9 +305,67 @@ public function testShouldReceiveMessage() $this->assertEquals('The Receipt', $result->getReceiptHandle()); } + public function testShouldReceiveMessageWithCustomRegion() + { + $expectedAttributes = [ + '@region' => 'theRegion', + 'AttributeNames' => ['All'], + 'MessageAttributeNames' => ['All'], + 'MaxNumberOfMessages' => 1, + 'QueueUrl' => 'theQueueUrl', + 'WaitTimeSeconds' => 0, + ]; + + $client = $this->createSqsClientMock(); + $client + ->expects($this->once()) + ->method('receiveMessage') + ->with($this->identicalTo($expectedAttributes)) + ->willReturn(new Result(['Messages' => [[ + 'Body' => 'The Body', + 'ReceiptHandle' => 'The Receipt', + 'Attributes' => [ + 'ApproximateReceiveCount' => 3, + ], + 'MessageAttributes' => [ + 'Headers' => [ + 'StringValue' => json_encode([['hkey' => 'hvalue'], ['key' => 'value']]), + 'DataType' => 'String', + ], + ], + ]]])) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getSqsClient') + ->willReturn($client) + ; + $context + ->expects($this->once()) + ->method('getQueueUrl') + ->willReturn('theQueueUrl') + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn(new SqsMessage()) + ; + + $destination = new SqsDestination('queue'); + $destination->setRegion('theRegion'); + + $consumer = new SqsConsumer($context, $destination); + $result = $consumer->receiveNoWait(); + + $this->assertInstanceOf(SqsMessage::class, $result); + } + public function testShouldReturnNullIfThereIsNoNewMessage() { $expectedAttributes = [ + '@region' => null, 'AttributeNames' => ['All'], 'MessageAttributeNames' => ['All'], 'MaxNumberOfMessages' => 1, @@ -239,7 +384,7 @@ public function testShouldReturnNullIfThereIsNoNewMessage() $context = $this->createContextMock(); $context ->expects($this->once()) - ->method('getClient') + ->method('getSqsClient') ->willReturn($client) ; $context @@ -261,7 +406,7 @@ public function testShouldReturnNullIfThereIsNoNewMessage() /** * @return \PHPUnit_Framework_MockObject_MockObject|SqsProducer */ - private function createProducerMock() + private function createProducerMock(): SqsProducer { return $this->createMock(SqsProducer::class); } @@ -269,19 +414,15 @@ private function createProducerMock() /** * @return \PHPUnit_Framework_MockObject_MockObject|SqsClient */ - private function createSqsClientMock() + private function createSqsClientMock(): SqsClient { - return $this->getMockBuilder(SqsClient::class) - ->disableOriginalConstructor() - ->setMethods(['deleteMessage', 'receiveMessage']) - ->getMock() - ; + return $this->createMock(SqsClient::class); } /** * @return \PHPUnit_Framework_MockObject_MockObject|SqsContext */ - private function createContextMock() + private function createContextMock(): SqsContext { return $this->createMock(SqsContext::class); } diff --git a/pkg/sqs/Tests/SqsContextTest.php b/pkg/sqs/Tests/SqsContextTest.php index f6567d870..53dfc4358 100644 --- a/pkg/sqs/Tests/SqsContextTest.php +++ b/pkg/sqs/Tests/SqsContextTest.php @@ -3,7 +3,7 @@ namespace Enqueue\Sqs\Tests; use Aws\Result; -use Aws\Sqs\SqsClient; +use Enqueue\Sqs\SqsClient; use Enqueue\Sqs\SqsConsumer; use Enqueue\Sqs\SqsContext; use Enqueue\Sqs\SqsDestination; @@ -14,8 +14,9 @@ use Interop\Queue\Exception\InvalidDestinationException; use Interop\Queue\Exception\TemporaryQueueNotSupportedException; use Interop\Queue\Queue; +use PHPUnit\Framework\TestCase; -class SqsContextTest extends \PHPUnit\Framework\TestCase +class SqsContextTest extends TestCase { use ClassExtensionTrait; @@ -26,26 +27,12 @@ public function testShouldImplementContextInterface() public function testCouldBeConstructedWithSqsClientAsFirstArgument() { - new SqsContext($this->createSqsClientMock()); - } - - public function testCouldBeConstructedWithSqsClientFactoryAsFirstArgument() - { - new SqsContext(function () { - return $this->createSqsClientMock(); - }); - } - - public function testThrowIfNeitherSqsClientNorFactoryGiven() - { - $this->expectException(\InvalidArgumentException::class); - $this->expectExceptionMessage('The $client argument must be either Aws\Sqs\SqsClient or callable that returns Aws\Sqs\SqsClient once called.'); - new SqsContext(new \stdClass()); + new SqsContext($this->createSqsClientMock(), []); } public function testShouldAllowCreateEmptyMessage() { - $context = new SqsContext($this->createSqsClientMock()); + $context = new SqsContext($this->createSqsClientMock(), []); $message = $context->createMessage(); @@ -58,7 +45,7 @@ public function testShouldAllowCreateEmptyMessage() public function testShouldAllowCreateCustomMessage() { - $context = new SqsContext($this->createSqsClientMock()); + $context = new SqsContext($this->createSqsClientMock(), []); $message = $context->createMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']); @@ -71,7 +58,9 @@ public function testShouldAllowCreateCustomMessage() public function testShouldCreateQueue() { - $context = new SqsContext($this->createSqsClientMock()); + $context = new SqsContext($this->createSqsClientMock(), [ + 'queue_owner_aws_account_id' => null, + ]); $queue = $context->createQueue('aQueue'); @@ -81,7 +70,9 @@ public function testShouldCreateQueue() public function testShouldAllowCreateTopic() { - $context = new SqsContext($this->createSqsClientMock()); + $context = new SqsContext($this->createSqsClientMock(), [ + 'queue_owner_aws_account_id' => null, + ]); $topic = $context->createTopic('aTopic'); @@ -91,7 +82,7 @@ public function testShouldAllowCreateTopic() public function testThrowNotImplementedOnCreateTmpQueueCall() { - $context = new SqsContext($this->createSqsClientMock()); + $context = new SqsContext($this->createSqsClientMock(), []); $this->expectException(TemporaryQueueNotSupportedException::class); @@ -100,7 +91,7 @@ public function testThrowNotImplementedOnCreateTmpQueueCall() public function testShouldCreateProducer() { - $context = new SqsContext($this->createSqsClientMock()); + $context = new SqsContext($this->createSqsClientMock(), []); $producer = $context->createProducer(); @@ -109,7 +100,7 @@ public function testShouldCreateProducer() public function testShouldThrowIfNotSqsDestinationGivenOnCreateConsumer() { - $context = new SqsContext($this->createSqsClientMock()); + $context = new SqsContext($this->createSqsClientMock(), []); $this->expectException(InvalidDestinationException::class); $this->expectExceptionMessage('The destination must be an instance of Enqueue\Sqs\SqsDestination but got Mock_Queue'); @@ -119,7 +110,9 @@ public function testShouldThrowIfNotSqsDestinationGivenOnCreateConsumer() public function testShouldCreateConsumer() { - $context = new SqsContext($this->createSqsClientMock()); + $context = new SqsContext($this->createSqsClientMock(), [ + 'queue_owner_aws_account_id' => null, + ]); $queue = $context->createQueue('aQueue'); @@ -134,24 +127,57 @@ public function testShouldAllowDeclareQueue() $sqsClient ->expects($this->once()) ->method('createQueue') - ->with($this->identicalTo(['Attributes' => [], 'QueueName' => 'aQueueName'])) + ->with($this->identicalTo([ + '@region' => null, + 'Attributes' => [], + 'QueueName' => 'aQueueName', + ])) ->willReturn(new Result(['QueueUrl' => 'theQueueUrl'])) ; - $context = new SqsContext($sqsClient); + $context = new SqsContext($sqsClient, [ + 'queue_owner_aws_account_id' => null, + ]); $queue = $context->createQueue('aQueueName'); $context->declareQueue($queue); } + public function testShouldAllowDeclareQueueWithCustomRegion() + { + $sqsClient = $this->createSqsClientMock(); + $sqsClient + ->expects($this->once()) + ->method('createQueue') + ->with($this->identicalTo([ + '@region' => 'theRegion', + 'Attributes' => [], + 'QueueName' => 'aQueueName', + ])) + ->willReturn(new Result(['QueueUrl' => 'theQueueUrl'])) + ; + + $context = new SqsContext($sqsClient, [ + 'queue_owner_aws_account_id' => null, + ]); + + $queue = $context->createQueue('aQueueName'); + $queue->setRegion('theRegion'); + + $context->declareQueue($queue); + } + public function testShouldAllowDeleteQueue() { $sqsClient = $this->createSqsClientMock(); $sqsClient ->expects($this->once()) ->method('getQueueUrl') - ->with($this->identicalTo(['QueueName' => 'aQueueName'])) + ->with($this->identicalTo([ + '@region' => null, + 'QueueName' => 'aQueueName', + ])) ->willReturn(new Result(['QueueUrl' => 'theQueueUrl'])) ; $sqsClient @@ -161,62 +187,211 @@ public function testShouldAllowDeleteQueue() ->willReturn(new Result()) ; - $context = new SqsContext($sqsClient); + $context = new SqsContext($sqsClient, [ + 'queue_owner_aws_account_id' => null, + ]); $queue = $context->createQueue('aQueueName'); $context->deleteQueue($queue); } + public function testShouldAllowDeleteQueueWithCustomRegion() + { + $sqsClient = $this->createSqsClientMock(); + $sqsClient + ->expects($this->once()) + ->method('getQueueUrl') + ->with($this->identicalTo([ + '@region' => 'theRegion', + 'QueueName' => 'aQueueName', + ])) + ->willReturn(new Result(['QueueUrl' => 'theQueueUrl'])) + ; + $sqsClient + ->expects($this->once()) + ->method('deleteQueue') + ->with($this->identicalTo(['QueueUrl' => 'theQueueUrl'])) + ->willReturn(new Result()) + ; + + $context = new SqsContext($sqsClient, [ + 'queue_owner_aws_account_id' => null, + ]); + + $queue = $context->createQueue('aQueueName'); + $queue->setRegion('theRegion'); + + $context->deleteQueue($queue); + } + public function testShouldAllowPurgeQueue() { $sqsClient = $this->createSqsClientMock(); $sqsClient ->expects($this->once()) ->method('getQueueUrl') - ->with($this->identicalTo(['QueueName' => 'aQueueName'])) + ->with($this->identicalTo([ + '@region' => null, + 'QueueName' => 'aQueueName', + ])) ->willReturn(new Result(['QueueUrl' => 'theQueueUrl'])) ; $sqsClient ->expects($this->once()) ->method('purgeQueue') - ->with($this->identicalTo(['QueueUrl' => 'theQueueUrl'])) + ->with($this->identicalTo([ + '@region' => null, + 'QueueUrl' => 'theQueueUrl', + ])) ->willReturn(new Result()) ; - $context = new SqsContext($sqsClient); + $context = new SqsContext($sqsClient, [ + 'queue_owner_aws_account_id' => null, + ]); $queue = $context->createQueue('aQueueName'); $context->purgeQueue($queue); } + public function testShouldAllowPurgeQueueWithCustomRegion() + { + $sqsClient = $this->createSqsClientMock(); + $sqsClient + ->expects($this->once()) + ->method('getQueueUrl') + ->with($this->identicalTo([ + '@region' => 'theRegion', + 'QueueName' => 'aQueueName', + ])) + ->willReturn(new Result(['QueueUrl' => 'theQueueUrl'])) + ; + $sqsClient + ->expects($this->once()) + ->method('purgeQueue') + ->with($this->identicalTo([ + '@region' => 'theRegion', + 'QueueUrl' => 'theQueueUrl', + ])) + ->willReturn(new Result()) + ; + + $context = new SqsContext($sqsClient, [ + 'queue_owner_aws_account_id' => null, + ]); + + $queue = $context->createQueue('aQueueName'); + $queue->setRegion('theRegion'); + + $context->purgeQueue($queue); + } + public function testShouldAllowGetQueueUrl() { $sqsClient = $this->createSqsClientMock(); $sqsClient ->expects($this->once()) ->method('getQueueUrl') - ->with($this->identicalTo(['QueueName' => 'aQueueName'])) + ->with($this->identicalTo([ + '@region' => null, + 'QueueName' => 'aQueueName', + ])) + ->willReturn(new Result(['QueueUrl' => 'theQueueUrl'])) + ; + + $context = new SqsContext($sqsClient, [ + 'queue_owner_aws_account_id' => null, + ]); + + $context->getQueueUrl(new SqsDestination('aQueueName')); + } + + public function testShouldAllowGetQueueUrlWithCustomRegion() + { + $sqsClient = $this->createSqsClientMock(); + $sqsClient + ->expects($this->once()) + ->method('getQueueUrl') + ->with($this->identicalTo([ + '@region' => 'theRegion', + 'QueueName' => 'aQueueName', + ])) + ->willReturn(new Result(['QueueUrl' => 'theQueueUrl'])) + ; + + $context = new SqsContext($sqsClient, [ + 'queue_owner_aws_account_id' => null, + ]); + + $queue = new SqsDestination('aQueueName'); + $queue->setRegion('theRegion'); + + $context->getQueueUrl($queue); + } + + public function testShouldAllowGetQueueUrlFromAnotherAWSAccountSetGlobally() + { + $sqsClient = $this->createSqsClientMock(); + $sqsClient + ->expects($this->once()) + ->method('getQueueUrl') + ->with($this->identicalTo([ + '@region' => null, + 'QueueName' => 'aQueueName', + 'QueueOwnerAWSAccountId' => 'anotherAWSAccountID', + ])) ->willReturn(new Result(['QueueUrl' => 'theQueueUrl'])) ; - $context = new SqsContext($sqsClient); + $context = new SqsContext($sqsClient, [ + 'queue_owner_aws_account_id' => 'anotherAWSAccountID', + ]); $context->getQueueUrl(new SqsDestination('aQueueName')); } + public function testShouldAllowGetQueueUrlFromAnotherAWSAccountSetPerQueue() + { + $sqsClient = $this->createSqsClientMock(); + $sqsClient + ->expects($this->once()) + ->method('getQueueUrl') + ->with($this->identicalTo([ + '@region' => null, + 'QueueName' => 'aQueueName', + 'QueueOwnerAWSAccountId' => 'anotherAWSAccountID', + ])) + ->willReturn(new Result(['QueueUrl' => 'theQueueUrl'])) + ; + + $context = new SqsContext($sqsClient, [ + 'queue_owner_aws_account_id' => null, + ]); + + $queue = new SqsDestination('aQueueName'); + $queue->setQueueOwnerAWSAccountId('anotherAWSAccountID'); + + $context->getQueueUrl($queue); + } + public function testShouldThrowExceptionIfGetQueueUrlResultHasNoQueueUrlProperty() { $sqsClient = $this->createSqsClientMock(); $sqsClient ->expects($this->once()) ->method('getQueueUrl') - ->with($this->identicalTo(['QueueName' => 'aQueueName'])) + ->with($this->identicalTo([ + '@region' => null, + 'QueueName' => 'aQueueName', + ])) ->willReturn(new Result([])) ; - $context = new SqsContext($sqsClient); + $context = new SqsContext($sqsClient, [ + 'queue_owner_aws_account_id' => null, + ]); $this->expectException(\RuntimeException::class); $this->expectExceptionMessage('QueueUrl cannot be resolved. queueName: "aQueueName"'); @@ -227,12 +402,8 @@ public function testShouldThrowExceptionIfGetQueueUrlResultHasNoQueueUrlProperty /** * @return \PHPUnit_Framework_MockObject_MockObject|SqsClient */ - private function createSqsClientMock() + private function createSqsClientMock(): SqsClient { - return $this->getMockBuilder(SqsClient::class) - ->disableOriginalConstructor() - ->setMethods(['deleteQueue', 'purgeQueue', 'createQueue', 'getQueueUrl']) - ->getMock() - ; + return $this->createMock(SqsClient::class); } } diff --git a/pkg/sqs/Tests/SqsProducerTest.php b/pkg/sqs/Tests/SqsProducerTest.php index fe5e9428b..a33b98f76 100644 --- a/pkg/sqs/Tests/SqsProducerTest.php +++ b/pkg/sqs/Tests/SqsProducerTest.php @@ -3,7 +3,7 @@ namespace Enqueue\Sqs\Tests; use Aws\Result; -use Aws\Sqs\SqsClient; +use Enqueue\Sqs\SqsClient; use Enqueue\Sqs\SqsContext; use Enqueue\Sqs\SqsDestination; use Enqueue\Sqs\SqsMessage; @@ -68,7 +68,7 @@ public function testShouldThrowIfSendMessageFailed() ; $context ->expects($this->once()) - ->method('getClient') + ->method('getSqsClient') ->will($this->returnValue($client)) ; @@ -85,6 +85,7 @@ public function testShouldThrowIfSendMessageFailed() public function testShouldSendMessage() { $expectedArguments = [ + '@region' => null, 'MessageAttributes' => [ 'Headers' => [ 'DataType' => 'String', @@ -103,7 +104,7 @@ public function testShouldSendMessage() ->expects($this->once()) ->method('sendMessage') ->with($this->identicalTo($expectedArguments)) - ->willReturn(new Result()) + ->willReturn(new Result(['MessageId' => 'theMessageId'])) ; $context = $this->createSqsContextMock(); @@ -114,7 +115,7 @@ public function testShouldSendMessage() ; $context ->expects($this->once()) - ->method('getClient') + ->method('getSqsClient') ->will($this->returnValue($client)) ; @@ -124,8 +125,48 @@ public function testShouldSendMessage() $message->setMessageDeduplicationId('theDeduplicationId'); $message->setMessageGroupId('groupId'); - $this->expectException(\RuntimeException::class); - $this->expectExceptionMessage('Message was not sent'); + $producer = new SqsProducer($context); + $producer->send($destination, $message); + } + + public function testShouldSendMessageWithCustomRegion() + { + $expectedArguments = [ + '@region' => 'theRegion', + 'MessageAttributes' => [ + 'Headers' => [ + 'DataType' => 'String', + 'StringValue' => '[[],[]]', + ], + ], + 'MessageBody' => 'theBody', + 'QueueUrl' => 'theQueueUrl', + ]; + + $client = $this->createSqsClientMock(); + $client + ->expects($this->once()) + ->method('sendMessage') + ->with($this->identicalTo($expectedArguments)) + ->willReturn(new Result(['MessageId' => 'theMessageId'])) + ; + + $context = $this->createSqsContextMock(); + $context + ->expects($this->once()) + ->method('getQueueUrl') + ->willReturn('theQueueUrl') + ; + $context + ->expects($this->once()) + ->method('getSqsClient') + ->will($this->returnValue($client)) + ; + + $destination = new SqsDestination('queue-name'); + $destination->setRegion('theRegion'); + + $message = new SqsMessage('theBody'); $producer = new SqsProducer($context); $producer->send($destination, $message); @@ -134,6 +175,7 @@ public function testShouldSendMessage() public function testShouldSendDelayedMessage() { $expectedArguments = [ + '@region' => null, 'MessageAttributes' => [ 'Headers' => [ 'DataType' => 'String', @@ -152,7 +194,7 @@ public function testShouldSendDelayedMessage() ->expects($this->once()) ->method('sendMessage') ->with($this->identicalTo($expectedArguments)) - ->willReturn(new Result()) + ->willReturn(new Result(['MessageId' => 'theMessageId'])) ; $context = $this->createSqsContextMock(); @@ -163,7 +205,7 @@ public function testShouldSendDelayedMessage() ; $context ->expects($this->once()) - ->method('getClient') + ->method('getSqsClient') ->will($this->returnValue($client)) ; @@ -173,9 +215,6 @@ public function testShouldSendDelayedMessage() $message->setMessageDeduplicationId('theDeduplicationId'); $message->setMessageGroupId('groupId'); - $this->expectException(\RuntimeException::class); - $this->expectExceptionMessage('Message was not sent'); - $producer = new SqsProducer($context); $producer->setDeliveryDelay(5000); $producer->send($destination, $message); @@ -184,7 +223,7 @@ public function testShouldSendDelayedMessage() /** * @return \PHPUnit_Framework_MockObject_MockObject|SqsContext */ - private function createSqsContextMock() + private function createSqsContextMock(): SqsContext { return $this->createMock(SqsContext::class); } @@ -192,13 +231,8 @@ private function createSqsContextMock() /** * @return \PHPUnit_Framework_MockObject_MockObject|SqsClient */ - private function createSqsClientMock() + private function createSqsClientMock(): SqsClient { - return $this - ->getMockBuilder(SqsClient::class) - ->disableOriginalConstructor() - ->setMethods(['sendMessage']) - ->getMock() - ; + return $this->createMock(SqsClient::class); } } diff --git a/pkg/sqs/composer.json b/pkg/sqs/composer.json index ec8388c4f..0022bf72f 100644 --- a/pkg/sqs/composer.json +++ b/pkg/sqs/composer.json @@ -7,7 +7,7 @@ "license": "MIT", "require": { "php": "^7.1.3", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "enqueue/dsn": "^0.9", "aws/aws-sdk-php": "~3.26" }, diff --git a/pkg/stomp/composer.json b/pkg/stomp/composer.json index 4efde62d7..0f667a8ae 100644 --- a/pkg/stomp/composer.json +++ b/pkg/stomp/composer.json @@ -9,7 +9,7 @@ "php": "^7.1.3", "enqueue/dsn": "^0.9", "stomp-php/stomp-php": "^4", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "php-http/guzzle6-adapter": "^1.1", "php-http/client-common": "^1.7@dev", "richardfullmer/rabbitmq-management-api": "^2.0" diff --git a/pkg/wamp/composer.json b/pkg/wamp/composer.json index 7c2f15e64..28c2c7065 100644 --- a/pkg/wamp/composer.json +++ b/pkg/wamp/composer.json @@ -7,7 +7,7 @@ "license": "MIT", "require": { "php": "^7.1.3", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "enqueue/dsn": "^0.9", "thruway/pawl-transport": "^0.5.0", "voryx/thruway": "^0.5.3"