diff --git a/CHANGELOG.md b/CHANGELOG.md index ea5b4919c..a59e836fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,21 @@ # Change Log +## [0.8.34](https://github.com/php-enqueue/enqueue-dev/tree/0.8.34) (2018-08-04) +[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.8.33...0.8.34) + +- \[sqs\] Messages should not allow empty bodies [\#435](https://github.com/php-enqueue/enqueue-dev/issues/435) +- Use the auto-tagging feature for e.g. processors [\#405](https://github.com/php-enqueue/enqueue-dev/issues/405) + +- \[simple-client\] `sqs:` DSN not working [\#483](https://github.com/php-enqueue/enqueue-dev/issues/483) + +- Adding a signal handler to the consumer [\#485](https://github.com/php-enqueue/enqueue-dev/issues/485) +- Problem with SQS DSN string with + in secret [\#481](https://github.com/php-enqueue/enqueue-dev/issues/481) +- Monitoring interface [\#476](https://github.com/php-enqueue/enqueue-dev/issues/476) + +- simple client dsn issue [\#486](https://github.com/php-enqueue/enqueue-dev/pull/486) ([makasim](https://github.com/makasim)) +- Update SQS DSN doc sample with mention urlencode [\#484](https://github.com/php-enqueue/enqueue-dev/pull/484) ([dgoujard](https://github.com/dgoujard)) +- Prevent SqsProducer from sending messages with empty bodies [\#478](https://github.com/php-enqueue/enqueue-dev/pull/478) ([elazar](https://github.com/elazar)) + ## [0.8.33](https://github.com/php-enqueue/enqueue-dev/tree/0.8.33) (2018-07-26) [Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.8.32...0.8.33) @@ -13,6 +29,7 @@ - \[sqs\] Support using a pre-configured SqsClient [\#443](https://github.com/php-enqueue/enqueue-dev/issues/443) - IronMQ \(iron.io\) provider ? [\#415](https://github.com/php-enqueue/enqueue-dev/issues/415) +- Fix AMQPContext::unsubscribe [\#479](https://github.com/php-enqueue/enqueue-dev/pull/479) ([adrienbrault](https://github.com/adrienbrault)) - Add Localstack Docker container for SQS functional tests [\#473](https://github.com/php-enqueue/enqueue-dev/pull/473) ([elazar](https://github.com/elazar)) - \[consumption\] add process niceness extension [\#467](https://github.com/php-enqueue/enqueue-dev/pull/467) ([ramunasd](https://github.com/ramunasd)) @@ -20,6 +37,7 @@ [Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.8.31...0.8.32) - \[Bundle\] auto-tag services [\#409](https://github.com/php-enqueue/enqueue-dev/issues/409) +- Add autoconfigure for services extending PsrProcess interface [\#452](https://github.com/php-enqueue/enqueue-dev/pull/452) ([mnavarrocarter](https://github.com/mnavarrocarter)) - Add documentation the processor services need to be public [\#406](https://github.com/php-enqueue/enqueue-dev/issues/406) diff --git a/docs/transport/sqs.md b/docs/transport/sqs.md index 9c688f8ec..8f447d40d 100644 --- a/docs/transport/sqs.md +++ b/docs/transport/sqs.md @@ -29,7 +29,7 @@ $factory = new SqsConnectionFactory([ 'region' => 'aRegion', ]); -// same as above but given as DSN string +// same as above but given as DSN string. You may need to url encode secret if it contains special char (like +) $factory = new SqsConnectionFactory('sqs:?key=aKey&secret=aSecret®ion=aRegion'); $psrContext = $factory->createContext(); diff --git a/pkg/amqp-ext/AmqpContext.php b/pkg/amqp-ext/AmqpContext.php index 140bda466..c07ef4485 100644 --- a/pkg/amqp-ext/AmqpContext.php +++ b/pkg/amqp-ext/AmqpContext.php @@ -315,7 +315,7 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback) $consumerTag = $extQueue->getConsumerTag(); $consumer->setConsumerTag($consumerTag); - $this->subscribers[$consumerTag] = [$consumer, $callback]; + $this->subscribers[$consumerTag] = [$consumer, $callback, $extQueue]; } /** @@ -327,15 +327,13 @@ public function unsubscribe(InteropAmqpConsumer $consumer) return; } - // seg fault -// $consumerTag = $consumer->getConsumerTag(); -// $consumer->setConsumerTag(null); -// -// $extQueue = new \AMQPQueue($this->getExtChannel()); -// $extQueue->setName($consumer->getQueue()->getQueueName()); -// -// $extQueue->cancel($consumerTag); -// unset($this->subscribers[$consumerTag]); + $consumerTag = $consumer->getConsumerTag(); + $consumer->setConsumerTag(null); + + list($consumer, $callback, $extQueue) = $this->subscribers[$consumerTag]; + + $extQueue->cancel($consumerTag); + unset($this->subscribers[$consumerTag]); } /** diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index 7197d8522..15cd06dd0 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -3,7 +3,9 @@ namespace Enqueue\Bundle\DependencyInjection; use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncEventDispatcherExtension; +use Enqueue\Client\CommandSubscriberInterface; use Enqueue\Client\Producer; +use Enqueue\Client\TopicSubscriberInterface; use Enqueue\Client\TraceableProducer; use Enqueue\Consumption\QueueConsumer; use Enqueue\JobQueue\Job; @@ -72,6 +74,8 @@ public function load(array $configs, ContainerBuilder $container) $loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config')); $loader->load('services.yml'); + $this->setupAutowiringForProcessors($container); + foreach ($config['transport'] as $name => $transportConfig) { $this->factories[$name]->createConnectionFactory($container, $transportConfig); $this->factories[$name]->createContext($container, $transportConfig); @@ -220,4 +224,19 @@ private function registerJobQueueDoctrineEntityMapping(ContainerBuilder $contain } } } + + private function setupAutowiringForProcessors(ContainerBuilder $container) + { + if (!method_exists($container, 'registerForAutoconfiguration')) { + return; + } + + $container->registerForAutoconfiguration(TopicSubscriberInterface::class) + ->setPublic(true) + ->addTag('enqueue.client.processor'); + + $container->registerForAutoconfiguration(CommandSubscriberInterface::class) + ->setPublic(true) + ->addTag('enqueue.client.processor'); + } } diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php index 9f53930ee..4a59dc35b 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php @@ -6,8 +6,10 @@ use Enqueue\Bundle\DependencyInjection\EnqueueExtension; use Enqueue\Bundle\Tests\Unit\Mocks\FooTransportFactory; use Enqueue\Bundle\Tests\Unit\Mocks\TransportFactoryWithoutDriverFactory; +use Enqueue\Client\CommandSubscriberInterface; use Enqueue\Client\Producer; use Enqueue\Client\ProducerInterface; +use Enqueue\Client\TopicSubscriberInterface; use Enqueue\Client\TraceableProducer; use Enqueue\Consumption\QueueConsumer; use Enqueue\JobQueue\JobRunner; @@ -633,6 +635,31 @@ public function testShouldThrowIfPackageShouldBeInstalledToUseTransport() ]], $container); } + public function testShouldLoadProcessAutoconfigureChildDefinition() + { + if (30300 >= Kernel::VERSION_ID) { + $this->markTestSkipped('The autoconfigure feature is available since Symfony 3.3 version'); + } + + $container = $this->getContainerBuilder(true); + $extension = new EnqueueExtension(); + + $extension->load([[ + 'client' => [], + 'transport' => [], + ]], $container); + + $autoconfigured = $container->getAutoconfiguredInstanceof(); + + self::assertArrayHasKey(CommandSubscriberInterface::class, $autoconfigured); + self::assertTrue($autoconfigured[CommandSubscriberInterface::class]->hasTag('enqueue.client.processor')); + self::assertTrue($autoconfigured[CommandSubscriberInterface::class]->isPublic()); + + self::assertArrayHasKey(TopicSubscriberInterface::class, $autoconfigured); + self::assertTrue($autoconfigured[TopicSubscriberInterface::class]->hasTag('enqueue.client.processor')); + self::assertTrue($autoconfigured[TopicSubscriberInterface::class]->isPublic()); + } + /** * @param bool $debug * diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php index 000743259..8114de2e2 100644 --- a/pkg/simple-client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -377,7 +377,7 @@ class_exists(AmqpLibConnectionFactory::class) */ private function buildConfig($config) { - if (is_string($config) && false !== strpos($config, '://')) { + if (is_string($config) && false !== strpos($config, ':')) { $extConfig = [ 'client' => [], 'transport' => [ diff --git a/pkg/simple-client/Tests/SimpleClientTest.php b/pkg/simple-client/Tests/SimpleClientTest.php new file mode 100644 index 000000000..3ff8badf9 --- /dev/null +++ b/pkg/simple-client/Tests/SimpleClientTest.php @@ -0,0 +1,117 @@ +removeQueue('enqueue.app.default'); + } + + public function transportConfigDataProvider() + { + yield 'amqp' => [[ + 'transport' => [ + 'default' => 'amqp', + 'amqp' => [ + 'driver' => 'ext', + 'host' => getenv('RABBITMQ_HOST'), + 'port' => getenv('RABBITMQ_AMQP__PORT'), + 'user' => getenv('RABBITMQ_USER'), + 'pass' => getenv('RABBITMQ_PASSWORD'), + 'vhost' => getenv('RABBITMQ_VHOST'), + ], + ], + ]]; + + yield 'config_as_dsn_string' => [getenv('AMQP_DSN')]; + + yield 'config_as_dsn_without_host' => ['amqp:?lazy=1']; + + yield 'amqp_dsn' => [[ + 'transport' => [ + 'default' => 'amqp', + 'amqp' => getenv('AMQP_DSN'), + ], + ]]; + + yield 'default_amqp_as_dsn' => [[ + 'transport' => [ + 'default' => getenv('AMQP_DSN'), + ], + ]]; + + yield [[ + 'transport' => [ + 'default' => 'rabbitmq_amqp', + 'rabbitmq_amqp' => [ + 'driver' => 'ext', + 'host' => getenv('RABBITMQ_HOST'), + 'port' => getenv('RABBITMQ_AMQP__PORT'), + 'user' => getenv('RABBITMQ_USER'), + 'pass' => getenv('RABBITMQ_PASSWORD'), + 'vhost' => getenv('RABBITMQ_VHOST'), + ], + ], + ]]; + + yield [[ + 'transport' => [ + 'default' => 'rabbitmq_amqp', + 'rabbitmq_amqp' => [ + 'driver' => 'ext', + 'host' => getenv('RABBITMQ_HOST'), + 'port' => getenv('RABBITMQ_AMQP__PORT'), + 'user' => getenv('RABBITMQ_USER'), + 'pass' => getenv('RABBITMQ_PASSWORD'), + 'vhost' => getenv('RABBITMQ_VHOST'), + ], + ], + ]]; + + yield 'mongodb_dsn' => [[ + 'transport' => [ + 'default' => 'mongodb', + 'mongodb' => getenv('MONGO_DSN'), + ], + ]]; + } + + /** + * @dataProvider transportConfigDataProvider + * + * @param mixed $config + */ + public function testProduceAndConsumeOneMessage($config) + { + $actualMessage = null; + + $client = new SimpleClient($config); + $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message) use (&$actualMessage) { + $actualMessage = $message; + + return Result::ACK; + }); + + $this->assertInstanceOf(PsrContext::class, $client->getContext()); + } +} diff --git a/pkg/sqs/SqsProducer.php b/pkg/sqs/SqsProducer.php index 3acdce608..5be9e48d3 100644 --- a/pkg/sqs/SqsProducer.php +++ b/pkg/sqs/SqsProducer.php @@ -42,11 +42,11 @@ public function send(PsrDestination $destination, PsrMessage $message) InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class); $body = $message->getBody(); - if (is_scalar($body) || null === $body) { + if (is_scalar($body) && strlen($body) > 0) { $body = (string) $body; } else { throw new InvalidMessageException(sprintf( - 'The message body must be a scalar or null. Got: %s', + 'The message body must be a non-empty string. Got: %s', is_object($body) ? get_class($body) : gettype($body) )); } diff --git a/pkg/sqs/Tests/SqsProducerTest.php b/pkg/sqs/Tests/SqsProducerTest.php index 445b9cef0..2e37c31c6 100644 --- a/pkg/sqs/Tests/SqsProducerTest.php +++ b/pkg/sqs/Tests/SqsProducerTest.php @@ -31,7 +31,7 @@ public function testCouldBeConstructedWithRequiredArguments() public function testShouldThrowIfBodyOfInvalidType() { $this->expectException(InvalidMessageException::class); - $this->expectExceptionMessage('The message body must be a scalar or null. Got: stdClass'); + $this->expectExceptionMessage('The message body must be a non-empty string. Got: stdClass'); $producer = new SqsProducer($this->createSqsContextMock()); @@ -72,7 +72,7 @@ public function testShouldThrowIfSendMessageFailed() ; $destination = new SqsDestination('queue-name'); - $message = new SqsMessage(); + $message = new SqsMessage('foo'); $this->expectException(\RuntimeException::class); $this->expectExceptionMessage('Message was not sent');