Skip to content

Commit 1fa5847

Browse files
committed
Reowrk router processor, remove client routing compiler pass.
1 parent 6a66b9f commit 1fa5847

File tree

3 files changed

+49
-404
lines changed

3 files changed

+49
-404
lines changed

DependencyInjection/Compiler/BuildClientRoutingPass.php

-47
This file was deleted.

DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php

+49-39
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
namespace Enqueue\Bundle\DependencyInjection\Compiler;
44

55
use Enqueue\Client\CommandSubscriberInterface;
6-
use Enqueue\Client\Config;
76
use Enqueue\Client\TopicSubscriberInterface;
87
use Symfony\Component\DependencyInjection\ContainerBuilder;
98
use Symfony\Component\DependencyInjection\Exception\ParameterNotFoundException;
@@ -31,47 +30,45 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS
3130
}
3231
};
3332

34-
$processorClass = $container->getDefinition($processorServiceId)->getClass();
35-
if (false == class_exists($processorClass)) {
36-
throw new \LogicException(sprintf('The class "%s" could not be found.', $processorClass));
33+
$processorDefinition = $container->getDefinition($processorServiceId);
34+
$processorClass = $processorDefinition->getClass();
35+
if (false == $processorDefinition->getFactory() && false == class_exists($processorClass)) {
36+
throw new \LogicException(sprintf('The processor class "%s" could not be found.', $processorClass));
3737
}
3838

3939
$defaultQueueName = $resolve($container->getParameter('enqueue.client.default_queue_name'));
40-
$subscriptionPrototype = [
41-
'topicName' => null,
42-
'queueName' => null,
43-
'queueNameHardcoded' => false,
44-
'processorName' => null,
45-
'exclusive' => false,
46-
];
4740

4841
$data = [];
49-
if (is_subclass_of($processorClass, CommandSubscriberInterface::class)) {
42+
if ($processorClass && is_subclass_of($processorClass, CommandSubscriberInterface::class)) {
5043
/** @var CommandSubscriberInterface $processorClass */
5144
$params = $processorClass::getSubscribedCommand();
5245
if (is_string($params)) {
5346
if (empty($params)) {
54-
throw new \LogicException('The processor name (it is also the command name) must not be empty.');
47+
throw new \LogicException('The command name must not be empty.');
5548
}
5649

5750
$data[] = [
58-
'topicName' => Config::COMMAND_TOPIC,
51+
'commandName' => $params,
5952
'queueName' => $defaultQueueName,
6053
'queueNameHardcoded' => false,
61-
'processorName' => $params,
54+
'processorName' => $processorServiceId,
55+
'exclusive' => false,
6256
];
6357
} elseif (is_array($params)) {
64-
$params = array_replace($subscriptionPrototype, $params);
65-
if (false == $processorName = $resolve($params['processorName'])) {
66-
throw new \LogicException('The processor name (it is also the command name) must not be empty.');
58+
if (empty($params['commandName'])) {
59+
throw new \LogicException('The commandName must be set.');
60+
}
61+
62+
if (false == $commandName = $resolve($params['commandName'])) {
63+
throw new \LogicException('The commandName must not be empty.');
6764
}
6865

6966
$data[] = [
70-
'topicName' => Config::COMMAND_TOPIC,
71-
'queueName' => $resolve($params['queueName']) ?: $defaultQueueName,
72-
'queueNameHardcoded' => $resolve($params['queueNameHardcoded']),
73-
'processorName' => $processorName,
74-
'exclusive' => array_key_exists('exclusive', $params) ? $params['exclusive'] : false,
67+
'commandName' => $commandName,
68+
'queueName' => $resolve($params['queueName'] ?? $defaultQueueName),
69+
'queueNameHardcoded' => $params['queueNameHardcoded'] ?? false,
70+
'processorName' => $params['processorName'] ?? $commandName,
71+
'exclusive' => $params['exclusive'] ?? false,
7572
];
7673
} else {
7774
throw new \LogicException(sprintf(
@@ -81,7 +78,7 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS
8178
}
8279
}
8380

84-
if (is_subclass_of($processorClass, TopicSubscriberInterface::class)) {
81+
if ($processorClass && is_subclass_of($processorClass, TopicSubscriberInterface::class)) {
8582
/** @var TopicSubscriberInterface $processorClass */
8683
$topics = $processorClass::getSubscribedTopics();
8784
if (!is_array($topics)) {
@@ -99,15 +96,15 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS
9996
'queueName' => $defaultQueueName,
10097
'queueNameHardcoded' => false,
10198
'processorName' => $processorServiceId,
99+
'exclusive' => false,
102100
];
103101
} elseif (is_array($params)) {
104-
$params = array_replace($subscriptionPrototype, $params);
105-
106102
$data[] = [
107103
'topicName' => $topicName,
108-
'queueName' => $resolve($params['queueName']) ?: $defaultQueueName,
109-
'queueNameHardcoded' => $resolve($params['queueNameHardcoded']),
104+
'queueName' => $resolve($params['queueName'] ?? $defaultQueueName),
105+
'queueNameHardcoded' => $params['queueNameHardcoded'] ?? false,
110106
'processorName' => $resolve($params['processorName']) ?: $processorServiceId,
107+
'exclusive' => false,
111108
];
112109
} else {
113110
throw new \LogicException(sprintf(
@@ -120,24 +117,37 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS
120117
}
121118

122119
if (false == (
120+
$processorClass ||
123121
is_subclass_of($processorClass, CommandSubscriberInterface::class) ||
124122
is_subclass_of($processorClass, TopicSubscriberInterface::class)
125123
)) {
126124
foreach ($tagAttributes as $tagAttribute) {
127-
$tagAttribute = array_replace($subscriptionPrototype, $tagAttribute);
125+
if (empty($tagAttribute['commandName']) && empty($tagAttribute['topicName'])) {
126+
throw new \LogicException('Either commandName or topicName attribute must be set');
127+
}
128+
129+
$topicName = $tagAttribute['topicName'] ?? null;
130+
$commandName = $tagAttribute['commandName'] ?? null;
128131

129-
if (false == $tagAttribute['topicName']) {
130-
throw new \LogicException(sprintf('Topic name is not set on message processor tag but it is required. Service %s', $processorServiceId));
132+
if ($topicName) {
133+
$data[] = [
134+
'topicName' => $resolve($tagAttribute['topicName']),
135+
'queueName' => $resolve($tagAttribute['queueName'] ?? $defaultQueueName),
136+
'queueNameHardcoded' => $tagAttribute['queueNameHardcoded'],
137+
'processorName' => $resolve($tagAttribute['processorName'] ?? $processorServiceId),
138+
'exclusive' => false,
139+
];
131140
}
132141

133-
$data[] = [
134-
'topicName' => $resolve($tagAttribute['topicName']),
135-
'queueName' => $resolve($tagAttribute['queueName']) ?: $defaultQueueName,
136-
'queueNameHardcoded' => $resolve($tagAttribute['queueNameHardcoded']),
137-
'processorName' => $resolve($tagAttribute['processorName']) ?: $processorServiceId,
138-
'exclusive' => Config::COMMAND_TOPIC == $resolve($tagAttribute['topicName']) &&
139-
array_key_exists('exclusive', $tagAttribute) ? $tagAttribute['exclusive'] : false,
140-
];
142+
if ($commandName) {
143+
$data[] = [
144+
'commandName' => $resolve($tagAttribute['commandName']),
145+
'queueName' => $resolve($tagAttribute['queueName'] ?? $defaultQueueName),
146+
'queueNameHardcoded' => $tagAttribute['queueNameHardcoded'],
147+
'processorName' => $resolve($tagAttribute['processorName'] ?? $processorServiceId),
148+
'exclusive' => $tagAttribute['exclusive'] ?? false,
149+
];
150+
}
141151
}
142152
}
143153

0 commit comments

Comments
 (0)