-
Notifications
You must be signed in to change notification settings - Fork 439
Kafka symfony transport #432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
makasim
merged 16 commits into
php-enqueue:master
from
dheineman:kafka-symfony-transport
May 1, 2018
Merged
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
4912bbe
Add symfony intergation for kafka transport
dheineman d8d51e6
Remove expiration, delay and priority from the RdKafka driver
dheineman 728b3d5
Fixed a typo
dheineman c0be066
Renamed the dsn in the kafka transport factory test
dheineman 86b56d8
Added a new line to the end of the kafka driver and transport factory
dheineman 161418a
Renamed another dsn in the kafka transport factory test
dheineman 311abf9
Use inheritdoc in the kafka transport factory
dheineman 81c23e0
Fixed some code styling
dheineman 23f2038
Updated the description on some of the kafka symfony configuration nodes
dheineman ea2a998
No need to check as the default return value is null
dheineman a4becbe
Add the rdkafka transport factory to the EnqueueBundle class
dheineman d636cab
Made the rdkafka global config node a variableNode with an empty arra…
dheineman 2eaed0f
Updated the kafka symfony configuration
dheineman 574f96b
Expect the correct response in rdkafka tests
dheineman b9464ec
Add the broker setup for the rdkafka driver
dheineman b97bc53
Fix the rdkafka should setupd driver test
dheineman File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
<?php | ||
|
||
namespace Enqueue\RdKafka\Client; | ||
|
||
use Enqueue\Client\Config; | ||
use Enqueue\Client\DriverInterface; | ||
use Enqueue\Client\Message; | ||
use Enqueue\Client\Meta\QueueMetaRegistry; | ||
use Enqueue\RdKafka\RdKafkaContext; | ||
use Interop\Queue\PsrMessage; | ||
use Psr\Log\LoggerInterface; | ||
use Psr\Log\NullLogger; | ||
|
||
class RdKafkaDriver implements DriverInterface | ||
{ | ||
/** | ||
* @var RdKafkaContext | ||
*/ | ||
private $context; | ||
|
||
/** | ||
* @var Config | ||
*/ | ||
private $config; | ||
|
||
/** | ||
* @var QueueMetaRegistry | ||
*/ | ||
private $queueMetaRegistry; | ||
|
||
/** | ||
* @param RdKafkaContext $context | ||
* @param Config $config | ||
* @param QueueMetaRegistry $queueMetaRegistry | ||
*/ | ||
public function __construct(RdKafkaContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry) | ||
{ | ||
$this->context = $context; | ||
$this->config = $config; | ||
$this->queueMetaRegistry = $queueMetaRegistry; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function createTransportMessage(Message $message) | ||
{ | ||
$headers = $message->getHeaders(); | ||
$headers['content_type'] = $message->getContentType(); | ||
|
||
$transportMessage = $this->context->createMessage(); | ||
$transportMessage->setBody($message->getBody()); | ||
$transportMessage->setHeaders($headers); | ||
$transportMessage->setProperties($message->getProperties()); | ||
$transportMessage->setMessageId($message->getMessageId()); | ||
$transportMessage->setTimestamp($message->getTimestamp()); | ||
$transportMessage->setReplyTo($message->getReplyTo()); | ||
$transportMessage->setCorrelationId($message->getCorrelationId()); | ||
|
||
return $transportMessage; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function createClientMessage(PsrMessage $message) | ||
{ | ||
$clientMessage = new Message(); | ||
$clientMessage->setBody($message->getBody()); | ||
$clientMessage->setHeaders($message->getHeaders()); | ||
$clientMessage->setProperties($message->getProperties()); | ||
|
||
$clientMessage->setContentType($message->getHeader('content_type')); | ||
|
||
$clientMessage->setTimestamp($message->getTimestamp()); | ||
$clientMessage->setMessageId($message->getMessageId()); | ||
$clientMessage->setReplyTo($message->getReplyTo()); | ||
$clientMessage->setCorrelationId($message->getCorrelationId()); | ||
|
||
return $clientMessage; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function sendToRouter(Message $message) | ||
{ | ||
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) { | ||
throw new \LogicException('Topic name parameter is required but is not set'); | ||
} | ||
|
||
$topic = $this->createRouterTopic(); | ||
$transportMessage = $this->createTransportMessage($message); | ||
|
||
$this->context->createProducer()->send($topic, $transportMessage); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function sendToProcessor(Message $message) | ||
{ | ||
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { | ||
throw new \LogicException('Processor name parameter is required but is not set'); | ||
} | ||
|
||
if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { | ||
throw new \LogicException('Queue name parameter is required but is not set'); | ||
} | ||
|
||
$transportMessage = $this->createTransportMessage($message); | ||
$destination = $this->createQueue($queueName); | ||
|
||
$this->context->createProducer()->send($destination, $transportMessage); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function createQueue($queueName) | ||
{ | ||
$transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName(); | ||
|
||
return $this->context->createQueue($transportName); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function setupBroker(LoggerInterface $logger = null) | ||
{ | ||
$logger = $logger ?: new NullLogger(); | ||
$logger->debug('[RdKafkaDriver] setup broker'); | ||
$log = function ($text, ...$args) use ($logger) { | ||
$logger->debug(sprintf('[RdKafkaDriver] '.$text, ...$args)); | ||
}; | ||
|
||
// setup router | ||
$routerQueue = $this->createQueue($this->config->getRouterQueueName()); | ||
$log('Create router queue: %s', $routerQueue->getQueueName()); | ||
$this->context->createConsumer($routerQueue); | ||
|
||
// setup queues | ||
foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) { | ||
$queue = $this->createQueue($meta->getClientName()); | ||
$log('Create processor queue: %s', $queue->getQueueName()); | ||
$this->context->createConsumer($queue); | ||
} | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function getConfig() | ||
{ | ||
return $this->config; | ||
} | ||
|
||
private function createRouterTopic() | ||
{ | ||
$topic = $this->context->createTopic( | ||
$this->config->createTransportRouterTopicName($this->config->getRouterTopicName()) | ||
); | ||
|
||
return $topic; | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
<?php | ||
|
||
namespace Enqueue\RdKafka\Symfony; | ||
|
||
use Enqueue\RdKafka\Client\RdKafkaDriver; | ||
use Enqueue\RdKafka\RdKafkaConnectionFactory; | ||
use Enqueue\RdKafka\RdKafkaContext; | ||
use Enqueue\Symfony\DriverFactoryInterface; | ||
use Enqueue\Symfony\TransportFactoryInterface; | ||
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; | ||
use Symfony\Component\DependencyInjection\ContainerBuilder; | ||
use Symfony\Component\DependencyInjection\Definition; | ||
use Symfony\Component\DependencyInjection\Reference; | ||
|
||
class RdKafkaTransportFactory implements TransportFactoryInterface, DriverFactoryInterface | ||
{ | ||
/** | ||
* @var string | ||
*/ | ||
private $name; | ||
|
||
/** | ||
* @param string $name | ||
*/ | ||
public function __construct($name = 'rdkafka') | ||
{ | ||
$this->name = $name; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function addConfiguration(ArrayNodeDefinition $builder) | ||
{ | ||
$builder | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The entire configuration needs more work/checking |
||
->beforeNormalization() | ||
->ifString() | ||
->then(function ($v) { | ||
return ['dsn' => $v]; | ||
}) | ||
->end() | ||
->children() | ||
->scalarNode('dsn') | ||
->info('The kafka DSN. Other parameters are ignored if set') | ||
->end() | ||
->variableNode('global') | ||
->defaultValue([]) | ||
->info('The kafka global configuration properties') | ||
->end() | ||
->variableNode('topic') | ||
->defaultValue([]) | ||
->info('The kafka topic configuration properties') | ||
->end() | ||
->scalarNode('dr_msg_cb') | ||
->info('Delivery report callback') | ||
->end() | ||
->scalarNode('error_cb') | ||
->info('Error callback') | ||
->end() | ||
->scalarNode('rebalance_cb') | ||
->info('Called after consumer group has been rebalanced') | ||
->end() | ||
->enumNode('partitioner') | ||
->values(['RD_KAFKA_MSG_PARTITIONER_RANDOM', 'RD_KAFKA_MSG_PARTITIONER_CONSISTENT']) | ||
->info('Which partitioner to use') | ||
->end() | ||
->integerNode('log_level') | ||
->info('Logging level (syslog(3) levels)') | ||
->min(0)->max(7) | ||
->end() | ||
->booleanNode('commit_async') | ||
->defaultFalse() | ||
->info('Commit asynchronous') | ||
->end() | ||
; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function createConnectionFactory(ContainerBuilder $container, array $config) | ||
{ | ||
if (false == empty($config['rdkafka'])) { | ||
$config['rdkafka'] = new Reference($config['rdkafka']); | ||
} | ||
|
||
$factory = new Definition(RdKafkaConnectionFactory::class); | ||
$factory->setArguments([isset($config['dsn']) ? $config['dsn'] : $config]); | ||
|
||
$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); | ||
$container->setDefinition($factoryId, $factory); | ||
|
||
return $factoryId; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function createContext(ContainerBuilder $container, array $config) | ||
{ | ||
$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); | ||
|
||
$context = new Definition(RdKafkaContext::class); | ||
$context->setPublic(true); | ||
$context->setFactory([new Reference($factoryId), 'createContext']); | ||
|
||
$contextId = sprintf('enqueue.transport.%s.context', $this->getName()); | ||
$container->setDefinition($contextId, $context); | ||
|
||
return $contextId; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function createDriver(ContainerBuilder $container, array $config) | ||
{ | ||
$driver = new Definition(RdKafkaDriver::class); | ||
$driver->setPublic(true); | ||
$driver->setArguments([ | ||
new Reference(sprintf('enqueue.transport.%s.context', $this->getName())), | ||
new Reference('enqueue.client.config'), | ||
new Reference('enqueue.client.meta.queue_meta_registry'), | ||
]); | ||
|
||
$driverId = sprintf('enqueue.client.%s.driver', $this->getName()); | ||
$container->setDefinition($driverId, $driver); | ||
|
||
return $driverId; | ||
} | ||
|
||
/** | ||
* @return string | ||
*/ | ||
public function getName() | ||
{ | ||
return $this->name; | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if this is going to be needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will, keep it.