From 04cf80b907432ea98c020bf4be5b3adb4b473c01 Mon Sep 17 00:00:00 2001 From: Max Kotliar Date: Thu, 19 Apr 2018 09:19:18 +0300 Subject: [PATCH 01/21] Update .travis.yml --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 633ddc8be..328164818 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,8 +7,8 @@ language: php matrix: include: - - php: 5.6 - env: SYMFONY_VERSION=2.8.* UNIT_TESTS=true +# - php: 5.6 +# env: SYMFONY_VERSION=2.8.* UNIT_TESTS=true - php: 7.1 env: SYMFONY_VERSION=3.0.* PHPSTAN=true - php: 7.1 From 9adbf015d3be1994ff172939de75cec872d4d866 Mon Sep 17 00:00:00 2001 From: Max Kotliar Date: Fri, 20 Apr 2018 08:30:25 +0300 Subject: [PATCH 02/21] Update README.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 0a915c762..59b0ae68e 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,7 @@ Features: [![Build Status](https://travis-ci.org/php-enqueue/null.png?branch=master)](https://travis-ci.org/php-enqueue/null) [![Total Downloads](https://poser.pugx.org/enqueue/null/d/total.png)](https://packagist.org/packages/enqueue/null) [![Latest Stable Version](https://poser.pugx.org/enqueue/null/version.png)](https://packagist.org/packages/enqueue/null) + * [the others are comming](https://github.com/php-enqueue/enqueue-dev/issues/284) * [Symfony bundle](docs/bundle/quick_tour.md) * [Magento1 extension](docs/magento/quick_tour.md) * [Magento2 module](docs/magento2/quick_tour.md) From 4912bbe3175cb2d6f4c907fdc8bd6b36a4d4f5be Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 15:03:47 +0200 Subject: [PATCH 03/21] Add symfony intergation for kafka transport --- .../Symfony/DefaultTransportFactory.php | 6 + .../Symfony/DefaultTransportFactoryTest.php | 2 + pkg/rdkafka/Client/RdKafkaDriver.php | 166 +++++++++ .../Symfony/RdKafkaTransportFactory.php | 148 ++++++++ .../Tests/Client/RdKafkaDriverTest.php | 340 ++++++++++++++++++ .../RdKafkaConnectionFactoryConfigTest.php | 92 +++++ .../Symfony/RdKafkaTransportFactoryTest.php | 146 ++++++++ pkg/rdkafka/composer.json | 4 +- 8 files changed, 903 insertions(+), 1 deletion(-) create mode 100644 pkg/rdkafka/Client/RdKafkaDriver.php create mode 100644 pkg/rdkafka/Symfony/RdKafkaTransportFactory.php create mode 100644 pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php create mode 100644 pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php create mode 100644 pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php diff --git a/pkg/enqueue/Symfony/DefaultTransportFactory.php b/pkg/enqueue/Symfony/DefaultTransportFactory.php index 20fa1c91a..65b877a30 100644 --- a/pkg/enqueue/Symfony/DefaultTransportFactory.php +++ b/pkg/enqueue/Symfony/DefaultTransportFactory.php @@ -10,6 +10,8 @@ use Enqueue\Gps\Symfony\GpsTransportFactory; use Enqueue\Null\NullConnectionFactory; use Enqueue\Null\Symfony\NullTransportFactory; +use Enqueue\RdKafka\RdKafkaConnectionFactory; +use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory; use Enqueue\Redis\RedisConnectionFactory; use Enqueue\Redis\Symfony\RedisTransportFactory; use Enqueue\Sqs\SqsConnectionFactory; @@ -209,6 +211,10 @@ private function findFactory($dsn) return new StompTransportFactory('default_stomp'); } + if ($factory instanceof RdKafkaConnectionFactory) { + return new RdKafkaTransportFactory('default_kafka'); + } + throw new \LogicException(sprintf( 'There is no supported transport factory for the connection factory "%s" created from DSN "%s"', get_class($factory), diff --git a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php index 9a013fdb9..8f2df545b 100644 --- a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php @@ -289,5 +289,7 @@ public static function provideDSNs() yield ['redis:', 'default_redis']; yield ['stomp:', 'default_stomp']; + + yield ['kafka:', 'default_kafka']; } } diff --git a/pkg/rdkafka/Client/RdKafkaDriver.php b/pkg/rdkafka/Client/RdKafkaDriver.php new file mode 100644 index 000000000..e3450b368 --- /dev/null +++ b/pkg/rdkafka/Client/RdKafkaDriver.php @@ -0,0 +1,166 @@ +context = $context; + $this->config = $config; + $this->queueMetaRegistry = $queueMetaRegistry; + } + + /** + * {@inheritdoc} + */ + public function createTransportMessage(Message $message) + { + $headers = $message->getHeaders(); + $headers['content_type'] = $message->getContentType(); + + $transportMessage = $this->context->createMessage(); + $transportMessage->setBody($message->getBody()); + $transportMessage->setHeaders($headers); + $transportMessage->setProperties($message->getProperties()); + $transportMessage->setMessageId($message->getMessageId()); + $transportMessage->setTimestamp($message->getTimestamp()); + $transportMessage->setReplyTo($message->getReplyTo()); + $transportMessage->setCorrelationId($message->getCorrelationId()); + + return $transportMessage; + } + + /** + * {@inheritdoc} + */ + public function createClientMessage(PsrMessage $message) + { + $clientMessage = new Message(); + $clientMessage->setBody($message->getBody()); + $clientMessage->setHeaders($message->getHeaders()); + $clientMessage->setProperties($message->getProperties()); + + $clientMessage->setTimestamp($message->getTimestamp()); + $clientMessage->setMessageId($message->getMessageId()); + $clientMessage->setReplyTo($message->getReplyTo()); + $clientMessage->setCorrelationId($message->getCorrelationId()); + + if ($contentType = $message->getHeader('content_type')) { + $clientMessage->setContentType($contentType); + } + + if ($expiration = $message->getHeader('expiration')) { + $clientMessage->setExpire($expiration); + } + + if ($delay = $message->getHeader('delay')) { + $clientMessage->setDelay($delay); + } + + if ($priority = $message->getHeader('priority')) { + $clientMessage->setPriority($priority); + } + + return $clientMessage; + } + + /** + * {@inheritdoc} + */ + public function sendToRouter(Message $message) + { + if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) { + throw new \LogicException('Topic name parameter is required but is not set'); + } + + $topic = $this->createRouterTopic(); + $transportMessage = $this->createTransportMessage($message); + + $this->context->createProducer()->send($topic, $transportMessage); + } + + /** + * {@inheritdoc} + */ + public function sendToProcessor(Message $message) + { + if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { + throw new \LogicException('Processor name parameter is required but is not set'); + } + + if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { + throw new \LogicException('Queue name parameter is required but is not set'); + } + + $transportMessage = $this->createTransportMessage($message); + $destination = $this->createQueue($queueName); + + $this->context->createProducer()->send($destination, $transportMessage); + } + + /** + * {@inheritdoc} + */ + public function createQueue($queueName) + { + $transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName(); + + return $this->context->createQueue($transportName); + } + + /** + * {@inheritdoc} + */ + public function setupBroker(LoggerInterface $logger = null) + { + $logger = $logger ?: new NullLogger(); + $logger->debug('[RdKasfkaDriver] setup broker'); + } + + /** + * {@inheritdoc} + */ + public function getConfig() + { + return $this->config; + } + + private function createRouterTopic() + { + $topic = $this->context->createTopic( + $this->config->createTransportRouterTopicName($this->config->getRouterTopicName()) + ); + + return $topic; + } +} \ No newline at end of file diff --git a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php new file mode 100644 index 000000000..26e904631 --- /dev/null +++ b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php @@ -0,0 +1,148 @@ +name = $name; + } + + /** + * @param ArrayNodeDefinition $builder + */ + public function addConfiguration(ArrayNodeDefinition $builder) + { + $builder + ->beforeNormalization() + ->ifString() + ->then(function ($v) { + return ['dsn' => $v]; + }) + ->end() + ->fixXmlConfig('topic') + ->children() + ->scalarNode('dsn') + ->info('The kafka DSN. Other parameters are ignored if set') + ->end() + ->arrayNode('global') + ->children() + ->scalarNode('metadata.broker.list')->end() + ->end() + ->end() + ->arrayNode('topics') + ->prototype('scalar')->end() + ->end() + ->scalarNode('dr_msq_cb') + ->info('todo') + ->end() + ->scalarNode('error_cb') + ->info('todo') + ->end() + ->scalarNode('rebalance_cb') + ->info('todo') + ->end() + ->enumNode('partitioner') + ->values(['RD_KAFKA_MSG_PARTITIONER_RANDOM', 'RD_KAFKA_MSG_PARTITIONER_CONSISTENT']) + ->info('todo') + ->end() + ->scalarNode('log_level') + ->info('todo') + ->end() + ->booleanNode('commit_async') + ->defaultFalse() + ->info('todo') + ->end() + ; + } + + /** + * @param ContainerBuilder $container + * @param array $config + * + * @return string The method must return a factory service id + */ + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + if (false == empty($config['rdkafka'])) { + $config['rdkafka'] = new Reference($config['rdkafka']); + } + + $factory = new Definition(RdKafkaConnectionFactory::class); + $factory->setArguments([isset($config['dsn']) ? $config['dsn'] : $config]); + + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + $container->setDefinition($factoryId, $factory); + + return $factoryId; + } + + /** + * @param ContainerBuilder $container + * @param array $config + * + * @return string The method must return a context service id + */ + public function createContext(ContainerBuilder $container, array $config) + { + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + + $context = new Definition(RdKafkaContext::class); + $context->setPublic(true); + $context->setFactory([new Reference($factoryId), 'createContext']); + + $contextId = sprintf('enqueue.transport.%s.context', $this->getName()); + $container->setDefinition($contextId, $context); + + return $contextId; + } + + /** + * @param ContainerBuilder $container + * @param array $config + * + * @return string The method must return a driver service id + */ + public function createDriver(ContainerBuilder $container, array $config) + { + $driver = new Definition(RdKafkaDriver::class); + $driver->setPublic(true); + $driver->setArguments([ + new Reference(sprintf('enqueue.transport.%s.context', $this->getName())), + new Reference('enqueue.client.config'), + new Reference('enqueue.client.meta.queue_meta_registry'), + ]); + + $driverId = sprintf('enqueue.client.%s.driver', $this->getName()); + $container->setDefinition($driverId, $driver); + + return $driverId; + } + + /** + * @return string + */ + public function getName() + { + return $this->name; + } +} \ No newline at end of file diff --git a/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php b/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php new file mode 100644 index 000000000..9c0b9f1ee --- /dev/null +++ b/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php @@ -0,0 +1,340 @@ +assertClassImplements(DriverInterface::class, RdKafkaDriver::class); + } + + public function testCouldBeConstructedWithRequiredArguments() + { + new RdKafkaDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + } + + public function testShouldReturnConfigObject() + { + $config = $this->createDummyConfig(); + + $driver = new RdKafkaDriver( + $this->createPsrContextMock(), + $config, + $this->createDummyQueueMetaRegistry() + ); + + $this->assertSame($config, $driver->getConfig()); + } + + public function testShouldCreateAndReturnQueueInstance() + { + $expectedQueue = new RdKafkaTopic('aName'); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('aprefix.afooqueue') + ->willReturn($expectedQueue) + ; + + $driver = new RdKafkaDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry()); + + $queue = $driver->createQueue('aFooQueue'); + + $this->assertSame($expectedQueue, $queue); + } + + public function testShouldCreateAndReturnQueueInstanceWithHardcodedTransportName() + { + $expectedQueue = new RdKafkaTopic('aName'); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('aBarQueue') + ->willReturn($expectedQueue) + ; + + $driver = new RdKafkaDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry()); + + $queue = $driver->createQueue('aBarQueue'); + + $this->assertSame($expectedQueue, $queue); + } + + public function testShouldConvertTransportMessageToClientMessage() + { + $transportMessage = new RdKafkaMessage(); + $transportMessage->setBody('body'); + $transportMessage->setHeaders(['hkey' => 'hval']); + $transportMessage->setProperties(['key' => 'val']); + $transportMessage->setHeader('content_type', 'ContentType'); + $transportMessage->setMessageId('MessageId'); + $transportMessage->setTimestamp(1000); + + $driver = new RdKafkaDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $clientMessage = $driver->createClientMessage($transportMessage); + + $this->assertInstanceOf(Message::class, $clientMessage); + $this->assertSame('body', $clientMessage->getBody()); + $this->assertSame([ + 'hkey' => 'hval', + 'content_type' => 'ContentType', + 'message_id' => 'MessageId', + 'timestamp' => 1000, + ], $clientMessage->getHeaders()); + $this->assertSame([ + 'key' => 'val', + ], $clientMessage->getProperties()); + $this->assertSame('MessageId', $clientMessage->getMessageId()); + $this->assertSame('ContentType', $clientMessage->getContentType()); + $this->assertSame(1000, $clientMessage->getTimestamp()); + + $this->assertNull($clientMessage->getExpire()); + } + + public function testShouldConvertClientMessageToTransportMessage() + { + $clientMessage = new Message(); + $clientMessage->setBody('body'); + $clientMessage->setHeaders(['hkey' => 'hval']); + $clientMessage->setProperties(['key' => 'val']); + $clientMessage->setContentType('ContentType'); + $clientMessage->setExpire(123); + $clientMessage->setMessageId('MessageId'); + $clientMessage->setTimestamp(1000); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn(new RdKafkaMessage()) + ; + + $driver = new RdKafkaDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $transportMessage = $driver->createTransportMessage($clientMessage); + + $this->assertInstanceOf(RdKafkaMessage::class, $transportMessage); + $this->assertSame('body', $transportMessage->getBody()); + $this->assertSame([ + 'hkey' => 'hval', + 'content_type' => 'ContentType', + 'message_id' => 'MessageId', + 'timestamp' => 1000, + 'reply_to' => null, + 'correlation_id' => '', + ], $transportMessage->getHeaders()); + $this->assertSame([ + 'key' => 'val', + ], $transportMessage->getProperties()); + $this->assertSame('MessageId', $transportMessage->getMessageId()); + $this->assertSame(1000, $transportMessage->getTimestamp()); + } + + public function testShouldSendMessageToRouter() + { + $topic = new RdKafkaTopic('queue-name'); + $transportMessage = new RdKafkaMessage(); + + $producer = $this->createPsrProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($topic), $this->identicalTo($transportMessage)) + ; + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createTopic') + ->with('aprefix.router') + ->willReturn($topic) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($transportMessage) + ; + + $driver = new RdKafkaDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_TOPIC_NAME, 'topic'); + + $driver->sendToRouter($message); + } + + public function testShouldThrowExceptionIfTopicParameterIsNotSet() + { + $driver = new RdKafkaDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Topic name parameter is required but is not set'); + + $driver->sendToRouter(new Message()); + } + + public function testShouldSendMessageToProcessor() + { + $queue = new RdKafkaTopic('queue-name'); + $transportMessage = new RdKafkaMessage(); + + $producer = $this->createPsrProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($queue), $this->identicalTo($transportMessage)) + ; + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->willReturn($queue) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($transportMessage) + ; + + $driver = new RdKafkaDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); + $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'aFooQueue'); + + $driver->sendToProcessor($message); + } + + public function testShouldThrowExceptionIfProcessorNameParameterIsNotSet() + { + $driver = new RdKafkaDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Processor name parameter is required but is not set'); + + $driver->sendToProcessor(new Message()); + } + + public function testShouldThrowExceptionIfProcessorQueueNameParameterIsNotSet() + { + $driver = new RdKafkaDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Queue name parameter is required but is not set'); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); + + $driver->sendToProcessor($message); + } + + public function testShouldSetupBroker() + { + $context = $this->createPsrContextMock(); + + $driver = new RdKafkaDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $driver->setupBroker(); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|RdKafkaContext + */ + private function createPsrContextMock() + { + return $this->createMock(RdKafkaContext::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|PsrProducer + */ + private function createPsrProducerMock() + { + return $this->createMock(PsrProducer::class); + } + + /** + * @return QueueMetaRegistry + */ + private function createDummyQueueMetaRegistry() + { + $registry = new QueueMetaRegistry($this->createDummyConfig(), []); + $registry->add('default'); + $registry->add('aFooQueue'); + $registry->add('aBarQueue', 'aBarQueue'); + + return $registry; + } + + /** + * @return Config + */ + private function createDummyConfig() + { + return Config::create('aPrefix'); + } +} diff --git a/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php b/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php new file mode 100644 index 000000000..179599455 --- /dev/null +++ b/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php @@ -0,0 +1,92 @@ +expectException(\LogicException::class); + $this->expectExceptionMessage('The config must be either an array of options, a DSN string or null'); + + new RdKafkaConnectionFactory(new \stdClass()); + } + + public function testThrowIfSchemeIsNotSupported() + { + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The given DSN scheme "http" is not supported. Could be "kafka" only.'); + + new RdKafkaConnectionFactory('http://example.com/'); + } + + /** + * @dataProvider provideConfigs + * + * @param mixed $config + * @param mixed $expectedConfig + */ + public function testShouldParseConfigurationAsExpected($config, $expectedConfig) + { + $factory = new RdKafkaConnectionFactory($config); + + $config = $this->getObjectAttribute($factory, 'config'); + + $this->assertNotEmpty($config['global']['group.id']); + + $config['global']['group.id'] = 'group-id'; + $this->assertSame($expectedConfig, $config); + } + + public static function provideConfigs() + { + yield [ + null, + [ + 'global' => [ + 'group.id' => 'group-id', + 'metadata.broker.list' => 'localhost:9092' + ] + ], + ]; + + yield [ + 'kafka:', + [ + 'global' => [ + 'group.id' => 'group-id', + 'metadata.broker.list' => 'localhost:9092' + ] + ], + ]; + + yield [ + 'kafka://user:pass@host:10000/db', + [ + 'global' => [ + 'group.id' => 'group-id', + 'metadata.broker.list' => 'host:10000' + ] + ], + ]; + + yield [ + [], + [ + 'global' => [ + 'group.id' => 'group-id', + 'metadata.broker.list' => 'localhost:9092' + ] + ], + ]; + } +} diff --git a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php new file mode 100644 index 000000000..e81a125e5 --- /dev/null +++ b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php @@ -0,0 +1,146 @@ +assertClassImplements(TransportFactoryInterface::class, RdKafkaTransportFactory::class); + } + + public function testCouldBeConstructedWithDefaultName() + { + $transport = new RdKafkaTransportFactory(); + + $this->assertEquals('rdkafka', $transport->getName()); + } + + public function testCouldBeConstructedWithCustomName() + { + $transport = new RdKafkaTransportFactory('theCustomName'); + + $this->assertEquals('theCustomName', $transport->getName()); + } + + public function testShouldAllowAddConfiguration() + { + $transport = new RdKafkaTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [[ + ]]); + + $this->assertEquals([ + 'topics' => [], + 'commit_async' => false + ], $config); + } + + public function testShouldAllowAddConfigurationAsString() + { + $transport = new RdKafkaTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), ['fileDSN']); + + $this->assertEquals([ + 'dsn' => 'fileDSN', + 'topics' => [], + 'commit_async' => false + ], $config); + } + + public function testShouldCreateConnectionFactory() + { + $container = new ContainerBuilder(); + + $transport = new RdKafkaTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, [ + ]); + + $this->assertTrue($container->hasDefinition($serviceId)); + $factory = $container->getDefinition($serviceId); + $this->assertEquals(RdKafkaConnectionFactory::class, $factory->getClass()); + $this->assertSame([[ + ]], $factory->getArguments()); + } + + public function testShouldCreateConnectionFactoryFromDsnString() + { + $container = new ContainerBuilder(); + + $transport = new RdKafkaTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, [ + 'dsn' => 'theFileDSN', + ]); + + $this->assertTrue($container->hasDefinition($serviceId)); + $factory = $container->getDefinition($serviceId); + $this->assertEquals(RdKafkaConnectionFactory::class, $factory->getClass()); + $this->assertSame(['theFileDSN'], $factory->getArguments()); + } + + public function testShouldCreateContext() + { + $container = new ContainerBuilder(); + + $transport = new RdKafkaTransportFactory(); + + $serviceId = $transport->createContext($container, [ + ]); + + $this->assertEquals('enqueue.transport.rdkafka.context', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $context = $container->getDefinition('enqueue.transport.rdkafka.context'); + $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); + $this->assertEquals('enqueue.transport.rdkafka.connection_factory', (string) $context->getFactory()[0]); + $this->assertEquals('createContext', $context->getFactory()[1]); + } + + public function testShouldCreateDriver() + { + $container = new ContainerBuilder(); + + $transport = new RdKafkaTransportFactory(); + + $serviceId = $transport->createDriver($container, []); + + $this->assertEquals('enqueue.client.rdkafka.driver', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $driver = $container->getDefinition($serviceId); + $this->assertSame(RdKafkaDriver::class, $driver->getClass()); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(0)); + $this->assertEquals('enqueue.transport.rdkafka.context', (string) $driver->getArgument(0)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(1)); + $this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(2)); + $this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2)); + } +} diff --git a/pkg/rdkafka/composer.json b/pkg/rdkafka/composer.json index 5339cd980..09909b592 100644 --- a/pkg/rdkafka/composer.json +++ b/pkg/rdkafka/composer.json @@ -16,7 +16,9 @@ "enqueue/enqueue": "^0.8@dev", "enqueue/null": "^0.8@dev", "queue-interop/queue-spec": "^0.5.3@dev", - "kwn/php-rdkafka-stubs": "^1.0.2" + "kwn/php-rdkafka-stubs": "^1.0.2", + "symfony/dependency-injection": "^2.8|^3|^4", + "symfony/config": "^2.8|^3|^4" }, "support": { "email": "opensource@forma-pro.com", From d8d51e6b3d0c47a69788ba2d7aede1f3812468fc Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 15:07:35 +0200 Subject: [PATCH 04/21] Remove expiration, delay and priority from the RdKafka driver --- pkg/rdkafka/Client/RdKafkaDriver.php | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/pkg/rdkafka/Client/RdKafkaDriver.php b/pkg/rdkafka/Client/RdKafkaDriver.php index e3450b368..fcb72051a 100644 --- a/pkg/rdkafka/Client/RdKafkaDriver.php +++ b/pkg/rdkafka/Client/RdKafkaDriver.php @@ -79,18 +79,6 @@ public function createClientMessage(PsrMessage $message) $clientMessage->setContentType($contentType); } - if ($expiration = $message->getHeader('expiration')) { - $clientMessage->setExpire($expiration); - } - - if ($delay = $message->getHeader('delay')) { - $clientMessage->setDelay($delay); - } - - if ($priority = $message->getHeader('priority')) { - $clientMessage->setPriority($priority); - } - return $clientMessage; } From 728b3d577d4ce1ca52ea97d229faefc5b6f0b693 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 15:21:15 +0200 Subject: [PATCH 05/21] Fixed a typo --- pkg/rdkafka/Client/RdKafkaDriver.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/rdkafka/Client/RdKafkaDriver.php b/pkg/rdkafka/Client/RdKafkaDriver.php index fcb72051a..ffd19fbfd 100644 --- a/pkg/rdkafka/Client/RdKafkaDriver.php +++ b/pkg/rdkafka/Client/RdKafkaDriver.php @@ -132,7 +132,7 @@ public function createQueue($queueName) public function setupBroker(LoggerInterface $logger = null) { $logger = $logger ?: new NullLogger(); - $logger->debug('[RdKasfkaDriver] setup broker'); + $logger->debug('[RdKafkaDriver] setup broker'); } /** From c0be06668c5c4f9e1ed18099816cf0166c1dbf79 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 15:24:19 +0200 Subject: [PATCH 06/21] Renamed the dsn in the kafka transport factory test --- pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php index e81a125e5..802a966a3 100644 --- a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php +++ b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php @@ -61,10 +61,10 @@ public function testShouldAllowAddConfigurationAsString() $transport->addConfiguration($rootNode); $processor = new Processor(); - $config = $processor->process($tb->buildTree(), ['fileDSN']); + $config = $processor->process($tb->buildTree(), ['kafkaDSN']); $this->assertEquals([ - 'dsn' => 'fileDSN', + 'dsn' => 'kafkaDSN', 'topics' => [], 'commit_async' => false ], $config); From 86b56d891fc55111a0979b6305c3bc74826b7852 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 15:24:52 +0200 Subject: [PATCH 07/21] Added a new line to the end of the kafka driver and transport factory --- pkg/rdkafka/Client/RdKafkaDriver.php | 2 +- pkg/rdkafka/Symfony/RdKafkaTransportFactory.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rdkafka/Client/RdKafkaDriver.php b/pkg/rdkafka/Client/RdKafkaDriver.php index ffd19fbfd..4530dc52d 100644 --- a/pkg/rdkafka/Client/RdKafkaDriver.php +++ b/pkg/rdkafka/Client/RdKafkaDriver.php @@ -151,4 +151,4 @@ private function createRouterTopic() return $topic; } -} \ No newline at end of file +} diff --git a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php index 26e904631..6a903aedb 100644 --- a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php +++ b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php @@ -145,4 +145,4 @@ public function getName() { return $this->name; } -} \ No newline at end of file +} From 161418ab5a4331cebfb41452c01bc01d0da0a82f Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 15:26:14 +0200 Subject: [PATCH 08/21] Renamed another dsn in the kafka transport factory test --- pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php index 802a966a3..a98a514e7 100644 --- a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php +++ b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php @@ -93,13 +93,13 @@ public function testShouldCreateConnectionFactoryFromDsnString() $transport = new RdKafkaTransportFactory(); $serviceId = $transport->createConnectionFactory($container, [ - 'dsn' => 'theFileDSN', + 'dsn' => 'theKafkaDSN', ]); $this->assertTrue($container->hasDefinition($serviceId)); $factory = $container->getDefinition($serviceId); $this->assertEquals(RdKafkaConnectionFactory::class, $factory->getClass()); - $this->assertSame(['theFileDSN'], $factory->getArguments()); + $this->assertSame(['theKafkaDSN'], $factory->getArguments()); } public function testShouldCreateContext() From 311abf9eb2040fa6b8e76a370b3dc5e45da34467 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 15:52:09 +0200 Subject: [PATCH 09/21] Use inheritdoc in the kafka transport factory --- pkg/rdkafka/Symfony/RdKafkaTransportFactory.php | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php index 6a903aedb..88088c2ff 100644 --- a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php +++ b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php @@ -28,7 +28,7 @@ public function __construct($name = 'rdkafka') } /** - * @param ArrayNodeDefinition $builder + * {@inheritdoc} */ public function addConfiguration(ArrayNodeDefinition $builder) { @@ -76,10 +76,7 @@ public function addConfiguration(ArrayNodeDefinition $builder) } /** - * @param ContainerBuilder $container - * @param array $config - * - * @return string The method must return a factory service id + * {@inheritdoc} */ public function createConnectionFactory(ContainerBuilder $container, array $config) { @@ -97,10 +94,7 @@ public function createConnectionFactory(ContainerBuilder $container, array $conf } /** - * @param ContainerBuilder $container - * @param array $config - * - * @return string The method must return a context service id + * {@inheritdoc} */ public function createContext(ContainerBuilder $container, array $config) { @@ -117,10 +111,7 @@ public function createContext(ContainerBuilder $container, array $config) } /** - * @param ContainerBuilder $container - * @param array $config - * - * @return string The method must return a driver service id + * {@inheritdoc} */ public function createDriver(ContainerBuilder $container, array $config) { From 81c23e087ebd982b056c77fbf14359cbfbe35293 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 16:01:48 +0200 Subject: [PATCH 10/21] Fixed some code styling --- pkg/rdkafka/Client/RdKafkaDriver.php | 4 ++-- .../Tests/RdKafkaConnectionFactoryConfigTest.php | 16 ++++++++-------- .../Symfony/RdKafkaTransportFactoryTest.php | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/rdkafka/Client/RdKafkaDriver.php b/pkg/rdkafka/Client/RdKafkaDriver.php index 4530dc52d..7a5fd4a9a 100644 --- a/pkg/rdkafka/Client/RdKafkaDriver.php +++ b/pkg/rdkafka/Client/RdKafkaDriver.php @@ -29,8 +29,8 @@ class RdKafkaDriver implements DriverInterface private $queueMetaRegistry; /** - * @param RdKafkaContext $context - * @param Config $config + * @param RdKafkaContext $context + * @param Config $config * @param QueueMetaRegistry $queueMetaRegistry */ public function __construct(RdKafkaContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry) diff --git a/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php b/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php index 179599455..d3a6a5dab 100644 --- a/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php +++ b/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php @@ -54,8 +54,8 @@ public static function provideConfigs() [ 'global' => [ 'group.id' => 'group-id', - 'metadata.broker.list' => 'localhost:9092' - ] + 'metadata.broker.list' => 'localhost:9092', + ], ], ]; @@ -64,8 +64,8 @@ public static function provideConfigs() [ 'global' => [ 'group.id' => 'group-id', - 'metadata.broker.list' => 'localhost:9092' - ] + 'metadata.broker.list' => 'localhost:9092', + ], ], ]; @@ -74,8 +74,8 @@ public static function provideConfigs() [ 'global' => [ 'group.id' => 'group-id', - 'metadata.broker.list' => 'host:10000' - ] + 'metadata.broker.list' => 'host:10000', + ], ], ]; @@ -84,8 +84,8 @@ public static function provideConfigs() [ 'global' => [ 'group.id' => 'group-id', - 'metadata.broker.list' => 'localhost:9092' - ] + 'metadata.broker.list' => 'localhost:9092', + ], ], ]; } diff --git a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php index a98a514e7..b49f643e9 100644 --- a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php +++ b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php @@ -49,7 +49,7 @@ public function testShouldAllowAddConfiguration() $this->assertEquals([ 'topics' => [], - 'commit_async' => false + 'commit_async' => false, ], $config); } @@ -66,7 +66,7 @@ public function testShouldAllowAddConfigurationAsString() $this->assertEquals([ 'dsn' => 'kafkaDSN', 'topics' => [], - 'commit_async' => false + 'commit_async' => false, ], $config); } From 23f203807afbe398ee7728b6b93ccab1a386f1b6 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Fri, 27 Apr 2018 16:45:53 +0200 Subject: [PATCH 11/21] Updated the description on some of the kafka symfony configuration nodes --- pkg/rdkafka/Symfony/RdKafkaTransportFactory.php | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php index 88088c2ff..260cac337 100644 --- a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php +++ b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php @@ -52,25 +52,26 @@ public function addConfiguration(ArrayNodeDefinition $builder) ->arrayNode('topics') ->prototype('scalar')->end() ->end() - ->scalarNode('dr_msq_cb') - ->info('todo') + ->scalarNode('dr_msg_cb') + ->info('Delivery report callback') ->end() ->scalarNode('error_cb') - ->info('todo') + ->info('Error callback') ->end() ->scalarNode('rebalance_cb') - ->info('todo') + ->info('Called after consumer group has been rebalanced') ->end() ->enumNode('partitioner') ->values(['RD_KAFKA_MSG_PARTITIONER_RANDOM', 'RD_KAFKA_MSG_PARTITIONER_CONSISTENT']) - ->info('todo') + ->info('Which partitioner to use') ->end() - ->scalarNode('log_level') - ->info('todo') + ->integerNode('log_level') + ->info('Logging level (syslog(3) levels)') + ->min(0)->max(7) ->end() ->booleanNode('commit_async') ->defaultFalse() - ->info('todo') + ->info('Commit asynchronous') ->end() ; } From ea2a998cc7e55a464165efe7333e9ffee83e2d85 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Fri, 27 Apr 2018 17:21:00 +0200 Subject: [PATCH 12/21] No need to check as the default return value is null --- pkg/rdkafka/Client/RdKafkaDriver.php | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/rdkafka/Client/RdKafkaDriver.php b/pkg/rdkafka/Client/RdKafkaDriver.php index 7a5fd4a9a..a881d6d70 100644 --- a/pkg/rdkafka/Client/RdKafkaDriver.php +++ b/pkg/rdkafka/Client/RdKafkaDriver.php @@ -70,15 +70,13 @@ public function createClientMessage(PsrMessage $message) $clientMessage->setHeaders($message->getHeaders()); $clientMessage->setProperties($message->getProperties()); + $clientMessage->setContentType($message->getHeader('content_type')); + $clientMessage->setTimestamp($message->getTimestamp()); $clientMessage->setMessageId($message->getMessageId()); $clientMessage->setReplyTo($message->getReplyTo()); $clientMessage->setCorrelationId($message->getCorrelationId()); - if ($contentType = $message->getHeader('content_type')) { - $clientMessage->setContentType($contentType); - } - return $clientMessage; } From 6a7190f5f9ef9c2ef4ae6be2f5a5f8105265768f Mon Sep 17 00:00:00 2001 From: Max Kotliar Date: Fri, 27 Apr 2018 19:02:06 +0300 Subject: [PATCH 13/21] drop 5.6 tests --- .travis.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 328164818..287eab5f0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,16 +7,12 @@ language: php matrix: include: -# - php: 5.6 -# env: SYMFONY_VERSION=2.8.* UNIT_TESTS=true - php: 7.1 env: SYMFONY_VERSION=3.0.* PHPSTAN=true - php: 7.1 env: SYMFONY_VERSION=3.0.* PHP_CS_FIXER=true - php: 7.0 env: SYMFONY_VERSION=2.8.* UNIT_TESTS=true - - php: 5.6 - env: SYMFONY_VERSION=3.0.* UNIT_TESTS=true - php: 7.0 env: SYMFONY_VERSION=3.0.* UNIT_TESTS=true - php: 7.1 From a4becbef003a5c37614169c375f1d57084a0dbbd Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Mon, 30 Apr 2018 09:04:02 +0200 Subject: [PATCH 14/21] Add the rdkafka transport factory to the EnqueueBundle class --- pkg/enqueue-bundle/EnqueueBundle.php | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index e007b97dc..a62f2db2b 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -21,6 +21,8 @@ use Enqueue\Fs\Symfony\FsTransportFactory; use Enqueue\Gps\GpsConnectionFactory; use Enqueue\Gps\Symfony\GpsTransportFactory; +use Enqueue\RdKafka\RdKafkaConnectionFactory; +use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory; use Enqueue\Redis\RedisConnectionFactory; use Enqueue\Redis\Symfony\RedisTransportFactory; use Enqueue\Sqs\SqsConnectionFactory; @@ -104,6 +106,12 @@ class_exists(AmqpLibConnectionFactory::class) $extension->setTransportFactory(new MissingTransportFactory('gps', ['enqueue/gps'])); } + if (class_exists(RdKafkaConnectionFactory::class)) { + $extension->setTransportFactory(new RdKafkaTransportFactory('rdkafka')); + } else { + $extension->setTransportFactory(new MissingTransportFactory('rdkafka', ['enqueue/rdkafka'])); + } + $container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); $container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); } From d636cab60df254dbcf82f1bf513f60b75dc47426 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Mon, 30 Apr 2018 09:08:50 +0200 Subject: [PATCH 15/21] Made the rdkafka global config node a variableNode with an empty array as default --- pkg/rdkafka/Symfony/RdKafkaTransportFactory.php | 6 +----- pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php | 2 ++ 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php index 260cac337..042dfe4a6 100644 --- a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php +++ b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php @@ -44,11 +44,7 @@ public function addConfiguration(ArrayNodeDefinition $builder) ->scalarNode('dsn') ->info('The kafka DSN. Other parameters are ignored if set') ->end() - ->arrayNode('global') - ->children() - ->scalarNode('metadata.broker.list')->end() - ->end() - ->end() + ->variableNode('global')->defaultValue([])->end() ->arrayNode('topics') ->prototype('scalar')->end() ->end() diff --git a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php index b49f643e9..42e697a58 100644 --- a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php +++ b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php @@ -50,6 +50,7 @@ public function testShouldAllowAddConfiguration() $this->assertEquals([ 'topics' => [], 'commit_async' => false, + 'global' => [], ], $config); } @@ -67,6 +68,7 @@ public function testShouldAllowAddConfigurationAsString() 'dsn' => 'kafkaDSN', 'topics' => [], 'commit_async' => false, + 'global' => [], ], $config); } From 2eaed0f19e1966af590032c262df7a4a8aabd714 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Mon, 30 Apr 2018 18:00:15 +0200 Subject: [PATCH 16/21] Updated the kafka symfony configuration --- pkg/rdkafka/Symfony/RdKafkaTransportFactory.php | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php index 042dfe4a6..f03971fba 100644 --- a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php +++ b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php @@ -39,14 +39,17 @@ public function addConfiguration(ArrayNodeDefinition $builder) return ['dsn' => $v]; }) ->end() - ->fixXmlConfig('topic') ->children() ->scalarNode('dsn') ->info('The kafka DSN. Other parameters are ignored if set') ->end() - ->variableNode('global')->defaultValue([])->end() - ->arrayNode('topics') - ->prototype('scalar')->end() + ->variableNode('global') + ->defaultValue([]) + ->info('The kafka global configuration properties') + ->end() + ->variableNode('topic') + ->defaultValue([]) + ->info('The kafka topic configuration properties') ->end() ->scalarNode('dr_msg_cb') ->info('Delivery report callback') From 574f96bfffaf78d0b6d3b824e4a61cb9c8dc9979 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Tue, 1 May 2018 10:21:20 +0200 Subject: [PATCH 17/21] Expect the correct response in rdkafka tests --- pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php index 42e697a58..950034205 100644 --- a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php +++ b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php @@ -48,7 +48,7 @@ public function testShouldAllowAddConfiguration() ]]); $this->assertEquals([ - 'topics' => [], + 'topic' => [], 'commit_async' => false, 'global' => [], ], $config); @@ -66,7 +66,7 @@ public function testShouldAllowAddConfigurationAsString() $this->assertEquals([ 'dsn' => 'kafkaDSN', - 'topics' => [], + 'topic' => [], 'commit_async' => false, 'global' => [], ], $config); From b9464ec3960795955cbe70bc44d771f054feff3b Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Tue, 1 May 2018 13:36:35 +0200 Subject: [PATCH 18/21] Add the broker setup for the rdkafka driver --- pkg/rdkafka/Client/RdKafkaDriver.php | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/rdkafka/Client/RdKafkaDriver.php b/pkg/rdkafka/Client/RdKafkaDriver.php index a881d6d70..416725366 100644 --- a/pkg/rdkafka/Client/RdKafkaDriver.php +++ b/pkg/rdkafka/Client/RdKafkaDriver.php @@ -131,6 +131,21 @@ public function setupBroker(LoggerInterface $logger = null) { $logger = $logger ?: new NullLogger(); $logger->debug('[RdKafkaDriver] setup broker'); + $log = function ($text, ...$args) use ($logger) { + $logger->debug(sprintf('[RdKafkaDriver] '.$text, ...$args)); + }; + + // setup router + $routerQueue = $this->createQueue($this->config->getRouterQueueName()); + $log('Create router queue: %s', $routerQueue->getQueueName()); + $this->context->createConsumer($routerQueue); + + // setup queues + foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) { + $queue = $this->createQueue($meta->getClientName()); + $log('Create processor queue: %s', $queue->getQueueName()); + $this->context->createConsumer($queue); + } } /** From b97bc5343fec39df5c27d98a6932a3e14ade482f Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Tue, 1 May 2018 13:55:06 +0200 Subject: [PATCH 19/21] Fix the rdkafka should setupd driver test --- .../Tests/Client/RdKafkaDriverTest.php | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php b/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php index 9c0b9f1ee..c0cfab616 100644 --- a/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php +++ b/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php @@ -290,12 +290,37 @@ public function testShouldThrowExceptionIfProcessorQueueNameParameterIsNotSet() public function testShouldSetupBroker() { + $routerTopic = new RdKafkaTopic(''); + $routerQueue = new RdKafkaTopic(''); + + $processorTopic = new RdKafkaTopic(''); + $context = $this->createPsrContextMock(); + $context + ->expects($this->at(0)) + ->method('createQueue') + ->willReturn($routerTopic) + ; + $context + ->expects($this->at(1)) + ->method('createQueue') + ->willReturn($routerQueue) + ; + $context + ->expects($this->at(2)) + ->method('createQueue') + ->willReturn($processorTopic) + ; + + $meta = new QueueMetaRegistry($this->createDummyConfig(), [ + 'default' => [], + ]); + $driver = new RdKafkaDriver( $context, $this->createDummyConfig(), - $this->createDummyQueueMetaRegistry() + $meta ); $driver->setupBroker(); From 668c9e616eb5c24f1b4fc32b0339baa6bc6dbb3a Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 1 May 2018 20:09:19 +0300 Subject: [PATCH 20/21] [doc][skip ci] update doc. --- docs/bundle/config_reference.md | 51 +++++++++++++++++++++++++++----- docs/client/supported_brokers.md | 7 +++-- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/docs/bundle/config_reference.md b/docs/bundle/config_reference.md index f9c8d4cc6..7b54dd23f 100644 --- a/docs/bundle/config_reference.md +++ b/docs/bundle/config_reference.md @@ -21,8 +21,6 @@ enqueue: connection_timeout: 1 buffer_size: 1000 lazy: true - - # Should be true if you want to use secure connections. False by default ssl_on: false rabbitmq_stomp: host: localhost @@ -34,6 +32,7 @@ enqueue: connection_timeout: 1 buffer_size: 1000 lazy: true + ssl_on: false # The option tells whether RabbitMQ broker has management plugin installed or not management_plugin_installed: false @@ -42,7 +41,7 @@ enqueue: # The option tells whether RabbitMQ broker has delay plugin installed or not delay_plugin_installed: false amqp: - driver: ~ # One of "ext"; "lib"; "bunny" + driver: ~ # The connection to AMQP broker set as a string. Other parameters could be used as defaults dsn: ~ @@ -106,7 +105,7 @@ enqueue: # Path to local private key file on filesystem in case of separate files for certificate (local_cert) and private key. A string. ssl_key: ~ rabbitmq_amqp: - driver: ~ # One of "ext"; "lib"; "bunny" + driver: ~ # The connection to AMQP broker set as a string. Other parameters could be used as defaults dsn: ~ @@ -190,18 +189,27 @@ enqueue: polling_interval: 100 redis: + # The redis connection given as DSN. For example redis://host:port?vendor=predis + dsn: ~ + # can be a host, or the path to a unix domain socket - host: ~ # Required + host: ~ port: ~ # The library used internally to interact with Redis server - vendor: ~ # One of "phpredis"; "predis", Required + vendor: ~ # One of "phpredis"; "predis"; "custom" + + # A custom redis service id, used with vendor true only + redis: ~ # bool, Whether it use single persisted connection or open a new one for every context persisted: false # the connection will be performed as later as possible, if the option set to true lazy: true + + # Database index to select when connected. + database: 0 dbal: # The Doctrine DBAL DSN. Other parameters are ignored if set @@ -229,6 +237,7 @@ enqueue: # the connection will be performed as later as possible, if the option set to true lazy: true + endpoint: null gps: # The connection to Google Pub/Sub broker set as a string. Other parameters are ignored if set @@ -248,8 +257,36 @@ enqueue: # The connection will be performed as later as possible, if the option set to true lazy: true + rdkafka: + + # The kafka DSN. Other parameters are ignored if set + dsn: ~ + + # The kafka global configuration properties + global: [] + + # The kafka topic configuration properties + topic: [] + + # Delivery report callback + dr_msg_cb: ~ + + # Error callback + error_cb: ~ + + # Called after consumer group has been rebalanced + rebalance_cb: ~ + + # Which partitioner to use + partitioner: ~ # One of "RD_KAFKA_MSG_PARTITIONER_RANDOM"; "RD_KAFKA_MSG_PARTITIONER_CONSISTENT" + + # Logging level (syslog(3) levels) + log_level: ~ + + # Commit asynchronous + commit_async: false client: - traceable_producer: false + traceable_producer: true prefix: enqueue app_name: app router_topic: default diff --git a/docs/client/supported_brokers.md b/docs/client/supported_brokers.md index 3be5dc334..f734b660c 100644 --- a/docs/client/supported_brokers.md +++ b/docs/client/supported_brokers.md @@ -13,6 +13,7 @@ Here's the list of transports supported by Enqueue Client: | Redis | [enqueue/gps](../transport/redis.md) | redis: | | Amazon SQS | [enqueue/sqs](../transport/sqs.md) | sqs: | | STOMP, RabbitMQ | [enqueue/stomp](../transport/stomp.md) | stomp: | +| Kafka | [enqueue/stomp](../transport/kafka.md) | kafka: | | Null | [enqueue/null](../transport/null.md) | null: | Here's the list of protocols and Client features supported by them @@ -20,16 +21,16 @@ Here's the list of protocols and Client features supported by them | Protocol | Priority | Delay | Expiration | Setup broker | Message bus | |:--------------:|:--------:|:--------:|:----------:|:------------:|:-----------:| | AMQP | No | No | Yes | Yes | Yes | -| RabbitMQ AMQP | Yes | Yes* | Yes | Yes | Yes | +| RabbitMQ AMQP | Yes | Yes | Yes | Yes | Yes | | STOMP | No | No | Yes | No | Yes** | -| RabbitMQ STOMP | Yes | Yes* | Yes | Yes*** | Yes** | +| RabbitMQ STOMP | Yes | Yes | Yes | Yes*** | Yes** | | Filesystem | No | No | No | Yes | No | | Redis | No | No | No | Not needed | No | | Doctrine DBAL | Yes | Yes | No | Yes | No | | Amazon SQS | No | Yes | No | Yes | Not impl | +| Kafka | No | No | No | Yes | No | | Google PubSub | Not impl | Not impl | Not impl | Yes | Not impl | -* \* Possible if a RabbitMQ delay plugin is installed. * \*\* Possible if topics (exchanges) are configured on broker side manually. * \*\*\* Possible if RabbitMQ Management Plugin is installed. From cea4d84dc9ed1ae8805d4d4fbfadc54ef6f0e732 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 1 May 2018 20:10:41 +0300 Subject: [PATCH 21/21] Release 0.8.27 --- CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2913c58aa..ac0c21719 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Change Log +## [0.8.27](https://github.com/php-enqueue/enqueue-dev/tree/0.8.27) (2018-05-01) +[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.8.26...0.8.27) + +- Kafka symfony transport [\#432](https://github.com/php-enqueue/enqueue-dev/pull/432) ([dheineman](https://github.com/dheineman)) +- Drop PHP5 support, Drop Symfony 2.X support. [\#419](https://github.com/php-enqueue/enqueue-dev/pull/419) ([makasim](https://github.com/makasim)) + +- How can I use the Symfony Bundle with Kafka? [\#428](https://github.com/php-enqueue/enqueue-dev/issues/428) + ## [0.8.26](https://github.com/php-enqueue/enqueue-dev/tree/0.8.26) (2018-04-19) [Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.8.25...0.8.26)