Skip to content

Kafka symfony transport #432

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/enqueue-bundle/EnqueueBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Gps\GpsConnectionFactory;
use Enqueue\Gps\Symfony\GpsTransportFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory;
use Enqueue\Redis\RedisConnectionFactory;
use Enqueue\Redis\Symfony\RedisTransportFactory;
use Enqueue\Sqs\SqsConnectionFactory;
Expand Down Expand Up @@ -104,6 +106,12 @@ class_exists(AmqpLibConnectionFactory::class)
$extension->setTransportFactory(new MissingTransportFactory('gps', ['enqueue/gps']));
}

if (class_exists(RdKafkaConnectionFactory::class)) {
$extension->setTransportFactory(new RdKafkaTransportFactory('rdkafka'));
} else {
$extension->setTransportFactory(new MissingTransportFactory('rdkafka', ['enqueue/rdkafka']));
}

$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
$container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/enqueue/Symfony/DefaultTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
use Enqueue\Gps\Symfony\GpsTransportFactory;
use Enqueue\Null\NullConnectionFactory;
use Enqueue\Null\Symfony\NullTransportFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory;
use Enqueue\Redis\RedisConnectionFactory;
use Enqueue\Redis\Symfony\RedisTransportFactory;
use Enqueue\Sqs\SqsConnectionFactory;
Expand Down Expand Up @@ -209,6 +211,10 @@ private function findFactory($dsn)
return new StompTransportFactory('default_stomp');
}

if ($factory instanceof RdKafkaConnectionFactory) {
return new RdKafkaTransportFactory('default_kafka');
}

throw new \LogicException(sprintf(
'There is no supported transport factory for the connection factory "%s" created from DSN "%s"',
get_class($factory),
Expand Down
2 changes: 2 additions & 0 deletions pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -289,5 +289,7 @@ public static function provideDSNs()
yield ['redis:', 'default_redis'];

yield ['stomp:', 'default_stomp'];

yield ['kafka:', 'default_kafka'];
}
}
167 changes: 167 additions & 0 deletions pkg/rdkafka/Client/RdKafkaDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
<?php

namespace Enqueue\RdKafka\Client;

use Enqueue\Client\Config;
use Enqueue\Client\DriverInterface;
use Enqueue\Client\Message;
use Enqueue\Client\Meta\QueueMetaRegistry;
use Enqueue\RdKafka\RdKafkaContext;
use Interop\Queue\PsrMessage;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

class RdKafkaDriver implements DriverInterface
{
/**
* @var RdKafkaContext
*/
private $context;

/**
* @var Config
*/
private $config;

/**
* @var QueueMetaRegistry
*/
private $queueMetaRegistry;

/**
* @param RdKafkaContext $context
* @param Config $config
* @param QueueMetaRegistry $queueMetaRegistry
*/
public function __construct(RdKafkaContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry)
{
$this->context = $context;
$this->config = $config;
$this->queueMetaRegistry = $queueMetaRegistry;
}

/**
* {@inheritdoc}
*/
public function createTransportMessage(Message $message)
{
$headers = $message->getHeaders();
$headers['content_type'] = $message->getContentType();

$transportMessage = $this->context->createMessage();
$transportMessage->setBody($message->getBody());
$transportMessage->setHeaders($headers);
$transportMessage->setProperties($message->getProperties());
$transportMessage->setMessageId($message->getMessageId());
$transportMessage->setTimestamp($message->getTimestamp());
$transportMessage->setReplyTo($message->getReplyTo());
$transportMessage->setCorrelationId($message->getCorrelationId());

return $transportMessage;
}

/**
* {@inheritdoc}
*/
public function createClientMessage(PsrMessage $message)
{
$clientMessage = new Message();
$clientMessage->setBody($message->getBody());
$clientMessage->setHeaders($message->getHeaders());
$clientMessage->setProperties($message->getProperties());

$clientMessage->setContentType($message->getHeader('content_type'));

$clientMessage->setTimestamp($message->getTimestamp());
$clientMessage->setMessageId($message->getMessageId());
$clientMessage->setReplyTo($message->getReplyTo());
$clientMessage->setCorrelationId($message->getCorrelationId());

return $clientMessage;
}

/**
* {@inheritdoc}
*/
public function sendToRouter(Message $message)
{
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
throw new \LogicException('Topic name parameter is required but is not set');
}

$topic = $this->createRouterTopic();
$transportMessage = $this->createTransportMessage($message);

$this->context->createProducer()->send($topic, $transportMessage);
}

/**
* {@inheritdoc}
*/
public function sendToProcessor(Message $message)
{
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
throw new \LogicException('Processor name parameter is required but is not set');
}

if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
throw new \LogicException('Queue name parameter is required but is not set');
}

$transportMessage = $this->createTransportMessage($message);
$destination = $this->createQueue($queueName);

$this->context->createProducer()->send($destination, $transportMessage);
}

/**
* {@inheritdoc}
*/
public function createQueue($queueName)
{
$transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName();

return $this->context->createQueue($transportName);
}

/**
* {@inheritdoc}
*/
public function setupBroker(LoggerInterface $logger = null)
{
$logger = $logger ?: new NullLogger();
$logger->debug('[RdKafkaDriver] setup broker');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this is going to be needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will, keep it.

$log = function ($text, ...$args) use ($logger) {
$logger->debug(sprintf('[RdKafkaDriver] '.$text, ...$args));
};

// setup router
$routerQueue = $this->createQueue($this->config->getRouterQueueName());
$log('Create router queue: %s', $routerQueue->getQueueName());
$this->context->createConsumer($routerQueue);

// setup queues
foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) {
$queue = $this->createQueue($meta->getClientName());
$log('Create processor queue: %s', $queue->getQueueName());
$this->context->createConsumer($queue);
}
}

/**
* {@inheritdoc}
*/
public function getConfig()
{
return $this->config;
}

private function createRouterTopic()
{
$topic = $this->context->createTopic(
$this->config->createTransportRouterTopicName($this->config->getRouterTopicName())
);

return $topic;
}
}
139 changes: 139 additions & 0 deletions pkg/rdkafka/Symfony/RdKafkaTransportFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
<?php

namespace Enqueue\RdKafka\Symfony;

use Enqueue\RdKafka\Client\RdKafkaDriver;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Enqueue\RdKafka\RdKafkaContext;
use Enqueue\Symfony\DriverFactoryInterface;
use Enqueue\Symfony\TransportFactoryInterface;
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Reference;

class RdKafkaTransportFactory implements TransportFactoryInterface, DriverFactoryInterface
{
/**
* @var string
*/
private $name;

/**
* @param string $name
*/
public function __construct($name = 'rdkafka')
{
$this->name = $name;
}

/**
* {@inheritdoc}
*/
public function addConfiguration(ArrayNodeDefinition $builder)
{
$builder
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire configuration needs more work/checking

->beforeNormalization()
->ifString()
->then(function ($v) {
return ['dsn' => $v];
})
->end()
->children()
->scalarNode('dsn')
->info('The kafka DSN. Other parameters are ignored if set')
->end()
->variableNode('global')
->defaultValue([])
->info('The kafka global configuration properties')
->end()
->variableNode('topic')
->defaultValue([])
->info('The kafka topic configuration properties')
->end()
->scalarNode('dr_msg_cb')
->info('Delivery report callback')
->end()
->scalarNode('error_cb')
->info('Error callback')
->end()
->scalarNode('rebalance_cb')
->info('Called after consumer group has been rebalanced')
->end()
->enumNode('partitioner')
->values(['RD_KAFKA_MSG_PARTITIONER_RANDOM', 'RD_KAFKA_MSG_PARTITIONER_CONSISTENT'])
->info('Which partitioner to use')
->end()
->integerNode('log_level')
->info('Logging level (syslog(3) levels)')
->min(0)->max(7)
->end()
->booleanNode('commit_async')
->defaultFalse()
->info('Commit asynchronous')
->end()
;
}

/**
* {@inheritdoc}
*/
public function createConnectionFactory(ContainerBuilder $container, array $config)
{
if (false == empty($config['rdkafka'])) {
$config['rdkafka'] = new Reference($config['rdkafka']);
}

$factory = new Definition(RdKafkaConnectionFactory::class);
$factory->setArguments([isset($config['dsn']) ? $config['dsn'] : $config]);

$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
$container->setDefinition($factoryId, $factory);

return $factoryId;
}

/**
* {@inheritdoc}
*/
public function createContext(ContainerBuilder $container, array $config)
{
$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());

$context = new Definition(RdKafkaContext::class);
$context->setPublic(true);
$context->setFactory([new Reference($factoryId), 'createContext']);

$contextId = sprintf('enqueue.transport.%s.context', $this->getName());
$container->setDefinition($contextId, $context);

return $contextId;
}

/**
* {@inheritdoc}
*/
public function createDriver(ContainerBuilder $container, array $config)
{
$driver = new Definition(RdKafkaDriver::class);
$driver->setPublic(true);
$driver->setArguments([
new Reference(sprintf('enqueue.transport.%s.context', $this->getName())),
new Reference('enqueue.client.config'),
new Reference('enqueue.client.meta.queue_meta_registry'),
]);

$driverId = sprintf('enqueue.client.%s.driver', $this->getName());
$container->setDefinition($driverId, $driver);

return $driverId;
}

/**
* @return string
*/
public function getName()
{
return $this->name;
}
}
Loading