Skip to content

[bundle] Extend EventDispatcher instead of container aware one. #129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jul 6, 2017
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ install:

script:
# misssing pkg/amqp-ext pkg/job-queue pkg/redis
- if [ "$PHPSTAN" = true ]; then ./bin/phpstan analyse -l 1 -c phpstan.neon pkg/enqueue pkg/psr-queue pkg/fs pkg/simple-client; fi
- if [ "$PHPSTAN" = true ]; then ./bin/phpstan analyse -l 1 -c phpstan.neon pkg/stomp pkg/dbal pkg/enqueue-bundle pkg/null pkg/sqs pkg/test; fi
- if [ "$PHPSTAN" = true ]; then php -d memory_limit=512M bin/phpstan analyse -l 1 -c phpstan.neon pkg/amqp-ext pkg/async-event-dispatcher pkg/dbal pkg/enqueue pkg/enqueue-bundle pkg/fs pkg/gearman pkg/job-queue pkg/null pkg/pheanstalk pkg/psr-queue pkg/redis pkg/simple-client pkg/sqs pkg/stomp pkg/test; fi
- if [ "$PHP_CS_FIXER" = true ]; then IFS=$'\n'; COMMIT_SCA_FILES=($(git diff --name-only --diff-filter=ACMRTUXB "${TRAVIS_COMMIT_RANGE}")); unset IFS; fi
- if [ "$PHP_CS_FIXER" = true ]; then ./bin/php-cs-fixer fix --config=.php_cs.dist -v --dry-run --stop-on-violation --using-cache=no --path-mode=intersection -- "${COMMIT_SCA_FILES[@]}"; fi
- if [ "$UNIT_TESTS" = true ]; then bin/phpunit --exclude-group=functional; fi
Expand Down
5 changes: 5 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"enqueue/job-queue": "*@dev",
"enqueue/simple-client": "*@dev",
"enqueue/test": "*@dev",
"enqueue/async-event-dispatcher": "*@dev",

"phpunit/phpunit": "^5",
"doctrine/doctrine-bundle": "~1.2",
Expand Down Expand Up @@ -98,6 +99,10 @@
{
"type": "path",
"url": "pkg/simple-client"
},
{
"type": "path",
"url": "pkg/async-event-dispatcher"
}
]
}
112 changes: 112 additions & 0 deletions docs/async_event_dispatcher/quick_tour.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Async event dispatcher (Symfony)

The doc shows how you can setup async event dispatching in plain PHP.
If you are looking for the ways to use it in Symfony application [read this post instead](../bundle/async_events.md)

* [Installation](#installation)
* [Configuration](#configuration)
* [Dispatch event](#dispatch-event)
* [Process async events](#process-async-events)

## Installation

You need the async dispatcher library and a one of [the supported transports](../transport)

```bash
$ composer require enqueue/async-event-dispatcher enqueue/fs
```

## Configuration

```php
<?php

// config.php

use Enqueue\AsyncEventDispatcher\AsyncListener;
use Enqueue\AsyncEventDispatcher\AsyncProcessor;
use Enqueue\AsyncEventDispatcher\PhpSerializerEventTransformer;
use Enqueue\AsyncEventDispatcher\AsyncEventDispatcher;
use Enqueue\AsyncEventDispatcher\SimpleRegistry;
use Enqueue\Fs\FsConnectionFactory;
use Symfony\Component\EventDispatcher\EventDispatcher;

require_once __DIR__.'/vendor/autoload.php';

// it could be any other enqueue/psr-queue compatible context.
$context = (new FsConnectionFactory('file://'.__DIR__.'/queues'))->createContext();
$eventQueue = $context->createQueue('symfony_events');

$registry = new SimpleRegistry(
['the_event' => 'default'],
['default' => new PhpSerializerEventTransformer($context, true)]
);

$asyncListener = new AsyncListener($context, $registry, $eventQueue);

$dispatcher = new EventDispatcher();

// the listener sends even as a message through MQ
$dispatcher->addListener('the_event', $asyncListener);

$asyncDispatcher = new AsyncEventDispatcher($dispatcher, $asyncListener);

// the listener is executed on consumer side.
$asyncDispatcher->addListener('the_event', function() {
});

$asyncProcessor = new AsyncProcessor($registry, $asyncDispatcher);
```

## Dispatch event

```php
<?php

// send.php

use Symfony\Component\EventDispatcher\GenericEvent;

require_once __DIR__.'/vendor/autoload.php';

include __DIR__.'/config.php';

$dispatcher->dispatch('the_event', new GenericEvent('theSubject'));
```

## Process async events

```php
<?php

// consume.php

use Enqueue\Psr\PsrProcessor;

require_once __DIR__.'/vendor/autoload.php';
include __DIR__.'/config.php';

$consumer = $context->createConsumer($eventQueue);

while (true) {
if ($message = $consumer->receive(5000)) {
$result = $asyncProcessor->process($message, $context);

switch ((string) $result) {
case PsrProcessor::ACK:
$consumer->acknowledge($message);
break;
case PsrProcessor::REJECT:
$consumer->reject($message);
break;
case PsrProcessor::REQUEUE:
$consumer->reject($message, true);
break;
default:
throw new \LogicException('Result is not supported');
}
}
}
```

[back to index](../index.md)
57 changes: 4 additions & 53 deletions docs/bundle/async_events.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,67 +66,18 @@ You can also add an async listener directly and register a custom message proces

services:
acme.async_foo_listener:
class: 'Enqueue\Bundle\Events\AsyncListener'
class: 'Enqueue\AsyncEventDispatcher\AsyncListener'
public: false
arguments: ['@enqueue.client.producer', '@enqueue.events.registry']
arguments: ['@enqueue.transport.default.context', '@enqueue.events.registry', 'a_queue_name']
tags:
- { name: 'kernel.event_listener', event: 'foo', method: 'onEvent' }
```

The message processor must subscribe to `event.foo` topic. The message queue topics names for event follow this patter `event.{eventName}`.

```php
<?php

use Enqueue\Bundle\Events\Registry;
use Enqueue\Client\TopicSubscriberInterface;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;

class FooEventProcessor implements PsrProcessor, TopicSubscriberInterface
{
/**
* @var Registry
*/
private $registry;

/**
* @param Registry $registry
*/
public function __construct(Registry $registry)
{
$this->registry = $registry;
}

public function process(PsrMessage $message, PsrContext $context)
{
if (false == $eventName = $message->getProperty('event_name')) {
return self::REJECT;
}
if (false == $transformerName = $message->getProperty('transformer_name')) {
return self::REJECT;
}

// do what you want with the event.
$event = $this->registry->getTransformer($transformerName)->toEvent($eventName, $message);


return self::ACK;
}

public static function getSubscribedTopics()
{
return ['event.foo'];
}
}
```


## Event transformer

The bundle uses [php serializer](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue-bundle/Events/PhpSerializerEventTransformer.php) transformer by default to pass events through MQ.
You could create a transformer for the given event type. The transformer must implement `Enqueue\Bundle\Events\EventTransformer` interface.
You could create a transformer for the given event type. The transformer must implement `Enqueue\AsyncEventDispatcher\EventTransformer` interface.
Consider the next example. It shows how to send an event that contains Doctrine entity as a subject

```php
Expand All @@ -140,7 +91,7 @@ use Enqueue\Consumption\Result;
use Enqueue\Psr\PsrMessage;
use Enqueue\Util\JSON;
use Symfony\Component\EventDispatcher\Event;
use Enqueue\Bundle\Events\EventTransformer;
use Enqueue\AsyncEventDispatcher\EventTransformer;
use Doctrine\Bundle\DoctrineBundle\Registry;
use Symfony\Component\EventDispatcher\GenericEvent;

Expand Down
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
- [Production settings](bundle/production_settings.md)
- [Debuging](bundle/debuging.md)
- [Functional testing](bundle/functional_testing.md)
* Async event dispatcher (Symfony)
- [Quick tour](async_event_dispatcher/quick_tour.md)
* Magento
- [Quick tour](magento/quick_tour.md)
- [Cli commands](magento/cli_commands.md)
Expand Down
9 changes: 8 additions & 1 deletion phpstan.neon
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
parameters:
excludes_analyse:
- pkg/enqueue/Util/UUID.php
- pkg/enqueue/Util/UUID.php
- pkg/enqueue-bundle/Tests/Functional/App
- pkg/job-queue/Test/JobRunner.php
- pkg/job-queue/Tests/Functional/app/AppKernel.php
- pkg/redis/PhpRedis.php
- pkg/redis/RedisConnectionFactory.php
- pkg/gearman
- pkg/amqp-ext/AmqpConsumer.php
4 changes: 4 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@
<testsuite name="simple-client">
<directory>pkg/simple-client/Tests</directory>
</testsuite>

<testsuite name="async-event-dispatcher">
<directory>pkg/async-event-dispatcher/Tests</directory>
</testsuite>
</testsuites>

<filter>
Expand Down
7 changes: 7 additions & 0 deletions pkg/async-event-dispatcher/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
*~
/composer.lock
/composer.phar
/phpunit.xml
/vendor/
/.idea/
Tests/Functional/queues
21 changes: 21 additions & 0 deletions pkg/async-event-dispatcher/.travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
sudo: false

git:
depth: 1

language: php

php:
- '5.6'
- '7.0'

cache:
directories:
- $HOME/.composer/cache

install:
- composer self-update
- composer install --prefer-source --ignore-platform-reqs

script:
- vendor/bin/phpunit --exclude-group=functional
57 changes: 57 additions & 0 deletions pkg/async-event-dispatcher/AsyncEventDispatcher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?php

namespace Enqueue\AsyncEventDispatcher;

use Symfony\Component\EventDispatcher\Event;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

class AsyncEventDispatcher extends EventDispatcher
{
/**
* @var EventDispatcherInterface
*/
private $trueEventDispatcher;

/**
* @var AsyncListener
*/
private $asyncListener;

/**
* @param EventDispatcherInterface $trueEventDispatcher
* @param AsyncListener $asyncListener
*/
public function __construct(EventDispatcherInterface $trueEventDispatcher, AsyncListener $asyncListener)
{
$this->trueEventDispatcher = $trueEventDispatcher;
$this->asyncListener = $asyncListener;
}

/**
* This method dispatches only those listeners that were marked as async.
*
* @param string $eventName
* @param Event|null $event
*/
public function dispatchAsyncListenersOnly($eventName, Event $event = null)
{
try {
$this->asyncListener->syncMode($eventName);

parent::dispatch($eventName, $event);
} finally {
$this->asyncListener->resetSyncMode();
}
}

/**
* {@inheritdoc}
*/
public function dispatch($eventName, Event $event = null)
{
parent::dispatch($eventName, $event);

$this->trueEventDispatcher->dispatch($eventName, $event);
}
}
Loading