|
| 1 | +# Async events |
| 2 | + |
| 3 | +The EnqueueBundle allows you to dispatch events asynchronously. |
| 4 | +Behind the scene it replaces your listener with one that sends a message to MQ. |
| 5 | +The message contains the event object. |
| 6 | +The consumer, once it receives the message, restores the event and dispatches it to only async listeners. |
| 7 | + |
| 8 | +Async listeners benefits: |
| 9 | + |
| 10 | +* The response time lesser. It has to do less work. |
| 11 | +* Better fault tolerance. Bugs in async listener does not affect user. Messages will wait till you fix bugs. |
| 12 | +* Better scaling. Add more consumers to meet the load. |
| 13 | + |
| 14 | +## Configuration |
| 15 | + |
| 16 | +I suppose you already [installed the bundle](quick_tour.md#install). |
| 17 | +Now, you have to enable `async_events`. |
| 18 | +If you do not enable it, events will be processed as before: synchronously. |
| 19 | + |
| 20 | +```yaml |
| 21 | +# app/config/config.yml |
| 22 | + |
| 23 | +enqueue: |
| 24 | + async_events: true |
| 25 | + |
| 26 | +# if you'd like to send send messages onTerminate use spool_producer (it makes response time even lesser): |
| 27 | + |
| 28 | +enqueue: |
| 29 | + async_events: |
| 30 | + enabled: true |
| 31 | + spool_producer: true |
| 32 | +``` |
| 33 | +
|
| 34 | +## Usage |
| 35 | +
|
| 36 | +To make your listener async you have add `async: true` attribute to the tag `kernel.event_listener`, like this: |
| 37 | + |
| 38 | +```yaml |
| 39 | +# app/config/config.yml |
| 40 | +
|
| 41 | +service: |
| 42 | + acme.foo_listener: |
| 43 | + class: 'AcmeBundle\Listener\FooListener' |
| 44 | + tags: |
| 45 | + - { name: 'kernel.event_listener', async: true, event: 'foo', method: 'onEvent' } |
| 46 | +``` |
| 47 | + |
| 48 | +You can also add an async listener directly and register a custom message processor for it: |
| 49 | + |
| 50 | +```yaml |
| 51 | +# app/config/config.yml |
| 52 | +
|
| 53 | +service: |
| 54 | + acme.async_foo_listener: |
| 55 | + class: 'Enqueue\Bundle\Events\AsyncListener' |
| 56 | + public: false |
| 57 | + arguments: ['@enqueue.client.producer', '@enqueue.events.registry'] |
| 58 | + tags: |
| 59 | + - { name: 'kernel.event_listener', event: 'foo', method: 'onEvent' } |
| 60 | +``` |
| 61 | + |
| 62 | +The message processor must subscribe to `event.foo` topic. The message queue topics names for event follow this patter `event.{eventName}`. |
| 63 | + |
| 64 | +```php |
| 65 | +<?php |
| 66 | +
|
| 67 | +use Enqueue\Bundle\Events\Registry; |
| 68 | +use Enqueue\Client\TopicSubscriberInterface; |
| 69 | +use Enqueue\Psr\PsrContext; |
| 70 | +use Enqueue\Psr\PsrMessage; |
| 71 | +use Enqueue\Psr\PsrProcessor; |
| 72 | +
|
| 73 | +class FooEventProcessor implements PsrProcessor, TopicSubscriberInterface |
| 74 | +{ |
| 75 | + /** |
| 76 | + * @var Registry |
| 77 | + */ |
| 78 | + private $registry; |
| 79 | +
|
| 80 | + /** |
| 81 | + * @param Registry $registry |
| 82 | + */ |
| 83 | + public function __construct(Registry $registry) |
| 84 | + { |
| 85 | + $this->registry = $registry; |
| 86 | + } |
| 87 | +
|
| 88 | + public function process(PsrMessage $message, PsrContext $context) |
| 89 | + { |
| 90 | + if (false == $eventName = $message->getProperty('event_name')) { |
| 91 | + return self::REJECT; |
| 92 | + } |
| 93 | + if (false == $transformerName = $message->getProperty('transformer_name')) { |
| 94 | + return self::REJECT; |
| 95 | + } |
| 96 | +
|
| 97 | + // do what you want with the event. |
| 98 | + $event = $this->registry->getTransformer($transformerName)->toEvent($eventName, $message); |
| 99 | + |
| 100 | + |
| 101 | + return self::ACK; |
| 102 | + } |
| 103 | +
|
| 104 | + public static function getSubscribedTopics() |
| 105 | + { |
| 106 | + return ['event.foo']; |
| 107 | + } |
| 108 | +} |
| 109 | +``` |
| 110 | + |
| 111 | + |
| 112 | +## Event transformer |
| 113 | + |
| 114 | +The bundle uses [php serializer](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue-bundle/Events/PhpSerializerEventTransformer.php) transformer by default to pass events through MQ. |
| 115 | +You could create a transformer for the given event type. The transformer must implement `Enqueue\Bundle\Events\EventTransformer` interface. |
| 116 | +Consider the next example. It shows how to send an event that contains Doctrine entity as subject |
| 117 | + |
| 118 | +```php |
| 119 | +<?php |
| 120 | +namespace AcmeBundle\Listener; |
| 121 | +
|
| 122 | +// src/AcmeBundle/Listener/FooEventTransformer.php |
| 123 | +
|
| 124 | +use Enqueue\Client\Message; |
| 125 | +use Enqueue\Consumption\Result; |
| 126 | +use Enqueue\Psr\PsrMessage; |
| 127 | +use Enqueue\Util\JSON; |
| 128 | +use Symfony\Component\EventDispatcher\Event; |
| 129 | +use Enqueue\Bundle\Events\EventTransformer; |
| 130 | +use Doctrine\Bundle\DoctrineBundle\Registry; |
| 131 | +use Symfony\Component\EventDispatcher\GenericEvent; |
| 132 | +
|
| 133 | +class FooEventTransformer implements EventTransformer |
| 134 | +{ |
| 135 | + /** @var Registry @doctrine */ |
| 136 | + private $doctrine; |
| 137 | +
|
| 138 | + public function __construct(Registry $doctrine) |
| 139 | + { |
| 140 | + $this->doctrine = $doctrine; |
| 141 | + } |
| 142 | +
|
| 143 | + /** |
| 144 | + * {@inheritdoc} |
| 145 | + * |
| 146 | + * @param GenericEvent $event |
| 147 | + */ |
| 148 | + public function toMessage($eventName, Event $event = null) |
| 149 | + { |
| 150 | + $entity = $event->getSubject(); |
| 151 | + $entityClass = get_class($event); |
| 152 | + |
| 153 | + $manager = $this->doctrine->getManagerForClass($entityClass); |
| 154 | + $meta = $manager->getClassMetadata($entityClass); |
| 155 | +
|
| 156 | + $id = $meta->getIdentifierValues($entity); |
| 157 | + |
| 158 | + $message = new Message(); |
| 159 | + $message->setBody([ |
| 160 | + 'entityClass' => $entityClass, |
| 161 | + 'entityId' => $id, |
| 162 | + 'arguments' => $event->getArguments() |
| 163 | + ]); |
| 164 | +
|
| 165 | + return $message; |
| 166 | + } |
| 167 | +
|
| 168 | + /** |
| 169 | + * {@inheritdoc} |
| 170 | + */ |
| 171 | + public function toEvent($eventName, PsrMessage $message) |
| 172 | + { |
| 173 | + $data = JSON::decode($message->getBody()); |
| 174 | + |
| 175 | + $entityClass = $data['entityClass']; |
| 176 | + |
| 177 | + $manager = $this->doctrine->getManagerForClass($entityClass); |
| 178 | + if (false == $entity = $manager->find($entityClass, $data['entityId'])) { |
| 179 | + return Result::reject('The entity could not be found.'); |
| 180 | + } |
| 181 | + |
| 182 | + return new GenericEvent($entity, $data['arguments']); |
| 183 | + } |
| 184 | +} |
| 185 | +``` |
| 186 | + |
| 187 | +and register it: |
| 188 | + |
| 189 | +```yaml |
| 190 | +# app/config/config.yml |
| 191 | +
|
| 192 | +service: |
| 193 | + acme.foo_event_transofrmer: |
| 194 | + class: 'AcmeBundle\Listener\FooEventTransformer' |
| 195 | + arguments: ['@doctrine'] |
| 196 | + tags: |
| 197 | + - {name: 'enqueue.event_transformer', eventName: 'foo' } |
| 198 | +``` |
| 199 | + |
| 200 | +The `eventName` attribute accepts a regexp. You can do next `eventName: '/foo\..*?/'`. |
| 201 | +It uses this transformer for all event with the name beginning with `foo.` |
| 202 | + |
| 203 | +[back to index](../index.md) |
0 commit comments