Skip to content

Commit fbfd147

Browse files
committed
upd
1 parent 09b4afa commit fbfd147

10 files changed

+164
-61
lines changed

DependencyInjection/Compiler/AsyncTransformersPass.php

-30
This file was deleted.

EnqueueBundle.php

+6-4
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
use Enqueue\AmqpExt\AmqpContext;
66
use Enqueue\AmqpExt\Symfony\AmqpTransportFactory;
77
use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory;
8-
use Enqueue\Bundle\DependencyInjection\Compiler\AsyncEventsPass;
9-
use Enqueue\Bundle\DependencyInjection\Compiler\AsyncTransformersPass;
108
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass;
119
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass;
1210
use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass;
1311
use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass;
1412
use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass;
1513
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
1614
use Enqueue\Bundle\DependencyInjection\EnqueueExtension;
15+
use Enqueue\Bundle\Events\DependencyInjection\AsyncEventsPass;
16+
use Enqueue\Bundle\Events\DependencyInjection\AsyncTransformersPass;
1717
use Enqueue\Dbal\DbalContext;
1818
use Enqueue\Dbal\Symfony\DbalTransportFactory;
1919
use Enqueue\Fs\FsContext;
@@ -25,6 +25,7 @@
2525
use Enqueue\Stomp\StompContext;
2626
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
2727
use Enqueue\Stomp\Symfony\StompTransportFactory;
28+
use Symfony\Component\DependencyInjection\Compiler\PassConfig;
2829
use Symfony\Component\DependencyInjection\ContainerBuilder;
2930
use Symfony\Component\HttpKernel\Bundle\Bundle;
3031

@@ -41,8 +42,6 @@ public function build(ContainerBuilder $container)
4142
$container->addCompilerPass(new BuildTopicMetaSubscribersPass());
4243
$container->addCompilerPass(new BuildQueueMetaRegistryPass());
4344
$container->addCompilerPass(new BuildClientExtensionsPass());
44-
$container->addCompilerPass(new AsyncEventsPass());
45-
$container->addCompilerPass(new AsyncTransformersPass());
4645

4746
/** @var EnqueueExtension $extension */
4847
$extension = $container->getExtension('enqueue');
@@ -72,5 +71,8 @@ public function build(ContainerBuilder $container)
7271
if (class_exists(SqsContext::class)) {
7372
$extension->addTransportFactory(new SqsTransportFactory());
7473
}
74+
75+
$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
76+
$container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
7577
}
7678
}

Events/AsyncListener.php

+7-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\Bundle\Events;
44

5+
use Enqueue\Client\Message;
56
use Enqueue\Client\ProducerInterface;
67
use Symfony\Component\EventDispatcher\Event;
78

@@ -40,10 +41,14 @@ public function syncMode($eventName)
4041
public function onEvent(Event $event, $eventName)
4142
{
4243
if (false == isset($this->syncMode[$eventName])) {
43-
$message = $this->registry->getTransformer($eventName)->toMessage($eventName, $event);
44+
$transformerName = $this->registry->getTransformerNameForEvent($eventName);
45+
46+
$message = $this->registry->getTransformer($transformerName)->toMessage($eventName, $event);
47+
$message->setScope(Message::SCOPE_APP);
4448
$message->setProperty('event_name', $eventName);
49+
$message->setProperty('transformer_name', $transformerName);
4550

46-
$this->producer->send('symfony_events', $message);
51+
$this->producer->send('event.'.$eventName, $message);
4752
}
4853
}
4954
}

Events/AsyncProcessor.php

+5-13
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22

33
namespace Enqueue\Bundle\Events;
44

5-
use Enqueue\Client\TopicSubscriberInterface;
65
use Enqueue\Psr\PsrContext;
76
use Enqueue\Psr\PsrMessage;
87
use Enqueue\Psr\PsrProcessor;
98

10-
class AsyncProcessor implements PsrProcessor, TopicSubscriberInterface
9+
class AsyncProcessor implements PsrProcessor
1110
{
1211
/**
1312
* @var Registry
@@ -37,22 +36,15 @@ public function process(PsrMessage $message, PsrContext $context)
3736
if (false == $eventName = $message->getProperty('event_name')) {
3837
return self::REJECT;
3938
}
39+
if (false == $transformerName = $message->getProperty('transformer_name')) {
40+
return self::REJECT;
41+
}
4042

41-
// TODO set transformer's name explicitly when sending a message.
42-
43-
$event = $this->registry->getTransformer($eventName)->toEvent($eventName, $message);
43+
$event = $this->registry->getTransformer($transformerName)->toEvent($eventName, $message);
4444

4545
$this->eventDispatcher->syncMode($eventName);
4646
$this->eventDispatcher->dispatch($eventName, $event);
4747

4848
return self::ACK;
4949
}
50-
51-
/**
52-
* {@inheritdoc}
53-
*/
54-
public static function getSubscribedTopics()
55-
{
56-
return ['symfony_events'];
57-
}
5850
}

Events/ContainerAwareRegistry.php

+52-8
Original file line numberDiff line numberDiff line change
@@ -12,27 +12,71 @@ class ContainerAwareRegistry implements Registry, ContainerAwareInterface
1212
/**
1313
* @var string[]
1414
*/
15-
private $transformersMap;
15+
private $eventNamesMap;
1616

1717
/**
18-
* @param string[] $transformersMap
18+
* @var string[]
1919
*/
20-
public function __construct(array $transformersMap)
20+
private $transformerIdsMap;
21+
22+
/**
23+
* @param string[] $eventNamesMap
24+
* @param string[] $transformerIdsMap
25+
*/
26+
public function __construct(array $eventNamesMap, array $transformerIdsMap)
2127
{
22-
$this->transformersMap = $transformersMap;
28+
$this->eventNamesMap = $eventNamesMap;
29+
$this->transformerIdsMap = $transformerIdsMap;
2330
}
2431

2532
/**
2633
* {@inheritdoc}
2734
*/
28-
public function getTransformer($eventName)
35+
public function getTransformerNameForEvent($eventName)
2936
{
30-
if (false == array_key_exists($eventName, $this->transformersMap)) {
37+
$transformerName = null;
38+
if (array_key_exists($eventName, $this->eventNamesMap)) {
39+
$transformerName = $this->eventNamesMap[$eventName];
40+
} else {
41+
foreach ($this->eventNamesMap as $eventNamePattern => $name) {
42+
if ('/' != $eventNamePattern[0]) {
43+
continue;
44+
}
45+
46+
if (preg_match($eventNamePattern, $eventName)) {
47+
$transformerName = $name;
48+
49+
break;
50+
}
51+
}
52+
}
53+
54+
if (empty($transformerName)) {
3155
throw new \LogicException(sprintf('There is no transformer registered for the given event %s', $eventName));
3256
}
3357

34-
// TODO add check container returns instance of EventTransformer interface.
58+
return $transformerName;
59+
}
60+
61+
/**
62+
* {@inheritdoc}
63+
*/
64+
public function getTransformer($name)
65+
{
66+
if (false == array_key_exists($name, $this->transformerIdsMap)) {
67+
throw new \LogicException(sprintf('There is no transformer named %s', $name));
68+
}
69+
70+
$transformer = $this->container->get($this->transformerIdsMap[$name]);
71+
72+
if (false == $transformer instanceof EventTransformer) {
73+
throw new \LogicException(sprintf(
74+
'The container must return instance of %s but got %s',
75+
EventTransformer::class,
76+
is_object($transformer) ? get_class($transformer) : gettype($transformer)
77+
));
78+
}
3579

36-
return $this->container->get($this->transformersMap[$eventName]);
80+
return $transformer;
3781
}
3882
}

DependencyInjection/Compiler/AsyncEventsPass.php renamed to Events/DependencyInjection/AsyncEventsPass.php

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?php
22

3-
namespace Enqueue\Bundle\DependencyInjection\Compiler;
3+
namespace Enqueue\Bundle\Events\DependencyInjection;
44

55
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
66
use Symfony\Component\DependencyInjection\ContainerBuilder;
@@ -17,6 +17,10 @@ public function process(ContainerBuilder $container)
1717
return;
1818
}
1919

20+
if (false == $container->hasDefinition('enqueue.events.registry')) {
21+
return;
22+
}
23+
2024
$registeredToEvent = [];
2125
foreach ($container->findTaggedServiceIds('kernel.event_listener') as $serviceId => $tagAttributes) {
2226
foreach ($tagAttributes as $tagAttribute) {
@@ -37,6 +41,12 @@ public function process(ContainerBuilder $container)
3741
])
3842
;
3943

44+
$container->getDefinition('enqueue.events.async_processor')
45+
->addTag('enqueue.client.processor', [
46+
'topicName' => 'event.'.$tagAttribute['event'],
47+
])
48+
;
49+
4050
$registeredToEvent[$tagAttribute['event']] = true;
4151
}
4252
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Events\DependencyInjection;
4+
5+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
6+
use Symfony\Component\DependencyInjection\ContainerBuilder;
7+
8+
class AsyncTransformersPass implements CompilerPassInterface
9+
{
10+
/**
11+
* {@inheritdoc}
12+
*/
13+
public function process(ContainerBuilder $container)
14+
{
15+
if (false == $container->hasDefinition('enqueue.events.registry')) {
16+
return;
17+
}
18+
19+
$transformerIdsMap = [];
20+
$eventNamesMap = [];
21+
foreach ($container->findTaggedServiceIds('enqueue.event_transformer') as $serviceId => $tagAttributes) {
22+
foreach ($tagAttributes as $tagAttribute) {
23+
if (false == isset($tagAttribute['eventName'])) {
24+
throw new \LogicException('The eventName attribute must be set');
25+
}
26+
27+
$eventName = $tagAttribute['eventName'];
28+
29+
$transformerName = isset($tagAttribute['transformerName']) ? $tagAttribute['transformerName'] : $serviceId;
30+
31+
$eventNamesMap[$eventName] = $transformerName;
32+
$transformerIdsMap[$transformerName] = $serviceId;
33+
}
34+
}
35+
36+
$container->getDefinition('enqueue.events.registry')
37+
->replaceArgument(0, $eventNamesMap)
38+
->replaceArgument(1, $transformerIdsMap)
39+
;
40+
}
41+
}
+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Events;
4+
5+
use Enqueue\Client\Message;
6+
use Enqueue\Psr\PsrMessage;
7+
use Symfony\Component\EventDispatcher\Event;
8+
9+
class PhpSerializerEventTransformer implements EventTransformer
10+
{
11+
/**
12+
* {@inheritdoc}
13+
*/
14+
public function toMessage($eventName, Event $event = null)
15+
{
16+
$message = new Message();
17+
$message->setBody(serialize($event));
18+
19+
return $message;
20+
}
21+
22+
/**
23+
* {@inheritdoc}
24+
*/
25+
public function toEvent($eventName, PsrMessage $message)
26+
{
27+
return unserialize($message->getBody());
28+
}
29+
}

Events/Registry.php

+8-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,14 @@ interface Registry
77
/**
88
* @param string $eventName
99
*
10+
* @return string
11+
*/
12+
public function getTransformerNameForEvent($eventName);
13+
14+
/**
15+
* @param string $name
16+
*
1017
* @return EventTransformer
1118
*/
12-
public function getTransformer($eventName);
19+
public function getTransformer($name);
1320
}

Resources/config/events.yml

+5-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ services:
22
enqueue.events.registry:
33
class: 'Enqueue\Bundle\Events\ContainerAwareRegistry'
44
public: false
5-
arguments: [[]]
5+
arguments: [[], []]
66
calls:
77
- ['setContainer', ['@service_container']]
88

@@ -24,5 +24,8 @@ services:
2424
arguments:
2525
- '@enqueue.events.registry'
2626
- '@enqueue.events.event_dispatcher'
27+
28+
enqueue.events.php_serializer_event_transofrmer:
29+
class: 'Enqueue\Bundle\Events\PhpSerializerEventTransformer'
2730
tags:
28-
- {name: 'enqueue.client.processor' }
31+
- {name: 'enqueue.event_transformer', eventName: '/.*/', transformerName: 'php_serializer' }

0 commit comments

Comments
 (0)